fix: the local cache obtained can be modified (#2473)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* mage

* optimization version log

* optimization version log

* sync

* sync

* sync

* group sync

* sync option

* sync option

* refactor: replace `friend` package with `realtion`.

* refactor: update lastest commit to relation.

* sync option

* sync option

* sync option

* sync

* sync

* go.mod

* seq

* update: go mod

* refactor: change incremental to full

* feat: get full friend user ids

* feat: api and config

* seq

* group version

* merge

* seq

* seq

* seq

* fix: sort by id avoid unstable sort friends.

* group

* group

* group

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* user version

* seq

* seq

* seq user

* user online

* implement minio expire delete.

* user online

* config

* fix

* fix

* implement minio expire delete logic.

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* feat: implement scheduled delete outdated object in minio.

* update gomake version

* update gomake version

* implement FindExpires pagination.

* remove unnesseary incr.

* fix uncorrect args call.

* online push

* online push

* online push

* resolving conflicts

* resolving conflicts

* test

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* rpc prommetrics

* rpc prommetrics

* online status

* online status

* online status

* online status

* sub

* conversation version incremental

* merge seq

* merge online

* merge online

* merge online

* merge seq

* GetOwnerConversation

* fix: change incremental syncer router name.

* rockscache batch get

* rockscache seq batch get

* fix: GetMsgDocModelByIndex bug

* update go.mod

* update go.mod

* merge

* feat: prometheus

* feat: prometheus

* group member sort

* sub

* sub

* fix: seq conversion bug

* fix: redis pipe exec

* sort version

* sort version

* sort version

* remove old version online subscription

* remove old version online subscription

* version log index

* version log index

* batch push

* batch push

* seq void filling

* fix: batchGetMaxSeq

* fix: batchGetMaxSeq

* cache db error log

* 111

* fix bug

* fix: ImportFriends

* add online cache

* add some logs

* add some logs

* fix: onlineUserIDs

* add logs

* test

* test

* test

* test

* add log

* feat: solve the problem that modifying the cached data affects other

* feat: solve the problem that modifying the cached data affects other

* feat: search messages to filter out notifications

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
This commit is contained in:
chao
2024-08-02 16:58:04 +08:00
committed by GitHub
parent 51e170ade1
commit 84049a1f55
8 changed files with 183 additions and 83 deletions
+22 -17
View File
@@ -17,6 +17,8 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
@@ -32,7 +34,7 @@ func NewGroupLocalCache(client rpcclient.GroupRpcClient, localCache *config.Loca
log.ZDebug(context.Background(), "GroupLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &GroupLocalCache{
client: client,
local: localcache.New[any](
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
@@ -48,21 +50,22 @@ func NewGroupLocalCache(client rpcclient.GroupRpcClient, localCache *config.Loca
type GroupLocalCache struct {
client rpcclient.GroupRpcClient
local localcache.Cache[any]
local localcache.Cache[[]byte]
}
func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *listMap[string], err error) {
func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *group.GetGroupMemberUserIDsResp, err error) {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs req", "groupID", groupID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "value", val)
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "groupID", groupID, "value", val)
} else {
log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err)
log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err, "groupID", groupID)
}
}()
return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) {
var cache cacheProto[group.GetGroupMemberUserIDsResp]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID)
return newListMap(g.client.GetGroupMemberIDs(ctx, groupID))
return cache.Marshal(g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID}))
}))
}
@@ -70,14 +73,15 @@ func (g *GroupLocalCache) GetGroupMember(ctx context.Context, groupID, userID st
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID, "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val)
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "groupID", groupID, "userID", userID, "value", val)
} else {
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err)
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err, "groupID", groupID, "userID", userID)
}
}()
return localcache.AnyValue[*sdkws.GroupMemberFullInfo](g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) (any, error) {
var cache cacheProto[sdkws.GroupMemberFullInfo]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID, "userID", userID)
return g.client.GetGroupMemberCache(ctx, groupID, userID)
return cache.Marshal(g.client.GetGroupMemberCache(ctx, groupID, userID))
}))
}
@@ -85,14 +89,15 @@ func (g *GroupLocalCache) GetGroupInfo(ctx context.Context, groupID string) (val
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val)
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "groupID", groupID, "value", val)
} else {
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err)
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err, "groupID", groupID)
}
}()
return localcache.AnyValue[*sdkws.GroupInfo](g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) (any, error) {
var cache cacheProto[sdkws.GroupInfo]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID)
return g.client.GetGroupInfoCache(ctx, groupID)
return cache.Marshal(g.client.GetGroupInfoCache(ctx, groupID))
}))
}
@@ -101,7 +106,7 @@ func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string)
if err != nil {
return nil, err
}
return res.List, nil
return res.UserIDs, nil
}
func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID string) (map[string]struct{}, error) {
@@ -109,7 +114,7 @@ func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID strin
if err != nil {
return nil, err
}
return res.Map, nil
return datautil.SliceSet(res.UserIDs), nil
}
func (g *GroupLocalCache) GetGroupInfos(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) {