This commit is contained in:
wangchuxiao
2023-03-06 16:23:16 +08:00
parent 8143d589aa
commit 23c24308f7
7 changed files with 70 additions and 78 deletions
+4 -14
View File
@@ -10,7 +10,6 @@ import (
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tracelog"
pbMsg "OpenIM/pkg/proto/msg"
sdkws "OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context"
"encoding/json"
@@ -23,6 +22,7 @@ type ModifyMsgConsumerHandler struct {
modifyMsgConsumerGroup *kfk.MConsumerGroup
extendMsgDatabase controller.ExtendMsgDatabase
extendSetMsgModel unRelationTb.ExtendMsgSetModel
}
func NewModifyMsgConsumerHandler(database controller.ExtendMsgDatabase) *ModifyMsgConsumerHandler {
@@ -66,7 +66,6 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
if !isReactionFromCache {
continue
}
ctx := context.Background()
tracelog.SetOperationID(ctx, operationID)
if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageModifier {
notification := &apistruct.ReactionMessageModifierNotification{}
@@ -85,7 +84,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
ClientMsgID: notification.ClientMsgID,
MsgFirstModifyTime: notification.MsgFirstModifyTime,
}
for _, v := range notification.SuccessReactionExtensionList {
for _, v := range notification.SuccessReactionExtensions {
reactionExtensionList[v.TypeKey] = unRelationTb.KeyValueModel{
TypeKey: v.TypeKey,
Value: v.Value,
@@ -98,16 +97,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
continue
}
} else {
var reactionExtensionList = make(map[string]*sdkws.KeyValue)
for _, v := range notification.SuccessReactionExtensionList {
reactionExtensionList[v.TypeKey] = &sdkws.KeyValue{
TypeKey: v.TypeKey,
Value: v.Value,
LatestUpdateTime: v.LatestUpdateTime,
}
}
// is already modify
if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, reactionExtensionList); err != nil {
if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
}
}
@@ -116,7 +106,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
if err := json.Unmarshal(msgDataToMQ.MsgData.Content, notification); err != nil {
continue
}
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, notification.SuccessReactionExtensionList); err != nil {
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
}
}
@@ -65,7 +65,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs
}
if tag {
log.NewInfo(operationID, "msg_transfer msg persisting", string(msg))
if err = pc.chatLogInterface.CreateChatLog(msgFromMQ); err != nil {
if err = pc.chatLogDatabase.CreateChatLog(msgFromMQ); err != nil {
log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
return
}