This commit is contained in:
withchao
2023-04-28 18:39:21 +08:00
parent 45ad252a44
commit 7a7a912352
9 changed files with 1287 additions and 525 deletions
+4 -8
View File
@@ -2,8 +2,6 @@ package msgtransfer
import (
"fmt"
"sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@@ -44,15 +42,16 @@ func StartTransfer(prometheusPort int) error {
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
notificationDatabase := controller.NewNotificationDatabase(msgDocModel, cacheModel) // todo
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase)
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, notificationDatabase)
msgTransfer.initPrometheus()
return msgTransfer.Start(prometheusPort)
}
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.MsgDatabase) *MsgTransfer {
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.MsgDatabase, notificationDatabase controller.NotificationDatabase) *MsgTransfer {
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase),
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase, notificationDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
}
func (m *MsgTransfer) initPrometheus() {
@@ -67,8 +66,6 @@ func (m *MsgTransfer) initPrometheus() {
}
func (m *MsgTransfer) Start(prometheusPort int) error {
var wg sync.WaitGroup
wg.Add(4)
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if config.Config.ChatPersistenceMysql {
go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
@@ -82,6 +79,5 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
if err != nil {
return err
}
wg.Wait()
return nil
}
@@ -2,6 +2,7 @@ package msgtransfer
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"sync"
"time"
@@ -11,7 +12,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
@@ -38,7 +38,7 @@ type Cmd2Value struct {
Value interface{}
}
type ContextMsg struct {
message *pbMsg.MsgDataToMQ
message *sdkws.MsgData
ctx context.Context
}
@@ -80,11 +80,11 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
msgChannelValue := cmd.Value.(MsgChannelValue)
ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
notStorageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
storageNotificationList := make([]*pbMsg.MsgDataToMQ, 0, 80)
notStorageNotificationList := make([]*pbMsg.MsgDataToMQ, 0, 80)
modifyMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
storageMsgList := make([]*sdkws.MsgData, 0, 80)
notStorageMsgList := make([]*sdkws.MsgData, 0, 80)
storageNotificationList := make([]*sdkws.MsgData, 0, 80)
notStorageNotificationList := make([]*sdkws.MsgData, 0, 80)
modifyMsgList := make([]*sdkws.MsgData, 0, 80)
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "aggregationID", msgChannelValue.aggregationID)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList = och.getPushStorageMsgList(msgChannelValue.aggregationID, ctxMsgList)
och.handleMsg(ctx, msgChannelValue.aggregationID, storageMsgList, notStorageMsgList)
@@ -98,26 +98,26 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
}
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
isStorage := func(msg *pbMsg.MsgDataToMQ) bool {
options2 := utils.Options(msg.MsgData.Options)
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
isStorage := func(msg *sdkws.MsgData) bool {
options2 := utils.Options(msg.Options)
if options2.IsHistory() {
return true
} else {
if !(!options2.IsSenderSync() && aggregationID == msg.MsgData.SendID) {
if !(!options2.IsSenderSync() && aggregationID == msg.SendID) {
return false
}
}
return false
}
for _, v := range totalMsgs {
options := utils.Options(v.message.MsgData.Options)
options := utils.Options(v.message.Options)
if options.IsNotification() {
// 原通知
notificationMsg := proto.Clone(v.message).(*pbMsg.MsgDataToMQ)
notificationMsg := proto.Clone(v.message).(*sdkws.MsgData)
if options.IsSendMsg() {
// 消息
v.message.MsgData.Options = utils.WithOptions(utils.Options(v.message.MsgData.Options), utils.WithNotification(false), utils.WithSendMsg(false))
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotification(false), utils.WithSendMsg(false))
storageMsgList = append(storageMsgList, v.message)
}
if isStorage(notificationMsg) {
@@ -132,22 +132,22 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationI
notStorageMsgList = append(notStorageMsgList, v.message)
}
}
if v.message.MsgData.ContentType == constant.ReactionMessageModifier || v.message.MsgData.ContentType == constant.ReactionMessageDeleter {
if v.message.ContentType == constant.ReactionMessageModifier || v.message.ContentType == constant.ReactionMessageDeleter {
modifyMsgList = append(modifyMsgList, v.message)
}
}
return
}
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, aggregationID string, storageList, notStorageList []*sdkws.MsgData) {
och.handle(ctx, aggregationID, storageList, notStorageList, och.msgDatabase.BatchInsertChat2Cache)
}
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, aggregationID string, storageList, notStorageList []*sdkws.MsgData) {
och.handle(ctx, aggregationID, storageList, notStorageList, och.msgDatabase.NotificationBatchInsertChat2Cache)
}
func (och *OnlineHistoryRedisConsumerHandler) handle(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ, cacheAndIncr func(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)) {
func (och *OnlineHistoryRedisConsumerHandler) handle(ctx context.Context, aggregationID string, storageList, notStorageList []*sdkws.MsgData, cacheAndIncr func(ctx context.Context, sourceID string, msgList []*sdkws.MsgData) (int64, error)) {
if len(storageList) > 0 {
lastSeq, err := cacheAndIncr(ctx, aggregationID, storageList)
if err != nil {
@@ -186,7 +186,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages))
for i := 0; i < len(consumerMessages); i++ {
ctxMsg := &ContextMsg{}
msgFromMQ := pbMsg.MsgDataToMQ{}
var msgFromMQ sdkws.MsgData
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
if err != nil {
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
@@ -2,6 +2,9 @@ package msgtransfer
import (
"context"
"errors"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
@@ -10,8 +13,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
@@ -19,14 +20,16 @@ import (
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kfk.MConsumerGroup
msgDatabase controller.MsgDatabase
notificationDatabase controller.NotificationDatabase
}
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase) *OnlineHistoryMongoConsumerHandler {
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase, notificationDatabase controller.NotificationDatabase) *OnlineHistoryMongoConsumerHandler {
mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database,
msgDatabase: database,
notificationDatabase: notificationDatabase,
}
return mc
}
@@ -41,30 +44,53 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
return
}
log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
if len(msgFromMQ.MsgData) == 0 {
log.ZError(ctx, "msgFromMQ.MsgData is empty", errors.New("msgFromMQ.MsgData is empty"), "cMsg", cMsg)
return
}
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.Messages {
if v.MsgData.ContentType == constant.DeleteMessageNotification {
tips := sdkws.TipsComm{}
DeleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.MsgData.Content, &tips)
if err != nil {
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
continue
isNotification := msgFromMQ.MsgData[0].Options[constant.IsNotification]
if isNotification {
err = mc.notificationDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.MsgData, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
err = mc.notificationDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MsgData)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.MsgData, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.MsgData {
if v.ContentType == constant.DeleteMessageNotification {
deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil {
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.notificationDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", deleteMessageTips.UserID, deleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
}
err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
if err != nil {
log.NewError(operationID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.Seqs); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
} else {
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.MsgData, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MsgData)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.MsgData, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.MsgData {
if v.ContentType == constant.DeleteMessageNotification {
deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil {
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", deleteMessageTips.UserID, deleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
}
}
}