This commit is contained in:
wangchuxiao
2023-05-08 12:39:45 +08:00
parent b34d1e7ee7
commit b95496df73
36 changed files with 969 additions and 981 deletions
+6 -3
View File
@@ -2,6 +2,7 @@ package msgtransfer
import (
"fmt"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
@@ -38,7 +39,9 @@ func StartTransfer(prometheusPort int) error {
if err != nil {
return err
}
client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, config.Config.Zookeeper.UserName, config.Config.Zookeeper.Password)
client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName,
config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10))
if err != nil {
return err
}
@@ -50,7 +53,7 @@ func StartTransfer(prometheusPort int) error {
extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
msgDatabase := controller.NewMsgDatabase(msgDocModel, msgModel)
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
notificationDatabase := controller.NewNotificationDatabase(notificationDocModel, notificationModel)
conversationRpcClient := rpcclient.NewConversationClient(client)
@@ -60,7 +63,7 @@ func StartTransfer(prometheusPort int) error {
}
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.MsgDatabase, notificationDatabase controller.NotificationDatabase,
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase, notificationDatabase controller.NotificationDatabase,
conversationRpcClient *rpcclient.ConversationClient) *MsgTransfer {
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient),
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase, notificationDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
@@ -11,7 +11,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@@ -55,12 +54,12 @@ type OnlineHistoryRedisConsumerHandler struct {
singleMsgSuccessCountMutex sync.Mutex
singleMsgFailedCountMutex sync.Mutex
msgDatabase controller.MsgDatabase
msgDatabase controller.CommonMsgDatabase
conversationRpcClient *rpcclient.ConversationClient
groupRpcClient *rpcclient.GroupClient
}
func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase, conversationRpcClient *rpcclient.ConversationClient) *OnlineHistoryRedisConsumerHandler {
func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient) *OnlineHistoryRedisConsumerHandler {
var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
@@ -143,7 +142,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversation
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
och.toPushTopic(ctx, conversationID, notStorageList)
if len(storageList) > 0 {
lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, conversationID, storageList)
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
if err != nil {
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageList)
return
@@ -163,31 +162,8 @@ func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, c
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
och.toPushTopic(ctx, conversationID, notStorageList)
if len(storageList) > 0 {
currentMaxSeq, err := och.msgDatabase.GetMaxSeq(ctx, conversationID)
if err == redis.Nil {
if storageList[0].SessionType == constant.SuperGroupChatType {
log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID)
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
if err != nil {
log.ZError(ctx, "get group member ids error", err, "conversationID", conversationID)
} else {
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx, storageList[0].GroupID, userIDs); err != nil {
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
}
}
} else {
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID, storageList[0].SendID); err != nil {
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
}
}
}
if err != nil && err != redis.Nil {
prome.Inc(prome.SeqGetFailedCounter)
log.ZError(ctx, "get max seq err", err, "conversationID", conversationID)
return
}
prome.Inc(prome.SeqGetSuccessCounter)
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList, currentMaxSeq)
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
if err != nil && err != redis.Nil {
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
och.singleMsgFailedCountMutex.Lock()
@@ -195,7 +171,25 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, con
och.singleMsgFailedCountMutex.Unlock()
return
}
log.ZDebug(ctx, "success to next topic")
if isNewConversation {
if storageList[0].SessionType == constant.SuperGroupChatType {
log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID)
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
if err != nil {
log.ZWarn(ctx, "get group member ids error", err, "conversationID", conversationID)
} else {
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx, storageList[0].GroupID, userIDs); err != nil {
log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
}
}
} else {
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID, storageList[0].SendID); err != nil {
log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
}
}
}
log.ZDebug(ctx, "success incr to next topic")
och.singleMsgSuccessCountMutex.Lock()
och.singleMsgSuccessCount += uint64(len(storageList))
och.singleMsgSuccessCountMutex.Unlock()
@@ -20,11 +20,11 @@ import (
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kfk.MConsumerGroup
msgDatabase controller.MsgDatabase
msgDatabase controller.CommonMsgDatabase
notificationDatabase controller.NotificationDatabase
}
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase, notificationDatabase controller.NotificationDatabase) *OnlineHistoryMongoConsumerHandler {
func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase, notificationDatabase controller.NotificationDatabase) *OnlineHistoryMongoConsumerHandler {
mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},