refactor: db cache batch refactor and batch consume message. (#2325)

* refactor: cmd update.

* refactor: msg transfer refactor.

* refactor: msg transfer refactor.

* refactor: msg transfer refactor.

* fix: read prometheus port when flag set to enable and prevent failure during startup.

* fix: notification has counted unread counts bug fix.

* fix: merge opensource code into local.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* refactor: delete message and message batch use lua.

* fix: add protective measures against memory overflow.
This commit is contained in:
OpenIM-Gordon
2024-06-03 11:24:37 +08:00
committed by GitHub
parent 67fe13f089
commit 973442e3d3
31 changed files with 1147 additions and 1333 deletions
+144 -319
View File
@@ -16,51 +16,34 @@ package msgtransfer
import (
"context"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
"strconv"
"strings"
"time"
)
const (
ConsumerMsgs = 3
SourceMessages = 4
MongoMessages = 5
ChannelNum = 100
size = 500
mainDataBuffer = 500
subChanBuffer = 50
worker = 50
interval = 100 * time.Millisecond
)
type MsgChannelValue struct {
uniqueKey string
ctx context.Context
ctxMsgList []*ContextMsg
}
type TriggerChannelValue struct {
ctx context.Context
cMsgList []*sarama.ConsumerMessage
}
type Cmd2Value struct {
Cmd int
Value any
}
type ContextMsg struct {
message *sdkws.MsgData
ctx context.Context
@@ -68,13 +51,8 @@ type ContextMsg struct {
type OnlineHistoryRedisConsumerHandler struct {
historyConsumerGroup *kafka.MConsumerGroup
chArrays [ChannelNum]chan Cmd2Value
msgDistributionCh chan Cmd2Value
// singleMsgSuccessCount uint64
// singleMsgFailedCount uint64
// singleMsgSuccessCountMutex sync.Mutex
// singleMsgFailedCountMutex sync.Mutex
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
msgDatabase controller.CommonMsgDatabase
conversationRpcClient *rpcclient.ConversationRpcClient
@@ -83,89 +61,82 @@ type OnlineHistoryRedisConsumerHandler struct {
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, true)
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false)
if err != nil {
return nil, err
}
var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
go och.MessagesDistributionHandle()
for i := 0; i < ChannelNum; i++ {
och.chArrays[i] = make(chan Cmd2Value, 50)
go och.Run(i)
b := batcher.New[sarama.ConsumerMessage](
batcher.WithSize(size),
batcher.WithWorker(worker),
batcher.WithInterval(interval),
batcher.WithDataBuffer(mainDataBuffer),
batcher.WithSyncWait(true),
batcher.WithBuffer(subChanBuffer),
)
b.Sharding = func(key string) int {
hashCode := stringutil.GetHashCode(key)
return int(hashCode) % och.redisMessageBatches.Worker()
}
b.Key = func(consumerMessage *sarama.ConsumerMessage) string {
return string(consumerMessage.Key)
}
b.Do = och.do
och.redisMessageBatches = b
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = historyConsumerGroup
return &och, err
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID())
ctxMessages := och.parseConsumerMessages(ctx, val.Val())
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
"key", val.Key())
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
for cmd := range och.chArrays[channelID] {
switch cmd.Cmd {
case SourceMessages:
msgChannelValue := cmd.Value.(MsgChannelValue)
ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx
log.ZDebug(
ctx,
"msg arrived channel",
"channel id",
channelID,
"msgList length",
len(ctxMsgList),
"uniqueKey",
msgChannelValue.uniqueKey,
)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(
ctxMsgList,
)
log.ZDebug(
ctx,
"msg lens",
"storageMsgList",
len(storageMsgList),
"notStorageMsgList",
len(notStorageMsgList),
"storageNotificationList",
len(storageNotificationList),
"notStorageNotificationList",
len(notStorageNotificationList),
"modifyMsgList",
len(modifyMsgList),
)
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
och.handleNotification(
ctx,
msgChannelValue.uniqueKey,
conversationIDNotification,
storageNotificationList,
notStorageNotificationList,
)
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
log.ZError(ctx, "msg to modify mq error", err, "uniqueKey", msgChannelValue.uniqueKey, "modifyMsgList", modifyMsgList)
}
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
och.categorizeMessageLists(ctxMessages)
log.ZDebug(ctx, "number of categorized messages", "storageMsgList", len(storageMsgList), "notStorageMsgList",
len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList",
len(notStorageNotificationList))
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMessages[0].message)
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMessages[0].message)
och.handleMsg(ctx, val.Key(), conversationIDMsg, storageMsgList, notStorageMsgList)
och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList)
}
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
var ctxMessages []*ContextMsg
for i := 0; i < len(consumerMessages); i++ {
ctxMsg := &ContextMsg{}
msgFromMQ := &sdkws.MsgData{}
err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ)
if err != nil {
log.ZWarn(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
continue
}
var arr []string
for i, header := range consumerMessages[i].Headers {
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
}
log.ZDebug(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers),
"header", strings.Join(arr, ", "))
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
ctxMsg.message = msgFromMQ
log.ZDebug(ctx, "message parse finish", "message", msgFromMQ, "key",
string(consumerMessages[i].Key))
ctxMessages = append(ctxMessages, ctxMsg)
}
return ctxMessages
}
// Get messages/notifications stored message list, not stored and pushed message list.
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
totalMsgs []*ContextMsg,
) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
isStorage := func(msg *sdkws.MsgData) bool {
options2 := msgprocessor.Options(msg.Options)
if options2.IsHistory() {
return true
}
// if !(!options2.IsSenderSync() && conversationID == msg.MsgData.SendID) {
// return false
// }
return false
}
func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs []*ContextMsg) (storageMsgList,
notStorageMsgList, storageNotificationList, notStorageNotificationList []*ContextMsg) {
for _, v := range totalMsgs {
options := msgprocessor.Options(v.message.Options)
if !options.IsNotNotification() {
@@ -185,176 +156,106 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
msgprocessor.WithOfflinePush(false),
msgprocessor.WithUnreadCount(false),
)
storageMsgList = append(storageMsgList, msg)
ctxMsg := &ContextMsg{
message: msg,
ctx: v.ctx,
}
storageMsgList = append(storageMsgList, ctxMsg)
}
if isStorage(v.message) {
storageNotificatoinList = append(storageNotificatoinList, v.message)
if options.IsHistory() {
storageNotificationList = append(storageNotificationList, v)
} else {
notStorageNotificationList = append(notStorageNotificationList, v.message)
notStorageNotificationList = append(notStorageNotificationList, v)
}
} else {
if isStorage(v.message) {
storageMsgList = append(storageMsgList, v.message)
if options.IsHistory() {
storageMsgList = append(storageMsgList, v)
} else {
notStorageMsgList = append(notStorageMsgList, v.message)
notStorageMsgList = append(notStorageMsgList, v)
}
}
if v.message.ContentType == constant.ReactionMessageModifier ||
v.message.ContentType == constant.ReactionMessageDeleter {
modifyMsgList = append(modifyMsgList, v.message)
}
}
return
}
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(
ctx context.Context,
key, conversationID string,
storageList, notStorageList []*sdkws.MsgData,
) {
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
och.toPushTopic(ctx, key, conversationID, notStorageList)
if len(storageList) > 0 {
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
if err != nil {
log.ZError(
ctx,
"notification batch insert to redis error",
err,
"conversationID",
conversationID,
"storageList",
storageList,
)
return
}
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
if err != nil {
log.ZError(ctx, "MsgToMongoMQ error", err)
}
och.toPushTopic(ctx, key, conversationID, storageList)
var storageMessageList []*sdkws.MsgData
for _, msg := range storageList {
storageMessageList = append(storageMessageList, msg.message)
}
}
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData) {
for _, v := range msgs {
och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v) // nolint: errcheck
}
}
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
och.toPushTopic(ctx, key, conversationID, notStorageList)
if len(storageList) > 0 {
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
if len(storageMessageList) > 0 {
msg := storageMessageList[0]
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil && errs.Unwrap(err) != redis.Nil {
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
return
}
if isNewConversation {
switch storageList[0].SessionType {
switch msg.SessionType {
case constant.ReadGroupChatType:
log.ZInfo(ctx, "group chat first create conversation", "conversationID",
conversationID)
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID)
if err != nil {
log.ZWarn(ctx, "get group member ids error", err, "conversationID",
conversationID)
} else {
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx,
storageList[0].GroupID, userIDs); err != nil {
msg.GroupID, userIDs); err != nil {
log.ZWarn(ctx, "single chat first create conversation error", err,
"conversationID", conversationID)
}
}
case constant.SingleChatType, constant.NotificationChatType:
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID,
storageList[0].SendID, conversationID, storageList[0].SessionType); err != nil {
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, msg.RecvID,
msg.SendID, conversationID, msg.SessionType); err != nil {
log.ZWarn(ctx, "single chat or notification first create conversation error", err,
"conversationID", conversationID, "sessionType", storageList[0].SessionType)
"conversationID", conversationID, "sessionType", msg.SessionType)
}
default:
log.ZWarn(ctx, "unknown session type", nil, "sessionType",
storageList[0].SessionType)
msg.SessionType)
}
}
log.ZDebug(ctx, "success incr to next topic")
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil {
log.ZError(ctx, "MsgToMongoMQ error", err)
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
}
och.toPushTopic(ctx, key, conversationID, storageList)
}
}
func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
for {
aggregationMsgs := make(map[string][]*ContextMsg, ChannelNum)
select {
case cmd := <-och.msgDistributionCh:
switch cmd.Cmd {
case ConsumerMsgs:
triggerChannelValue := cmd.Value.(TriggerChannelValue)
ctx := triggerChannelValue.ctx
consumerMessages := triggerChannelValue.cMsgList
// Aggregation map[userid]message list
log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages))
for i := 0; i < len(consumerMessages); i++ {
ctxMsg := &ContextMsg{}
msgFromMQ := &sdkws.MsgData{}
err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ)
if err != nil {
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
continue
}
var arr []string
for i, header := range consumerMessages[i].Headers {
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
}
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers),
"header", strings.Join(arr, ", "))
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
ctxMsg.message = msgFromMQ
log.ZDebug(
ctx,
"single msg come to distribution center",
"message",
msgFromMQ,
"key",
string(consumerMessages[i].Key),
)
// aggregationMsgs[string(consumerMessages[i].Key)] =
// append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, ctxMsg)
aggregationMsgs[string(consumerMessages[i].Key)] = oldM
} else {
m := make([]*ContextMsg, 0, 100)
m = append(m, ctxMsg)
aggregationMsgs[string(consumerMessages[i].Key)] = m
}
}
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
for uniqueKey, v := range aggregationMsgs {
if len(v) >= 0 {
hashCode := stringutil.GetHashCode(uniqueKey)
channelID := hashCode % ChannelNum
newCtx := withAggregationCtx(ctx, v)
log.ZDebug(
newCtx,
"generate channelID",
"hashCode",
hashCode,
"channelID",
channelID,
"uniqueKey",
uniqueKey,
)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: newCtx}}
}
}
}
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, key, conversationID string,
storageList, notStorageList []*ContextMsg) {
och.toPushTopic(ctx, key, conversationID, notStorageList)
var storageMessageList []*sdkws.MsgData
for _, msg := range storageList {
storageMessageList = append(storageMessageList, msg.message)
}
if len(storageMessageList) > 0 {
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil {
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
"storageList", storageMessageList)
return
}
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil {
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
}
och.toPushTopic(ctx, key, conversationID, storageList)
}
}
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) {
for _, v := range msgs {
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
}
}
@@ -377,106 +278,30 @@ func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSess
return nil
}
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error { // a instance in the consumer group
for {
if sess == nil {
log.ZWarn(context.Background(), "sess == nil, waiting", nil)
time.Sleep(100 * time.Millisecond)
} else {
break
}
}
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
log.ZInfo(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
var (
split = 1000
rwLock = new(sync.RWMutex)
messages = make([]*sarama.ConsumerMessage, 0, 1000)
ticker = time.NewTicker(time.Millisecond * 100)
wg = sync.WaitGroup{}
running = new(atomic.Bool)
)
running.Store(true)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ticker.C:
// if the buffer is empty and running is false, return loop.
if len(messages) == 0 {
if !running.Load() {
return
}
continue
}
rwLock.Lock()
buffer := make([]*sarama.ConsumerMessage, 0, len(messages))
buffer = append(buffer, messages...)
// reuse slice, set cap to 0
messages = messages[:0]
rwLock.Unlock()
start := time.Now()
ctx := mcontext.WithTriggerIDContext(context.Background(), idutil.OperationIDGenerator())
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
for i := 0; i < len(buffer)/split; i++ {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
ctx: ctx, cMsgList: buffer[i*split : (i+1)*split],
}}
}
if (len(buffer) % split) > 0 {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):],
}}
}
log.ZDebug(ctx, "timer trigger msg consumer end",
"length", len(buffer), "time_cost", time.Since(start),
)
och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) {
session.MarkMessage(lastMessage, "")
session.Commit()
}
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for running.Load() {
select {
case msg, ok := <-claim.Messages():
if !ok {
running.Store(false)
return
}
if len(msg.Value) == 0 {
continue
}
rwLock.Lock()
messages = append(messages, msg)
rwLock.Unlock()
sess.MarkMessage(msg, "")
case <-sess.Context().Done():
running.Store(false)
return
if len(msg.Value) == 0 {
continue
}
err := och.redisMessageBatches.Put(context.Background(), msg)
if err != nil {
log.ZWarn(context.Background(), "put msg to error", err, "msg", msg)
}
case <-session.Context().Done():
return nil
}
}()
wg.Wait()
return nil
}
}