Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode

This commit is contained in:
wangchuxiao
2023-05-04 15:53:58 +08:00
31 changed files with 2175 additions and 663 deletions
+11 -10
View File
@@ -2,8 +2,6 @@ package msgtransfer
import (
"fmt"
"sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@@ -43,25 +41,28 @@ func StartTransfer(prometheusPort int) error {
if err != nil {
return err
}
cacheModel := cache.NewCacheModel(rdb)
msgModel := cache.NewMsgCacheModel(rdb)
notificationModel := cache.NewNotificationCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
notificationDocModel := unrelation.NewNotificationMongoDriver(mongo.GetDatabase())
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
msgDatabase := controller.NewMsgDatabase(msgDocModel, msgModel)
notificationDatabase := controller.NewNotificationDatabase(notificationDocModel, notificationModel)
conversationRpcClient := rpcclient.NewConversationClient(client)
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, conversationRpcClient)
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, notificationDatabase, conversationRpcClient)
msgTransfer.initPrometheus()
return msgTransfer.Start(prometheusPort)
}
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.MsgDatabase,
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.MsgDatabase, notificationDatabase controller.MsgDatabase,
conversationRpcClient *rpcclient.ConversationClient) *MsgTransfer {
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient),
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase, notificationDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
}
func (m *MsgTransfer) initPrometheus() {
@@ -76,8 +77,8 @@ func (m *MsgTransfer) initPrometheus() {
}
func (m *MsgTransfer) Start(prometheusPort int) error {
var wg sync.WaitGroup
wg.Add(4)
//var wg sync.WaitGroup
//wg.Add(4)
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if config.Config.ChatPersistenceMysql {
go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
@@ -91,6 +92,6 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
if err != nil {
return err
}
wg.Wait()
//wg.Wait()
return nil
}
@@ -5,6 +5,9 @@ import (
"sync"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/go-redis/redis"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@@ -18,8 +21,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/Shopify/sarama"
"github.com/go-redis/redis/v8"
"google.golang.org/protobuf/proto"
"github.com/golang/protobuf/proto"
)
const ConsumerMsgs = 3
@@ -43,7 +45,7 @@ type Cmd2Value struct {
Value interface{}
}
type ContextMsg struct {
message *pbMsg.MsgDataToMQ
message *sdkws.MsgData
ctx context.Context
}
@@ -101,9 +103,9 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
}
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
isStorage := func(msg *pbMsg.MsgDataToMQ) bool {
options2 := utils.Options(msg.MsgData.Options)
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
isStorage := func(msg *sdkws.MsgData) bool {
options2 := utils.Options(msg.Options)
if options2.IsHistory() {
return true
} else {
@@ -115,13 +117,13 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversation
return false
}
for _, v := range totalMsgs {
options := utils.Options(v.message.MsgData.Options)
options := utils.Options(v.message.Options)
if options.IsNotification() {
// 原通知
notificationMsg := proto.Clone(v.message).(*pbMsg.MsgDataToMQ)
if options.IsSendMsg() {
// 消息
v.message.MsgData.Options = utils.WithOptions(utils.Options(v.message.MsgData.Options), utils.WithNotification(false), utils.WithSendMsg(false))
v.message.Options = utils.WithOptions(utils.Options(v.message.MsgData.Options), utils.WithNotification(false), utils.WithSendMsg(false))
storageMsgList = append(storageMsgList, v.message)
}
if isStorage(notificationMsg) {
@@ -136,7 +138,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversation
notStorageMsgList = append(notStorageMsgList, v.message)
}
}
if v.message.MsgData.ContentType == constant.ReactionMessageModifier || v.message.MsgData.ContentType == constant.ReactionMessageDeleter {
if v.message.ContentType == constant.ReactionMessageModifier || v.message.ContentType == constant.ReactionMessageDeleter {
modifyMsgList = append(modifyMsgList, v.message)
}
}
@@ -251,7 +253,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages))
for i := 0; i < len(consumerMessages); i++ {
ctxMsg := &ContextMsg{}
msgFromMQ := pbMsg.MsgDataToMQ{}
var msgFromMQ sdkws.MsgData
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
if err != nil {
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
@@ -2,6 +2,10 @@ package msgtransfer
import (
"context"
"errors"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
@@ -10,8 +14,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
)
@@ -19,14 +21,16 @@ import (
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kfk.MConsumerGroup
msgDatabase controller.MsgDatabase
notificationDatabase controller.NotificationDatabase
}
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase) *OnlineHistoryMongoConsumerHandler {
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase, notificationDatabase controller.NotificationDatabase) *OnlineHistoryMongoConsumerHandler {
mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database,
msgDatabase: database,
notificationDatabase: notificationDatabase,
}
return mc
}
@@ -40,31 +44,54 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
return
}
log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.ConversationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.Messages, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.ConversationID, msgFromMQ.TriggerID)
log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
if len(msgFromMQ.MsgData) == 0 {
log.ZError(ctx, "msgFromMQ.MsgData is empty", errors.New("msgFromMQ.MsgData is empty"), "cMsg", cMsg)
return
}
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.Messages)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.ConversationID, msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.Messages {
if v.MsgData.ContentType == constant.DeleteMessageNotification {
tips := sdkws.TipsComm{}
DeleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.MsgData.Content, &tips)
if err != nil {
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
continue
isNotification := msgFromMQ.MsgData[0].Options[constant.IsNotification]
if isNotification {
err = mc.notificationDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.MsgData, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
err = mc.notificationDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MsgData)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.MsgData, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.MsgData {
if v.ContentType == constant.DeleteMessageNotification {
deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil {
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.notificationDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", deleteMessageTips.UserID, deleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
}
err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
if err != nil {
log.NewError(operationID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.Seqs); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
} else {
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.MsgData, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MsgData)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.MsgData, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
for _, v := range msgFromMQ.MsgData {
if v.ContentType == constant.DeleteMessageNotification {
deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil {
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", deleteMessageTips.UserID, deleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
}
}
}