This commit is contained in:
wangchuxiao
2023-05-12 20:05:25 +08:00
parent cc20c7af4b
commit b64e9af92f
9 changed files with 43 additions and 1469 deletions
@@ -4,14 +4,12 @@ import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
@@ -20,16 +18,14 @@ import (
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kfk.MConsumerGroup
msgDatabase controller.CommonMsgDatabase
notificationDatabase controller.NotificationDatabase
}
func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase, notificationDatabase controller.NotificationDatabase) *OnlineHistoryMongoConsumerHandler {
func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) *OnlineHistoryMongoConsumerHandler {
mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database,
notificationDatabase: notificationDatabase,
msgDatabase: database,
}
return mc
}
@@ -37,7 +33,6 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase,
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, conversationID string, session sarama.ConsumerGroupSession) {
msg := cMsg.Value
msgFromMQ := pbMsg.MsgDataToMongoByMQ{}
operationID := mcontext.GetOperationID(ctx)
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.ZError(ctx, "unmarshall failed", err, "conversationID", conversationID, "len", len(msg))
@@ -48,52 +43,28 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
return
}
log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.MsgData)
isNotification := msgFromMQ.MsgData[0].Options[constant.IsNotification]
if isNotification {
err = mc.notificationDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.MsgData, msgFromMQ.ConversationID, msgFromMQ.TriggerID)
}
err = mc.notificationDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.MsgData, msgFromMQ.ConversationID, msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.MsgData {
if v.ContentType == constant.DeleteMessageNotification {
deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil {
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.notificationDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", deleteMessageTips.UserID, deleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID, "triggerID", msgFromMQ.TriggerID)
}
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData)
if err != nil {
log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID, "triggerID", msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.MsgData {
if v.ContentType == constant.DeleteMessageNotification {
deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil {
log.ZError(ctx, "tips unmarshal err:", err, "msg", msg)
continue
}
}
} else {
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.MsgData, msgFromMQ.ConversationID, msgFromMQ.TriggerID)
}
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.MsgData, msgFromMQ.ConversationID, msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.MsgData {
if v.ContentType == constant.DeleteMessageNotification {
deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil {
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", deleteMessageTips.UserID, deleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs)
}
}
}
}
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }