Files
open-im-server/internal/msgtransfer/online_history_msg_handler.go
T

328 lines
13 KiB
Go
Raw Normal View History

2023-02-09 20:36:34 +08:00
package msgtransfer
2021-05-26 19:24:25 +08:00
import (
2023-03-22 18:35:21 +08:00
"context"
2023-05-10 15:21:08 +08:00
"strconv"
"strings"
2023-04-18 14:43:54 +08:00
"sync"
"time"
2023-05-10 17:18:04 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2023-03-21 12:28:21 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
2023-04-28 18:33:33 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2021-05-26 19:24:25 +08:00
"github.com/Shopify/sarama"
2023-05-04 15:54:04 +08:00
"github.com/go-redis/redis"
2023-05-10 20:27:39 +08:00
"google.golang.org/protobuf/proto"
2021-05-26 19:24:25 +08:00
)
2023-02-15 15:52:32 +08:00
const ConsumerMsgs = 3
2023-04-28 18:38:12 +08:00
const SourceMessages = 4
2023-02-15 15:52:32 +08:00
const MongoMessages = 5
const ChannelNum = 100
2022-05-19 12:25:46 +08:00
type MsgChannelValue struct {
2023-05-04 15:06:23 +08:00
conversationID string //maybe userID or super groupID
ctx context.Context
ctxMsgList []*ContextMsg
2022-05-20 14:42:49 +08:00
}
2023-02-15 15:52:32 +08:00
2022-05-20 14:42:49 +08:00
type TriggerChannelValue struct {
2023-03-22 18:35:21 +08:00
ctx context.Context
cMsgList []*sarama.ConsumerMessage
2022-05-19 12:25:46 +08:00
}
2023-02-15 15:52:32 +08:00
2022-05-11 18:33:48 +08:00
type Cmd2Value struct {
2022-05-11 20:49:47 +08:00
Cmd int
2022-05-11 18:33:48 +08:00
Value interface{}
}
2023-03-22 18:35:21 +08:00
type ContextMsg struct {
2023-04-28 18:39:21 +08:00
message *sdkws.MsgData
2023-03-22 18:35:21 +08:00
ctx context.Context
}
2023-02-15 15:52:32 +08:00
2022-06-16 20:35:27 +08:00
type OnlineHistoryRedisConsumerHandler struct {
2023-02-15 15:52:32 +08:00
historyConsumerGroup *kafka.MConsumerGroup
2022-05-20 13:33:38 +08:00
chArrays [ChannelNum]chan Cmd2Value
2022-05-25 18:46:53 +08:00
msgDistributionCh chan Cmd2Value
2023-02-15 15:52:32 +08:00
singleMsgSuccessCount uint64
singleMsgFailedCount uint64
singleMsgSuccessCountMutex sync.Mutex
singleMsgFailedCountMutex sync.Mutex
2023-05-08 12:39:45 +08:00
msgDatabase controller.CommonMsgDatabase
2023-04-28 18:33:33 +08:00
conversationRpcClient *rpcclient.ConversationClient
groupRpcClient *rpcclient.GroupClient
2021-05-26 19:24:25 +08:00
}
2023-05-17 11:42:25 +08:00
func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient, groupRpcClient *rpcclient.GroupClient) *OnlineHistoryRedisConsumerHandler {
2023-03-03 17:42:26 +08:00
var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database
2022-05-25 18:46:53 +08:00
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
go och.MessagesDistributionHandle()
2022-05-20 15:51:37 +08:00
for i := 0; i < ChannelNum; i++ {
2022-05-25 22:42:44 +08:00
och.chArrays[i] = make(chan Cmd2Value, 50)
2022-05-20 15:51:37 +08:00
go och.Run(i)
}
2023-04-28 18:33:33 +08:00
och.conversationRpcClient = conversationRpcClient
2023-05-17 11:42:25 +08:00
och.groupRpcClient = groupRpcClient
2023-02-15 15:52:32 +08:00
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
2021-05-26 19:24:25 +08:00
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
2022-06-16 20:35:27 +08:00
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
2023-03-16 10:20:40 +08:00
//statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
2023-03-03 17:42:26 +08:00
return &och
2022-05-11 18:33:48 +08:00
}
2023-02-15 15:52:32 +08:00
2022-06-16 20:35:27 +08:00
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
2022-05-19 12:25:46 +08:00
for {
select {
2022-05-20 13:33:38 +08:00
case cmd := <-och.chArrays[channelID]:
2022-05-19 12:25:46 +08:00
switch cmd.Cmd {
2023-04-28 18:38:12 +08:00
case SourceMessages:
2022-05-19 12:25:46 +08:00
msgChannelValue := cmd.Value.(MsgChannelValue)
2023-03-22 18:35:21 +08:00
ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx
2023-05-04 15:06:23 +08:00
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "conversationID", msgChannelValue.conversationID)
2023-05-16 18:37:14 +08:00
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
2023-05-10 18:00:05 +08:00
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
2023-05-16 19:08:43 +08:00
och.handleMsg(ctx, utils.GetChatConversationIDByMsg(ctxMsgList[0].message), storageMsgList, notStorageMsgList)
och.handleNotification(ctx, utils.GetNotificationConversationID(ctxMsgList[0].message), storageNotificationList, notStorageNotificationList)
2023-05-04 15:06:23 +08:00
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.conversationID, modifyMsgList); err != nil {
log.ZError(ctx, "msg to modify mq error", err, "conversationID", msgChannelValue.conversationID, "modifyMsgList", modifyMsgList)
2022-05-19 12:25:46 +08:00
}
}
}
}
}
2023-04-26 18:57:41 +08:00
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
2023-05-16 18:37:14 +08:00
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
2023-04-28 18:39:21 +08:00
isStorage := func(msg *sdkws.MsgData) bool {
options2 := utils.Options(msg.Options)
2023-04-26 18:57:41 +08:00
if options2.IsHistory() {
return true
} else {
2023-05-04 15:06:23 +08:00
// if !(!options2.IsSenderSync() && conversationID == msg.MsgData.SendID) {
// return false
// }
return false
2023-04-26 18:57:41 +08:00
}
}
for _, v := range totalMsgs {
2023-04-28 18:39:21 +08:00
options := utils.Options(v.message.Options)
2023-05-10 17:48:26 +08:00
if !options.IsNotNotification() {
2023-05-17 18:44:55 +08:00
// clone msg from notificationMsg
2023-04-26 18:57:41 +08:00
if options.IsSendMsg() {
2023-05-17 18:44:55 +08:00
msg := proto.Clone(v.message).(*sdkws.MsgData)
2023-04-26 18:57:41 +08:00
// 消息
2023-05-10 17:48:26 +08:00
if v.message.Options != nil {
2023-05-17 18:44:55 +08:00
msg.Options = utils.NewMsgOptions()
2023-05-10 17:48:26 +08:00
}
2023-05-17 18:44:55 +08:00
if options.IsOfflinePush() {
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithOfflinePush(false))
msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithOfflinePush(true))
}
if options.IsUnreadCount() {
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithUnreadCount(false))
msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithUnreadCount(true))
}
storageMsgList = append(storageMsgList, msg)
2023-04-26 18:57:41 +08:00
}
2023-05-17 18:44:55 +08:00
if isStorage(v.message) {
storageNotificatoinList = append(storageNotificatoinList, v.message)
2023-04-26 18:57:41 +08:00
} else {
2023-05-17 18:44:55 +08:00
notStorageNotificationList = append(notStorageNotificationList, v.message)
2023-04-26 18:57:41 +08:00
}
} else {
if isStorage(v.message) {
storageMsgList = append(storageMsgList, v.message)
} else {
notStorageMsgList = append(notStorageMsgList, v.message)
}
}
2023-04-28 18:39:21 +08:00
if v.message.ContentType == constant.ReactionMessageModifier || v.message.ContentType == constant.ReactionMessageDeleter {
2023-04-26 18:57:41 +08:00
modifyMsgList = append(modifyMsgList, v.message)
}
}
return
}
2023-05-16 19:08:43 +08:00
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
2023-05-04 15:06:23 +08:00
och.toPushTopic(ctx, conversationID, notStorageList)
2023-04-28 18:33:33 +08:00
if len(storageList) > 0 {
2023-05-08 12:39:45 +08:00
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
2023-04-28 18:33:33 +08:00
if err != nil {
2023-05-04 15:06:23 +08:00
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageList)
2023-04-28 18:33:33 +08:00
return
}
2023-05-16 18:37:14 +08:00
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
2023-05-04 15:06:23 +08:00
och.msgDatabase.MsgToMongoMQ(ctx, conversationID, storageList, lastSeq)
och.toPushTopic(ctx, conversationID, storageList)
2023-04-28 18:33:33 +08:00
}
2023-04-26 18:57:41 +08:00
}
2023-05-04 15:54:04 +08:00
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) {
2023-04-28 18:33:33 +08:00
for _, v := range msgs {
2023-05-04 15:06:23 +08:00
och.msgDatabase.MsgToPushMQ(ctx, conversationID, v)
2023-04-28 18:33:33 +08:00
}
2023-04-26 18:57:41 +08:00
}
2023-05-16 19:08:43 +08:00
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
2023-05-04 15:06:23 +08:00
och.toPushTopic(ctx, conversationID, notStorageList)
2023-04-26 18:57:41 +08:00
if len(storageList) > 0 {
2023-05-08 12:39:45 +08:00
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
2023-05-10 11:49:55 +08:00
if err != nil && errs.Unwrap(err) != redis.Nil {
2023-05-08 12:39:45 +08:00
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
och.singleMsgFailedCountMutex.Lock()
och.singleMsgFailedCount += uint64(len(storageList))
och.singleMsgFailedCountMutex.Unlock()
return
}
if isNewConversation {
2023-05-05 21:30:32 +08:00
if storageList[0].SessionType == constant.SuperGroupChatType {
2023-05-04 15:06:23 +08:00
log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID)
2023-05-04 17:27:29 +08:00
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
if err != nil {
2023-05-08 12:39:45 +08:00
log.ZWarn(ctx, "get group member ids error", err, "conversationID", conversationID)
2023-05-04 17:27:29 +08:00
} else {
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx, storageList[0].GroupID, userIDs); err != nil {
2023-05-08 12:39:45 +08:00
log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
2023-05-04 17:27:29 +08:00
}
2023-04-28 18:33:33 +08:00
}
2023-05-05 21:30:32 +08:00
} else {
2023-05-04 17:27:29 +08:00
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID, storageList[0].SendID); err != nil {
2023-05-08 12:39:45 +08:00
log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
2023-04-28 18:33:33 +08:00
}
}
}
2023-05-08 12:39:45 +08:00
log.ZDebug(ctx, "success incr to next topic")
2023-04-28 18:33:33 +08:00
och.singleMsgSuccessCountMutex.Lock()
och.singleMsgSuccessCount += uint64(len(storageList))
och.singleMsgSuccessCountMutex.Unlock()
2023-05-04 15:06:23 +08:00
och.msgDatabase.MsgToMongoMQ(ctx, conversationID, storageList, lastSeq)
och.toPushTopic(ctx, conversationID, storageList)
2023-04-26 18:57:41 +08:00
}
2023-04-28 18:33:33 +08:00
}
2022-06-16 20:35:27 +08:00
func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
2022-05-25 18:46:53 +08:00
for {
2023-03-22 18:35:21 +08:00
aggregationMsgs := make(map[string][]*ContextMsg, ChannelNum)
2022-05-25 18:46:53 +08:00
select {
case cmd := <-och.msgDistributionCh:
switch cmd.Cmd {
case ConsumerMsgs:
triggerChannelValue := cmd.Value.(TriggerChannelValue)
2023-03-22 18:35:21 +08:00
ctx := triggerChannelValue.ctx
2023-02-15 15:52:32 +08:00
consumerMessages := triggerChannelValue.cMsgList
2022-05-25 18:46:53 +08:00
//Aggregation map[userid]message list
2023-03-22 19:38:35 +08:00
log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages))
2022-05-25 18:46:53 +08:00
for i := 0; i < len(consumerMessages); i++ {
2023-03-22 18:35:21 +08:00
ctxMsg := &ContextMsg{}
2023-04-28 18:39:21 +08:00
var msgFromMQ sdkws.MsgData
2022-05-25 18:46:53 +08:00
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
if err != nil {
2023-03-22 18:35:21 +08:00
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
2022-05-25 18:46:53 +08:00
return
}
2023-05-10 15:21:08 +08:00
var arr []string
for i, header := range consumerMessages[i].Headers {
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
}
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", "))
2023-03-22 18:35:21 +08:00
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
ctxMsg.message = &msgFromMQ
log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
2023-04-26 15:10:20 +08:00
//aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
2022-05-28 18:10:08 +08:00
if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
2023-03-22 18:35:21 +08:00
oldM = append(oldM, ctxMsg)
2022-05-28 18:10:08 +08:00
aggregationMsgs[string(consumerMessages[i].Key)] = oldM
2022-05-25 18:46:53 +08:00
} else {
2023-03-22 18:35:21 +08:00
m := make([]*ContextMsg, 0, 100)
m = append(m, ctxMsg)
2022-05-28 18:10:08 +08:00
aggregationMsgs[string(consumerMessages[i].Key)] = m
2022-05-25 18:46:53 +08:00
}
}
2023-03-22 19:38:35 +08:00
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
2023-05-04 15:06:23 +08:00
for conversationID, v := range aggregationMsgs {
2022-05-25 18:46:53 +08:00
if len(v) >= 0 {
2023-05-04 15:06:23 +08:00
hashCode := utils.GetHashCode(conversationID)
2022-05-25 18:46:53 +08:00
channelID := hashCode % ChannelNum
2023-05-04 15:06:23 +08:00
log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "conversationID", conversationID)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{conversationID: conversationID, ctxMsgList: v, ctx: ctx}}
2022-05-25 18:46:53 +08:00
}
}
}
}
2022-05-19 12:25:46 +08:00
}
2021-05-26 19:24:25 +08:00
}
2023-03-22 19:39:20 +08:00
func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
2022-05-20 13:33:38 +08:00
2023-02-15 15:52:32 +08:00
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
2022-05-22 21:08:57 +08:00
for {
if sess == nil {
2023-04-18 14:43:54 +08:00
log.ZWarn(context.Background(), "sess == nil, waiting", nil)
2022-05-22 21:08:57 +08:00
time.Sleep(100 * time.Millisecond)
} else {
break
}
2022-05-21 10:19:02 +08:00
}
2022-05-25 20:29:32 +08:00
rwLock := new(sync.RWMutex)
2023-03-22 18:47:29 +08:00
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
2022-05-25 18:46:53 +08:00
cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
2023-04-26 15:10:20 +08:00
t := time.NewTicker(time.Millisecond * 100)
2022-05-25 20:29:32 +08:00
go func() {
2022-05-25 21:15:17 +08:00
for {
select {
case <-t.C:
if len(cMsg) > 0 {
rwLock.Lock()
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
for _, v := range cMsg {
ccMsg = append(ccMsg, v)
}
cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
rwLock.Unlock()
split := 1000
2023-05-10 17:18:04 +08:00
ctx := mcontext.NewCtx(utils.OperationIDGenerator())
2023-05-10 17:21:58 +08:00
ctx = mcontext.WithTriggerIDContext(ctx, utils.OperationIDGenerator())
2023-03-22 19:38:35 +08:00
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg))
2022-05-25 21:15:17 +08:00
for i := 0; i < len(ccMsg)/split; i++ {
//log.Debug()
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
2023-03-22 18:35:21 +08:00
ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split]}}
2022-05-25 21:15:17 +08:00
}
if (len(ccMsg) % split) > 0 {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
2023-03-22 18:35:21 +08:00
ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):]}}
2022-05-25 21:15:17 +08:00
}
2023-03-22 19:38:35 +08:00
log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg))
2022-05-25 21:03:48 +08:00
}
2022-05-25 18:46:53 +08:00
}
}
2022-05-25 20:29:32 +08:00
}()
2022-05-25 21:23:10 +08:00
for msg := range claim.Messages() {
rwLock.Lock()
2022-06-08 17:11:17 +08:00
if len(msg.Value) != 0 {
cMsg = append(cMsg, msg)
}
2022-05-25 21:23:10 +08:00
rwLock.Unlock()
sess.MarkMessage(msg, "")
}
2021-05-26 19:24:25 +08:00
return nil
}