Files
open-im-server/internal/msg_transfer/logic/init.go
T

78 lines
2.5 KiB
Go
Raw Normal View History

2021-05-26 19:24:25 +08:00
package logic
import (
"Open_IM/pkg/common/config"
2022-05-11 18:33:48 +08:00
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
2022-09-12 19:32:24 +08:00
promePkg "Open_IM/pkg/common/prometheus"
2022-05-11 18:33:48 +08:00
"Open_IM/pkg/statistics"
2022-04-20 18:53:23 +08:00
"fmt"
2022-05-11 18:33:48 +08:00
"sync"
2021-05-26 19:24:25 +08:00
)
2022-05-11 20:49:47 +08:00
const OnlineTopicBusy = 1
const OnlineTopicVacancy = 0
2022-05-19 12:25:46 +08:00
const Msg = 2
2022-05-20 13:33:38 +08:00
const ConsumerMsgs = 3
2022-05-28 18:10:08 +08:00
const AggregationMessages = 4
2022-05-24 21:14:39 +08:00
const MongoMessages = 5
2022-05-25 19:01:36 +08:00
const ChannelNum = 100
2022-05-11 18:33:48 +08:00
2021-05-26 19:24:25 +08:00
var (
2022-05-11 18:33:48 +08:00
persistentCH PersistentConsumerHandler
2022-06-16 20:35:27 +08:00
historyCH OnlineHistoryRedisConsumerHandler
historyMongoCH OnlineHistoryMongoConsumerHandler
2022-05-11 18:33:48 +08:00
producer *kafka.Producer
2022-06-16 12:21:49 +08:00
producerToMongo *kafka.Producer
2022-05-11 18:33:48 +08:00
cmdCh chan Cmd2Value
2022-05-11 20:49:47 +08:00
onlineTopicStatus int
2022-05-11 18:33:48 +08:00
w *sync.Mutex
singleMsgSuccessCount uint64
groupMsgCount uint64
singleMsgFailedCount uint64
2022-05-18 08:06:05 +08:00
singleMsgSuccessCountMutex sync.Mutex
2021-05-26 19:24:25 +08:00
)
func Init() {
2022-05-11 18:33:48 +08:00
cmdCh = make(chan Cmd2Value, 10000)
2022-05-11 18:38:58 +08:00
w = new(sync.Mutex)
2022-09-14 21:11:40 +08:00
initPrometheus()
2022-09-09 16:26:57 +08:00
persistentCH.Init() // ws2mschat save mysql
historyCH.Init(cmdCh) //
2022-09-09 17:17:10 +08:00
historyMongoCH.Init()
2022-05-11 20:49:47 +08:00
onlineTopicStatus = OnlineTopicVacancy
2022-05-23 11:57:41 +08:00
//offlineHistoryCH.Init(cmdCh)
2022-05-11 18:33:48 +08:00
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
2021-05-26 19:24:25 +08:00
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
2022-06-16 12:21:49 +08:00
producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
2021-05-26 19:24:25 +08:00
}
2022-09-09 01:10:06 +08:00
func Run(promethuesPort int) {
2021-05-26 19:24:25 +08:00
//register mysqlConsumerHandler to
2022-04-20 18:53:23 +08:00
if config.Config.ChatPersistenceMysql {
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
} else {
fmt.Println("not start mysql consumer")
}
2021-05-26 19:24:25 +08:00
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
2022-06-17 10:37:43 +08:00
go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
2022-05-21 19:17:31 +08:00
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
2022-09-12 19:32:24 +08:00
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
2022-05-11 18:33:48 +08:00
}
2022-05-11 20:49:47 +08:00
func SetOnlineTopicStatus(status int) {
2022-05-11 18:33:48 +08:00
w.Lock()
defer w.Unlock()
onlineTopicStatus = status
}
2022-05-11 20:49:47 +08:00
func GetOnlineTopicStatus() int {
2022-05-11 18:33:48 +08:00
w.Lock()
defer w.Unlock()
return onlineTopicStatus
2021-05-26 19:24:25 +08:00
}