This commit is contained in:
wangchuxiao
2023-03-23 19:02:20 +08:00
parent 3e15014e0a
commit 132e1b987c
33 changed files with 818 additions and 785 deletions
+28 -11
View File
@@ -2,10 +2,11 @@ package cache
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
"time"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"time"
)
const (
@@ -16,21 +17,35 @@ const (
// args fn will exec when no data in cache
type BlackCache interface {
//get blackIDs from cache
GetBlackIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) (blackIDs []string, err error)
metaCache
NewCache() BlackCache
GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error)
//del user's blackIDs cache, exec when a user's black list changed
DelBlackIDs(ctx context.Context, userID string) (err error)
DelBlackIDs(ctx context.Context, userID string) BlackCache
}
type BlackCacheRedis struct {
metaCache
expireTime time.Duration
rcClient *rockscache.Client
black *relation.BlackGorm
blackDB relationTb.BlackModelInterface
}
func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB BlackCache, options rockscache.Options) *BlackCacheRedis {
func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB relationTb.BlackModelInterface, options rockscache.Options) BlackCache {
rcClient := rockscache.NewClient(rdb, options)
return &BlackCacheRedis{
expireTime: blackExpireTime,
rcClient: rockscache.NewClient(rdb, options),
rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient),
blackDB: blackDB,
}
}
func (b *BlackCacheRedis) NewCache() BlackCache {
return &BlackCacheRedis{
expireTime: b.expireTime,
rcClient: b.rcClient,
blackDB: b.blackDB,
}
}
@@ -39,11 +54,13 @@ func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
}
func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) {
return GetCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) {
return b.black.FindBlackUserIDs(ctx, userID)
return getCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) {
return b.blackDB.FindBlackUserIDs(ctx, userID)
})
}
func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) (err error) {
return b.rcClient.TagAsDeleted(b.getBlackIDsKey(userID))
func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache {
cache := b.NewCache()
cache.AddKeys(userID)
return cache
}
+163 -229
View File
@@ -2,282 +2,216 @@ package cache
import (
"context"
"errors"
"math/big"
"strings"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"time"
)
const (
conversationKey = "CONVERSATION:"
conversationIDsKey = "CONVERSATION_IDS:"
recvMsgOptKey = "RECV_MSG_OPT:"
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
conversationExpireTime = time.Second * 60 * 60 * 12
)
conversationKey = "CONVERSATION:"
conversationIDsKey = "CONVERSATION_IDS:"
recvMsgOptKey = "RECV_MSG_OPT:"
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
type FuncDB func() (string, error)
conversationExpireTime = time.Second * 60 * 60 * 12
)
// arg fn will exec when no data in cache
type ConversationCache interface {
metaCache
NewCache() ConversationCache
// get user's conversationIDs from cache
GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error)
// del user's conversationIDs from cache, call when a user add or reduce a conversation
DelUserConversationIDs(ctx context.Context, userID string) error
DelUsersConversationIDs(ctx context.Context, userIDList []string) error
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
DelConversationIDs(userIDs []string) ConversationCache
// get one conversation from cache
GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error)
GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)
DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache
DelUsersConversation(ownerUserIDs []string, conversationID string) ConversationCache
// get one conversation from cache
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error)
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
// get one user's all conversations from cache
GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error)
// del one conversation from cache, call when one user's conversation Info changed
DelConversation(ctx context.Context, ownerUserID, conversationID string) error
DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error
DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error
GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error)
// get user conversation recv msg from cache
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)) (opt int, err error)
// del user recv msg opt from cache, call when user's conversation recv msg opt changed
DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache
// get one super group recv msg but do not notification userID list
GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error)
// del one super group recv msg but do not notification userID list, call it when this list changed
DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) error
//GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error)
//DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string)
GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error)
// get one super group recv msg but do not notification userID list hash
GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
}
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options) ConversationCache {
return &ConversationRedis{rcClient: rockscache.NewClient(rdb, opts)}
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationTb.ConversationModelInterface) ConversationCache {
rcClient := rockscache.NewClient(rdb, opts)
return &ConversationRedisCache{rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), conversationDB: db, expireTime: conversationExpireTime}
}
type ConversationRedis struct {
rcClient *rockscache.Client
expireTime time.Duration
type ConversationRedisCache struct {
metaCache
rcClient *rockscache.Client
conversationDB relationTb.ConversationModelInterface
expireTime time.Duration
}
func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error) {
return nil, nil
func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) ConversationCache {
rcClient := rockscache.NewClient(rdb, options)
return &ConversationRedisCache{rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), conversationDB: conversationDB, expireTime: conversationExpireTime}
}
func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error) {
//TODO implement me
panic("implement me")
func (c *ConversationRedisCache) NewCache() ConversationCache {
return &ConversationRedisCache{rcClient: c.rcClient, metaCache: c.metaCache, conversationDB: c.conversationDB, expireTime: c.expireTime}
}
func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (opt int, err error)) (opt int, err error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error {
panic("implement me")
}
func (c *ConversationRedis) DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error {
panic("implement me")
}
func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationRedis {
return &ConversationRedis{rcClient: rockscache.NewClient(rdb, options)}
}
func (c *ConversationRedis) getConversationKey(ownerUserID, conversationID string) string {
func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string {
return conversationKey + ownerUserID + ":" + conversationID
}
func (c *ConversationRedis) getConversationIDsKey(ownerUserID string) string {
func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string {
return conversationIDsKey + ownerUserID
}
func (c *ConversationRedis) getRecvMsgOptKey(ownerUserID, conversationID string) string {
return recvMsgOptKey + ownerUserID + ":" + conversationID
}
func (c *ConversationRedis) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsKey + groupID
}
//func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) {
// //getConversationIDs := func() (string, error) {
// // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID)
// // if err != nil {
// // return "", err
// // }
// // bytes, err := json.Marshal(conversationIDs)
// // if err != nil {
// // return "", utils.Wrap(err, "")
// // }
// // return string(bytes), nil
// //}
// //defer func() {
// // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs)
// //}()
// //conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs)
// //err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs)
// //if err != nil {
// // return nil, utils.Wrap(err, "")
// //}
// //return conversationIDs, nil
// return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) {
// panic("implement me")
// })
//}
func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string {
return recvMsgOptKey + ownerUserID + ":" + conversationID
}
func (c *ConversationRedis) GetUserConversationIDs1(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) {
//getConversationIDs := func() (string, error) {
// conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID)
// if err != nil {
// return "", err
// }
// bytes, err := json.Marshal(conversationIDs)
// if err != nil {
// return "", utils.Wrap(err, "")
// }
// return string(bytes), nil
//}
//defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs)
//}()
//conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs)
//err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs)
//if err != nil {
// return nil, utils.Wrap(err, "")
//}
//return conversationIDs, nil
//return GetCache1[[]string](c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, fn)
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID
}
return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) {
panic("")
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
})
}
//func GetCache1[T any](rcClient *rockscache.Client, key string, expire time.Duration, fn func() (any, error)) (T, error) {
// v, err := rcClient.Fetch(key, expire, func() (string, error) {
// v, err := fn()
// if err != nil {
// return "", err
// }
// bs, err := json.Marshal(v)
// if err != nil {
// return "", utils.Wrap(err, "")
// }
// return string(bs), nil
// })
// var t T
// if err != nil {
// return t, err
// }
// err = json.Unmarshal([]byte(v), &t)
// if err != nil {
// return t, utils.Wrap(err, "")
// }
// return t, nil
//}
func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUserID string) (err error) {
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err")
func (c *ConversationRedisCache) DelConversationIDs(userIDs []string) ConversationCache {
var keys []string
for _, userID := range userIDs {
keys = append(keys, c.getConversationIDsKey(userID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
//func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) {
// return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) {
// panic("implement me")
// })
//}
func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) {
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err")
func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) {
return getCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) {
return c.conversationDB.Take(ctx, ownerUserID, conversationID)
})
}
//func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relationTb.ConversationModel, err error) {
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations)
// }()
// for _, conversationID := range conversationIDs {
// conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
// if err != nil {
// return nil, err
// }
// conversations = append(conversations, *conversation)
// }
// return conversations, nil
//}
//func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relationTb.ConversationModel, err error) {
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations)
// }()
// IDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
// if err != nil {
// return nil, err
// }
// var conversationIDs []relationTb.ConversationModel
// for _, conversationID := range IDs {
// conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
// if err != nil {
// return nil, err
// }
// conversationIDs = append(conversationIDs, *conversation)
// }
// return conversationIDs, nil
//}
//func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
// //getConversation := func() (string, error) {
// // conversation, err := relation.GetConversation(ownerUserID, conversationID)
// // if err != nil {
// // return "", err
// // }
// // return strconv.Itoa(int(conversation.RecvMsgOpt)), nil
// //}
// //defer func() {
// // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt)
// //}()
// //optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation)
// //if err != nil {
// // return 0, err
// //}
// //return strconv.Atoi(optStr)
// // panic("implement me")
// return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) {
// panic("implement me")
// })
//}
func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error {
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed")
func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs []string) ConversationCache {
var keys []string
for _, conversationID := range convsersationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) {
panic("implement me")
}
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) {
panic("implement me")
}
func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) {
func (c *ConversationRedisCache) GetConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) {
key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
for _i, _key := range keys {
if _key == key {
return _i, nil
}
}
return 0, errors.New("not found key:" + key + " in keys")
}
func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
var keys []string
for _, conversarionID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
}
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) {
return c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
})
}
func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
var keys []string
for _, conversarionID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
}
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) {
return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID)
})
}
func (c *ConversationRedisCache) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ConversationCache {
var keys []string
for _, conversationID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) {
return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
})
}
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) {
return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
})
}
func (c *ConversationRedisCache) DelUsersConversation(ownerUserIDs []string, conversationID string) ConversationCache {
var keys []string
for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache {
cache := c.NewCache()
cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID))
return cache
}
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ConversationCache {
cache := c.NewCache()
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID))
return cache
}
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) {
return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) {
userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
return 0, err
}
utils.Sort(userIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
return bi.Uint64(), nil
})
}
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) {
panic("implement me")
}
+43 -35
View File
@@ -2,10 +2,11 @@ package cache
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/dtm-labs/rockscache"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
)
const (
@@ -13,44 +14,51 @@ const (
extendMsgCache = "EXTEND_MSG_CACHE:"
)
type ExtendMsgSetCache struct {
expireTime time.Duration
rcClient *rockscache.Client
type ExtendMsgSetCache interface {
metaCache
NewCache() ExtendMsgSetCache
GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error)
DelExtendMsg(clientMsgID string) ExtendMsgSetCache
}
func (e *ExtendMsgSetCache) getKey(clientMsgID string) string {
type ExtendMsgSetCacheRedis struct {
metaCache
expireTime time.Duration
rcClient *rockscache.Client
extendMsgSetDB unrelation.ExtendMsgSetModelInterface
}
func NewExtendMsgSetCacheRedis(rdb redis.UniversalClient, extendMsgSetDB unrelation.ExtendMsgSetModelInterface, options rockscache.Options) ExtendMsgSetCache {
rcClient := rockscache.NewClient(rdb, options)
return &ExtendMsgSetCacheRedis{
metaCache: NewMetaCacheRedis(rcClient),
expireTime: time.Second * 30 * 60,
extendMsgSetDB: extendMsgSetDB,
rcClient: rcClient,
}
}
func (e *ExtendMsgSetCacheRedis) NewCache() ExtendMsgSetCache {
return &ExtendMsgSetCacheRedis{
metaCache: e.metaCache,
expireTime: e.expireTime,
extendMsgSetDB: e.extendMsgSetDB,
rcClient: e.rcClient,
}
}
func (e *ExtendMsgSetCacheRedis) getKey(clientMsgID string) string {
return extendMsgCache + clientMsgID
}
func (e *ExtendMsgSetCache) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) {
//getExtendMsg := func() (string, error) {
// extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime)
// if err != nil {
// return "", utils.Wrap(err, "GetExtendMsgList failed")
// }
// bytes, err := json.Marshal(extendMsg)
// if err != nil {
// return "", utils.Wrap(err, "Marshal failed")
// }
// return string(bytes), nil
//}
//defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType",
// sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg)
//}()
//extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg)
//if err != nil {
// return nil, utils.Wrap(err, "Fetch failed")
//}
//extendMsg = &mongoDB.ExtendMsg{}
//err = json.Unmarshal([]byte(extendMsgStr), extendMsg)
//return extendMsg, utils.Wrap(err, "Unmarshal failed")
return GetCache(ctx, e.rcClient, e.getKey(clientMsgID), e.expireTime, func(ctx context.Context) (*unrelation.ExtendMsgModel, error) {
panic("")
func (e *ExtendMsgSetCacheRedis) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) {
return getCache(ctx, e.rcClient, e.getKey(clientMsgID), e.expireTime, func(ctx context.Context) (*unrelation.ExtendMsgModel, error) {
return e.extendMsgSetDB.TakeExtendMsg(ctx, sourceID, sessionType, clientMsgID, firstModifyTime)
})
}
func (e *ExtendMsgSetCache) DelExtendMsg(ctx context.Context, clientMsgID string) (err error) {
return utils.Wrap(e.rcClient.TagAsDeleted(e.getKey(clientMsgID)), "DelExtendMsg err")
func (e *ExtendMsgSetCacheRedis) DelExtendMsg(clientMsgID string) ExtendMsgSetCache {
new := e.NewCache()
new.AddKeys(e.getKey(clientMsgID))
return new
}
+40 -36
View File
@@ -2,13 +2,12 @@ package cache
import (
"context"
"encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
"time"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"time"
)
const (
@@ -20,29 +19,38 @@ const (
// args fn will exec when no data in cache
type FriendCache interface {
GetFriendIDs(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserID string) (friendIDs []string, err error)) (friendIDs []string, err error)
metaCache
NewCache() FriendCache
GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error)
// call when friendID List changed
DelFriendIDs(ctx context.Context, ownerUserID string) (err error)
DelFriendIDs(ownerUserID ...string) FriendCache
// get single friendInfo from cache
GetFriend(ctx context.Context, ownerUserID, friendUserID string, fn func(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)) (friend *relationTb.FriendModel, err error)
GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)
// del friend when friend info changed
DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error)
DelFriend(ownerUserID, friendUserID string) FriendCache
}
type FriendCacheRedis struct {
friendDB *relation.FriendGorm
metaCache
friendDB relationTb.FriendModelInterface
expireTime time.Duration
rcClient *rockscache.Client
}
func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB *relation.FriendGorm, options rockscache.Options) *FriendCacheRedis {
func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationTb.FriendModelInterface, options rockscache.Options) FriendCache {
rcClient := rockscache.NewClient(rdb, options)
return &FriendCacheRedis{
metaCache: NewMetaCacheRedis(rcClient),
friendDB: friendDB,
expireTime: friendExpireTime,
rcClient: rockscache.NewClient(rdb, options),
rcClient: rcClient,
}
}
func (c *FriendCacheRedis) NewCache() FriendCache {
return &FriendCacheRedis{rcClient: c.rcClient, metaCache: c.metaCache, friendDB: c.friendDB, expireTime: c.expireTime}
}
func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string {
return friendIDsKey + ownerUserID
}
@@ -56,15 +64,22 @@ func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string
}
func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
return GetCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
return f.friendDB.FindFriendUserIDs(ctx, ownerUserID)
})
}
func (f *FriendCacheRedis) DelFriendIDs(ctx context.Context, ownerUserID string) (err error) {
return f.rcClient.TagAsDeleted(f.getFriendIDsKey(ownerUserID))
func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache {
new := f.NewCache()
var keys []string
for _, userID := range ownerUserID {
keys = append(keys, f.getFriendIDsKey(userID))
}
new.AddKeys(keys...)
return new
}
// todo
func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) {
friendIDs, err := f.GetFriendIDs(ctx, ownerUserID)
if err != nil {
@@ -82,31 +97,20 @@ func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID s
return twoWayFriendIDs, nil
}
func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) (err error) {
return f.rcClient.TagAsDeleted(f.getTwoWayFriendsIDsKey(ownerUserID))
func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) FriendCache {
new := f.NewCache()
new.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID))
return new
}
func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string, fn func(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)) (friend *relationTb.FriendModel, err error) {
getFriend := func() (string, error) {
friend, err = f.friendDB.Take(ctx, ownerUserID, friendUserID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(friend)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
friendStr, err := f.rcClient.Fetch(f.getFriendKey(ownerUserID, friendUserID), f.expireTime, getFriend)
if err != nil {
return nil, err
}
friend = &relationTb.FriendModel{}
err = json.Unmarshal([]byte(friendStr), friend)
return friend, utils.Wrap(err, "")
func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error) {
return getCache(ctx, f.rcClient, f.getFriendKey(ownerUserID, friendUserID), f.expireTime, func(ctx context.Context) (*relationTb.FriendModel, error) {
return f.friendDB.Take(ctx, ownerUserID, friendUserID)
})
}
func (f *FriendCacheRedis) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) {
return f.rcClient.TagAsDeleted(f.getFriendKey(ownerUserID, friendUserID))
func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache {
new := f.NewCache()
new.AddKeys(f.getFriendKey(ownerUserID, friendUserID))
return new
}
+178 -181
View File
@@ -2,14 +2,15 @@ package cache
import (
"context"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
unrelation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"math/big"
"strings"
"time"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
unrelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
)
const (
@@ -24,40 +25,54 @@ const (
)
type GroupCache interface {
metaCache
NewCache() GroupCache
GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)
GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error)
DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error)
GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error)
GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error)
GetGroupMemberHash1(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error)
DelGroupMembersHash(ctx context.Context, groupID string) (err error)
DelJoinedSuperGroupIDs(userIDs ...string) GroupCache
GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error)
GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error)
DelGroupMembersHash(groupID string) GroupCache
GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error)
DelGroupMemberIDs(ctx context.Context, groupID string) (err error)
DelJoinedGroupID(ctx context.Context, userID string) (err error)
GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (groupMemberIDs map[string][]string, err error)
DelGroupMemberIDs(groupID string) GroupCache
DelJoinedGroupID(userID ...string) GroupCache
GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error)
DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error)
DelGroupMemberNum(ctx context.Context, groupID string) (err error)
DelGroupInfo(ctx context.Context, groupID string) (err error)
DelGroupsInfo(ctx context.Context, groupIDs []string) error
GetGroupMembersInfo(ctx context.Context, groupID string, userID []string, roleLevel []int32) (groupMembers []*relationTb.GroupMemberModel, err error)
DelGroupMembersInfo(groupID string, userID ...string) GroupCache
GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error)
DelGroupsMemberNum(groupID ...string) GroupCache
DelGroupsInfo(groupIDs ...string) GroupCache
}
type GroupCacheRedis struct {
group relationTb.GroupModelInterface
groupMember relationTb.GroupMemberModelInterface
groupRequest relationTb.GroupRequestModelInterface
mongoDB unrelation2.SuperGroupModelInterface
expireTime time.Duration
rcClient *rockscache.Client
metaCache
groupDB relationTb.GroupModelInterface
groupMemberDB relationTb.GroupMemberModelInterface
groupRequestDB relationTb.GroupRequestModelInterface
mongoDB unrelationTb.SuperGroupModelInterface
expireTime time.Duration
rcClient *rockscache.Client
}
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelation2.SuperGroupModelInterface, opts rockscache.Options) GroupCache {
return &GroupCacheRedis{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime,
group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB,
mongoDB: mongoClient,
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelationTb.SuperGroupModelInterface, opts rockscache.Options) GroupCache {
rcClient := rockscache.NewClient(rdb, opts)
return &GroupCacheRedis{rcClient: rcClient, expireTime: groupExpireTime,
groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB,
mongoDB: mongoClient, metaCache: NewMetaCacheRedis(rcClient),
}
}
func (g *GroupCacheRedis) NewCache() GroupCache {
return &GroupCacheRedis{rcClient: g.rcClient, expireTime: g.expireTime, groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, mongoDB: g.mongoDB, metaCache: g.metaCache}
}
func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
return groupInfoKey + groupID
}
@@ -86,35 +101,66 @@ func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string {
return groupMemberNumKey + groupID
}
func (g *GroupCacheRedis) GetGroupIndex(group *relationTb.GroupModel, keys []string) (int, error) {
key := g.getGroupInfoKey(group.GroupID)
for i, _key := range keys {
if _key == key {
return i, nil
}
}
return 0, errIndex
}
func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationTb.GroupMemberModel, keys []string) (int, error) {
key := g.getGroupMemberInfoKey(groupMember.GroupID, groupMember.UserID)
for i, _key := range keys {
if _key == key {
return i, nil
}
}
return 0, errIndex
}
// / groupInfo
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
return GetCacheFor(ctx, groupIDs, func(ctx context.Context, groupID string) (*relationTb.GroupModel, error) {
return g.GetGroupInfo(ctx, groupID)
var keys []string
for _, group := range groupIDs {
keys = append(keys, g.getGroupInfoKey(group))
}
return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupIndex, func(ctx context.Context) ([]*relationTb.GroupModel, error) {
return g.groupDB.Find(ctx, groupIDs)
})
}
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
return GetCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) {
return g.group.Take(ctx, groupID)
return getCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) {
return g.groupDB.Take(ctx, groupID)
})
}
// userJoinSuperGroup
func (g *GroupCacheRedis) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
for _, userID := range userIDs {
if err := g.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
return err
}
func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache {
new := g.NewCache()
var keys []string
for _, groupID := range groupIDs {
keys = append(keys, g.getGroupInfoKey(groupID))
}
return nil
new.AddKeys(keys...)
return new
}
func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) {
return g.rcClient.TagAsDeleted(g.getJoinedSuperGroupsIDKey(userID))
// userJoinSuperGroup
func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache {
new := g.NewCache()
var keys []string
for _, userID := range userIDs {
keys = append(keys, g.getJoinedSuperGroupsIDKey(userID))
}
new.AddKeys(keys...)
return new
}
func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
return GetCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
if err != nil {
return nil, err
@@ -124,8 +170,8 @@ func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID str
}
// groupMembersHash
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
return GetCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) {
return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return 0, err
@@ -137,174 +183,125 @@ func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID strin
})
}
func (g *GroupCacheRedis) GetGroupMemberHash1(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
// todo
mapGroupUserIDs, err := g.groupMember.FindJoinUserID(ctx, groupIDs)
if err != nil {
return nil, err
}
func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
res := make(map[string]*relationTb.GroupSimpleUserID)
for _, groupID := range groupIDs {
userIDs := mapGroupUserIDs[groupID]
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
users := &relationTb.GroupSimpleUserID{}
if len(userIDs) > 0 {
utils.Sort(userIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
users.Hash = bi.Uint64()
users.MemberNum = uint32(len(userIDs))
}
res[groupID] = users
}
return res, nil
}
func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
return g.rcClient.TagAsDeleted(g.getGroupMembersHashKey(groupID))
func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache {
cache := g.NewCache()
cache.AddKeys(g.getGroupMembersHashKey(groupID))
return cache
}
// groupMemberIDs
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
return GetCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
return g.groupMember.FindMemberUserID(ctx, groupID)
return getCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindMemberUserID(ctx, groupID)
})
}
func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID))
func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) {
m := make(map[string][]string)
for _, groupID := range groupIDs {
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
m[groupID] = userIDs
}
return m, nil
}
//// JoinedGroups
//func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
// getJoinedGroupIDList := func() (string, error) {
// joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
// if err != nil {
// return "", err
// }
// bytes, err := json.Marshal(joinedGroupList)
// if err != nil {
// return "", utils.Wrap(err, "")
// }
// return string(bytes), nil
// }
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
// }()
// joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
// if err != nil {
// return nil, err
// }
// err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
// return joinedGroupIDs, utils.Wrap(err, "")
//}
func (g *GroupCacheRedis) DelJoinedGroupID(ctx context.Context, userID string) (err error) {
return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID))
func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) GroupCache {
cache := g.NewCache()
cache.AddKeys(g.getGroupMemberIDsKey(groupID))
return cache
}
//func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userIDs []string) (err error) {
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
// }()
// for _, userID := range userIDs {
// if err := g.DelJoinedGroupID(ctx, userID); err != nil {
// return err
// }
// }
// return nil
//}
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
return getCache(ctx, g.rcClient, g.getJoinedGroupsKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID)
})
}
func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache {
var keys []string
for _, userID := range userIDs {
keys = append(keys, g.getJoinedGroupsKey(userID))
}
cache := g.NewCache()
cache.AddKeys(keys...)
return cache
}
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
return GetCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationTb.GroupMemberModel, error) {
return g.groupMember.Take(ctx, groupID, userID)
return getCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationTb.GroupMemberModel, error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
})
}
//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID, userIDs []string) (groupMember *relationTb.GroupMemberModel, err error) {
//
// return nil, err
//}
//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
// }()
// groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
// if err != nil {
// return nil, err
// }
// if count < 0 || offset < 0 {
// return nil, nil
// }
// var groupMemberList []*relation.GroupMember
// var start, stop int32
// start = offset
// stop = offset + count
// l := int32(len(groupMemberIDList))
// if start > stop {
// return nil, nil
// }
// if start >= l {
// return nil, nil
// }
// if count != 0 {
// if stop >= l {
// stop = l
// }
// groupMemberIDList = groupMemberIDList[start:stop]
// } else {
// if l < 1000 {
// stop = l
// } else {
// stop = 1000
// }
// groupMemberIDList = groupMemberIDList[start:stop]
// }
// for _, userID := range groupMemberIDList {
// groupMember, err := g.GetGroupMemberInfo(ctx, groupID, userID)
// if err != nil {
// return
// }
// groupMembers = append(groupMembers, groupMember)
// }
// return groupMemberList, nil
//}
func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
return g.rcClient.TagAsDeleted(g.getGroupMemberInfoKey(groupID, userID))
}
// groupMemberNum
//func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
// getGroupMemberNum := func() (string, error) {
// num, err := relation.GetGroupMemberNumByGroupID(groupID)
// if err != nil {
// return "", err
// }
// return strconv.Itoa(int(num)), nil
// }
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
// }()
// groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
// if err != nil {
// return 0, err
// }
// return strconv.Atoi(groupMember)
//}
func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID))
}
func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) {
return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID))
}
func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
for _, groupID := range groupIDs {
if err := g.DelGroupInfo(ctx, groupID); err != nil {
return err
}
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string, roleLevel []int32) ([]*relationTb.GroupMemberModel, error) {
var keys []string
for _, userID := range userIDs {
keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
}
return nil
return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationTb.GroupMemberModel, error) {
return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, roleLevel)
})
}
func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID string) ([]*relationTb.GroupMemberModel, error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
var keys []string
for _, groupMemberID := range groupMemberIDs {
keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID))
}
return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationTb.GroupMemberModel, error) {
return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil)
})
}
func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache {
var keys []string
for _, userID := range userIDs {
keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
}
cache := g.NewCache()
cache.AddKeys(keys...)
return cache
}
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) {
return getCache(ctx, g.rcClient, g.getGroupMemberNumKey(groupID), g.expireTime, func(ctx context.Context) (int64, error) {
return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID)
})
}
func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache {
var keys []string
for _, groupID := range groupID {
keys = append(keys, g.getGroupMemberNumKey(groupID))
}
cache := g.NewCache()
cache.AddKeys(keys...)
return cache
}
+2 -1
View File
@@ -3,11 +3,12 @@ package cache
import (
"context"
"fmt"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw/specialerror"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/go-redis/redis/v8"
"time"
)
func NewRedis() (redis.UniversalClient, error) {
+4 -3
View File
@@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
@@ -13,8 +16,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"strconv"
"time"
"github.com/go-redis/redis/v8"
)
@@ -251,7 +252,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []
}
}
if len(failedMsgs) != 0 {
return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedMsgs, tracelog.GetOperationID(ctx)))
return len(failedMsgs), fmt.Errorf("set msg to cache failed, failed lists: %v, %s", failedMsgs, tracelog.GetOperationID(ctx))
}
_, err := pipe.Exec(ctx)
return 0, err
+72 -9
View File
@@ -3,13 +3,53 @@ package cache
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/dtm-labs/rockscache"
"time"
)
const scanCount = 3000
var errIndex = errors.New("err index")
type metaCache interface {
ExecDel(ctx context.Context) error
// delete key rapid
DeleteKey(ctx context.Context, key string) error
AddKeys(keys ...string)
GetPreDeleteKeys() []string
}
func NewMetaCacheRedis(rcClient *rockscache.Client) metaCache {
return &metaCacheRedis{rcClient: rcClient}
}
type metaCacheRedis struct {
rcClient *rockscache.Client
keys []string
}
func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
if len(m.keys) > 0 {
return m.rcClient.TagAsDeletedBatch2(ctx, m.keys)
}
return nil
}
func (m *metaCacheRedis) DeleteKey(ctx context.Context, key string) error {
return m.rcClient.TagAsDeleted2(ctx, key)
}
func (m *metaCacheRedis) AddKeys(keys ...string) {
m.keys = append(m.keys, keys...)
}
func (m *metaCacheRedis) GetPreDeleteKeys() []string {
return m.keys
}
func GetDefaultOpt() rockscache.Options {
opts := rockscache.NewDefaultOptions()
opts.StrongConsistency = true
@@ -17,10 +57,10 @@ func GetDefaultOpt() rockscache.Options {
return opts
}
func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
var t T
var write bool
v, err := rcClient.Fetch(key, expire, func() (s string, err error) {
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
t, err = fn(ctx)
if err != nil {
return "", err
@@ -45,14 +85,37 @@ func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
return t, nil
}
func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) {
rs := make([]T, 0, len(list))
for _, e := range list {
r, err := fn(ctx, e)
func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) {
var tArrays []T
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
values := make(map[int]string)
tArrays, err = fn(ctx)
if err != nil {
return nil, err
}
rs = append(rs, r)
for _, v := range tArrays {
index, err := keyIndexFn(v, keys)
if err != nil {
continue
}
bs, err := json.Marshal(v)
if err != nil {
return nil, utils.Wrap(err, "marshal failed")
}
values[index] = string(bs)
}
return values, nil
})
if err != nil {
return nil, err
}
return rs, nil
for _, v := range batchMap {
var t T
err = json.Unmarshal([]byte(v), &t)
if err != nil {
return nil, utils.Wrap(err, "unmarshal failed")
}
tArrays = append(tArrays, t)
}
return tArrays, nil
}
+61 -63
View File
@@ -2,14 +2,11 @@ package cache
import (
"context"
"encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
"time"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"strconv"
"time"
)
const (
@@ -19,21 +16,38 @@ const (
)
type UserCache interface {
metaCache
NewCache() UserCache
GetUserInfo(ctx context.Context, userID string) (userInfo *relationTb.UserModel, err error)
GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationTb.UserModel, error)
DelUsersInfo(userIDs []string) UserCache
GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error)
DelUsersGlobalRecvMsgOpt(userIDs []string) UserCache
}
type UserCacheRedis struct {
userDB *relation.UserGorm
metaCache
userDB relationTb.UserModelInterface
expireTime time.Duration
rcClient *rockscache.Client
rcClient *rockscache.Client
}
func NewUserCacheRedis(rdb redis.UniversalClient, userDB *relation.UserGorm, options rockscache.Options) *UserCacheRedis {
func NewUserCacheRedis(rdb redis.UniversalClient, userDB relationTb.UserModelInterface, options rockscache.Options) UserCache {
rcClient := rockscache.NewClient(rdb, options)
return &UserCacheRedis{
metaCache: NewMetaCacheRedis(rcClient),
userDB: userDB,
expireTime: userExpireTime,
rcClient: rockscache.NewClient(rdb, options),
rcClient: rcClient,
}
}
func (u *UserCacheRedis) NewCache() UserCache {
return &UserCacheRedis{
metaCache: u.metaCache,
userDB: u.userDB,
expireTime: u.expireTime,
rcClient: u.rcClient,
}
}
@@ -46,66 +60,50 @@ func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string {
}
func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationTb.UserModel, err error) {
getUserInfo := func() (string, error) {
userInfo, err := u.userDB.Take(ctx, userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(userInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserInfo)
if err != nil {
return nil, err
}
userInfo = &relationTb.UserModel{}
err = json.Unmarshal([]byte(userInfoStr), userInfo)
return userInfo, utils.Wrap(err, "")
return getCache(ctx, u.rcClient, u.getUserInfoKey(userID), u.expireTime, func(ctx context.Context) (*relationTb.UserModel, error) {
return u.userDB.Take(ctx, userID)
})
}
func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationTb.UserModel, error) {
var users []*relationTb.UserModel
//for _, userID := range userIDs {
// user, err := GetUserInfoFromCache(ctx, userID)
// if err != nil {
// return nil, err
// }
// users = append(users, user)
//}
return users, nil
}
func (u *UserCacheRedis) DelUserInfo(ctx context.Context, userID string) (err error) {
return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID))
}
func (u *UserCacheRedis) DelUsersInfo(ctx context.Context, userIDs []string) (err error) {
var keys []string
for _, userID := range userIDs {
if err := u.DelUserInfo(ctx, userID); err != nil {
return err
}
keys = append(keys, u.getUserInfoKey(userID))
}
return nil
return batchGetCache(ctx, u.rcClient, keys, u.expireTime, func(user *relationTb.UserModel, keys []string) (int, error) {
for i, key := range keys {
if key == u.getUserInfoKey(user.UserID) {
return i, nil
}
}
return 0, errIndex
}, func(ctx context.Context) ([]*relationTb.UserModel, error) {
return u.userDB.Find(ctx, userIDs)
})
}
func (u *UserCacheRedis) DelUsersInfo(userIDs []string) UserCache {
var keys []string
for _, userID := range userIDs {
keys = append(keys, u.getUserInfoKey(userID))
}
cache := u.NewCache()
cache.AddKeys(keys...)
return cache
}
func (u *UserCacheRedis) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
getUserGlobalRecvMsgOpt := func() (string, error) {
userInfo, err := u.userDB.Take(ctx, userID)
if err != nil {
return "", err
}
return strconv.Itoa(int(userInfo.GlobalRecvMsgOpt)), nil
}
optStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserGlobalRecvMsgOpt)
if err != nil {
return 0, err
}
return strconv.Atoi(optStr)
return getCache(ctx, u.rcClient, u.getUserGlobalRecvMsgOptKey(userID), u.expireTime, func(ctx context.Context) (int, error) {
return u.userDB.GetUserGlobalRecvMsgOpt(ctx, userID)
})
}
func (u *UserCacheRedis) DelUserGlobalRecvMsgOpt(ctx context.Context, userID string) (err error) {
return u.rcClient.TagAsDeleted(u.getUserGlobalRecvMsgOptKey(userID))
func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs []string) UserCache {
var keys []string
for _, userID := range userIDs {
keys = append(keys, u.getUserGlobalRecvMsgOptKey(userID))
}
cache := u.NewCache()
cache.AddKeys(keys...)
return cache
}