feat: Optimizing RPC call (#2993)

* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* rpc client

* rpc client

* rpc client

* rpc client

* rpc client

* rpc client

* rpc client

* rpc client
This commit is contained in:
chao
2024-12-24 10:51:38 +08:00
committed by GitHub
parent e9895062cb
commit e3f609b3ae
67 changed files with 1287 additions and 1092 deletions
+18 -20
View File
@@ -16,12 +16,11 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
pbconv "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
@@ -33,10 +32,11 @@ const (
conversationWorkerCount = 20
)
func NewConversationLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache {
func NewConversationLocalCache(client *rpcli.ConversationClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache {
lc := localCache.Conversation
log.ZDebug(context.Background(), "ConversationLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &ConversationLocalCache{
client: client,
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
@@ -52,7 +52,8 @@ func NewConversationLocalCache(localCache *config.LocalCache, cli redis.Universa
}
type ConversationLocalCache struct {
local localcache.Cache[[]byte]
client *rpcli.ConversationClient
local localcache.Cache[[]byte]
}
func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) {
@@ -63,7 +64,7 @@ func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUs
return resp.ConversationIDs, nil
}
func (c *ConversationLocalCache) getConversationIDs(ctx context.Context, ownerUserID string) (val *pbconv.GetConversationIDsResp, err error) {
func (c *ConversationLocalCache) getConversationIDs(ctx context.Context, ownerUserID string) (val *pbconversation.GetConversationIDsResp, err error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationIDs req", "ownerUserID", ownerUserID)
defer func() {
if err == nil {
@@ -72,14 +73,14 @@ func (c *ConversationLocalCache) getConversationIDs(ctx context.Context, ownerUs
log.ZError(ctx, "ConversationLocalCache getConversationIDs return", err, "ownerUserID", ownerUserID)
}
}()
var cache cacheProto[pbconv.GetConversationIDsResp]
var cache cacheProto[pbconversation.GetConversationIDsResp]
return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationIDs rpc", "ownerUserID", ownerUserID)
return cache.Marshal(pbconv.GetConversationIDsCaller.Invoke(ctx, &pbconv.GetConversationIDsReq{UserID: ownerUserID}))
return cache.Marshal(c.client.ConversationClient.GetConversationIDs(ctx, &pbconversation.GetConversationIDsReq{UserID: ownerUserID}))
}))
}
func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, conversationID string) (val *pbconv.Conversation, err error) {
func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, conversationID string) (val *pbconversation.Conversation, err error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversation req", "userID", userID, "conversationID", conversationID)
defer func() {
if err == nil {
@@ -88,13 +89,10 @@ func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, co
log.ZWarn(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID)
}
}()
var cache cacheProto[pbconv.Conversation]
var cache cacheProto[pbconversation.Conversation]
return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationKey(userID, conversationID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversation rpc", "userID", userID, "conversationID", conversationID)
return cache.Marshal(rpccall.ExtractField(ctx, pbconv.GetConversationCaller.Invoke, &pbconv.GetConversationReq{
ConversationID: conversationID,
OwnerUserID: userID,
}, (*pbconv.GetConversationResp).GetConversation))
return cache.Marshal(c.client.GetConversation(ctx, conversationID, userID))
}))
}
@@ -106,10 +104,10 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con
return conv.RecvMsgOpt, nil
}
func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconv.Conversation, error) {
func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
var (
conversations = make([]*pbconv.Conversation, 0, len(conversationIDs))
conversationsChan = make(chan *pbconv.Conversation, len(conversationIDs))
conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs))
conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs))
)
g, ctx := errgroup.WithContext(ctx)
@@ -139,7 +137,7 @@ func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUser
return conversations, nil
}
func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (val *pbconv.GetConversationNotReceiveMessageUserIDsResp, err error) {
func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (val *pbconversation.GetConversationNotReceiveMessageUserIDsResp, err error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs req", "conversationID", conversationID)
defer func() {
if err == nil {
@@ -148,10 +146,10 @@ func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx con
log.ZError(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs return", err, "conversationID", conversationID)
}
}()
var cache cacheProto[pbconv.GetConversationNotReceiveMessageUserIDsResp]
var cache cacheProto[pbconversation.GetConversationNotReceiveMessageUserIDsResp]
return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs rpc", "conversationID", conversationID)
return cache.Marshal(pbconv.GetConversationNotReceiveMessageUserIDsCaller.Invoke(ctx, &pbconv.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID}))
return cache.Marshal(c.client.ConversationClient.GetConversationNotReceiveMessageUserIDs(ctx, &pbconversation.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID}))
}))
}
+7 -5
View File
@@ -16,8 +16,8 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -26,10 +26,11 @@ import (
"github.com/redis/go-redis/v9"
)
func NewFriendLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) *FriendLocalCache {
func NewFriendLocalCache(client *rpcli.RelationClient, localCache *config.LocalCache, cli redis.UniversalClient) *FriendLocalCache {
lc := localCache.Friend
log.ZDebug(context.Background(), "FriendLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &FriendLocalCache{
client: client,
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
@@ -45,7 +46,8 @@ func NewFriendLocalCache(localCache *config.LocalCache, cli redis.UniversalClien
}
type FriendLocalCache struct {
local localcache.Cache[[]byte]
client *rpcli.RelationClient
local localcache.Cache[[]byte]
}
func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) {
@@ -68,7 +70,7 @@ func (f *FriendLocalCache) isFriend(ctx context.Context, possibleFriendUserID, u
var cache cacheProto[relation.IsFriendResp]
return cache.Unmarshal(f.local.GetLink(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "FriendLocalCache isFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
return cache.Marshal(relation.IsFriendCaller.Invoke(ctx, &relation.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID}))
return cache.Marshal(f.client.FriendClient.IsFriend(ctx, &relation.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID}))
}, cachekey.GetFriendIDsKey(possibleFriendUserID)))
}
@@ -94,6 +96,6 @@ func (f *FriendLocalCache) isBlack(ctx context.Context, possibleBlackUserID, use
var cache cacheProto[relation.IsBlackResp]
return cache.Unmarshal(f.local.GetLink(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "FriendLocalCache IsBlack rpc", "possibleBlackUserID", possibleBlackUserID, "userID", userID)
return cache.Marshal(relation.IsBlackCaller.Invoke(ctx, &relation.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID}))
return cache.Marshal(f.client.FriendClient.IsBlack(ctx, &relation.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID}))
}, cachekey.GetBlackIDsKey(userID)))
}
+8 -16
View File
@@ -16,10 +16,9 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -30,10 +29,11 @@ import (
"github.com/redis/go-redis/v9"
)
func NewGroupLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) *GroupLocalCache {
func NewGroupLocalCache(client *rpcli.GroupClient, localCache *config.LocalCache, cli redis.UniversalClient) *GroupLocalCache {
lc := localCache.Group
log.ZDebug(context.Background(), "GroupLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &GroupLocalCache{
client: client,
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
@@ -49,7 +49,8 @@ func NewGroupLocalCache(localCache *config.LocalCache, cli redis.UniversalClient
}
type GroupLocalCache struct {
local localcache.Cache[[]byte]
client *rpcli.GroupClient
local localcache.Cache[[]byte]
}
func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *group.GetGroupMemberUserIDsResp, err error) {
@@ -64,7 +65,7 @@ func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string)
var cache cacheProto[group.GetGroupMemberUserIDsResp]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID)
return cache.Marshal(group.GetGroupMemberUserIDsCaller.Invoke(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID}))
return cache.Marshal(g.client.GroupClient.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID}))
}))
}
@@ -80,13 +81,7 @@ func (g *GroupLocalCache) GetGroupMember(ctx context.Context, groupID, userID st
var cache cacheProto[sdkws.GroupMemberFullInfo]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID, "userID", userID)
return cache.Marshal(rpccall.ExtractField(ctx, group.GetGroupMemberCacheCaller.Invoke,
&group.GetGroupMemberCacheReq{
GroupID: groupID,
GroupMemberID: userID,
},
(*group.GetGroupMemberCacheResp).GetMember,
))
return cache.Marshal(g.client.GetGroupMemberCache(ctx, groupID, userID))
}))
}
@@ -102,10 +97,7 @@ func (g *GroupLocalCache) GetGroupInfo(ctx context.Context, groupID string) (val
var cache cacheProto[sdkws.GroupInfo]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID)
return cache.Marshal(rpccall.ExtractField(ctx, group.GetGroupInfoCacheCaller.Invoke,
&group.GetGroupInfoCacheReq{
GroupID: groupID,
}, (*group.GetGroupInfoCacheResp).GetGroupInfo))
return cache.Marshal(g.client.GetGroupInfoCache(ctx, groupID))
}))
}
+10 -25
View File
@@ -3,16 +3,15 @@ package rpccache
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
@@ -23,10 +22,10 @@ import (
"github.com/redis/go-redis/v9"
)
func NewOnlineCache(adminUserID []string, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) {
func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) {
l := &sync.Mutex{}
x := &OnlineCache{
adminUserID: adminUserID,
client: client,
group: group,
fullUserCache: fullUserCache,
Lock: l,
@@ -66,8 +65,8 @@ const (
)
type OnlineCache struct {
adminUserID []string
group *GroupLocalCache
client *rpcli.UserClient
group *GroupLocalCache
// fullUserCache if enabled, caches the online status of all users using mapCache;
// otherwise, only a portion of users' online statuses (regardless of whether they are online) will be cached using lruCache.
@@ -113,7 +112,7 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) {
cursor := uint64(0)
for resp == nil || resp.NextCursor != 0 {
if err = retryOperation(func() error {
resp, err = user.GetAllOnlineUsersCaller.Invoke(ctx, &user.GetAllOnlineUsersReq{Cursor: cursor})
resp, err = o.client.GetAllOnlineUsers(ctx, cursor)
if err != nil {
return err
}
@@ -187,17 +186,7 @@ func (o *OnlineCache) doSubscribe(ctx context.Context, rdb redis.UniversalClient
func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
platformIDs, err := o.lruCache.Get(userID, func() ([]int32, error) {
resp, err := rpccall.ExtractField(ctx, user.GetUserStatusCaller.Invoke, &user.GetUserStatusReq{
UserID: o.adminUserID[0],
UserIDs: []string{userID},
}, (*user.GetUserStatusResp).GetStatusList)
if err != nil {
return nil, err
}
if len(resp) == 0 {
return nil, nil
}
return resp[0].PlatformIDs, nil
return o.client.GetUserOnlinePlatform(ctx, userID)
})
if err != nil {
log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID)
@@ -238,11 +227,7 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) {
platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) {
platformIDsMap := make(map[string][]int32)
usersStatus, err := rpccall.ExtractField(ctx, user.GetUserStatusCaller.Invoke, &user.GetUserStatusReq{
UserID: o.adminUserID[0],
UserIDs: missingUsers,
}, (*user.GetUserStatusResp).GetStatusList)
usersStatus, err := o.client.GetUsersOnlinePlatform(ctx, missingUsers)
if err != nil {
return nil, err
}
-1
View File
@@ -17,7 +17,6 @@ package rpccache
import (
"context"
"encoding/json"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
)
+7 -5
View File
@@ -16,11 +16,11 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/errs"
@@ -28,10 +28,11 @@ import (
"github.com/redis/go-redis/v9"
)
func NewUserLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) *UserLocalCache {
func NewUserLocalCache(client *rpcli.UserClient, localCache *config.LocalCache, cli redis.UniversalClient) *UserLocalCache {
lc := localCache.User
log.ZDebug(context.Background(), "UserLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &UserLocalCache{
client: client,
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
@@ -47,7 +48,8 @@ func NewUserLocalCache(localCache *config.LocalCache, cli redis.UniversalClient)
}
type UserLocalCache struct {
local localcache.Cache[[]byte]
client *rpcli.UserClient
local localcache.Cache[[]byte]
}
func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *sdkws.UserInfo, err error) {
@@ -62,7 +64,7 @@ func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *s
var cache cacheProto[sdkws.UserInfo]
return cache.Unmarshal(u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "UserLocalCache GetUserInfo rpc", "userID", userID)
return cache.Marshal(rpcclient.GetUserInfo(ctx, userID))
return cache.Marshal(u.client.GetUserInfo(ctx, userID))
}))
}
@@ -86,7 +88,7 @@ func (u *UserLocalCache) getUserGlobalMsgRecvOpt(ctx context.Context, userID str
var cache cacheProto[user.GetGlobalRecvMessageOptResp]
return cache.Unmarshal(u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID)
return cache.Marshal(user.GetGlobalRecvMessageOptCaller.Invoke(ctx, &user.GetGlobalRecvMessageOptReq{UserID: userID}))
return cache.Marshal(u.client.UserClient.GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{UserID: userID}))
}))
}