mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-08 11:05:59 +08:00
refactor: Refactor rpc call && auto gen rpc_call code (#2969)
* refactor: rpcclient * chore: err * fix: err * fix: err * fix: err * feat: change api
This commit is contained in:
@@ -23,16 +23,17 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbconv "github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/group"
|
||||
"github.com/openimsdk/protocol/rpccall"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
@@ -69,14 +70,11 @@ type OnlineHistoryRedisConsumerHandler struct {
|
||||
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
|
||||
|
||||
msgTransferDatabase controller.MsgTransferDatabase
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
conversationUserHasReadChan chan *userHasReadSeq
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
|
||||
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
|
||||
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -103,8 +101,6 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
|
||||
}
|
||||
b.Do = och.do
|
||||
och.redisMessageBatches = b
|
||||
och.conversationRpcClient = conversationRpcClient
|
||||
och.groupRpcClient = groupRpcClient
|
||||
och.historyConsumerGroup = historyConsumerGroup
|
||||
|
||||
return &och, err
|
||||
@@ -285,22 +281,32 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
|
||||
case constant.ReadGroupChatType:
|
||||
log.ZDebug(ctx, "group chat first create conversation", "conversationID",
|
||||
conversationID)
|
||||
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID)
|
||||
|
||||
userIDs, err := rpccall.ExtractField(ctx, group.GetGroupMemberUserIDsCaller.Invoke,
|
||||
&group.GetGroupMemberUserIDsReq{
|
||||
GroupID: msg.GroupID,
|
||||
}, (*group.GetGroupMemberUserIDsResp).GetUserIDs)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "get group member ids error", err, "conversationID",
|
||||
conversationID)
|
||||
} else {
|
||||
log.ZInfo(ctx, "GetGroupMemberIDs end")
|
||||
|
||||
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx,
|
||||
msg.GroupID, userIDs); err != nil {
|
||||
if err := pbconv.CreateGroupChatConversationsCaller.Execute(ctx, &pbconv.CreateGroupChatConversationsReq{
|
||||
UserIDs: userIDs,
|
||||
GroupID: msg.GroupID,
|
||||
}); err != nil {
|
||||
log.ZWarn(ctx, "single chat first create conversation error", err,
|
||||
"conversationID", conversationID)
|
||||
}
|
||||
}
|
||||
case constant.SingleChatType, constant.NotificationChatType:
|
||||
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, msg.RecvID,
|
||||
msg.SendID, conversationID, msg.SessionType); err != nil {
|
||||
if err := pbconv.CreateSingleChatConversationsCaller.Execute(ctx, &pbconv.CreateSingleChatConversationsReq{
|
||||
RecvID: msg.RecvID,
|
||||
SendID: msg.SendID,
|
||||
ConversationID: conversationID,
|
||||
ConversationType: msg.SessionType,
|
||||
}); err != nil {
|
||||
log.ZWarn(ctx, "single chat or notification first create conversation error", err,
|
||||
"conversationID", conversationID, "sessionType", msg.SessionType)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user