feat: Optimizing RPC call (#2993)

* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* rpc client

* rpc client

* rpc client

* rpc client

* rpc client

* rpc client

* rpc client

* rpc client
This commit is contained in:
chao
2024-12-24 10:51:38 +08:00
committed by OpenIM-Robot
parent b26b0a422c
commit de42eb1f11
70 changed files with 1462 additions and 785 deletions
+11 -9
View File
@@ -17,6 +17,7 @@ package auth
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
@@ -28,7 +29,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
pbauth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway"
@@ -45,6 +45,7 @@ type authServer struct {
userRpcClient *rpcclient.UserRpcClient
RegisterCenter discovery.SvcDiscoveryRegistry
config *Config
userClient *rpcli.UserClient
}
type Config struct {
@@ -59,7 +60,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err
}
pbauth.RegisterAuthServer(server, &authServer{
userRpcClient: &userRpcClient,
RegisterCenter: client,
@@ -70,7 +74,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
config.Share.MultiLogin,
config.Share.IMAdminUserID,
),
config: config,
config: config,
userClient: rpcli.NewUserClient(userConn),
})
return nil
}
@@ -86,7 +91,7 @@ func (s *authServer) GetAdminToken(ctx context.Context, req *pbauth.GetAdminToke
}
if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil {
if err := s.userClient.CheckUser(ctx, []string{req.UserID}); err != nil {
return nil, err
}
@@ -115,7 +120,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
if authverify.IsManagerUserID(req.UserID, s.config.Share.IMAdminUserID) {
return nil, errs.ErrNoPermission.WrapMsg("don't get Admin token")
}
if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil {
if err := s.userClient.CheckUser(ctx, []string{req.UserID}); err != nil {
return nil, err
}
token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID))
@@ -156,10 +161,7 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim
return nil, servererrs.ErrTokenNotExist.Wrap()
}
func (s *authServer) ParseToken(
ctx context.Context,
req *pbauth.ParseTokenReq,
) (resp *pbauth.ParseTokenResp, err error) {
func (s *authServer) ParseToken(ctx context.Context, req *pbauth.ParseTokenReq) (resp *pbauth.ParseTokenResp, err error) {
resp = &pbauth.ParseTokenResp{}
claims, err := s.parseToken(ctx, req.Token)
if err != nil {
+39 -29
View File
@@ -16,8 +16,7 @@ package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sort"
"time"
@@ -27,12 +26,12 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/sdkws"
@@ -53,6 +52,10 @@ type conversationServer struct {
conversationNotificationSender *ConversationNotificationSender
config *Config
userClient *rpcli.UserClient
msgClient *rpcli.MsgClient
groupClient *rpcli.GroupClient
}
type Config struct {
@@ -78,17 +81,27 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err
}
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
if err != nil {
return err
}
msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg)
if err != nil {
return err
}
msgClient := rpcli.NewMsgClient(msgConn)
localcache.InitLocalCache(&config.LocalCacheConfig)
pbconversation.RegisterConversationServer(server, &conversationServer{
msgRpcClient: &msgRpcClient,
user: &userRpcClient,
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, &msgRpcClient),
groupRpcClient: &groupRpcClient,
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
conversationDatabase: controller.NewConversationDatabase(conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()),
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
})
return nil
}
@@ -125,13 +138,12 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
if len(conversations) == 0 {
return nil, errs.ErrRecordNotFound.Wrap()
}
maxSeqs, err := c.msgRpcClient.GetMaxSeqs(ctx, conversationIDs)
maxSeqs, err := c.msgClient.GetMaxSeqs(ctx, conversationIDs)
if err != nil {
return nil, err
}
chatLogs, err := c.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs)
chatLogs, err := c.msgClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs)
if err != nil {
return nil, err
}
@@ -141,7 +153,7 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
return nil, err
}
hasReadSeqs, err := c.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs)
hasReadSeqs, err := c.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.UserID)
if err != nil {
return nil, err
}
@@ -230,7 +242,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
if req.Conversation.ConversationType == constant.WriteGroupChatType {
groupInfo, err := c.groupRpcClient.GetGroupInfo(ctx, req.Conversation.GroupID)
groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID)
if err != nil {
return nil, err
}
@@ -434,14 +446,14 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
return nil, err
}
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
if _, err := c.msgRpcClient.Client.SetUserConversationMaxSeq(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: req.UserIDs, MaxSeq: 0}); err != nil {
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil {
return nil, err
}
return &pbconversation.CreateGroupChatConversationsResp{}, nil
}
func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) {
if _, err := c.msgRpcClient.Client.SetUserConversationMaxSeq(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MaxSeq: req.MaxSeq}); err != nil {
if err := c.msgClient.SetUserConversationMaxSeq(ctx, req.ConversationID, req.OwnerUserID, req.MaxSeq); err != nil {
return nil, err
}
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
@@ -455,7 +467,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc
}
func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) {
if _, err := c.msgRpcClient.Client.SetUserConversationMinSeq(ctx, &pbmsg.SetUserConversationMinSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MinSeq: req.MinSeq}); err != nil {
if err := c.msgClient.SetUserConversationMin(ctx, req.ConversationID, req.OwnerUserID, req.MinSeq); err != nil {
return nil, err
}
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
@@ -565,7 +577,7 @@ func (c *conversationServer) getConversationInfo(
}
}
if len(sendIDs) != 0 {
sendInfos, err := c.user.GetUsersInfo(ctx, sendIDs)
sendInfos, err := c.userClient.GetUsersInfo(ctx, sendIDs)
if err != nil {
return nil, err
}
@@ -574,7 +586,7 @@ func (c *conversationServer) getConversationInfo(
}
}
if len(groupIDs) != 0 {
groupInfos, err := c.groupRpcClient.GetGroupInfos(ctx, groupIDs, false)
groupInfos, err := c.groupClient.GetGroupsInfo(ctx, groupIDs)
if err != nil {
return nil, err
}
@@ -762,23 +774,22 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
continue
}
rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime}
resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq)
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-conversation.MsgDestructTime)
if err != nil {
return nil, err
}
if resp.Seq <= 0 {
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq)
if seq <= 0 {
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", seq)
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil {
return nil, err
}
continue
}
resp.Seq++
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil {
seq++
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, seq, latestMsgDestructTime); err != nil {
return nil, err
}
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime)
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", seq, "msgDestructTime", conversation.MsgDestructTime)
}
return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil
}
@@ -788,8 +799,7 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
"latest_msg_destruct_time": latestMsgDestructTime,
}
if minSeq >= 0 {
req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq}
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil {
if err := c.msgClient.SetUserConversationMin(ctx, conversationID, []string{ownerUserID}, minSeq); err != nil {
return err
}
update["min_seq"] = minSeq
+7 -3
View File
@@ -16,9 +16,11 @@ package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
)
@@ -27,8 +29,10 @@ type ConversationNotificationSender struct {
*rpcclient.NotificationSender
}
func NewConversationNotificationSender(conf *config.Notification, msgRpcClient *rpcclient.MessageRpcClient) *ConversationNotificationSender {
return &ConversationNotificationSender{rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(msgRpcClient))}
func NewConversationNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient) *ConversationNotificationSender {
return &ConversationNotificationSender{rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
return msgClient.SendMsg(ctx, req)
}))}
}
// SetPrivate invote.
+49 -64
View File
@@ -17,6 +17,7 @@ package group
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"math/big"
"math/rand"
"strconv"
@@ -36,9 +37,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/grouphash"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/open-im-server/v3/pkg/notification/grouphash"
"github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation"
pbgroup "github.com/openimsdk/protocol/group"
@@ -58,13 +57,13 @@ import (
type groupServer struct {
pbgroup.UnimplementedGroupServer
db controller.GroupDatabase
user rpcclient.UserRpcClient
notification *GroupNotificationSender
conversationRpcClient rpcclient.ConversationRpcClient
msgRpcClient rpcclient.MessageRpcClient
config *Config
webhookClient *webhook.Client
db controller.GroupDatabase
notification *NotificationSender
config *Config
webhookClient *webhook.Client
userClient *rpcli.UserClient
msgClient *rpcli.MsgClient
conversationClient *rpcli.ConversationClient
}
type Config struct {
@@ -99,32 +98,33 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
var gs groupServer
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.user = userRpcClient
gs.notification = NewGroupNotificationSender(
database,
&msgRpcClient,
&userRpcClient,
&conversationRpcClient,
config,
func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
},
)
//userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
//msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
//conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err
}
msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation)
if err != nil {
return err
}
gs := groupServer{
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
userClient: rpcli.NewUserClient(userConn),
msgClient: rpcli.NewMsgClient(msgConn),
conversationClient: rpcli.NewConversationClient(conversationConn),
}
gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.notification = NewNotificationSender(gs.db, config, gs.userClient, gs.msgClient, gs.conversationClient)
localcache.InitLocalCache(&config.LocalCacheConfig)
gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient
gs.config = config
gs.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
pbgroup.RegisterGroupServer(server, &gs)
return nil
}
@@ -168,19 +168,6 @@ func (g *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error
return nil
}
func (g *groupServer) GetPublicUserInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.PublicUserInfo, error) {
if len(userIDs) == 0 {
return map[string]*sdkws.PublicUserInfo{}, nil
}
users, err := g.user.GetPublicUserInfos(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, *sdkws.PublicUserInfo) {
return e.UserID, e
}), nil
}
func (g *groupServer) IsNotFound(err error) bool {
return errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err)))
}
@@ -222,7 +209,6 @@ func (g *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
return nil, errs.ErrArgs.WrapMsg("no group owner")
}
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, g.config.Share.IMAdminUserID); err != nil {
return nil, err
}
userIDs := append(append(req.MemberUserIDs, req.AdminUserIDs...), req.OwnerUserID)
@@ -235,7 +221,7 @@ func (g *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
return nil, errs.ErrArgs.WrapMsg("group member repeated")
}
userMap, err := g.user.GetUsersInfoMap(ctx, userIDs)
userMap, err := g.userClient.GetUsersInfoMap(ctx, userIDs)
if err != nil {
return nil, err
}
@@ -386,7 +372,7 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
return nil, servererrs.ErrDismissedAlready.WrapMsg("group dismissed checking group status found it dismissed")
}
userMap, err := g.user.GetUsersInfoMap(ctx, req.InvitedUserIDs)
userMap, err := g.userClient.GetUsersInfoMap(ctx, req.InvitedUserIDs)
if err != nil {
return nil, err
}
@@ -697,7 +683,7 @@ func (g *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup.
userIDs = append(userIDs, gr.UserID)
}
userIDs = datautil.Distinct(userIDs)
userMap, err := g.user.GetPublicUserInfoMap(ctx, userIDs)
userMap, err := g.userClient.GetUsersInfoMap(ctx, userIDs)
if err != nil {
return nil, err
}
@@ -809,7 +795,7 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
} else if !g.IsNotFound(err) {
return nil, err
}
if _, err := g.user.GetPublicUserInfo(ctx, req.FromUserID); err != nil {
if err := g.userClient.CheckUser(ctx, []string{req.FromUserID}); err != nil {
return nil, err
}
var member *model.GroupMember
@@ -853,7 +839,7 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
}
func (g *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) (*pbgroup.JoinGroupResp, error) {
user, err := g.user.GetUserInfo(ctx, req.InviterUserID)
user, err := g.userClient.GetUserInfo(ctx, req.InviterUserID)
if err != nil {
return nil, err
}
@@ -959,12 +945,12 @@ func (g *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq)
}
func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conevrsationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID)
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := g.msgClient.GetConversationMaxSeq(ctx, conversationID)
if err != nil {
return err
}
return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq)
return g.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, maxSeq)
}
func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInfoReq) (*pbgroup.SetGroupInfoResp, error) {
@@ -1040,7 +1026,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
return
}
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation)
}
}()
@@ -1153,8 +1139,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI
}
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation)
}
}()
@@ -1306,7 +1291,7 @@ func (g *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbgroup.GetGr
}
func (g *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgroup.GetUserReqApplicationListReq) (*pbgroup.GetUserReqApplicationListResp, error) {
user, err := g.user.GetPublicUserInfo(ctx, req.UserID)
user, err := g.userClient.GetUserInfo(ctx, req.UserID)
if err != nil {
return nil, err
}
@@ -1762,7 +1747,7 @@ func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
}
userMap, err := g.user.GetPublicUserInfoMap(ctx, req.UserIDs)
userMap, err := g.userClient.GetUsersInfoMap(ctx, req.UserIDs)
if err != nil {
return nil, err
}
@@ -1793,7 +1778,7 @@ func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
ownerUserID = owner.UserID
}
var userInfo *sdkws.PublicUserInfo
var userInfo *sdkws.UserInfo
if user, ok := userMap[e.UserID]; !ok {
userInfo = user
}
@@ -1839,7 +1824,7 @@ func (g *groupServer) GetSpecifiedUserGroupRequestInfo(ctx context.Context, req
return nil, err
}
userInfos, err := g.user.GetPublicUserInfos(ctx, []string{req.UserID})
userInfos, err := g.userClient.GetUsersInfo(ctx, []string{req.UserID})
if err != nil {
return nil, err
}
+68 -74
View File
@@ -18,6 +18,9 @@ import (
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
@@ -26,8 +29,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/open-im-server/v3/pkg/notification/common_user"
"github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/msg"
@@ -47,36 +50,38 @@ const (
adminReceiver
)
func NewGroupNotificationSender(
db controller.GroupDatabase,
msgRpcClient *rpcclient.MessageRpcClient,
userRpcClient *rpcclient.UserRpcClient,
conversationRpcClient *rpcclient.ConversationRpcClient,
config *Config,
fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error),
) *GroupNotificationSender {
return &GroupNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
getUsersInfo: fn,
func NewNotificationSender(db controller.GroupDatabase, config *Config, userClient *rpcli.UserClient, msgClient *rpcli.MsgClient, conversationClient *rpcli.ConversationClient) *NotificationSender {
return &NotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig,
rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
return msgClient.SendMsg(ctx, req)
}),
rpcclient.WithUserRpcClient(userClient.GetUserInfo),
),
getUsersInfo: func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) {
users, err := userClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.Slice(users, func(e *sdkws.UserInfo) common_user.CommonUser { return e }), nil
},
db: db,
config: config,
conversationRpcClient: conversationRpcClient,
msgRpcClient: msgRpcClient,
msgClient: msgClient,
conversationClient: conversationClient,
}
}
type GroupNotificationSender struct {
type NotificationSender struct {
*rpcclient.NotificationSender
getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)
db controller.GroupDatabase
config *Config
conversationRpcClient *rpcclient.ConversationRpcClient
msgRpcClient *rpcclient.MessageRpcClient
getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error)
db controller.GroupDatabase
config *Config
msgClient *rpcli.MsgClient
conversationClient *rpcli.ConversationClient
}
func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error {
func (g *NotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error {
if len(members) == 0 {
return nil
}
@@ -91,7 +96,7 @@ func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, membe
if err != nil {
return err
}
userMap := make(map[string]notification.CommonUser)
userMap := make(map[string]common_user.CommonUser)
for i, user := range users {
userMap[user.GetUserID()] = users[i]
}
@@ -111,7 +116,7 @@ func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, membe
return nil
}
func (g *GroupNotificationSender) getUser(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) {
func (g *NotificationSender) getUser(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) {
users, err := g.getUsersInfo(ctx, []string{userID})
if err != nil {
return nil, err
@@ -127,7 +132,7 @@ func (g *GroupNotificationSender) getUser(ctx context.Context, userID string) (*
}, nil
}
func (g *GroupNotificationSender) getGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) {
func (g *NotificationSender) getGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) {
gm, err := g.db.TakeGroup(ctx, groupID)
if err != nil {
return nil, err
@@ -148,7 +153,7 @@ func (g *GroupNotificationSender) getGroupInfo(ctx context.Context, groupID stri
return convert.Db2PbGroupInfo(gm, ownerUserID, num), nil
}
func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) {
func (g *NotificationSender) getGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) {
members, err := g.db.FindGroupMembers(ctx, groupID, userIDs)
if err != nil {
return nil, err
@@ -164,7 +169,7 @@ func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID s
return res, nil
}
func (g *GroupNotificationSender) getGroupMemberMap(ctx context.Context, groupID string, userIDs []string) (map[string]*sdkws.GroupMemberFullInfo, error) {
func (g *NotificationSender) getGroupMemberMap(ctx context.Context, groupID string, userIDs []string) (map[string]*sdkws.GroupMemberFullInfo, error) {
members, err := g.getGroupMembers(ctx, groupID, userIDs)
if err != nil {
return nil, err
@@ -176,7 +181,7 @@ func (g *GroupNotificationSender) getGroupMemberMap(ctx context.Context, groupID
return m, nil
}
func (g *GroupNotificationSender) getGroupMember(ctx context.Context, groupID string, userID string) (*sdkws.GroupMemberFullInfo, error) {
func (g *NotificationSender) getGroupMember(ctx context.Context, groupID string, userID string) (*sdkws.GroupMemberFullInfo, error) {
members, err := g.getGroupMembers(ctx, groupID, []string{userID})
if err != nil {
return nil, err
@@ -187,7 +192,7 @@ func (g *GroupNotificationSender) getGroupMember(ctx context.Context, groupID st
return members[0], nil
}
func (g *GroupNotificationSender) getGroupOwnerAndAdminUserID(ctx context.Context, groupID string) ([]string, error) {
func (g *NotificationSender) getGroupOwnerAndAdminUserID(ctx context.Context, groupID string) ([]string, error) {
members, err := g.db.FindGroupMemberRoleLevels(ctx, groupID, []int32{constant.GroupOwner, constant.GroupAdmin})
if err != nil {
return nil, err
@@ -199,7 +204,7 @@ func (g *GroupNotificationSender) getGroupOwnerAndAdminUserID(ctx context.Contex
return datautil.Slice(members, fn), nil
}
func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, appMangerLevel int32) *sdkws.GroupMemberFullInfo {
func (g *NotificationSender) groupMemberDB2PB(member *model.GroupMember, appMangerLevel int32) *sdkws.GroupMemberFullInfo {
return &sdkws.GroupMemberFullInfo{
GroupID: member.GroupID,
UserID: member.UserID,
@@ -216,7 +221,7 @@ func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, ap
}
}
/* func (g *GroupNotificationSender) getUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) {
/* func (g *NotificationSender) getUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) {
users, err := g.getUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
@@ -228,11 +233,11 @@ func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, ap
return result, nil
} */
func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) {
func (g *NotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) {
return g.fillOpUserByUserID(ctx, mcontext.GetOpUserID(ctx), opUser, groupID)
}
func (g *GroupNotificationSender) fillOpUserByUserID(ctx context.Context, userID string, opUser **sdkws.GroupMemberFullInfo, groupID string) error {
func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID string, opUser **sdkws.GroupMemberFullInfo, groupID string) error {
if opUser == nil {
return errs.ErrInternalServer.WrapMsg("**sdkws.GroupMemberFullInfo is nil")
}
@@ -276,7 +281,7 @@ func (g *GroupNotificationSender) fillOpUserByUserID(ctx context.Context, userID
return nil
}
func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) {
func (g *NotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) {
versions := versionctx.GetVersionLog(ctx).Get()
for _, coll := range versions {
if coll.Name == collName && coll.Doc.DID == id {
@@ -287,7 +292,7 @@ func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint6
}
}
func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) {
func (g *NotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) {
versions := versionctx.GetVersionLog(ctx).Get()
for _, coll := range versions {
if coll.Name == collName && coll.Doc.DID == id {
@@ -302,7 +307,7 @@ func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *u
}
}
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
func (g *NotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
var err error
defer func() {
if err != nil {
@@ -316,7 +321,7 @@ func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context,
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips)
}
func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) {
func (g *NotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) {
var err error
defer func() {
if err != nil {
@@ -330,7 +335,7 @@ func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context,
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName())
}
func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) {
func (g *NotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) {
var err error
defer func() {
if err != nil {
@@ -344,7 +349,7 @@ func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Conte
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips)
}
func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) {
func (g *NotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) {
var err error
defer func() {
if err != nil {
@@ -358,7 +363,7 @@ func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx conte
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName())
}
func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) {
func (g *NotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) {
var err error
defer func() {
if err != nil {
@@ -386,7 +391,7 @@ func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.C
}
}
func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, member *sdkws.GroupMemberFullInfo) {
func (g *NotificationSender) MemberQuitNotification(ctx context.Context, member *sdkws.GroupMemberFullInfo) {
var err error
defer func() {
if err != nil {
@@ -403,7 +408,7 @@ func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, me
g.Notification(ctx, mcontext.GetOpUserID(ctx), member.GroupID, constant.MemberQuitNotification, tips)
}
func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) {
func (g *NotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) {
var err error
defer func() {
if err != nil {
@@ -436,7 +441,7 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte
}
}
func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) {
func (g *NotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) {
var err error
defer func() {
if err != nil {
@@ -469,7 +474,7 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte
}
}
func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) {
func (g *NotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) {
var err error
defer func() {
if err != nil {
@@ -500,7 +505,7 @@ func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips)
}
func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) {
func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) {
var err error
defer func() {
if err != nil {
@@ -514,7 +519,7 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context,
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
}
func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, invitedOpUserID string, entrantUserID ...string) error {
func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, invitedOpUserID string, entrantUserID ...string) error {
var err error
defer func() {
if err != nil {
@@ -524,20 +529,15 @@ func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(c
if !g.config.RpcConfig.EnableHistoryForNewMembers {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
maxSeq, err := g.msgClient.GetConversationMaxSeq(ctx, conversationID)
if err != nil {
return err
}
if _, err = g.msgRpcClient.SetUserConversationsMinSeq(ctx, &msg.SetUserConversationsMinSeqReq{
UserIDs: entrantUserID,
ConversationID: conversationID,
Seq: maxSeq,
}); err != nil {
if err := g.msgClient.SetUserConversationsMinSeq(ctx, conversationID, entrantUserID, maxSeq); err != nil {
return err
}
}
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil {
if err := g.conversationClient.CreateGroupChatConversations(ctx, groupID, entrantUserID); err != nil {
return err
}
@@ -573,7 +573,7 @@ func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(c
return nil
}
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) error {
func (g *NotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) error {
var err error
defer func() {
if err != nil {
@@ -583,23 +583,17 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g
if !g.config.RpcConfig.EnableHistoryForNewMembers {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
maxSeq, err := g.msgClient.GetConversationMaxSeq(ctx, conversationID)
if err != nil {
return err
}
if _, err = g.msgRpcClient.SetUserConversationsMinSeq(ctx, &msg.SetUserConversationsMinSeqReq{
UserIDs: []string{entrantUserID},
ConversationID: conversationID,
Seq: maxSeq,
}); err != nil {
if err := g.msgClient.SetUserConversationsMinSeq(ctx, conversationID, []string{entrantUserID}, maxSeq); err != nil {
return err
}
}
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, []string{entrantUserID}); err != nil {
if err := g.conversationClient.CreateGroupChatConversations(ctx, groupID, []string{entrantUserID}); err != nil {
return err
}
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
@@ -620,7 +614,7 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g
return nil
}
func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) {
func (g *NotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) {
var err error
defer func() {
if err != nil {
@@ -633,7 +627,7 @@ func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupDismissedNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) {
func (g *NotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) {
var err error
defer func() {
if err != nil {
@@ -661,7 +655,7 @@ func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Conte
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberMutedNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) {
func (g *NotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
if err != nil {
@@ -686,7 +680,7 @@ func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberCancelMutedNotification, tips)
}
func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, groupID string) {
func (g *NotificationSender) GroupMutedNotification(ctx context.Context, groupID string) {
var err error
defer func() {
if err != nil {
@@ -714,7 +708,7 @@ func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, gr
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips)
}
func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) {
func (g *NotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) {
var err error
defer func() {
if err != nil {
@@ -742,7 +736,7 @@ func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Conte
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) {
func (g *NotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
if err != nil {
@@ -767,7 +761,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) {
func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
if err != nil {
@@ -791,7 +785,7 @@ func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) {
func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
if err != nil {
+2 -73
View File
@@ -2,23 +2,13 @@ package msg
import (
"context"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup"
"strings"
)
// hard delete in Database.
// DestructMsgs hard delete in Database.
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
@@ -61,67 +51,6 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil
}
// soft delete for user self
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
temp := convert.ConversationsPb2DB(req.Conversations)
batchNum := 100
errg, _ := errgroup.WithContext(ctx)
errg.SetLimit(100)
for i := 0; i < len(temp); i += batchNum {
batch := temp[i:min(i+batchNum, len(temp))]
errg.Go(func() error {
for _, conversation := range batch {
handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(handleCtx, "User MsgsDestruct",
"conversationID", conversation.ConversationID,
"ownerUserID", conversation.OwnerUserID,
"msgDestructTime", conversation.MsgDestructTime,
"lastMsgDestructTime", conversation.LatestMsgDestructTime)
seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if len(seqs) > 0 {
minseq := datautil.Max(seqs...)
// update
if err := m.Conversation.UpdateConversation(handleCtx,
&pbconversation.UpdateConversationReq{
UserIDs: []string{conversation.OwnerUserID},
ConversationID: conversation.ConversationID,
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
MinSeq: wrapperspb.Int64(minseq),
}); err != nil {
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil {
return err
}
// if you need Notify SDK client userseq is update.
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
return nil
})
}
if err := errg.Wait(); err != nil {
return nil, err
}
return nil, nil
}
func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) {
seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time)
if err != nil {
+5 -11
View File
@@ -74,19 +74,13 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms
if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil {
return nil, err
}
conversations, err := m.Conversation.GetConversationsByConversationID(ctx, []string{req.ConversationID})
conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID)
if err != nil {
return nil, err
}
tips := &sdkws.DeleteMsgsTips{UserID: req.UserID, ConversationID: req.ConversationID, Seqs: req.Seqs}
m.notificationSender.NotificationWithSessionType(
ctx,
req.UserID,
m.conversationAndGetRecvID(conversations[0], req.UserID),
constant.DeleteMsgsNotification,
conversations[0].ConversationType,
tips,
)
m.notificationSender.NotificationWithSessionType(ctx, req.UserID, m.conversationAndGetRecvID(conv, req.UserID),
constant.DeleteMsgsNotification, conv.ConversationType, tips)
} else {
if err := m.MsgDatabase.DeleteUserMsgsBySeqs(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil {
return nil, err
@@ -121,7 +115,7 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
}
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
conversations, err := m.Conversation.GetConversationsByConversationID(ctx, conversationIDs)
conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs)
if err != nil {
return err
}
@@ -144,7 +138,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str
}
ownerUserIDs := []string{userID}
for conversationID, seq := range setSeqs {
if err := m.Conversation.SetConversationMinSeq(ctx, ownerUserIDs, conversationID, seq); err != nil {
if err := m.conversationClient.SetConversationMinSeq(ctx, conversationID, ownerUserIDs, seq); err != nil {
return err
}
}
+1 -1
View File
@@ -17,7 +17,7 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
)
+3 -9
View File
@@ -113,19 +113,14 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
} else { // @Everyone and @other people
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe}
err = m.Conversation.SetConversations(ctx, atUserID, conversation)
if err != nil {
if err := m.conversationClient.SetConversations(ctx, atUserID, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation)
}
memberUserIDList = datautil.Single(atUserID, memberUserIDList)
}
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
err = m.Conversation.SetConversations(ctx, memberUserIDList, conversation)
if err != nil {
if err := m.conversationClient.SetConversations(ctx, memberUserIDList, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation)
}
@@ -133,8 +128,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
}
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe}
err := m.Conversation.SetConversations(ctx, msg.AtUserIDList, conversation)
if err != nil {
if err := m.conversationClient.SetConversations(ctx, msg.AtUserIDList, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation)
}
}
+55 -35
View File
@@ -16,6 +16,8 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
@@ -25,8 +27,8 @@ import (
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
@@ -35,39 +37,39 @@ import (
)
type MessageInterceptorFunc func(ctx context.Context, globalConfig *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error)
type (
// MessageInterceptorChain defines a chain of message interceptor functions.
MessageInterceptorChain []MessageInterceptorFunc
// MsgServer encapsulates dependencies required for message handling.
msgServer struct {
RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration.
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
Conversation *rpcclient.ConversationRpcClient // RPC client for conversation service.
UserLocalCache *rpccache.UserLocalCache // Local cache for user data.
FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data.
GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data.
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
config *Config // Global configuration settings.
webhookClient *webhook.Client
msg.UnimplementedMsgServer
}
// MessageInterceptorChain defines a chain of message interceptor functions.
type MessageInterceptorChain []MessageInterceptorFunc
Config struct {
RpcConfig config.Msg
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
)
type Config struct {
RpcConfig config.Msg
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
// MsgServer encapsulates dependencies required for message handling.
type msgServer struct {
msg.UnimplementedMsgServer
RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration.
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
StreamMsgDatabase controller.StreamMsgDatabase
UserLocalCache *rpccache.UserLocalCache // Local cache for user data.
FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data.
GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data.
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
config *Config // Global configuration settings.
webhookClient *webhook.Client
conversationClient *rpcli.ConversationClient
}
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
m.Handlers = append(m.Handlers, interceptorFunc...)
@@ -106,16 +108,34 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err
}
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
if err != nil {
return err
}
friendConn, err := client.GetConn(ctx, config.Discovery.RpcService.Friend)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation)
if err != nil {
return err
}
conversationClient := rpcli.NewConversationClient(conversationConn)
s := &msgServer{
Conversation: &conversationClient,
MsgDatabase: msgDatabase,
RegisterCenter: client,
UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, &config.LocalCacheConfig, rdb),
GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, &config.LocalCacheConfig, rdb),
UserLocalCache: rpccache.NewUserLocalCache(rpcli.NewUserClient(userConn), &config.LocalCacheConfig, rdb),
GroupLocalCache: rpccache.NewGroupLocalCache(rpcli.NewGroupClient(groupConn), &config.LocalCacheConfig, rdb),
ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, &config.LocalCacheConfig, rdb),
FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, &config.LocalCacheConfig, rdb),
FriendLocalCache: rpccache.NewFriendLocalCache(rpcli.NewRelationClient(friendConn), &config.LocalCacheConfig, rdb),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
conversationClient: conversationClient,
}
s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
+115
View File
@@ -0,0 +1,115 @@
package msg
import (
"context"
"fmt"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
)
const StreamDeadlineTime = time.Second * 60 * 10
func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error {
now := time.Now()
val := &model.StreamMsg{
ClientMsgID: msgData.ClientMsgID,
ConversationID: msgprocessor.GetConversationIDByMsg(msgData),
UserID: msgData.SendID,
CreateTime: now,
DeadlineTime: now.Add(StreamDeadlineTime),
}
return m.StreamMsgDatabase.CreateStreamMsg(ctx, val)
}
func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID)
if err != nil {
return nil, err
}
now := time.Now()
if !res.End && res.DeadlineTime.Before(now) {
res.End = true
res.DeadlineTime = now
_ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now)
}
return res, nil
}
func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) {
res, err := m.getStreamMsg(ctx, req.ClientMsgID)
if err != nil {
return nil, err
}
if res.End {
return nil, errs.ErrNoPermission.WrapMsg("stream msg is end")
}
if len(res.Packets) < int(req.StartIndex) {
return nil, errs.ErrNoPermission.WrapMsg("start index is invalid")
}
if val := len(res.Packets) - int(req.StartIndex); val > 0 {
exist := res.Packets[int(req.StartIndex):]
for i, s := range exist {
if len(req.Packets) == 0 {
break
}
if s != req.Packets[i] {
return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i))
}
req.StartIndex++
req.Packets = req.Packets[1:]
}
}
if len(req.Packets) == 0 && res.End == req.End {
return &msg.AppendStreamMsgResp{}, nil
}
deadlineTime := time.Now().Add(StreamDeadlineTime)
if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil {
return nil, err
}
conversation, err := m.conversationClient.GetConversation(ctx, res.ConversationID, res.UserID)
if err != nil {
return nil, err
}
tips := &sdkws.StreamMsgTips{
ConversationID: res.ConversationID,
ClientMsgID: res.ClientMsgID,
StartIndex: req.StartIndex,
Packets: req.Packets,
End: req.End,
}
var (
recvID string
sessionType int32
)
if conversation.GroupID == "" {
sessionType = constant.SingleChatType
recvID = conversation.UserID
} else {
sessionType = constant.ReadGroupChatType
recvID = conversation.GroupID
}
m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips)
return &msg.AppendStreamMsgResp{}, nil
}
func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) {
res, err := m.getStreamMsg(ctx, req.ClientMsgID)
if err != nil {
return nil, err
}
return &msg.GetStreamMsgResp{
ClientMsgID: res.ClientMsgID,
ConversationID: res.ConversationID,
UserID: res.UserID,
Packets: res.Packets,
End: res.End,
CreateTime: res.CreateTime.UnixMilli(),
DeadlineTime: res.DeadlineTime.UnixMilli(),
}, nil
}
+18 -6
View File
@@ -22,6 +22,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
@@ -39,7 +40,7 @@ func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *relation.Ge
return nil, err
}
resp = &relation.GetPaginationBlacksResp{}
resp.Blacks, err = convert.BlackDB2Pb(ctx, blacks, s.userRpcClient.GetUsersInfoMap)
resp.Blacks, err = convert.BlackDB2Pb(ctx, blacks, s.userClient.GetUsersInfoMap)
if err != nil {
return nil, err
}
@@ -81,9 +82,7 @@ func (s *friendServer) AddBlack(ctx context.Context, req *relation.AddBlackReq)
if err := s.webhookBeforeAddBlack(ctx, &s.config.WebhooksConfig.BeforeAddBlack, req); err != nil {
return nil, err
}
_, err := s.userRpcClient.GetUsersInfo(ctx, []string{req.OwnerUserID, req.BlackUserID})
if err != nil {
if err := s.userClient.CheckUser(ctx, []string{req.OwnerUserID, req.BlackUserID}); err != nil {
return nil, err
}
black := model.Black{
@@ -114,7 +113,7 @@ func (s *friendServer) GetSpecifiedBlacks(ctx context.Context, req *relation.Get
return nil, errs.ErrArgs.WrapMsg("userIDList repeated")
}
userMap, err := s.userRpcClient.GetPublicUserInfoMap(ctx, req.UserIDList)
userMap, err := s.userClient.GetUsersInfoMap(ctx, req.UserIDList)
if err != nil {
return nil, err
}
@@ -132,13 +131,26 @@ func (s *friendServer) GetSpecifiedBlacks(ctx context.Context, req *relation.Get
Blacks: make([]*sdkws.BlackInfo, 0, len(req.UserIDList)),
}
toPublcUser := func(userID string) *sdkws.PublicUserInfo {
v, ok := userMap[userID]
if !ok {
return nil
}
return &sdkws.PublicUserInfo{
UserID: v.UserID,
Nickname: v.Nickname,
FaceURL: v.FaceURL,
Ex: v.Ex,
}
}
for _, userID := range req.UserIDList {
if black := blackMap[userID]; black != nil {
resp.Blacks = append(resp.Blacks,
&sdkws.BlackInfo{
OwnerUserID: black.OwnerUserID,
CreateTime: black.CreateTime.UnixMilli(),
BlackUserInfo: userMap[userID],
BlackUserInfo: toPublcUser(userID),
AddSource: black.AddSource,
OperatorUserID: black.OperatorUserID,
Ex: black.Ex,
+35 -45
View File
@@ -16,6 +16,7 @@ package relation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/mq/memamq"
@@ -31,7 +32,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws"
@@ -44,15 +44,14 @@ import (
type friendServer struct {
relation.UnimplementedFriendServer
db controller.FriendDatabase
blackDatabase controller.BlackDatabase
userRpcClient *rpcclient.UserRpcClient
notificationSender *FriendNotificationSender
conversationRpcClient rpcclient.ConversationRpcClient
RegisterCenter discovery.SvcDiscoveryRegistry
config *Config
webhookClient *webhook.Client
queue *memamq.MemoryQueue
db controller.FriendDatabase
blackDatabase controller.BlackDatabase
notificationSender *FriendNotificationSender
RegisterCenter discovery.SvcDiscoveryRegistry
config *Config
webhookClient *webhook.Client
queue *memamq.MemoryQueue
userClient *rpcli.UserClient
}
type Config struct {
@@ -92,15 +91,21 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
return err
}
// Initialize RPC clients
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err
}
msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg)
if err != nil {
return err
}
userClient := rpcli.NewUserClient(userConn)
// Initialize notification sender
notificationSender := NewFriendNotificationSender(
&config.NotificationConfig,
&msgRpcClient,
WithRpcFunc(userRpcClient.GetUsersInfo),
rpcli.NewMsgClient(msgConn),
WithRpcFunc(userClient.GetUsersInfo),
)
localcache.InitLocalCache(&config.LocalCacheConfig)
@@ -116,13 +121,12 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
blackMongoDB,
redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, redis.GetRocksCacheOptions()),
),
userRpcClient: &userRpcClient,
notificationSender: notificationSender,
RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
queue: memamq.NewMemoryQueue(16, 1024*1024),
notificationSender: notificationSender,
RegisterCenter: client,
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
queue: memamq.NewMemoryQueue(16, 1024*1024),
userClient: userClient,
})
return nil
}
@@ -139,7 +143,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *relation.Apply
if err = s.webhookBeforeAddFriend(ctx, &s.config.WebhooksConfig.BeforeAddFriend, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
if _, err := s.userRpcClient.GetUsersInfoMap(ctx, []string{req.ToUserID, req.FromUserID}); err != nil {
if err := s.userClient.CheckUser(ctx, []string{req.ToUserID, req.FromUserID}); err != nil {
return nil, err
}
@@ -163,7 +167,8 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr
if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if _, err := s.userRpcClient.GetUsersInfo(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil {
if err := s.userClient.CheckUser(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil {
return nil, err
}
if datautil.Contain(req.OwnerUserID, req.FriendUserIDs...) {
@@ -304,7 +309,7 @@ func (s *friendServer) getFriend(ctx context.Context, ownerUserID string, friend
if err != nil {
return nil, err
}
return convert.FriendsDB2Pb(ctx, friends, s.userRpcClient.GetUsersInfoMap)
return convert.FriendsDB2Pb(ctx, friends, s.userClient.GetUsersInfoMap)
}
// Get the list of friend requests sent out proactively.
@@ -316,7 +321,7 @@ func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context,
return nil, err
}
resp = &relation.GetDesignatedFriendsApplyResp{}
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap)
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap)
if err != nil {
return nil, err
}
@@ -335,7 +340,7 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel
}
resp = &relation.GetPaginationFriendsApplyToResp{}
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap)
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap)
if err != nil {
return nil, err
}
@@ -357,7 +362,7 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *r
return nil, err
}
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap)
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap)
if err != nil {
return nil, err
}
@@ -388,7 +393,7 @@ func (s *friendServer) GetPaginationFriends(ctx context.Context, req *relation.G
}
resp = &relation.GetPaginationFriendsResp{}
resp.FriendsInfo, err = convert.FriendsDB2Pb(ctx, friends, s.userRpcClient.GetUsersInfoMap)
resp.FriendsInfo, err = convert.FriendsDB2Pb(ctx, friends, s.userClient.GetUsersInfoMap)
if err != nil {
return nil, err
}
@@ -421,7 +426,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio
return nil, errs.ErrArgs.WrapMsg("userIDList repeated")
}
userMap, err := s.userRpcClient.GetUsersInfoMap(ctx, req.UserIDList)
userMap, err := s.userClient.GetUsersInfoMap(ctx, req.UserIDList)
if err != nil {
return nil, err
}
@@ -525,18 +530,3 @@ func (s *friendServer) UpdateFriends(
s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs)
return resp, nil
}
func (s *friendServer) GetIncrementalFriendsApplyTo(ctx context.Context, req *relation.GetIncrementalFriendsApplyToReq) (*relation.GetIncrementalFriendsApplyToResp, error) {
// TODO implement me
return nil, nil
}
func (s *friendServer) GetIncrementalFriendsApplyFrom(ctx context.Context, req *relation.GetIncrementalFriendsApplyFromReq) (*relation.GetIncrementalFriendsApplyFromResp, error) {
// TODO implement me
return nil, nil
}
func (s *friendServer) GetIncrementalBlacks(ctx context.Context, req *relation.GetIncrementalBlacksReq) (*relation.GetIncrementalBlacksResp, error) {
// TODO implement me
return nil, nil
}
+12 -11
View File
@@ -16,6 +16,9 @@ package relation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
@@ -24,8 +27,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/open-im-server/v3/pkg/notification/common_user"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws"
@@ -35,7 +38,7 @@ import (
type FriendNotificationSender struct {
*rpcclient.NotificationSender
// Target not found err
getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)
getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error)
// db controller
db controller.FriendDatabase
}
@@ -52,7 +55,7 @@ func WithDBFunc(
fn func(ctx context.Context, userIDs []string) (users []*relationtb.User, err error),
) friendNotificationSenderOptions {
return func(s *FriendNotificationSender) {
f := func(ctx context.Context, userIDs []string) (result []notification.CommonUser, err error) {
f := func(ctx context.Context, userIDs []string) (result []common_user.CommonUser, err error) {
users, err := fn(ctx, userIDs)
if err != nil {
return nil, err
@@ -70,7 +73,7 @@ func WithRpcFunc(
fn func(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error),
) friendNotificationSenderOptions {
return func(s *FriendNotificationSender) {
f := func(ctx context.Context, userIDs []string) (result []notification.CommonUser, err error) {
f := func(ctx context.Context, userIDs []string) (result []common_user.CommonUser, err error) {
users, err := fn(ctx, userIDs)
if err != nil {
return nil, err
@@ -84,13 +87,11 @@ func WithRpcFunc(
}
}
func NewFriendNotificationSender(
conf *config.Notification,
msgRpcClient *rpcclient.MessageRpcClient,
opts ...friendNotificationSenderOptions,
) *FriendNotificationSender {
func NewFriendNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient, opts ...friendNotificationSenderOptions) *FriendNotificationSender {
f := &FriendNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(msgRpcClient)),
NotificationSender: rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
return msgClient.SendMsg(ctx, req)
})),
}
for _, opt := range opts {
opt(f)
+2 -3
View File
@@ -19,10 +19,9 @@ import (
"crypto/rand"
"time"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
@@ -149,7 +148,7 @@ func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq)
for _, log := range logs {
userIDs = append(userIDs, log.UserID)
}
userMap, err := t.userRpcClient.GetUsersInfoMap(ctx, userIDs)
userMap, err := t.userClient.GetUsersInfoMap(ctx, userIDs)
if err != nil {
return nil, err
}
+9
View File
@@ -17,6 +17,9 @@ package third
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
@@ -45,6 +48,7 @@ type thirdServer struct {
defaultExpire time.Duration
config *Config
s3 s3.Interface
userClient *rpcli.UserClient
}
type Config struct {
@@ -96,6 +100,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err
}
localcache.InitLocalCache(&config.LocalCacheConfig)
third.RegisterThirdServer(server, &thirdServer{
thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb),
@@ -104,6 +112,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
defaultExpire: time.Hour * 24 * 7,
config: config,
s3: o,
userClient: rpcli.NewUserClient(userConn),
})
return nil
}
+11 -6
View File
@@ -16,18 +16,21 @@ package user
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/msg"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/open-im-server/v3/pkg/notification/common_user"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
)
type UserNotificationSender struct {
*rpcclient.NotificationSender
getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)
getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error)
// db controller
db controller.UserDatabase
}
@@ -44,7 +47,7 @@ func WithUserFunc(
fn func(ctx context.Context, userIDs []string) (users []*relationtb.User, err error),
) userNotificationSenderOptions {
return func(u *UserNotificationSender) {
f := func(ctx context.Context, userIDs []string) (result []notification.CommonUser, err error) {
f := func(ctx context.Context, userIDs []string) (result []common_user.CommonUser, err error) {
users, err := fn(ctx, userIDs)
if err != nil {
return nil, err
@@ -58,9 +61,11 @@ func WithUserFunc(
}
}
func NewUserNotificationSender(config *Config, msgRpcClient *rpcclient.MessageRpcClient, opts ...userNotificationSenderOptions) *UserNotificationSender {
func NewUserNotificationSender(config *Config, msgClient *rpcli.MsgClient, opts ...userNotificationSenderOptions) *UserNotificationSender {
f := &UserNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient)),
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
return msgClient.SendMsg(ctx, req)
})),
}
for _, opt := range opts {
opt(f)
+23 -6
View File
@@ -17,6 +17,7 @@ package user
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"math/rand"
"strings"
"sync"
@@ -62,6 +63,8 @@ type userServer struct {
RegisterCenter registry.SvcDiscoveryRegistry
config *Config
webhookClient *webhook.Client
groupClient *rpcli.GroupClient
relationClient *rpcli.RelationClient
}
type Config struct {
@@ -94,6 +97,19 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
if err != nil {
return err
}
msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg)
if err != nil {
return err
}
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
if err != nil {
return err
}
friendConn, err := client.GetConn(ctx, config.Discovery.RpcService.Friend)
if err != nil {
return err
}
msgClient := rpcli.NewMsgClient(msgConn)
userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions())
database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx())
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
@@ -104,12 +120,13 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
online: redis.NewUserOnline(rdb),
db: database,
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: relation.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, relation.WithDBFunc(database.FindWithError)),
userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)),
friendNotificationSender: relation.NewFriendNotificationSender(&config.NotificationConfig, msgClient, relation.WithDBFunc(database.FindWithError)),
userNotificationSender: NewUserNotificationSender(config, msgClient, WithUserFunc(database.FindWithError)),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
groupClient: rpcli.NewGroupClient(groupConn),
relationClient: rpcli.NewRelationClient(friendConn),
}
pbuser.RegisterUserServer(server, u)
return u.db.InitOnce(context.Background(), users)
@@ -641,7 +658,7 @@ func (s *userServer) NotificationUserInfoUpdate(ctx context.Context, userID stri
wg.Add(len(es))
go func() {
defer wg.Done()
_, es[0] = s.groupRpcClient.Client.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{
_, es[0] = s.groupClient.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{
UserID: userID,
OldUserInfo: oldUserInfo,
NewUserInfo: newUserInfo,
@@ -650,7 +667,7 @@ func (s *userServer) NotificationUserInfoUpdate(ctx context.Context, userID stri
go func() {
defer wg.Done()
_, es[1] = s.friendRpcClient.Client.NotificationUserInfoUpdate(ctx, &friendpb.NotificationUserInfoUpdateReq{
_, es[1] = s.relationClient.NotificationUserInfoUpdate(ctx, &friendpb.NotificationUserInfoUpdateReq{
UserID: userID,
OldUserInfo: oldUserInfo,
NewUserInfo: newUserInfo,