mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 14:29:19 +08:00
fix: performance issues with Kafka caused by encapsulating the MQ interface (#3485)
This commit is contained in:
@@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config)
|
||||
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase, config)
|
||||
|
||||
msgTransfer := &MsgTransfer{
|
||||
historyConsumer: historyConsumer,
|
||||
@@ -161,8 +161,8 @@ func (m *MsgTransfer) Start(ctx context.Context) error {
|
||||
}()
|
||||
|
||||
go func() {
|
||||
fn := func(ctx context.Context, key string, value []byte) error {
|
||||
m.historyMongoHandler.HandleChatWs2Mongo(ctx, key, value)
|
||||
fn := func(msg mq.Message) error {
|
||||
m.historyMongoHandler.HandleChatWs2Mongo(msg)
|
||||
return nil
|
||||
}
|
||||
for {
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/openimsdk/tools/mq"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
@@ -77,6 +78,7 @@ type ConsumerMessage struct {
|
||||
Ctx context.Context
|
||||
Key string
|
||||
Value []byte
|
||||
Raw mq.Message
|
||||
}
|
||||
|
||||
func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
|
||||
@@ -113,6 +115,11 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.
|
||||
b.Do = och.do
|
||||
och.redisMessageBatches = b
|
||||
|
||||
och.redisMessageBatches.OnComplete = func(lastMessage *ConsumerMessage, totalCount int) {
|
||||
lastMessage.Raw.Mark()
|
||||
lastMessage.Raw.Commit()
|
||||
}
|
||||
|
||||
return &och, nil
|
||||
}
|
||||
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) {
|
||||
@@ -388,10 +395,10 @@ func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Conte
|
||||
return mcontext.SetOperationID(ctx, allMessageOperationID)
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(ctx context.Context, key string, value []byte) error { // a instance in the consumer group
|
||||
err := och.redisMessageBatches.Put(ctx, &ConsumerMessage{Ctx: ctx, Key: key, Value: value})
|
||||
func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(msg mq.Message) error { // a instance in the consumer group
|
||||
err := och.redisMessageBatches.Put(msg.Context(), &ConsumerMessage{Ctx: msg.Context(), Key: msg.Key(), Value: msg.Value(), Raw: msg})
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "put msg to error", err, "key", key, "value", value)
|
||||
log.ZWarn(msg.Context(), "put msg to error", err, "key", msg.Key(), "value", msg.Value())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -15,12 +15,12 @@
|
||||
package msgtransfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/tools/mq"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbmsg "github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@@ -40,7 +40,10 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabas
|
||||
}
|
||||
}
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Context, key string, msg []byte) {
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) {
|
||||
ctx := val.Context()
|
||||
key := val.Key()
|
||||
msg := val.Value()
|
||||
msgFromMQ := pbmsg.MsgDataToMongoByMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
if err != nil {
|
||||
@@ -58,6 +61,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont
|
||||
prommetrics.MsgInsertMongoFailedCounter.Inc()
|
||||
} else {
|
||||
prommetrics.MsgInsertMongoSuccessCounter.Inc()
|
||||
val.Mark()
|
||||
}
|
||||
|
||||
for _, msgData := range msgFromMQ.MsgData {
|
||||
|
||||
Reference in New Issue
Block a user