msg handle

This commit is contained in:
Gordon
2022-06-16 12:21:49 +08:00
parent 310997f978
commit 5e51eebe8a
6 changed files with 32 additions and 17 deletions
+2
View File
@@ -22,6 +22,7 @@ var (
persistentCH PersistentConsumerHandler
historyCH OnlineHistoryConsumerHandler
producer *kafka.Producer
producerToMongo *kafka.Producer
cmdCh chan Cmd2Value
onlineTopicStatus int
w *sync.Mutex
@@ -43,6 +44,7 @@ func Init() {
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
}
func Run() {
//register mysqlConsumerHandler to
@@ -155,11 +155,17 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
}
}
func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
hashCode := getHashCode(aggregationID)
channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}}
pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID)
if err != nil {
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
} else {
// log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID)
}
//hashCode := getHashCode(aggregationID)
//channelID := hashCode % ChannelNum
//log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID)
////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
//och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}}
}
func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
for {