proto modify

This commit is contained in:
wangchuxiao
2023-03-02 12:00:31 +08:00
parent fc009c7c6c
commit d991947d4c
21 changed files with 344 additions and 3153 deletions
@@ -242,10 +242,8 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
}
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) {
log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID)
rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
pid, offset, err := och.producerToPush.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
pid, offset, err := och.producerToPush.SendMessage(ctx, &mqPushMsg, mqPushMsg.PushToUserID)
if err != nil {
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
}
@@ -254,7 +252,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Co
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) {
if len(messages) > 0 {
pid, offset, err := och.producerToModify.SendMessage(&pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID)
pid, offset, err := och.producerToModify.SendMessage(ctx, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID)
if err != nil {
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
}
@@ -263,7 +261,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) {
if len(messages) > 0 {
pid, offset, err := och.producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID)
pid, offset, err := och.producerToMongo.SendMessage(ctx, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID)
if err != nil {
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
}