mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-14 14:05:59 +08:00
refactor: 3.7.0 code conventions. (#2148)
* Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * feat: add code lint * feat: add code lint * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * Script Refactoring * feat: code format * Script Refactoring * Script Refactoring * Script Refactoring * Adjust MinIO configuration settings * Adjust configuration settings * Adjust configuration settings * refactor: config change. * refactor: webhooks update. * Adjust configuration settings * refactor: webhooks update. * Adjust configuration settings * Adjust configuration settings * Adjust configuration settings * feat: s3 api addr * refactor: webhooks update. * Adjust configuration settings * Adjust configuration settings * Adjust configuration settings * Adjust configuration settings * Adjust configuration settings * Adjust configuration settings * Adjust configuration settings * refactor: webhooks update. * refactor: kafka update. * Simplify the Docker Compose configuration, remove unnecessary environment variables, and eliminate the gateway service. * refactor: kafka update. * refactor: kafka update. * Simplify the Docker Compose configuration, remove unnecessary environment variables, and eliminate the gateway service. * Simplify the Docker Compose configuration, remove unnecessary environment variables, and eliminate the gateway service. * Windows can compile and run. * Windows can compile and run. * refactor: kafka update. * feat: msg cache split * refactor: webhooks update * refactor: webhooks update * refactor: friends update * refactor: group update * refactor: third update * refactor: api update * refactor: crontab update * refactor: msggateway update * mage * mage * refactor: all module update. * check * refactor: all module update. * load config * load config * load config * load config * refactor: all module update. * refactor: all module update. * refactor: all module update. * refactor: all module update. * refactor: all module update. * Optimize Docker configuration and script. * refactor: all module update. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * refactor: all module update. * Optimize Docker configuration and script. * refactor: all module update. * refactor: all module update. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * update tools * update tools * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * update protocol * Optimize Docker configuration and script. * Optimize Docker configuration and script. * refactor: all module update. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * Optimize Docker configuration and script. * refactor: api remove token auth by redis directly. * Code Refactoring * refactor: websocket auth change to call rpc of auth. * refactor: kick online user and remove token change to call auth rpc. * refactor: kick online user and remove token change to call auth rpc. * refactor: remove msggateway redis. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor webhook * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor webhook * refactor: cmd update. * refactor: cmd update. * fix: runtime: goroutine stack exceeds * refactor: cmd update. * refactor notification * refactor notification * refactor * refactor: cmd update. * refactor: cmd update. * refactor * refactor * refactor * protojson * protojson * protojson * go mod * wrapperspb * refactor: cmd update. * refactor: cmd update. * refactor: cmd update. * refactor: context update. * refactor: websocket update info. * refactor: websocket update info. * refactor: websocket update info. * refactor: websocket update info. * refactor: api name change. * refactor: debug info. * refactor: debug info. * refactor: debug info. * fix: update file * refactor * refactor * refactor: debug info. * refactor: debug info. * refactor: debug info. * refactor: debug info. * refactor: debug info. * refactor: debug info. * fix: callback update. * fix: callback update. * refactor * fix: update message. * fix: msg cache timeout. * refactor * refactor * fix: push update. * fix: push update. * fix: push update. * fix: push update. * fix: push update. * fix: push update. * fix: push update. * fix: websocket handle error remove when upgrade error. --------- Co-authored-by: skiffer-git <44203734@qq.com> Co-authored-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com> Co-authored-by: withchao <993506633@qq.com>
This commit is contained in:
@@ -16,23 +16,26 @@ package msgtransfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"github.com/openimsdk/tools/db/redisutil"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/mw"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
|
||||
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/mw"
|
||||
"github.com/openimsdk/tools/system/program"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
@@ -50,59 +53,67 @@ type MsgTransfer struct {
|
||||
// and handle the deletion notification message deleted subscriptions topic: msg_to_mongo
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
config *config.GlobalConfig
|
||||
}
|
||||
|
||||
func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
|
||||
rdb, err := cache.NewRedis(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mongo, err := unrelation.NewMongo(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = mongo.CreateMsgIndex(); err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := kdisc.NewDiscoveryRegister(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
|
||||
msgModel := cache.NewMsgCacheModel(rdb, config)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database))
|
||||
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config)
|
||||
groupRpcClient := rpcclient.NewGroupRpcClient(client, config)
|
||||
msgTransfer, err := NewMsgTransfer(config, msgDatabase, &conversationRpcClient, &groupRpcClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return msgTransfer.Start(prometheusPort, config)
|
||||
type Config struct {
|
||||
MsgTransfer config.MsgTransfer
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
KafkaConfig config.Kafka
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
}
|
||||
|
||||
func NewMsgTransfer(
|
||||
config *config.GlobalConfig,
|
||||
msgDatabase controller.CommonMsgDatabase,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient,
|
||||
groupRpcClient *rpcclient.GroupRpcClient,
|
||||
) (*MsgTransfer, error) {
|
||||
historyCH, err := NewOnlineHistoryRedisConsumerHandler(config, msgDatabase, conversationRpcClient, groupRpcClient)
|
||||
func Start(ctx context.Context, index int, config *Config) error {
|
||||
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts",
|
||||
config.MsgTransfer.Prometheus.Ports, "index", index)
|
||||
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := client.CreateRpcRootNodes(config.Share.RpcRegisterName.GetServiceNames()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
|
||||
//todo MsgCacheTimeout
|
||||
msgModel := cache.NewMsgCache(rdb, config.RedisConfig.EnablePipeline)
|
||||
seqModel := cache.NewSeqCache(rdb)
|
||||
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
|
||||
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
|
||||
msgTransfer, err := NewMsgTransfer(&config.KafkaConfig, msgDatabase, &conversationRpcClient, &groupRpcClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return msgTransfer.Start(index, config)
|
||||
}
|
||||
|
||||
func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
|
||||
historyCH, err := NewOnlineHistoryRedisConsumerHandler(kafkaConf, msgDatabase, conversationRpcClient, groupRpcClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(config, msgDatabase)
|
||||
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(kafkaConf, msgDatabase)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -110,14 +121,13 @@ func NewMsgTransfer(
|
||||
return &MsgTransfer{
|
||||
historyCH: historyCH,
|
||||
historyMongoCH: historyMongoCH,
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) error {
|
||||
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
|
||||
if prometheusPort <= 0 {
|
||||
return errs.Wrap(errors.New("prometheusPort not correct"))
|
||||
func (m *MsgTransfer) Start(index int, config *Config) error {
|
||||
prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.ctx, m.cancel = context.WithCancel(context.Background())
|
||||
|
||||
@@ -129,17 +139,17 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) err
|
||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH)
|
||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH)
|
||||
|
||||
if config.Prometheus.Enable {
|
||||
if config.MsgTransfer.Prometheus.Enable {
|
||||
go func() {
|
||||
proreg := prometheus.NewRegistry()
|
||||
proreg.MustRegister(
|
||||
collectors.NewGoCollector(),
|
||||
)
|
||||
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", config)...)
|
||||
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.Share)...)
|
||||
http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg}))
|
||||
err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)
|
||||
if err != nil && err != http.ErrServerClosed {
|
||||
netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", prometheusPort))
|
||||
netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort)
|
||||
netDone <- struct{}{}
|
||||
}
|
||||
}()
|
||||
@@ -149,7 +159,7 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) err
|
||||
signal.Notify(sigs, syscall.SIGTERM)
|
||||
select {
|
||||
case <-sigs:
|
||||
util.SIGTERMExit()
|
||||
program.SIGTERMExit()
|
||||
// graceful close kafka client.
|
||||
m.cancel()
|
||||
m.historyCH.historyConsumerGroup.Close()
|
||||
|
||||
@@ -23,18 +23,19 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/mq/kafka"
|
||||
"github.com/openimsdk/tools/utils/idutil"
|
||||
"github.com/openimsdk/tools/utils/stringutil"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
@@ -80,12 +81,12 @@ type OnlineHistoryRedisConsumerHandler struct {
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
}
|
||||
|
||||
func NewOnlineHistoryRedisConsumerHandler(
|
||||
config *config.GlobalConfig,
|
||||
database controller.CommonMsgDatabase,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient,
|
||||
groupRpcClient *rpcclient.GroupRpcClient,
|
||||
) (*OnlineHistoryRedisConsumerHandler, error) {
|
||||
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
|
||||
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var och OnlineHistoryRedisConsumerHandler
|
||||
och.msgDatabase = database
|
||||
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
|
||||
@@ -96,32 +97,7 @@ func NewOnlineHistoryRedisConsumerHandler(
|
||||
}
|
||||
och.conversationRpcClient = conversationRpcClient
|
||||
och.groupRpcClient = groupRpcClient
|
||||
var err error
|
||||
|
||||
var tlsConfig *kafka.TLSConfig
|
||||
if config.Kafka.TLS != nil {
|
||||
tlsConfig = &kafka.TLSConfig{
|
||||
CACrt: config.Kafka.TLS.CACrt,
|
||||
ClientCrt: config.Kafka.TLS.ClientCrt,
|
||||
ClientKey: config.Kafka.TLS.ClientKey,
|
||||
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
|
||||
InsecureSkipVerify: false,
|
||||
}
|
||||
}
|
||||
|
||||
och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
|
||||
KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest,
|
||||
IsReturnErr: false,
|
||||
UserName: config.Kafka.Username,
|
||||
Password: config.Kafka.Password,
|
||||
}, []string{config.Kafka.LatestMsgToRedis.Topic},
|
||||
config.Kafka.Addr,
|
||||
config.Kafka.ConsumerGroupID.MsgToRedis,
|
||||
tlsConfig,
|
||||
)
|
||||
// statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d
|
||||
// second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
|
||||
och.historyConsumerGroup = historyConsumerGroup
|
||||
return &och, err
|
||||
}
|
||||
|
||||
@@ -265,22 +241,13 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(
|
||||
}
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(
|
||||
ctx context.Context,
|
||||
key, conversationID string,
|
||||
msgs []*sdkws.MsgData,
|
||||
) {
|
||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData) {
|
||||
for _, v := range msgs {
|
||||
och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v) // nolint: errcheck
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
|
||||
ctx context.Context,
|
||||
key, conversationID string,
|
||||
storageList, notStorageList []*sdkws.MsgData,
|
||||
) {
|
||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
|
||||
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
||||
if len(storageList) > 0 {
|
||||
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
||||
@@ -290,7 +257,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
|
||||
}
|
||||
if isNewConversation {
|
||||
switch storageList[0].SessionType {
|
||||
case constant.SuperGroupChatType:
|
||||
case constant.ReadGroupChatType:
|
||||
log.ZInfo(ctx, "group chat first create conversation", "conversationID",
|
||||
conversationID)
|
||||
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
|
||||
@@ -349,14 +316,8 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
||||
for i, header := range consumerMessages[i].Headers {
|
||||
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
|
||||
}
|
||||
log.ZInfo(
|
||||
ctx,
|
||||
"consumer.kafka.GetContextWithMQHeader",
|
||||
"len",
|
||||
len(consumerMessages[i].Headers),
|
||||
"header",
|
||||
strings.Join(arr, ", "),
|
||||
)
|
||||
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers),
|
||||
"header", strings.Join(arr, ", "))
|
||||
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
|
||||
ctxMsg.message = msgFromMQ
|
||||
log.ZDebug(
|
||||
@@ -381,7 +342,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
||||
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
|
||||
for uniqueKey, v := range aggregationMsgs {
|
||||
if len(v) >= 0 {
|
||||
hashCode := utils.GetHashCode(uniqueKey)
|
||||
hashCode := stringutil.GetHashCode(uniqueKey)
|
||||
channelID := hashCode % ChannelNum
|
||||
newCtx := withAggregationCtx(ctx, v)
|
||||
log.ZDebug(
|
||||
@@ -472,7 +433,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
|
||||
rwLock.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
|
||||
ctx := mcontext.WithTriggerIDContext(context.Background(), idutil.OperationIDGenerator())
|
||||
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
|
||||
for i := 0; i < len(buffer)/split; i++ {
|
||||
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
||||
|
||||
@@ -18,42 +18,22 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
pbmsg "github.com/OpenIMSDK/protocol/msg"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||
kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
pbmsg "github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/mq/kafka"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type OnlineHistoryMongoConsumerHandler struct {
|
||||
historyConsumerGroup *kfk.MConsumerGroup
|
||||
historyConsumerGroup *kafka.MConsumerGroup
|
||||
msgDatabase controller.CommonMsgDatabase
|
||||
}
|
||||
|
||||
func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
|
||||
var tlsConfig *kfk.TLSConfig
|
||||
if config.Kafka.TLS != nil {
|
||||
tlsConfig = &kfk.TLSConfig{
|
||||
CACrt: config.Kafka.TLS.CACrt,
|
||||
ClientCrt: config.Kafka.TLS.ClientCrt,
|
||||
ClientKey: config.Kafka.TLS.ClientKey,
|
||||
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
|
||||
InsecureSkipVerify: false,
|
||||
}
|
||||
}
|
||||
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
|
||||
KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest,
|
||||
IsReturnErr: false,
|
||||
UserName: config.Kafka.Username,
|
||||
Password: config.Kafka.Password,
|
||||
}, []string{config.Kafka.MsgToMongo.Topic},
|
||||
config.Kafka.Addr,
|
||||
config.Kafka.ConsumerGroupID.MsgToMongo,
|
||||
tlsConfig,
|
||||
)
|
||||
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
|
||||
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -65,12 +45,7 @@ func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database
|
||||
return mc, nil
|
||||
}
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(
|
||||
ctx context.Context,
|
||||
cMsg *sarama.ConsumerMessage,
|
||||
key string,
|
||||
session sarama.ConsumerGroupSession,
|
||||
) {
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, key string, session sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
msgFromMQ := pbmsg.MsgDataToMongoByMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
|
||||
Reference in New Issue
Block a user