fix: process add errors wrap. (#1862)

* fix: process add errors wrap.

* fix: process add errors wrap.
This commit is contained in:
OpenIM-Gordon
2024-02-02 10:11:13 +08:00
committed by GitHub
parent 55ca661d13
commit c55e03dc70
21 changed files with 143 additions and 111 deletions
+29 -20
View File
@@ -15,8 +15,10 @@
package msgtransfer
import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/errs"
"log"
"net/http"
"sync"
@@ -69,40 +71,47 @@ func StartTransfer(prometheusPort int) error {
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
if err != nil {
return err
}
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgTransfer := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient)
msgTransfer, err := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil {
return err
}
return msgTransfer.Start(prometheusPort)
}
func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *MsgTransfer {
return &MsgTransfer{
historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase),
func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
historyCH, err := NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient)
if err != nil {
return nil, err
}
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(msgDatabase)
if err != nil {
return nil, err
}
return &MsgTransfer{
historyCH: historyCH,
historyMongoCH: historyMongoCH,
}, nil
}
func (m *MsgTransfer) Start(prometheusPort int) error {
ctx := context.Background()
var wg sync.WaitGroup
wg.Add(1)
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if prometheusPort <= 0 {
return errors.New("prometheusPort not correct")
return errs.Wrap(errors.New("prometheusPort not correct"))
}
if config.Config.ChatPersistenceMysql {
// go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
} else {
fmt.Println("msg transfer not start mysql consumer")
}
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
// go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
/*err := prome.StartPrometheusSrv(prometheusPort)
if err != nil {
return err
}*/
////////////////////////////
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH)
if config.Config.Prometheus.Enable {
reg := prometheus.NewRegistry()
reg.MustRegister(
@@ -87,7 +87,7 @@ func NewOnlineHistoryRedisConsumerHandler(
database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient,
groupRpcClient *rpcclient.GroupRpcClient,
) *OnlineHistoryRedisConsumerHandler {
) (*OnlineHistoryRedisConsumerHandler, error) {
var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
@@ -98,14 +98,15 @@ func NewOnlineHistoryRedisConsumerHandler(
}
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
var err error
och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.LatestMsgToRedis.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
// statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d
// second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
return &och
return &och, err
}
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
@@ -34,16 +34,21 @@ type OnlineHistoryMongoConsumerHandler struct {
msgDatabase controller.CommonMsgDatabase
}
func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) *OnlineHistoryMongoConsumerHandler {
mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database,
func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
if err != nil {
return nil, err
}
return mc
mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: historyConsumerGroup,
msgDatabase: database,
}
return mc, nil
}
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(