This commit is contained in:
wangchuxiao
2023-03-13 15:39:47 +08:00
parent 6187b15a88
commit eb946ad181
18 changed files with 82 additions and 128 deletions
@@ -10,7 +10,6 @@ import (
pbMsg "OpenIM/pkg/proto/msg"
"OpenIM/pkg/statistics"
"OpenIM/pkg/utils"
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
@@ -50,9 +49,9 @@ type OnlineHistoryRedisConsumerHandler struct {
singleMsgSuccessCountMutex sync.Mutex
singleMsgFailedCountMutex sync.Mutex
producerToPush *kafka.Producer
producerToModify *kafka.Producer
producerToMongo *kafka.Producer
//producerToPush *kafka.Producer
//producerToModify *kafka.Producer
//producerToMongo *kafka.Producer
msgDatabase controller.MsgDatabase
}
@@ -66,9 +65,9 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *Onli
och.chArrays[i] = make(chan Cmd2Value, 50)
go och.Run(i)
}
och.producerToPush = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
och.producerToModify = kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic)
och.producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
//och.producerToPush = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
//och.producerToModify = kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic)
//och.producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
@@ -108,7 +107,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
}
}
if len(modifyMsgList) > 0 {
och.sendMessageToModifyMQ(ctx, msgChannelValue.aggregationID, triggerID, modifyMsgList)
och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, triggerID, modifyMsgList)
}
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList))
if len(storageMsgList) > 0 {
@@ -122,17 +121,18 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
och.singleMsgSuccessCountMutex.Lock()
och.singleMsgSuccessCount += uint64(len(storageMsgList))
och.singleMsgSuccessCountMutex.Unlock()
och.SendMessageToMongoCH(ctx, msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq)
och.msgDatabase.MsgToMongoMQ(ctx, msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq)
for _, v := range storageMsgList {
och.sendMessageToPushMQ(ctx, v, msgChannelValue.aggregationID)
och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v)
}
for _, x := range notStoragePushMsgList {
och.sendMessageToPushMQ(ctx, x, msgChannelValue.aggregationID)
for _, v := range notStoragePushMsgList {
och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v)
}
}
} else {
for _, v := range notStoragePushMsgList {
och.sendMessageToPushMQ(ctx, v, msgChannelValue.aggregationID)
och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v)
}
}
}
@@ -239,30 +239,3 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
}
return nil
}
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) {
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: message.MsgData, SourceID: pushToUserID}
pid, offset, err := och.producerToPush.SendMessage(ctx, mqPushMsg.SourceID, &mqPushMsg)
if err != nil {
log.Error(tracelog.GetOperationID(ctx), "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
}
return
}
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) {
if len(messages) > 0 {
pid, offset, err := och.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID})
if err != nil {
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
}
}
}
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) {
if len(messages) > 0 {
pid, offset, err := och.producerToMongo.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID})
if err != nil {
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
}
}
}