Files
open-im-server/internal/msg_transfer/logic/init.go
T

60 lines
2.0 KiB
Go
Raw Normal View History

2021-05-26 19:24:25 +08:00
package logic
import (
"Open_IM/pkg/common/config"
2022-05-11 18:33:48 +08:00
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
2022-05-11 18:54:22 +08:00
"Open_IM/pkg/common/log"
2022-05-11 18:33:48 +08:00
"Open_IM/pkg/statistics"
2022-04-20 18:53:23 +08:00
"fmt"
2022-05-11 18:33:48 +08:00
"sync"
2021-05-26 19:24:25 +08:00
)
2022-05-11 18:33:48 +08:00
const OnlineTopicBusy = "Busy"
const OnlineTopicVacancy = "Vacancy"
2021-05-26 19:24:25 +08:00
var (
2022-05-11 18:33:48 +08:00
persistentCH PersistentConsumerHandler
historyCH OnlineHistoryConsumerHandler
offlineHistoryCH OfflineHistoryConsumerHandler
producer *kafka.Producer
cmdCh chan Cmd2Value
onlineTopicStatus string
w *sync.Mutex
singleMsgSuccessCount uint64
groupMsgCount uint64
singleMsgFailedCount uint64
2021-05-26 19:24:25 +08:00
)
func Init() {
2022-05-11 18:33:48 +08:00
cmdCh = make(chan Cmd2Value, 10000)
2022-05-11 18:38:58 +08:00
w = new(sync.Mutex)
2021-05-26 19:24:25 +08:00
persistentCH.Init()
2022-05-11 18:33:48 +08:00
historyCH.Init(cmdCh)
2022-05-11 18:54:22 +08:00
log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic)
2022-05-11 18:33:48 +08:00
offlineHistoryCH.Init(cmdCh)
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
2021-05-26 19:24:25 +08:00
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
}
func Run() {
//register mysqlConsumerHandler to
2022-04-20 18:53:23 +08:00
if config.Config.ChatPersistenceMysql {
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
} else {
fmt.Println("not start mysql consumer")
}
2021-05-26 19:24:25 +08:00
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
2022-05-11 18:33:48 +08:00
go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
}
func SetOnlineTopicStatus(status string) {
w.Lock()
defer w.Unlock()
onlineTopicStatus = status
}
func GetOnlineTopicStatus() string {
w.Lock()
defer w.Unlock()
return onlineTopicStatus
2021-05-26 19:24:25 +08:00
}