refactor: ctx message update

This commit is contained in:
Gordon
2023-05-19 20:17:50 +08:00
parent 73c74a929b
commit 5f571f962f
@@ -90,7 +90,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
ctxMsgList := msgChannelValue.ctxMsgList ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx ctx := msgChannelValue.ctx
log.ZDebug(withAggregationCtx(ctx, ctxMsgList), "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey) log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList), log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList)) "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
@@ -259,8 +259,9 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
if len(v) >= 0 { if len(v) >= 0 {
hashCode := utils.GetHashCode(uniqueKey) hashCode := utils.GetHashCode(uniqueKey)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.ZDebug(withAggregationCtx(ctx, v), "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey) newCtx := withAggregationCtx(ctx, v)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: ctx}} log.ZDebug(newCtx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: newCtx}}
} }
} }
} }
@@ -269,9 +270,14 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
} }
func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context { func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context {
var allMessageOperationID string var allMessageOperationID string
for _, v := range values { for i, v := range values {
if opid := mcontext.GetOperationID(v.ctx); opid != "" { if opid := mcontext.GetOperationID(v.ctx); opid != "" {
allMessageOperationID += "$" + opid if i == 0 {
allMessageOperationID += opid
} else {
allMessageOperationID += "$" + opid
}
} }
} }
return mcontext.SetOperationID(ctx, allMessageOperationID) return mcontext.SetOperationID(ctx, allMessageOperationID)