Files
open-im-server/internal/msg_transfer/logic/init.go
T

68 lines
2.2 KiB
Go
Raw Normal View History

2021-05-26 19:24:25 +08:00
package logic
import (
"Open_IM/pkg/common/config"
2022-05-11 18:33:48 +08:00
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
2022-05-11 18:54:22 +08:00
"Open_IM/pkg/common/log"
2022-05-11 18:33:48 +08:00
"Open_IM/pkg/statistics"
2022-04-20 18:53:23 +08:00
"fmt"
2022-05-11 18:33:48 +08:00
"sync"
2021-05-26 19:24:25 +08:00
)
2022-05-11 20:49:47 +08:00
const OnlineTopicBusy = 1
const OnlineTopicVacancy = 0
2022-05-19 12:25:46 +08:00
const Msg = 2
2022-05-20 13:33:38 +08:00
const ConsumerMsgs = 3
const UserMessages = 4
2022-05-24 21:14:39 +08:00
const MongoMessages = 5
2022-05-25 19:01:36 +08:00
const ChannelNum = 100
2022-05-11 18:33:48 +08:00
2021-05-26 19:24:25 +08:00
var (
2022-05-11 18:33:48 +08:00
persistentCH PersistentConsumerHandler
historyCH OnlineHistoryConsumerHandler
offlineHistoryCH OfflineHistoryConsumerHandler
producer *kafka.Producer
cmdCh chan Cmd2Value
2022-05-11 20:49:47 +08:00
onlineTopicStatus int
2022-05-11 18:33:48 +08:00
w *sync.Mutex
singleMsgSuccessCount uint64
groupMsgCount uint64
singleMsgFailedCount uint64
2022-05-18 08:06:05 +08:00
singleMsgSuccessCountMutex sync.Mutex
2021-05-26 19:24:25 +08:00
)
func Init() {
2022-05-11 18:33:48 +08:00
cmdCh = make(chan Cmd2Value, 10000)
2022-05-11 18:38:58 +08:00
w = new(sync.Mutex)
2021-05-26 19:24:25 +08:00
persistentCH.Init()
2022-05-11 18:33:48 +08:00
historyCH.Init(cmdCh)
2022-05-11 20:49:47 +08:00
onlineTopicStatus = OnlineTopicVacancy
2022-05-11 18:54:22 +08:00
log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic)
2022-05-23 11:57:41 +08:00
//offlineHistoryCH.Init(cmdCh)
2022-05-11 18:33:48 +08:00
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
2021-05-26 19:24:25 +08:00
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
}
func Run() {
//register mysqlConsumerHandler to
2022-04-20 18:53:23 +08:00
if config.Config.ChatPersistenceMysql {
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
} else {
fmt.Println("not start mysql consumer")
}
2021-05-26 19:24:25 +08:00
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
2022-05-21 19:17:31 +08:00
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
2022-05-11 18:33:48 +08:00
}
2022-05-11 20:49:47 +08:00
func SetOnlineTopicStatus(status int) {
2022-05-11 18:33:48 +08:00
w.Lock()
defer w.Unlock()
onlineTopicStatus = status
}
2022-05-11 20:49:47 +08:00
func GetOnlineTopicStatus() int {
2022-05-11 18:33:48 +08:00
w.Lock()
defer w.Unlock()
return onlineTopicStatus
2021-05-26 19:24:25 +08:00
}