|
|
|
@@ -12,20 +12,25 @@ import (
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
|
|
|
|
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
|
|
|
|
|
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const ConsumerMsgs = 3
|
|
|
|
|
const AggregationMessages = 4
|
|
|
|
|
const SourceMessages = 4
|
|
|
|
|
const MongoMessages = 5
|
|
|
|
|
const ChannelNum = 100
|
|
|
|
|
|
|
|
|
|
type MsgChannelValue struct {
|
|
|
|
|
aggregationID string //maybe userID or super groupID
|
|
|
|
|
ctx context.Context
|
|
|
|
|
ctxMsgList []*ContextMsg
|
|
|
|
|
sourceID string //maybe userID or super groupID
|
|
|
|
|
ctx context.Context
|
|
|
|
|
ctxMsgList []*ContextMsg
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type TriggerChannelValue struct {
|
|
|
|
@@ -52,10 +57,12 @@ type OnlineHistoryRedisConsumerHandler struct {
|
|
|
|
|
singleMsgSuccessCountMutex sync.Mutex
|
|
|
|
|
singleMsgFailedCountMutex sync.Mutex
|
|
|
|
|
|
|
|
|
|
msgDatabase controller.MsgDatabase
|
|
|
|
|
msgDatabase controller.MsgDatabase
|
|
|
|
|
conversationRpcClient *rpcclient.ConversationClient
|
|
|
|
|
groupRpcClient *rpcclient.GroupClient
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *OnlineHistoryRedisConsumerHandler {
|
|
|
|
|
func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase, conversationRpcClient *rpcclient.ConversationClient) *OnlineHistoryRedisConsumerHandler {
|
|
|
|
|
var och OnlineHistoryRedisConsumerHandler
|
|
|
|
|
och.msgDatabase = database
|
|
|
|
|
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
|
|
|
|
@@ -64,6 +71,7 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *Onli
|
|
|
|
|
och.chArrays[i] = make(chan Cmd2Value, 50)
|
|
|
|
|
go och.Run(i)
|
|
|
|
|
}
|
|
|
|
|
och.conversationRpcClient = conversationRpcClient
|
|
|
|
|
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
|
|
|
|
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
|
|
|
|
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
|
|
|
|
@@ -76,21 +84,16 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|
|
|
|
select {
|
|
|
|
|
case cmd := <-och.chArrays[channelID]:
|
|
|
|
|
switch cmd.Cmd {
|
|
|
|
|
case AggregationMessages:
|
|
|
|
|
case SourceMessages:
|
|
|
|
|
msgChannelValue := cmd.Value.(MsgChannelValue)
|
|
|
|
|
ctxMsgList := msgChannelValue.ctxMsgList
|
|
|
|
|
ctx := msgChannelValue.ctx
|
|
|
|
|
storageMsgList := make([]*sdkws.MsgData, 0, 80)
|
|
|
|
|
notStorageMsgList := make([]*sdkws.MsgData, 0, 80)
|
|
|
|
|
storageNotificationList := make([]*sdkws.MsgData, 0, 80)
|
|
|
|
|
notStorageNotificationList := make([]*sdkws.MsgData, 0, 80)
|
|
|
|
|
modifyMsgList := make([]*sdkws.MsgData, 0, 80)
|
|
|
|
|
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "aggregationID", msgChannelValue.aggregationID)
|
|
|
|
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList = och.getPushStorageMsgList(msgChannelValue.aggregationID, ctxMsgList)
|
|
|
|
|
och.handleMsg(ctx, msgChannelValue.aggregationID, storageMsgList, notStorageMsgList)
|
|
|
|
|
och.handleNotification(ctx, msgChannelValue.aggregationID, storageNotificationList, notStorageNotificationList)
|
|
|
|
|
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, modifyMsgList); err != nil {
|
|
|
|
|
log.ZError(ctx, "msg to modify mq error", err, "aggregationID", msgChannelValue.aggregationID, "modifyMsgList", modifyMsgList)
|
|
|
|
|
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "sourceID", msgChannelValue.sourceID)
|
|
|
|
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.sourceID, ctxMsgList)
|
|
|
|
|
och.handleMsg(ctx, msgChannelValue.sourceID, storageMsgList, notStorageMsgList)
|
|
|
|
|
och.handleNotification(ctx, msgChannelValue.sourceID, storageNotificationList, notStorageNotificationList)
|
|
|
|
|
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.sourceID, modifyMsgList); err != nil {
|
|
|
|
|
log.ZError(ctx, "msg to modify mq error", err, "sourceID", msgChannelValue.sourceID, "modifyMsgList", modifyMsgList)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -98,13 +101,13 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
|
|
|
|
|
isStorage := func(msg *sdkws.MsgData) bool {
|
|
|
|
|
options2 := utils.Options(msg.Options)
|
|
|
|
|
if options2.IsHistory() {
|
|
|
|
|
return true
|
|
|
|
|
} else {
|
|
|
|
|
if !(!options2.IsSenderSync() && aggregationID == msg.SendID) {
|
|
|
|
|
if !(!options2.IsSenderSync() && sourceID == msg.MsgData.SendID) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -114,10 +117,10 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationI
|
|
|
|
|
options := utils.Options(v.message.Options)
|
|
|
|
|
if options.IsNotification() {
|
|
|
|
|
// 原通知
|
|
|
|
|
notificationMsg := proto.Clone(v.message).(*sdkws.MsgData)
|
|
|
|
|
notificationMsg := proto.Clone(v.message).(*pbMsg.MsgDataToMQ)
|
|
|
|
|
if options.IsSendMsg() {
|
|
|
|
|
// 消息
|
|
|
|
|
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotification(false), utils.WithSendMsg(false))
|
|
|
|
|
v.message.Options = utils.WithOptions(utils.Options(v.message.MsgData.Options), utils.WithNotification(false), utils.WithSendMsg(false))
|
|
|
|
|
storageMsgList = append(storageMsgList, v.message)
|
|
|
|
|
}
|
|
|
|
|
if isStorage(notificationMsg) {
|
|
|
|
@@ -149,27 +152,90 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) handle(ctx context.Context, aggregationID string, storageList, notStorageList []*sdkws.MsgData, cacheAndIncr func(ctx context.Context, sourceID string, msgList []*sdkws.MsgData) (int64, error)) {
|
|
|
|
|
if len(storageList) > 0 {
|
|
|
|
|
lastSeq, err := cacheAndIncr(ctx, aggregationID, storageList)
|
|
|
|
|
lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, sourceID, storageList)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "notification batch insert to redis error", err, "sourceID", sourceID, "storageList", storageList)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
och.msgDatabase.MsgToMongoMQ(ctx, sourceID, storageList, lastSeq)
|
|
|
|
|
och.toPushTopic(ctx, sourceID, storageList)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, sourceID string, msgs []*pbMsg.MsgDataToMQ) {
|
|
|
|
|
for _, v := range msgs {
|
|
|
|
|
och.msgDatabase.MsgToPushMQ(ctx, sourceID, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, sourceID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
|
|
|
|
|
och.toPushTopic(ctx, sourceID, notStorageList)
|
|
|
|
|
if len(storageList) > 0 {
|
|
|
|
|
var currentMaxSeq int64
|
|
|
|
|
var err error
|
|
|
|
|
if storageList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
|
|
|
|
currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, sourceID)
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
if err := och.GroupChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
|
|
|
|
|
log.ZError(ctx, "single chat first create conversation error", err, "sourceID", sourceID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, sourceID)
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
if err := och.SingleChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
|
|
|
|
|
log.ZError(ctx, "single chat first create conversation error", err, "sourceID", sourceID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err != nil && err != redis.Nil {
|
|
|
|
|
prome.Inc(prome.SeqGetFailedCounter)
|
|
|
|
|
log.ZError(ctx, "get max seq err", err, "sourceID", sourceID)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
prome.Inc(prome.SeqGetSuccessCounter)
|
|
|
|
|
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, sourceID, storageList, currentMaxSeq)
|
|
|
|
|
if err != nil && err != redis.Nil {
|
|
|
|
|
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
|
|
|
|
|
och.singleMsgFailedCountMutex.Lock()
|
|
|
|
|
och.singleMsgFailedCount += uint64(len(storageList))
|
|
|
|
|
och.singleMsgFailedCountMutex.Unlock()
|
|
|
|
|
} else {
|
|
|
|
|
och.singleMsgSuccessCountMutex.Lock()
|
|
|
|
|
och.singleMsgSuccessCount += uint64(len(storageList))
|
|
|
|
|
och.singleMsgSuccessCountMutex.Unlock()
|
|
|
|
|
och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq)
|
|
|
|
|
for _, v := range storageList {
|
|
|
|
|
och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
och.singleMsgSuccessCountMutex.Lock()
|
|
|
|
|
och.singleMsgSuccessCount += uint64(len(storageList))
|
|
|
|
|
och.singleMsgSuccessCountMutex.Unlock()
|
|
|
|
|
och.msgDatabase.MsgToMongoMQ(ctx, sourceID, storageList, lastSeq)
|
|
|
|
|
och.toPushTopic(ctx, sourceID, storageList)
|
|
|
|
|
}
|
|
|
|
|
if len(notStorageList) > 0 {
|
|
|
|
|
for _, v := range notStorageList {
|
|
|
|
|
och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) SingleChatFirstCreateConversation(ctx context.Context, msg *sdkws.MsgData) error {
|
|
|
|
|
conversation := new(pbConversation.Conversation)
|
|
|
|
|
conversation.ConversationType = constant.SingleChatType
|
|
|
|
|
conversation2 := proto.Clone(conversation).(*pbConversation.Conversation)
|
|
|
|
|
conversation.OwnerUserID = msg.SendID
|
|
|
|
|
conversation.UserID = msg.RecvID
|
|
|
|
|
conversation.ConversationID = utils.GetConversationIDBySessionType(msg.RecvID, constant.SingleChatType)
|
|
|
|
|
conversation2.OwnerUserID = msg.RecvID
|
|
|
|
|
conversation2.UserID = msg.SendID
|
|
|
|
|
conversation2.ConversationID = utils.GetConversationIDBySessionType(msg.SendID, constant.SingleChatType)
|
|
|
|
|
log.ZDebug(ctx, "create single conversation", "conversation", conversation, "conversation2", conversation2)
|
|
|
|
|
return och.conversationRpcClient.CreateConversationsWithoutNotification(ctx, []*pbConversation.Conversation{conversation, conversation2})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) GroupChatFirstCreateConversation(ctx context.Context, msg *sdkws.MsgData) error {
|
|
|
|
|
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
var conversations []*pbConversation.Conversation
|
|
|
|
|
for _, v := range userIDs {
|
|
|
|
|
conversation := pbConversation.Conversation{ConversationType: constant.SuperGroupChatType, GroupID: msg.GroupID, OwnerUserID: v, ConversationID: utils.GetConversationIDBySessionType(v, constant.SuperGroupChatType)}
|
|
|
|
|
conversations = append(conversations, &conversation)
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "create group conversation", "conversations", conversations)
|
|
|
|
|
return och.conversationRpcClient.CreateConversationsWithoutNotification(ctx, conversations)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
@@ -206,12 +272,12 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
|
|
|
|
|
for aggregationID, v := range aggregationMsgs {
|
|
|
|
|
for sourceID, v := range aggregationMsgs {
|
|
|
|
|
if len(v) >= 0 {
|
|
|
|
|
hashCode := utils.GetHashCode(aggregationID)
|
|
|
|
|
hashCode := utils.GetHashCode(sourceID)
|
|
|
|
|
channelID := hashCode % ChannelNum
|
|
|
|
|
log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "aggregationID", aggregationID)
|
|
|
|
|
och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, ctxMsgList: v, ctx: ctx}}
|
|
|
|
|
log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "sourceID", sourceID)
|
|
|
|
|
och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{sourceID: sourceID, ctxMsgList: v, ctx: ctx}}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|