feat: Remove MySQL and implement it all with Mongo (#1508)

* fix: GetUserReqApplicationList error when there is a disbanded group chat

* fix: error when querying some information about disbanded group

* fix: GetUserReqApplicationList dismissed group error

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* merge

* cicd: robot automated Change

* sdkws.MsgData

* user

* interface{} -> any

* user

* third

* group

* group

* group

* group

* group

* group

* conversation

* standalone mysql db model

* tx

* s3

* group

* mongo

* group

* group

* group

* group

* group

* group

* refactor: add openim mysql to mongo refactor

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* refactor: add openim mysql to mongo refactor

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* remove mysql

* remove mysql

* friend

* friend

* friend

* friend

* friend

* friend

* group

* convert

* index

* index

* all

* all

* mysql2mongo

* data conversion

* up35

* up35

* feat: add format set

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* fix: fix scripts

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* merge main

* merge main

* Update init-config.sh

* fix: user args check

---------

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
Co-authored-by: Xinwei Xiong <3293172751@qq.com>
This commit is contained in:
chao
2023-12-05 20:53:02 +08:00
committed by GitHub
parent 1c1322e3d2
commit c0194f6ef4
141 changed files with 4273 additions and 4858 deletions
+34 -35
View File
@@ -26,7 +26,6 @@ import (
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
@@ -67,10 +66,10 @@ type ConversationCache interface {
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache
// get one super group recv msg but do not notification userID list
GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error)
//GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error)
DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache
// get one super group recv msg but do not notification userID list hash
GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
//GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache
//GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
@@ -101,20 +100,20 @@ type ConversationRedisCache struct {
expireTime time.Duration
}
func NewNewConversationRedis(
rdb redis.UniversalClient,
conversationDB *relation.ConversationGorm,
options rockscache.Options,
) ConversationCache {
rcClient := rockscache.NewClient(rdb, options)
return &ConversationRedisCache{
rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient),
conversationDB: conversationDB,
expireTime: conversationExpireTime,
}
}
//func NewNewConversationRedis(
// rdb redis.UniversalClient,
// conversationDB *relation.ConversationGorm,
// options rockscache.Options,
//) ConversationCache {
// rcClient := rockscache.NewClient(rdb, options)
//
// return &ConversationRedisCache{
// rcClient: rcClient,
// metaCache: NewMetaCacheRedis(rcClient),
// conversationDB: conversationDB,
// expireTime: conversationExpireTime,
// }
//}
func (c *ConversationRedisCache) NewCache() ConversationCache {
return &ConversationRedisCache{
@@ -282,11 +281,11 @@ func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUse
})
}
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) {
return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
})
}
//func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
// return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) {
// return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
// })
//}
func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache {
keys := make([]string, 0, len(ownerUserIDs))
@@ -313,19 +312,19 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID st
return cache
}
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) {
return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) {
userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
return 0, err
}
utils.Sort(userIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
return bi.Uint64(), nil
},
)
}
//func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) {
// return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) {
// userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
// if err != nil {
// return 0, err
// }
// utils.Sort(userIDs, true)
// bi := big.NewInt(0)
// bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
// return bi.Uint64(), nil
// },
// )
//}
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache {
cache := c.NewCache()
+17 -7
View File
@@ -33,19 +33,20 @@ const (
friendKey = "FRIEND_INFO:"
)
// args fn will exec when no data in msgCache.
// FriendCache is an interface for caching friend-related data.
type FriendCache interface {
metaCache
NewCache() FriendCache
GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error)
// call when friendID List changed
// Called when friendID list changed
DelFriendIDs(ownerUserID ...string) FriendCache
// get single friendInfo from msgCache
// Get single friendInfo from the cache
GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationtb.FriendModel, err error)
// del friend when friend info changed
// Delete friend when friend info changed
DelFriend(ownerUserID, friendUserID string) FriendCache
}
// FriendCacheRedis is an implementation of the FriendCache interface using Redis.
type FriendCacheRedis struct {
metaCache
friendDB relationtb.FriendModelInterface
@@ -53,6 +54,7 @@ type FriendCacheRedis struct {
rcClient *rockscache.Client
}
// NewFriendCacheRedis creates a new instance of FriendCacheRedis.
func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface,
options rockscache.Options) FriendCache {
rcClient := rockscache.NewClient(rdb, options)
@@ -64,6 +66,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendMo
}
}
// NewCache creates a new instance of FriendCacheRedis with the same configuration.
func (f *FriendCacheRedis) NewCache() FriendCache {
return &FriendCacheRedis{
rcClient: f.rcClient,
@@ -73,24 +76,29 @@ func (f *FriendCacheRedis) NewCache() FriendCache {
}
}
// getFriendIDsKey returns the key for storing friend IDs in the cache.
func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string {
return friendIDsKey + ownerUserID
}
// getTwoWayFriendsIDsKey returns the key for storing two-way friend IDs in the cache.
func (f *FriendCacheRedis) getTwoWayFriendsIDsKey(ownerUserID string) string {
return TwoWayFriendsIDsKey + ownerUserID
}
// getFriendKey returns the key for storing friend info in the cache.
func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string {
return friendKey + ownerUserID + "-" + friendUserID
}
// GetFriendIDs retrieves friend IDs from the cache or the database if not found.
func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
return f.friendDB.FindFriendUserIDs(ctx, ownerUserID)
})
}
// DelFriendIDs deletes friend IDs from the cache.
func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) FriendCache {
newGroupCache := f.NewCache()
keys := make([]string, 0, len(ownerUserIDs))
@@ -102,7 +110,7 @@ func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) FriendCache {
return newGroupCache
}
// todo.
// GetTwoWayFriendIDs retrieves two-way friend IDs from the cache.
func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) {
friendIDs, err := f.GetFriendIDs(ctx, ownerUserID)
if err != nil {
@@ -121,6 +129,7 @@ func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID s
return twoWayFriendIDs, nil
}
// DelTwoWayFriendIDs deletes two-way friend IDs from the cache.
func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) FriendCache {
newFriendCache := f.NewCache()
newFriendCache.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID))
@@ -128,14 +137,15 @@ func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID s
return newFriendCache
}
func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID,
friendUserID string) (friend *relationtb.FriendModel, err error) {
// GetFriend retrieves friend info from the cache or the database if not found.
func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationtb.FriendModel, err error) {
return getCache(ctx, f.rcClient, f.getFriendKey(ownerUserID,
friendUserID), f.expireTime, func(ctx context.Context) (*relationtb.FriendModel, error) {
return f.friendDB.Take(ctx, ownerUserID, friendUserID)
})
}
// DelFriend deletes friend info from the cache.
func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache {
newFriendCache := f.NewCache()
newFriendCache.AddKeys(f.getFriendKey(ownerUserID, friendUserID))
+123 -158
View File
@@ -16,8 +16,13 @@ package cache
import (
"context"
"fmt"
"strconv"
"time"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/dtm-labs/rockscache"
@@ -26,21 +31,24 @@ import (
"github.com/OpenIMSDK/tools/utils"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
)
const (
groupExpireTime = time.Second * 60 * 60 * 12
groupInfoKey = "GROUP_INFO:"
groupMemberIDsKey = "GROUP_MEMBER_IDS:"
groupMembersHashKey = "GROUP_MEMBERS_HASH2:"
groupMemberInfoKey = "GROUP_MEMBER_INFO:"
joinedSuperGroupsKey = "JOIN_SUPER_GROUPS:"
SuperGroupMemberIDsKey = "SUPER_GROUP_MEMBER_IDS:"
joinedGroupsKey = "JOIN_GROUPS_KEY:"
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
groupExpireTime = time.Second * 60 * 60 * 12
groupInfoKey = "GROUP_INFO:"
groupMemberIDsKey = "GROUP_MEMBER_IDS:"
groupMembersHashKey = "GROUP_MEMBERS_HASH2:"
groupMemberInfoKey = "GROUP_MEMBER_INFO:"
//groupOwnerInfoKey = "GROUP_OWNER_INFO:"
joinedGroupsKey = "JOIN_GROUPS_KEY:"
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
)
type GroupHash interface {
GetGroupHash(ctx context.Context, groupID string) (uint64, error)
}
type GroupCache interface {
metaCache
NewCache() GroupCache
@@ -48,11 +56,6 @@ type GroupCache interface {
GetGroupInfo(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error)
DelGroupsInfo(groupIDs ...string) GroupCache
GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error)
DelJoinedSuperGroupIDs(userIDs ...string) GroupCache
GetSuperGroupMemberIDs(ctx context.Context, groupIDs ...string) (models []*unrelationtb.SuperGroupModel, err error)
DelSuperGroupMemberIDs(groupIDs ...string) GroupCache
GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error)
GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error)
DelGroupMembersHash(groupID string) GroupCache
@@ -69,9 +72,16 @@ type GroupCache interface {
GetGroupMembersInfo(ctx context.Context, groupID string, userID []string) (groupMembers []*relationtb.GroupMemberModel, err error)
GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error)
GetGroupMembersPage(ctx context.Context, groupID string, userID []string, showNumber, pageNumber int32) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error)
FindGroupMemberUser(ctx context.Context, groupIDs []string, userID string) ([]*relationtb.GroupMemberModel, error)
GetGroupRoleLevelMemberIDs(ctx context.Context, groupID string, roleLevel int32) ([]string, error)
GetGroupOwner(ctx context.Context, groupID string) (*relationtb.GroupMemberModel, error)
GetGroupsOwner(ctx context.Context, groupIDs []string) ([]*relationtb.GroupMemberModel, error)
DelGroupRoleLevel(groupID string, roleLevel []int32) GroupCache
DelGroupAllRoleLevel(groupID string) GroupCache
DelGroupMembersInfo(groupID string, userID ...string) GroupCache
GetGroupRoleLevelMemberInfo(ctx context.Context, groupID string, roleLevel int32) ([]*relationtb.GroupMemberModel, error)
GetGroupRolesLevelMemberInfo(ctx context.Context, groupID string, roleLevels []int32) ([]*relationtb.GroupMemberModel, error)
GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error)
DelGroupsMemberNum(groupID ...string) GroupCache
}
@@ -81,10 +91,9 @@ type GroupCacheRedis struct {
groupDB relationtb.GroupModelInterface
groupMemberDB relationtb.GroupMemberModelInterface
groupRequestDB relationtb.GroupRequestModelInterface
mongoDB unrelationtb.SuperGroupModelInterface
expireTime time.Duration
rcClient *rockscache.Client
hashCode func(ctx context.Context, groupID string) (uint64, error)
groupHash GroupHash
}
func NewGroupCacheRedis(
@@ -92,8 +101,7 @@ func NewGroupCacheRedis(
groupDB relationtb.GroupModelInterface,
groupMemberDB relationtb.GroupMemberModelInterface,
groupRequestDB relationtb.GroupRequestModelInterface,
mongoClient unrelationtb.SuperGroupModelInterface,
hashCode func(ctx context.Context, groupID string) (uint64, error),
hashCode GroupHash,
opts rockscache.Options,
) GroupCache {
rcClient := rockscache.NewClient(rdb, opts)
@@ -101,8 +109,7 @@ func NewGroupCacheRedis(
return &GroupCacheRedis{
rcClient: rcClient, expireTime: groupExpireTime,
groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB,
mongoDB: mongoClient,
hashCode: hashCode,
groupHash: hashCode,
metaCache: NewMetaCacheRedis(rcClient),
}
}
@@ -114,7 +121,6 @@ func (g *GroupCacheRedis) NewCache() GroupCache {
groupDB: g.groupDB,
groupMemberDB: g.groupMemberDB,
groupRequestDB: g.groupRequestDB,
mongoDB: g.mongoDB,
metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...),
}
}
@@ -123,18 +129,10 @@ func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
return groupInfoKey + groupID
}
func (g *GroupCacheRedis) getJoinedSuperGroupsIDKey(userID string) string {
return joinedSuperGroupsKey + userID
}
func (g *GroupCacheRedis) getJoinedGroupsKey(userID string) string {
return joinedGroupsKey + userID
}
func (g *GroupCacheRedis) getSuperGroupMemberIDsKey(groupID string) string {
return SuperGroupMemberIDsKey + groupID
}
func (g *GroupCacheRedis) getGroupMembersHashKey(groupID string) string {
return groupMembersHashKey + groupID
}
@@ -151,6 +149,10 @@ func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string {
return groupMemberNumKey + groupID
}
func (g *GroupCacheRedis) getGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string {
return groupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel))
}
func (g *GroupCacheRedis) GetGroupIndex(group *relationtb.GroupModel, keys []string) (int, error) {
key := g.getGroupInfoKey(group.GroupID)
for i, _key := range keys {
@@ -173,15 +175,7 @@ func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationtb.GroupMembe
return 0, errIndex
}
// / groupInfo.
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error) {
//var keys []string
//for _, group := range groupIDs {
// keys = append(keys, g.getGroupInfoKey(group))
//}
//return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupIndex, func(ctx context.Context) ([]*relationtb.GroupModel, error) {
// return g.groupDB.Find(ctx, groupIDs)
//})
return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return g.getGroupInfoKey(groupID)
}, func(ctx context.Context, groupID string) (*relationtb.GroupModel, error) {
@@ -206,123 +200,44 @@ func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache {
return newGroupCache
}
func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
if err != nil {
return nil, err
}
return userGroup.GroupIDs, nil
},
)
}
func (g *GroupCacheRedis) GetSuperGroupMemberIDs(ctx context.Context, groupIDs ...string) (models []*unrelationtb.SuperGroupModel, err error) {
//var keys []string
//for _, group := range groupIDs {
// keys = append(keys, g.getSuperGroupMemberIDsKey(group))
//}
//return batchGetCache(ctx, g.rcClient, keys, g.expireTime, func(model *unrelationtb.SuperGroupModel, keys []string) (int, error) {
// for i, key := range keys {
// if g.getSuperGroupMemberIDsKey(model.GroupID) == key {
// return i, nil
// }
// }
// return 0, errIndex
//},
// func(ctx context.Context) ([]*unrelationtb.SuperGroupModel, error) {
// return g.mongoDB.FindSuperGroup(ctx, groupIDs)
// })
return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return g.getSuperGroupMemberIDsKey(groupID)
}, func(ctx context.Context, groupID string) (*unrelationtb.SuperGroupModel, error) {
return g.mongoDB.TakeSuperGroup(ctx, groupID)
})
}
// userJoinSuperGroup.
func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache {
newGroupCache := g.NewCache()
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
keys = append(keys, g.getJoinedSuperGroupsIDKey(userID))
}
newGroupCache.AddKeys(keys...)
return newGroupCache
}
func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache {
func (g *GroupCacheRedis) DelGroupsOwner(groupIDs ...string) GroupCache {
newGroupCache := g.NewCache()
keys := make([]string, 0, len(groupIDs))
for _, groupID := range groupIDs {
keys = append(keys, g.getSuperGroupMemberIDsKey(groupID))
keys = append(keys, g.getGroupRoleLevelMemberIDsKey(groupID, constant.GroupOwner))
}
newGroupCache.AddKeys(keys...)
return newGroupCache
}
// groupMembersHash.
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) {
return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
return g.hashCode(ctx, groupID)
})
func (g *GroupCacheRedis) DelGroupRoleLevel(groupID string, roleLevels []int32) GroupCache {
newGroupCache := g.NewCache()
keys := make([]string, 0, len(roleLevels))
for _, roleLevel := range roleLevels {
keys = append(keys, g.getGroupRoleLevelMemberIDsKey(groupID, roleLevel))
}
newGroupCache.AddKeys(keys...)
return newGroupCache
}
//return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime,
// func(ctx context.Context) (uint64, error) {
// userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
// if err != nil {
// return 0, err
// }
// log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "userIDs", userIDs)
// var members []*relationtb.GroupMemberModel
// if len(userIDs) > 0 {
// members, err = g.GetGroupMembersInfo(ctx, groupID, userIDs)
// if err != nil {
// return 0, err
// }
// utils.Sort(userIDs, true)
// }
// memberMap := make(map[string]*relationtb.GroupMemberModel)
// for i, member := range members {
// memberMap[member.UserID] = members[i]
// }
// data := make([]string, 0, len(members)*11)
// for _, userID := range userIDs {
// member, ok := memberMap[userID]
// if !ok {
// continue
// }
// data = append(data,
// member.GroupID,
// member.UserID,
// member.Nickname,
// member.FaceURL,
// strconv.Itoa(int(member.RoleLevel)),
// strconv.FormatInt(member.JoinTime.UnixMilli(), 10),
// strconv.Itoa(int(member.JoinSource)),
// member.InviterUserID,
// member.OperatorUserID,
// strconv.FormatInt(member.MuteEndTime.UnixMilli(), 10),
// member.Ex,
// )
// }
// log.ZInfo(ctx, "hash data info", "userIDs.len", len(userIDs), "hash.data.len", len(data))
// log.ZInfo(ctx, "json hash data", "groupID", groupID, "data", data)
// val, err := json.Marshal(data)
// if err != nil {
// return 0, err
// }
// sum := md5.Sum(val)
// code := binary.BigEndian.Uint64(sum[:])
// log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "hashCode", code, "num", len(members))
// return code, nil
// },
//)
func (g *GroupCacheRedis) DelGroupAllRoleLevel(groupID string) GroupCache {
return g.DelGroupRoleLevel(groupID, []int32{constant.GroupOwner, constant.GroupAdmin, constant.GroupOrdinaryUsers})
}
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) {
if g.groupHash == nil {
return 0, errs.ErrInternalServer.Wrap("group hash is nil")
}
return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
return g.groupHash.GetGroupHash(ctx, groupID)
})
}
func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error) {
if g.groupHash == nil {
return nil, errs.ErrInternalServer.Wrap("group hash is nil")
}
res := make(map[string]*relationtb.GroupSimpleUserID)
for _, groupID := range groupIDs {
hash, err := g.GetGroupMembersHash(ctx, groupID)
@@ -347,7 +262,6 @@ func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache {
return cache
}
// groupMemberIDs.
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
return getCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindMemberUserID(ctx, groupID)
@@ -398,13 +312,6 @@ func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userI
}
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]*relationtb.GroupMemberModel, error) {
//var keys []string
//for _, userID := range userIDs {
// keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
//}
//return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
// return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, nil)
//})
return batchGetCache2(ctx, g.rcClient, g.expireTime, userIDs, func(userID string) string {
return g.getGroupMemberInfoKey(groupID, userID)
}, func(ctx context.Context, userID string) (*relationtb.GroupMemberModel, error) {
@@ -446,13 +353,6 @@ func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID str
if err != nil {
return nil, err
}
//var keys []string
//for _, groupMemberID := range groupMemberIDs {
// keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID))
//}
//return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
// return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil)
//})
return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
}
@@ -483,3 +383,68 @@ func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache {
return cache
}
func (g *GroupCacheRedis) GetGroupOwner(ctx context.Context, groupID string) (*relationtb.GroupMemberModel, error) {
members, err := g.GetGroupRoleLevelMemberInfo(ctx, groupID, constant.GroupOwner)
if err != nil {
return nil, err
}
if len(members) == 0 {
return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("group %s owner not found", groupID))
}
return members[0], nil
}
func (g *GroupCacheRedis) GetGroupsOwner(ctx context.Context, groupIDs []string) ([]*relationtb.GroupMemberModel, error) {
members := make([]*relationtb.GroupMemberModel, 0, len(groupIDs))
for _, groupID := range groupIDs {
items, err := g.GetGroupRoleLevelMemberInfo(ctx, groupID, constant.GroupOwner)
if err != nil {
return nil, err
}
if len(items) > 0 {
members = append(members, items[0])
}
}
return members, nil
}
func (g *GroupCacheRedis) GetGroupRoleLevelMemberIDs(ctx context.Context, groupID string, roleLevel int32) ([]string, error) {
return getCache(ctx, g.rcClient, g.getGroupRoleLevelMemberIDsKey(groupID, roleLevel), g.expireTime, func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindRoleLevelUserIDs(ctx, groupID, roleLevel)
})
}
func (g *GroupCacheRedis) GetGroupRoleLevelMemberInfo(ctx context.Context, groupID string, roleLevel int32) ([]*relationtb.GroupMemberModel, error) {
userIDs, err := g.GetGroupRoleLevelMemberIDs(ctx, groupID, roleLevel)
if err != nil {
return nil, err
}
return g.GetGroupMembersInfo(ctx, groupID, userIDs)
}
func (g *GroupCacheRedis) GetGroupRolesLevelMemberInfo(ctx context.Context, groupID string, roleLevels []int32) ([]*relationtb.GroupMemberModel, error) {
var userIDs []string
for _, roleLevel := range roleLevels {
ids, err := g.GetGroupRoleLevelMemberIDs(ctx, groupID, roleLevel)
if err != nil {
return nil, err
}
userIDs = append(userIDs, ids...)
}
return g.GetGroupMembersInfo(ctx, groupID, userIDs)
}
func (g *GroupCacheRedis) FindGroupMemberUser(ctx context.Context, groupIDs []string, userID string) (_ []*relationtb.GroupMemberModel, err error) {
if len(groupIDs) == 0 {
groupIDs, err = g.GetJoinedGroupIDs(ctx, userID)
if err != nil {
return nil, err
}
}
return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return g.getGroupMemberInfoKey(groupID, userID)
}, func(ctx context.Context, groupID string) (*relationtb.GroupMemberModel, error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
})
}
+5 -2
View File
@@ -38,7 +38,7 @@ const (
var errIndex = errors.New("err index")
type metaCache interface {
ExecDel(ctx context.Context) error
ExecDel(ctx context.Context, distinct ...bool) error
// delete key rapid
DelKey(ctx context.Context, key string) error
AddKeys(keys ...string)
@@ -57,7 +57,10 @@ type metaCacheRedis struct {
retryInterval time.Duration
}
func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error {
if len(distinct) > 0 && distinct[0] {
m.keys = utils.Distinct(m.keys)
}
if len(m.keys) > 0 {
log.ZDebug(ctx, "delete cache", "keys", m.keys)
for _, key := range m.keys {
+13 -56
View File
@@ -173,20 +173,7 @@ func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s st
}
func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
var retErr error
for {
select {
case <-ctx.Done():
return errs.Wrap(retErr, "SetMaxSeq redis retry too many amount")
default:
retErr = c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
if retErr != nil {
time.Sleep(time.Second * 2)
continue
}
return nil
}
}
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
}
func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
@@ -194,21 +181,7 @@ func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m
}
func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
var retErr error
var retData int64
for {
select {
case <-ctx.Done():
return -1, errs.Wrap(retErr, "GetMaxSeq redis retry too many amount")
default:
retData, retErr = c.getSeq(ctx, conversationID, c.getMaxSeqKey)
if retErr != nil && errs.Unwrap(retErr) != redis.Nil {
time.Sleep(time.Second * 2)
continue
}
return retData, retErr
}
}
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
}
func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
@@ -314,7 +287,7 @@ func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID string, pla
func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platform int, m map[string]int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platform)
mm := make(map[string]interface{})
mm := make(map[string]any)
for k, v := range m {
mm[k] = v
}
@@ -672,35 +645,19 @@ func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string
}
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
var (
cursor uint64
keys []string
err error
key = c.allMessageCacheKey(conversationID)
)
for {
// scan up to 10000 at a time, the count (10000) param refers to the number of scans on redis server.
// if the count is too small, needs to be run scan on redis frequently.
var limit int64 = 10000
keys, cursor, err = c.rdb.Scan(ctx, cursor, key, limit).Result()
if err != nil {
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result()
if errors.Is(err, redis.Nil) {
return nil
}
if err != nil {
return errs.Wrap(err)
}
for _, v := range vals {
if err := c.rdb.Del(ctx, v).Err(); err != nil {
return errs.Wrap(err)
}
for _, key := range keys {
err := c.rdb.Del(ctx, key).Err()
if err != nil {
return errs.Wrap(err)
}
}
// scan end
if cursor == 0 {
return nil
}
}
return nil
}
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
-47
View File
@@ -385,50 +385,3 @@ func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, input
assert.EqualValues(t, 1, val) // exists
}
}
func TestCleanUpOneConversationAllMsg(t *testing.T) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
count := 1000
prefix := fmt.Sprintf("%v", rand.Int63())
ids := []string{}
for i := 0; i < count; i++ {
id := fmt.Sprintf("%v-cid-%v", prefix, rand.Int63())
ids = append(ids, id)
key := cacher.allMessageCacheKey(id)
rdb.Set(context.Background(), key, "openim", 0)
}
// delete 100 keys with scan.
for i := 0; i < 100; i++ {
pickedKey := ids[i]
err := cacher.CleanUpOneConversationAllMsg(context.Background(), pickedKey)
assert.Nil(t, err)
ls, err := rdb.Keys(context.Background(), pickedKey).Result()
assert.Nil(t, err)
assert.Equal(t, 0, len(ls))
rcode, err := rdb.Exists(context.Background(), pickedKey).Result()
assert.Nil(t, err)
assert.EqualValues(t, 0, rcode) // non-exists
}
sid := fmt.Sprintf("%v-cid-*", prefix)
ls, err := rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result()
assert.Nil(t, err)
assert.Equal(t, count-100, len(ls))
// delete fuzzy matching keys.
err = cacher.CleanUpOneConversationAllMsg(context.Background(), sid)
assert.Nil(t, err)
// don't contains keys matched `{prefix}-cid-{random}` on redis
ls, err = rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result()
assert.Nil(t, err)
assert.Equal(t, 0, len(ls))
}
+9 -9
View File
@@ -14,8 +14,8 @@ import (
type ObjectCache interface {
metaCache
GetName(ctx context.Context, name string) (*relationtb.ObjectModel, error)
DelObjectName(names ...string) ObjectCache
GetName(ctx context.Context, engine string, name string) (*relationtb.ObjectModel, error)
DelObjectName(engine string, names ...string) ObjectCache
}
func NewObjectCacheRedis(rdb redis.UniversalClient, objDB relationtb.ObjectInfoModelInterface) ObjectCache {
@@ -44,23 +44,23 @@ func (g *objectCacheRedis) NewCache() ObjectCache {
}
}
func (g *objectCacheRedis) DelObjectName(names ...string) ObjectCache {
func (g *objectCacheRedis) DelObjectName(engine string, names ...string) ObjectCache {
objectCache := g.NewCache()
keys := make([]string, 0, len(names))
for _, name := range names {
keys = append(keys, g.getObjectKey(name))
keys = append(keys, g.getObjectKey(name, engine))
}
objectCache.AddKeys(keys...)
return objectCache
}
func (g *objectCacheRedis) getObjectKey(name string) string {
return "OBJECT:" + name
func (g *objectCacheRedis) getObjectKey(engine string, name string) string {
return "OBJECT:" + engine + ":" + name
}
func (g *objectCacheRedis) GetName(ctx context.Context, name string) (*relationtb.ObjectModel, error) {
return getCache(ctx, g.rcClient, g.getObjectKey(name), g.expireTime, func(ctx context.Context) (*relationtb.ObjectModel, error) {
return g.objDB.Take(ctx, name)
func (g *objectCacheRedis) GetName(ctx context.Context, engine string, name string) (*relationtb.ObjectModel, error) {
return getCache(ctx, g.rcClient, g.getObjectKey(name, engine), g.expireTime, func(ctx context.Context) (*relationtb.ObjectModel, error) {
return g.objDB.Take(ctx, engine, name)
})
}
+8 -34
View File
@@ -22,6 +22,8 @@ import (
"strconv"
"time"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/protocol/constant"
@@ -31,8 +33,6 @@ import (
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
const (
@@ -59,7 +59,8 @@ type UserCache interface {
type UserCacheRedis struct {
metaCache
rdb redis.UniversalClient
rdb redis.UniversalClient
//userDB relationtb.UserModelInterface
userDB relationtb.UserModelInterface
expireTime time.Duration
rcClient *rockscache.Client
@@ -100,39 +101,13 @@ func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string {
}
func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.UserModel, err error) {
return getCache(
ctx,
u.rcClient,
u.getUserInfoKey(userID),
u.expireTime,
func(ctx context.Context) (*relationtb.UserModel, error) {
return u.userDB.Take(ctx, userID)
},
return getCache(ctx, u.rcClient, u.getUserInfoKey(userID), u.expireTime, func(ctx context.Context) (*relationtb.UserModel, error) {
return u.userDB.Take(ctx, userID)
},
)
}
func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) {
//var keys []string
//for _, userID := range userIDs {
// keys = append(keys, u.getUserInfoKey(userID))
//}
//return batchGetCache(
// ctx,
// u.rcClient,
// keys,
// u.expireTime,
// func(user *relationtb.UserModel, keys []string) (int, error) {
// for i, key := range keys {
// if key == u.getUserInfoKey(user.UserID) {
// return i, nil
// }
// }
// return 0, errIndex
// },
// func(ctx context.Context) ([]*relationtb.UserModel, error) {
// return u.userDB.Find(ctx, userIDs)
// },
//)
return batchGetCache2(ctx, u.rcClient, u.expireTime, userIDs, func(userID string) string {
return u.getUserInfoKey(userID)
}, func(ctx context.Context, userID string) (*relationtb.UserModel, error) {
@@ -214,8 +189,7 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu
UserIDNum := crc32.ChecksumIEEE([]byte(userID))
modKey := strconv.Itoa(int(UserIDNum % statusMod))
key := olineStatusKey + modKey
log.ZDebug(ctx, "SetUserStatus args", "userID", userID, "status", status,
"platformID", platformID, "modKey", modKey, "key", key)
log.ZDebug(ctx, "SetUserStatus args", "userID", userID, "status", status, "platformID", platformID, "modKey", modKey, "key", key)
isNewKey, err := u.rdb.Exists(ctx, key).Result()
if err != nil {
return errs.Wrap(err)