kafka key

This commit is contained in:
wangchuxiao
2023-05-19 19:43:43 +08:00
parent ca2eee17bc
commit 5fab4d3138
15 changed files with 501 additions and 667 deletions
+5 -8
View File
@@ -53,15 +53,14 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
}
func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
log.NewInfo("msg come here ModifyMsg!!!", "", "msg", string(cMsg.Value), msgKey)
msgFromMQ := pbMsg.MsgDataToModifyByMQ{}
operationID := mcontext.GetOperationID(ctx)
err := proto.Unmarshal(cMsg.Value, &msgFromMQ)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "msg_transfer Unmarshal msg err", "msg", string(cMsg.Value), "err", err.Error())
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, "msg", string(cMsg.Value))
return
}
log.Debug(msgFromMQ.TriggerID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
log.ZDebug(ctx, "proto.Unmarshal MsgDataToMQ", "msgs", msgFromMQ.String())
for _, msg := range msgFromMQ.Messages {
isReactionFromCache := utils.GetSwitchFromOptions(msg.Options, constant.IsReactionFromCache)
if !isReactionFromCache {
@@ -74,7 +73,6 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
continue
}
if notification.IsExternalExtensions {
log.NewInfo(operationID, "msg:", notification, "this is external extensions")
continue
}
if !notification.IsReact {
@@ -94,12 +92,12 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
}
if err := mmc.extendMsgDatabase.InsertExtendMsg(ctx, notification.ConversationID, notification.SessionType, &extendMsg); err != nil {
log.NewError(operationID, "MsgFirstModify InsertExtendMsg failed", notification.ConversationID, notification.SessionType, extendMsg, err.Error())
// log.ZError(ctx, "MsgFirstModify InsertExtendMsg failed", notification.ConversationID, notification.SessionType, extendMsg, err.Error())
continue
}
} else {
if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
// log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
}
}
} else if msg.ContentType == constant.ReactionMessageDeleter {
@@ -108,9 +106,8 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
continue
}
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
// log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
}
}
}
}