This commit is contained in:
wangchuxiao
2023-02-23 17:28:57 +08:00
parent 2359d7ab37
commit 4fc00109ef
21 changed files with 87 additions and 139 deletions
@@ -56,8 +56,8 @@ type OnlineHistoryRedisConsumerHandler struct {
producerToModify *kafka.Producer
producerToMongo *kafka.Producer
msgInterface controller.MsgInterface
cache cache.Cache
msgDatabase controller.MsgDatabase
cache cache.Cache
}
func (och *OnlineHistoryRedisConsumerHandler) Init() {
@@ -113,7 +113,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
}
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList))
if len(storageMsgList) > 0 {
lastSeq, err := och.msgInterface.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList)
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList)
if err != nil {
och.singleMsgFailedCountMutex.Lock()
och.singleMsgFailedCount += uint64(len(storageMsgList))
@@ -18,7 +18,7 @@ import (
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kfk.MConsumerGroup
msgInterface controller.MsgInterface
msgDatabase controller.MsgDatabase
cache cache.Cache
}
@@ -39,12 +39,12 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con
ctx := context.Background()
tracelog.SetOperationID(ctx, msgFromMQ.TriggerID)
//err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
err = mc.msgInterface.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.LastSeq)
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.LastSeq)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
//err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID())
err = mc.msgInterface.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList)
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "remove cache msg from redis err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
@@ -62,8 +62,8 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con
log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.msgInterface.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.SeqList); err != nil {
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.Seqs); err != nil {
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
}