mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-19 00:09:02 +08:00
proto modify
This commit is contained in:
@@ -2,27 +2,54 @@ package msgtransfer
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/db/relation"
|
||||
relationTb "OpenIM/pkg/common/db/table/relation"
|
||||
"OpenIM/pkg/common/db/unrelation"
|
||||
"OpenIM/pkg/common/prome"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type MsgTransfer struct {
|
||||
persistentCH PersistentConsumerHandler // 聊天记录持久化到mysql的消费者 订阅的topic: ws2ms_chat
|
||||
historyCH OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
|
||||
historyMongoCH OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
|
||||
modifyCH ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
|
||||
persistentCH *PersistentConsumerHandler // 聊天记录持久化到mysql的消费者 订阅的topic: ws2ms_chat
|
||||
historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
|
||||
historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
|
||||
modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
|
||||
}
|
||||
|
||||
func NewMsgTransfer() *MsgTransfer {
|
||||
msgTransfer := &MsgTransfer{}
|
||||
msgTransfer.persistentCH.Init()
|
||||
msgTransfer.historyCH.Init()
|
||||
msgTransfer.historyMongoCH.Init()
|
||||
msgTransfer.modifyCH.Init()
|
||||
if config.Config.Prometheus.Enable {
|
||||
msgTransfer.initPrometheus()
|
||||
func StartTransfer(prometheusPort int) error {
|
||||
db, err := relation.NewGormDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return msgTransfer
|
||||
if err := db.AutoMigrate(&relationTb.ChatLogModel{}); err != nil {
|
||||
return err
|
||||
}
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mongo, err := unrelation.NewMongo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cacheModel := cache.NewCacheModel(rdb)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
|
||||
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
|
||||
|
||||
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel)
|
||||
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
|
||||
|
||||
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase)
|
||||
msgTransfer.initPrometheus()
|
||||
return msgTransfer.Start(prometheusPort)
|
||||
}
|
||||
|
||||
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.MsgDatabase) *MsgTransfer {
|
||||
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase),
|
||||
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
|
||||
}
|
||||
|
||||
func (m *MsgTransfer) initPrometheus() {
|
||||
@@ -36,19 +63,18 @@ func (m *MsgTransfer) initPrometheus() {
|
||||
prome.NewMsgInsertMongoFailedCounter()
|
||||
}
|
||||
|
||||
func (m *MsgTransfer) Run(promePort int) {
|
||||
func (m *MsgTransfer) Start(prometheusPort int) error {
|
||||
if config.Config.ChatPersistenceMysql {
|
||||
go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&m.persistentCH)
|
||||
go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
|
||||
} else {
|
||||
fmt.Println("msg transfer not start mysql consumer")
|
||||
}
|
||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&m.historyCH)
|
||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&m.historyMongoCH)
|
||||
go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(&m.modifyCH)
|
||||
go func() {
|
||||
err := prome.StartPrometheusSrv(promePort)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
|
||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
|
||||
go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
|
||||
err := prome.StartPrometheusSrv(prometheusPort)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -25,10 +25,13 @@ type ModifyMsgConsumerHandler struct {
|
||||
extendMsgDatabase controller.ExtendMsgDatabase
|
||||
}
|
||||
|
||||
func (mmc *ModifyMsgConsumerHandler) Init() {
|
||||
mmc.modifyMsgConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToModify.Topic},
|
||||
config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify)
|
||||
func NewModifyMsgConsumerHandler(database controller.ExtendMsgDatabase) *ModifyMsgConsumerHandler {
|
||||
return &ModifyMsgConsumerHandler{
|
||||
modifyMsgConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToModify.Topic},
|
||||
config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify),
|
||||
extendMsgDatabase: database,
|
||||
}
|
||||
}
|
||||
|
||||
func (ModifyMsgConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
@@ -38,7 +41,8 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||
if len(msg.Value) != 0 {
|
||||
mmc.ModifyMsg(msg, string(msg.Key), sess)
|
||||
ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg)
|
||||
mmc.ModifyMsg(ctx, msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.Error("", "msg get from kafka but is nil", msg.Key)
|
||||
}
|
||||
@@ -47,29 +51,30 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
log.NewInfo("msg come here ModifyMsg!!!", "", "msg", string(cMsg.Value), msgKey)
|
||||
msgFromMQ := pbMsg.MsgDataToModifyByMQ{}
|
||||
operationID := tracelog.GetOperationID(ctx)
|
||||
err := proto.Unmarshal(cMsg.Value, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "msg_transfer Unmarshal msg err", "msg", string(cMsg.Value), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Debug(msgFromMQ.TriggerID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
||||
for _, msgDataToMQ := range msgFromMQ.MessageList {
|
||||
for _, msgDataToMQ := range msgFromMQ.Messages {
|
||||
isReactionFromCache := utils.GetSwitchFromOptions(msgDataToMQ.MsgData.Options, constant.IsReactionFromCache)
|
||||
if !isReactionFromCache {
|
||||
continue
|
||||
}
|
||||
ctx := context.Background()
|
||||
tracelog.SetOperationID(ctx, msgDataToMQ.OperationID)
|
||||
tracelog.SetOperationID(ctx, operationID)
|
||||
if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageModifier {
|
||||
notification := &apistruct.ReactionMessageModifierNotification{}
|
||||
if err := json.Unmarshal(msgDataToMQ.MsgData.Content, notification); err != nil {
|
||||
continue
|
||||
}
|
||||
if notification.IsExternalExtensions {
|
||||
log.NewInfo(msgDataToMQ.OperationID, "msg:", notification, "this is external extensions")
|
||||
log.NewInfo(operationID, "msg:", notification, "this is external extensions")
|
||||
continue
|
||||
}
|
||||
if !notification.IsReact {
|
||||
@@ -89,7 +94,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
||||
}
|
||||
|
||||
if err := mmc.extendMsgDatabase.InsertExtendMsg(ctx, notification.SourceID, notification.SessionType, &extendMsg); err != nil {
|
||||
log.NewError(msgDataToMQ.OperationID, "MsgFirstModify InsertExtendMsg failed", notification.SourceID, notification.SessionType, extendMsg, err.Error())
|
||||
log.NewError(operationID, "MsgFirstModify InsertExtendMsg failed", notification.SourceID, notification.SessionType, extendMsg, err.Error())
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
@@ -103,7 +108,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
||||
}
|
||||
// is already modify
|
||||
if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, reactionExtensionList); err != nil {
|
||||
log.NewError(msgDataToMQ.OperationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
}
|
||||
}
|
||||
} else if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageDeleter {
|
||||
@@ -112,7 +117,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
||||
continue
|
||||
}
|
||||
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, notification.SuccessReactionExtensionList); err != nil {
|
||||
log.NewError(msgDataToMQ.OperationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,13 +3,11 @@ package msgtransfer
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/kafka"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
pbMsg "OpenIM/pkg/proto/msg"
|
||||
pbPush "OpenIM/pkg/proto/push"
|
||||
"OpenIM/pkg/statistics"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
@@ -57,10 +55,11 @@ type OnlineHistoryRedisConsumerHandler struct {
|
||||
producerToMongo *kafka.Producer
|
||||
|
||||
msgDatabase controller.MsgDatabase
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) Init() {
|
||||
func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *OnlineHistoryRedisConsumerHandler {
|
||||
var och OnlineHistoryRedisConsumerHandler
|
||||
och.msgDatabase = database
|
||||
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
|
||||
go och.MessagesDistributionHandle()
|
||||
for i := 0; i < ChannelNum; i++ {
|
||||
@@ -73,8 +72,8 @@ func (och *OnlineHistoryRedisConsumerHandler) Init() {
|
||||
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
|
||||
|
||||
statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
|
||||
return &och
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
||||
@@ -242,17 +241,17 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
||||
mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
|
||||
pid, offset, err := och.producerToPush.SendMessage(ctx, &mqPushMsg, mqPushMsg.PushToUserID)
|
||||
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: message.MsgData, SourceID: pushToUserID}
|
||||
pid, offset, err := och.producerToPush.SendMessage(ctx, &mqPushMsg, mqPushMsg.SourceID)
|
||||
if err != nil {
|
||||
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||
log.Error(tracelog.GetOperationID(ctx), "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) {
|
||||
if len(messages) > 0 {
|
||||
pid, offset, err := och.producerToModify.SendMessage(ctx, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID)
|
||||
pid, offset, err := och.producerToModify.SendMessage(ctx, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID)
|
||||
if err != nil {
|
||||
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
|
||||
}
|
||||
@@ -261,7 +260,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) {
|
||||
if len(messages) > 0 {
|
||||
pid, offset, err := och.producerToMongo.SendMessage(ctx, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID)
|
||||
pid, offset, err := och.producerToMongo.SendMessage(ctx, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID)
|
||||
if err != nil {
|
||||
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package msgtransfer
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
kfk "OpenIM/pkg/common/kafka"
|
||||
"OpenIM/pkg/common/log"
|
||||
@@ -19,66 +18,68 @@ import (
|
||||
type OnlineHistoryMongoConsumerHandler struct {
|
||||
historyConsumerGroup *kfk.MConsumerGroup
|
||||
msgDatabase controller.MsgDatabase
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) Init() {
|
||||
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
|
||||
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase) *OnlineHistoryMongoConsumerHandler {
|
||||
mc := &OnlineHistoryMongoConsumerHandler{
|
||||
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
|
||||
msgDatabase: database,
|
||||
}
|
||||
return mc
|
||||
}
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, session sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
msgFromMQ := pbMsg.MsgDataToMongoByMQ{}
|
||||
operationID := tracelog.GetOperationID(ctx)
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Info(msgFromMQ.TriggerID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
|
||||
ctx := context.Background()
|
||||
tracelog.SetOperationID(ctx, msgFromMQ.TriggerID)
|
||||
log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
|
||||
//err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
|
||||
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.LastSeq)
|
||||
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages, msgFromMQ.LastSeq)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
}
|
||||
//err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID())
|
||||
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList)
|
||||
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "remove cache msg from redis err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
}
|
||||
for _, v := range msgFromMQ.MessageList {
|
||||
for _, v := range msgFromMQ.Messages {
|
||||
if v.MsgData.ContentType == constant.DeleteMessageNotification {
|
||||
tips := sdkws.TipsComm{}
|
||||
DeleteMessageTips := sdkws.DeleteMessageTips{}
|
||||
err := proto.Unmarshal(v.MsgData.Content, &tips)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "tips unmarshal err:", err.Error(), v.String())
|
||||
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
|
||||
continue
|
||||
}
|
||||
err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
|
||||
log.NewError(operationID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
|
||||
continue
|
||||
}
|
||||
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.Seqs); err != nil {
|
||||
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
|
||||
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||
if len(msg.Value) != 0 {
|
||||
mc.handleChatWs2Mongo(msg, string(msg.Key), sess)
|
||||
ctx := mc.historyConsumerGroup.GetContextFromMsg(msg)
|
||||
mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.Error("", "mongo msg get from kafka but is nil", msg.Key)
|
||||
}
|
||||
|
||||
@@ -12,8 +12,10 @@ import (
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
kfk "OpenIM/pkg/common/kafka"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
pbMsg "OpenIM/pkg/proto/msg"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/golang/protobuf/proto"
|
||||
@@ -21,26 +23,30 @@ import (
|
||||
|
||||
type PersistentConsumerHandler struct {
|
||||
persistentConsumerGroup *kfk.MConsumerGroup
|
||||
chatLogInterface controller.ChatLogDatabase
|
||||
chatLogDatabase controller.ChatLogDatabase
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) Init() {
|
||||
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
||||
func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *PersistentConsumerHandler {
|
||||
return &PersistentConsumerHandler{
|
||||
persistentConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql),
|
||||
chatLogDatabase: database,
|
||||
}
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
operationID := tracelog.GetOperationID(ctx)
|
||||
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
|
||||
var tag bool
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
|
||||
log.NewError(operationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Debug(msgFromMQ.OperationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
||||
log.Debug(operationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
||||
//Control whether to store history messages (mysql)
|
||||
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
|
||||
//Only process receiver data
|
||||
@@ -58,23 +64,22 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
|
||||
tag = true
|
||||
}
|
||||
if tag {
|
||||
log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg))
|
||||
log.NewInfo(operationID, "msg_transfer msg persisting", string(msg))
|
||||
if err = pc.chatLogInterface.CreateChatLog(msgFromMQ); err != nil {
|
||||
log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
|
||||
log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error {
|
||||
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||
if len(msg.Value) != 0 {
|
||||
pc.handleChatWs2Mysql(msg, string(msg.Key), sess)
|
||||
ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg)
|
||||
pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.Error("", "msg get from kafka but is nil", msg.Key)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user