merge code

This commit is contained in:
withchao
2024-12-27 17:34:56 +08:00
parent d3b2587743
commit 2676295a4c
42 changed files with 69 additions and 1342 deletions
-25
View File
@@ -23,11 +23,6 @@ import (
"os/signal"
"syscall"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/network"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
@@ -38,7 +33,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
@@ -87,20 +81,6 @@ func Start(ctx context.Context, index int, config *Config) error {
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
config.MsgTransfer.GetConfigFileName(),
config.RedisConfig.GetConfigFileName(),
config.MongodbConfig.GetConfigFileName(),
config.KafkaConfig.GetConfigFileName(),
config.Share.GetConfigFileName(),
config.WebhooksConfig.GetConfigFileName(),
config.Discovery.GetConfigFileName(),
conf.LogConfigFileName,
})
cm.Watch(ctx)
}
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
@@ -153,11 +133,6 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
if config.MsgTransfer.Prometheus.Enable {
go func() {
defer func() {
if r := recover(); r != nil {
log.ZPanic(m.ctx, "MsgTransfer Start Panic", errs.ErrPanic(r))
}
}()
prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index)
if err != nil {
netErr = err
@@ -29,10 +29,8 @@ import (
"github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher"
"github.com/openimsdk/protocol/constant"
pbconv "github.com/openimsdk/protocol/conversation"
@@ -72,8 +70,6 @@ type OnlineHistoryRedisConsumerHandler struct {
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
msgTransferDatabase controller.MsgTransferDatabase
conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient
conversationUserHasReadChan chan *userHasReadSeq
wg sync.WaitGroup
@@ -87,11 +83,11 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.
if err != nil {
return nil, err
}
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
groupConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Group)
if err != nil {
return nil, err
}
conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation)
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
if err != nil {
return nil, err
}
@@ -119,11 +115,9 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.
}
b.Do = och.do
och.redisMessageBatches = b
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = historyConsumerGroup
return &och, err
return &och, nil
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID())