This commit is contained in:
wangchuxiao
2023-01-31 18:55:22 +08:00
parent 467d44adc6
commit 4c931fba5c
11 changed files with 491 additions and 181 deletions
+297 -15
View File
@@ -1,36 +1,92 @@
package cache
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"math/big"
"sort"
"strconv"
"sync"
"time"
)
const GroupExpireTime = time.Second * 60 * 60 * 12
const groupInfoCacheKey = "GROUP_INFO_CACHE:"
const (
groupExpireTime = time.Second * 60 * 60 * 12
groupInfoKey = "GROUP_INFO:"
groupMemberIDsKey = "GROUP_MEMBER_IDS:"
groupMembersHashKey = "GROUP_MEMBERS_HASH:"
groupMemberInfoKey = "GROUP_MEMBER_INFO:"
joinedSuperGroupsKey = "JOIN_SUPER_GROUPS:"
joinedGroupsKey = "JOIN_GROUPS_KEY:"
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
)
type GroupCache struct {
group *relation.Group
groupMember *relation.GroupMember
groupRequest *relation.GroupRequest
mongoDB *unrelation.SuperGroupMgoDB
expireTime time.Duration
redisClient *RedisClient
rcClient *rockscache.Client
//local cache
cacheGroupMtx sync.RWMutex
cacheGroupMemberUserIDs map[string]*GroupMemberIDsHash
}
func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.Group, groupMemberDB *relation.GroupMember, groupRequestDB *relation.GroupRequest, opts rockscache.Options) *GroupCache {
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb)}
type GroupMemberIDsHash struct {
MemberListHash uint64
UserIDs []string
}
func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.Group, groupMemberDB *relation.GroupMember, groupRequestDB *relation.GroupRequest, mongoClient *unrelation.SuperGroupMgoDB, opts rockscache.Options) *GroupCache {
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime,
group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb),
mongoDB: mongoClient, cacheGroupMemberUserIDs: make(map[string]*GroupMemberIDsHash, 0),
}
}
func (g *GroupCache) getRedisClient() *RedisClient {
return g.redisClient
}
func (g *GroupCache) getGroupInfoKey(groupID string) string {
return groupInfoKey + groupID
}
func (g *GroupCache) getJoinedSuperGroupsIDKey(userID string) string {
return joinedSuperGroupsKey + userID
}
func (g *GroupCache) getJoinedGroupsKey(userID string) string {
return joinedGroupsKey + userID
}
func (g *GroupCache) getGroupMembersHashKey(groupID string) string {
return groupMembersHashKey + groupID
}
func (g *GroupCache) getGroupMemberIDsKey(groupID string) string {
return groupMemberIDsKey + groupID
}
func (g *GroupCache) getGroupMemberInfoKey(groupID, userID string) string {
return groupMemberInfoKey + groupID + "-" + userID
}
func (g *GroupCache) getGroupMemberNumKey(groupID string) string {
return groupMemberNumKey + groupID
}
/// groupInfo
func (g *GroupCache) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
for _, groupID := range groupIDs {
group, err := g.GetGroupInfo(ctx, groupID)
@@ -58,9 +114,9 @@ func (g *GroupCache) GetGroupInfo(ctx context.Context, groupID string) (group *r
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group)
}()
groupStr, err := g.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup)
groupStr, err := g.rcClient.Fetch(g.getGroupInfoKey(groupID), g.expireTime, getGroup)
if err != nil {
return nil, utils.Wrap(err, "")
return nil, err
}
err = json.Unmarshal([]byte(groupStr), group)
return group, utils.Wrap(err, "")
@@ -70,7 +126,7 @@ func (g *GroupCache) DelGroupInfo(ctx context.Context, groupID string) (err erro
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID))
return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID))
}
func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
@@ -82,21 +138,247 @@ func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error
return nil
}
func (g *GroupCache) getGroupInfoCacheKey(groupID string) string {
return groupInfoCacheKey + groupID
}
func (g *GroupCache) DelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
// userJoinSuperGroup
func (g *GroupCache) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
for _, userID := range userIDs {
if err := g.rcClient.TagAsDeleted(joinedSuperGroupListCache + userID); err != nil {
if err := g.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
return err
}
}
return nil
}
func (g *GroupCache) DelJoinedSuperGroupID(ctx context.Context, userID string) (err error) {
func (g *GroupCache) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return g.rcClient.TagAsDeleted(joinedSuperGroupListCache + userID)
return g.rcClient.TagAsDeleted(g.getJoinedSuperGroupsIDKey(userID))
}
func (g *GroupCache) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
getJoinedSuperGroupIDList := func() (string, error) {
userToSuperGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(userToSuperGroup.GroupIDList)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedSuperGroupIDs", joinedSuperGroupIDs)
}()
joinedSuperGroupListStr, err := g.rcClient.Fetch(g.getJoinedSuperGroupsIDKey(userID), time.Second*30*60, getJoinedSuperGroupIDList)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(joinedSuperGroupListStr), &joinedSuperGroupIDs)
return joinedSuperGroupIDs, utils.Wrap(err, "")
}
// groupMembersHash
func (g *GroupCache) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
generateHash := func() (string, error) {
groupInfo, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
return "", err
}
if groupInfo.Status == constant.GroupStatusDismissed {
return "0", nil
}
groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return "", err
}
sort.Strings(groupMemberIDList)
var all string
for _, v := range groupMemberIDList {
all += v
}
bi := big.NewInt(0)
bi.SetString(utils.Md5(all)[0:8], 16)
return strconv.Itoa(int(bi.Uint64())), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "hashCodeUint64", hashCodeUint64)
}()
hashCodeStr, err := g.rcClient.Fetch(g.getGroupMembersHashKey(groupID), time.Second*30*60, generateHash)
if err != nil {
return 0, utils.Wrap(err, "fetch failed")
}
hashCode, err := strconv.Atoi(hashCodeStr)
return uint64(hashCode), err
}
func (g *GroupCache) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
return g.rcClient.TagAsDeleted(g.getGroupMembersHashKey(groupID))
}
// groupMemberIDs
// from redis
func (g *GroupCache) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
f := func() (string, error) {
groupInfo, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
return "", err
}
var groupMemberIDList []string
if groupInfo.GroupType == constant.SuperGroup {
superGroup, err := g.mongoDB.GetSuperGroup(ctx, groupID)
if err != nil {
return "", err
}
groupMemberIDList = superGroup.MemberIDList
} else {
groupMemberIDList, err = relation.GetGroupMemberIDListByGroupID(groupID)
if err != nil {
return "", err
}
}
bytes, err := json.Marshal(groupMemberIDList)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMemberIDList", groupMemberIDs)
}()
groupIDListStr, err := g.rcClient.Fetch(g.getGroupMemberIDsKey(groupID), time.Second*30*60, f)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(groupIDListStr), &groupMemberIDs)
return groupMemberIDs, nil
}
func (g *GroupCache) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID))
}
// from local map
func (g *GroupCache) LocalGetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
remoteHash, err := g.GetGroupMembersHash(ctx, groupID)
if err != nil {
g.cacheGroupMtx.Lock()
defer g.cacheGroupMtx.Unlock()
delete(g.cacheGroupMemberUserIDs, groupID)
return nil, err
}
g.cacheGroupMtx.Lock()
defer g.cacheGroupMtx.Unlock()
if remoteHash == 0 {
delete(g.cacheGroupMemberUserIDs, groupID)
return []string{}, nil
}
localCache, ok := g.cacheGroupMemberUserIDs[groupID]
if ok && localCache.MemberListHash == remoteHash {
return localCache.UserIDs, nil
}
groupMemberIDsRemote, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
g.cacheGroupMemberUserIDs[groupID] = &GroupMemberIDsHash{
MemberListHash: remoteHash,
UserIDs: groupMemberIDsRemote,
}
return groupMemberIDsRemote, nil
}
// JoinedGroups
func (g *GroupCache) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
getJoinedGroupIDList := func() (string, error) {
joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(joinedGroupList)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
}()
joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
return joinedGroupIDs, utils.Wrap(err, "")
}
func (g *GroupCache) DelJoinedGroupIDs(ctx context.Context, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID))
}
// GetGroupMemberInfo
func (g *GroupCache) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relation.GroupMember, err error) {
getGroupMemberInfo := func() (string, error) {
groupMemberInfo, err := relation.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(groupMemberInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember)
}()
groupMemberInfoStr, err := g.rcClient.Fetch(g.getGroupMemberInfoKey(groupID, userID), time.Second*30*60, getGroupMemberInfo)
if err != nil {
return nil, err
}
groupMember = &relation.GroupMember{}
err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember)
return groupMember, utils.Wrap(err, "")
}
func (g *GroupCache) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID)
}()
return g.rcClient.TagAsDeleted(g.getGroupMemberInfoKey(groupID, userID))
}
// groupMemberNum
func (g *GroupCache) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
getGroupMemberNum := func() (string, error) {
num, err := relation.GetGroupMemberNumByGroupID(groupID)
if err != nil {
return "", err
}
return strconv.Itoa(int(num)), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
}()
groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
if err != nil {
return 0, err
}
return strconv.Atoi(groupMember)
}
func (g *GroupCache) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID))
}
+18 -16
View File
@@ -4,6 +4,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/mongo"
"Open_IM/pkg/common/db/mysql"
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
@@ -19,31 +20,32 @@ import (
)
const (
userInfoCache = "USER_INFO_CACHE:"
//userInfoCache = "USER_INFO_CACHE:"
friendRelationCache = "FRIEND_RELATION_CACHE:"
blackListCache = "BLACK_LIST_CACHE:"
groupCache = "GROUP_CACHE:"
//groupCache = "GROUP_CACHE:"
//groupInfoCache = "GROUP_INFO_CACHE:"
groupOwnerIDCache = "GROUP_OWNER_ID:"
joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:"
groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:"
groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:"
allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:"
joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:"
groupMemberListHashCache = "GROUP_MEMBER_LIST_HASH_CACHE:"
groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:"
conversationCache = "CONVERSATION_CACHE:"
conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:"
extendMsgSetCache = "EXTEND_MSG_SET_CACHE:"
extendMsgCache = "EXTEND_MSG_CACHE:"
//groupOwnerIDCache = "GROUP_OWNER_ID:"
//joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:"
//groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:"
//groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:"
allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:"
//joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:"
//groupMemberListHashCache = "GROUP_MEMBER_LIST_HASH_CACHE:"
//groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:"
conversationCache = "CONVERSATION_CACHE:"
conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:"
extendMsgSetCache = "EXTEND_MSG_SET_CACHE:"
extendMsgCache = "EXTEND_MSG_CACHE:"
)
const scanCount = 3000
const RandomExpireAdjustment = 0.2
func (rc *RcClient) DelKeys() {
for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCacheKey, groupOwnerIDCache, joinedGroupListCache,
groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} {
for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache,
groupMemberInfoCache, groupAllMemberInfoCache, "ALL_FRIEND_INFO_CACHE:"} {
fName := utils.GetSelfFuncName()
var cursor uint64
var n int
+90
View File
@@ -0,0 +1,90 @@
package cache
import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"time"
)
const (
UserExpireTime = time.Second * 60 * 60 * 12
userInfoKey = "USER_INFO:"
)
type UserCache struct {
userDB *relation.User
expireTime time.Duration
redisClient *RedisClient
rcClient *rockscache.Client
}
func NewUserCache(rdb redis.UniversalClient, userDB *relation.User, options rockscache.Options) *UserCache {
return &UserCache{
userDB: userDB,
expireTime: UserExpireTime,
redisClient: NewRedisClient(rdb),
rcClient: rockscache.NewClient(rdb, options),
}
}
func (u *UserCache) getUserInfoKey(userID string) string {
return userInfoKey + userID
}
func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *relation.User, err error) {
getUserInfo := func() (string, error) {
userInfo, err := u.userDB.Take(ctx, userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(userInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "userInfo", *userInfo)
}()
userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), time.Second*30*60, getUserInfo)
if err != nil {
return nil, err
}
userInfo = &relation.User{}
err = json.Unmarshal([]byte(userInfoStr), userInfo)
return userInfo, utils.Wrap(err, "")
}
func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relation.User, error) {
var users []*relation.User
for _, userID := range userIDs {
user, err := GetUserInfoFromCache(ctx, userID)
if err != nil {
return nil, err
}
users = append(users, user)
}
return users, nil
}
func (u *UserCache) DelUserInfo(ctx context.Context, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID) + userID)
}
func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err error) {
for _, userID := range userIDs {
if err := u.DelUserInfo(ctx, userID); err != nil {
return err
}
}
return nil
}