refactor: db refactor and cache key add. (#2320)

* refactor: db refactor and cache key add.

* refactor: db refactor and cache key add.

* refactor: go version update.

* refactor: file name change.
This commit is contained in:
OpenIM-Gordon
2024-05-27 11:58:36 +08:00
committed by GitHub
parent 1eef4013e2
commit 76d9688a54
132 changed files with 2703 additions and 2255 deletions
+211
View File
@@ -0,0 +1,211 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"encoding/json"
"fmt"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw/specialerror"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"time"
)
// BatchDeleterRedis is a concrete implementation of the BatchDeleter interface based on Redis and RocksCache.
type BatchDeleterRedis struct {
redisClient redis.UniversalClient
keys []string
rocksClient *rockscache.Client
redisPubTopics []string
}
// NewBatchDeleterRedis creates a new BatchDeleterRedis instance.
func NewBatchDeleterRedis(redisClient redis.UniversalClient, options *rockscache.Options, redisPubTopics []string) *BatchDeleterRedis {
return &BatchDeleterRedis{
redisClient: redisClient,
rocksClient: rockscache.NewClient(redisClient, *options),
redisPubTopics: redisPubTopics,
}
}
// ExecDelWithKeys directly takes keys for batch deletion and publishes deletion information.
func (c *BatchDeleterRedis) ExecDelWithKeys(ctx context.Context, keys []string) error {
distinctKeys := datautil.Distinct(keys)
return c.execDel(ctx, distinctKeys)
}
// ChainExecDel is used for chain calls for batch deletion. It must call Clone to prevent memory pollution.
func (c *BatchDeleterRedis) ChainExecDel(ctx context.Context) error {
distinctKeys := datautil.Distinct(c.keys)
return c.execDel(ctx, distinctKeys)
}
// execDel performs batch deletion and publishes the keys that have been deleted to update the local cache information of other nodes.
func (c *BatchDeleterRedis) execDel(ctx context.Context, keys []string) error {
if len(keys) > 0 {
log.ZDebug(ctx, "delete cache", "topic", c.redisPubTopics, "keys", keys)
slotMapKeys, err := groupKeysBySlot(ctx, c.redisClient, keys)
if err != nil {
return err
}
// Batch delete keys
for slot, singleSlotKeys := range slotMapKeys {
if err := c.rocksClient.TagAsDeletedBatch2(ctx, singleSlotKeys); err != nil {
log.ZWarn(ctx, "Batch delete cache failed", err, "slot", slot, "keys", singleSlotKeys)
continue
}
}
// Publish the keys that have been deleted to Redis to update the local cache information of other nodes
if len(c.redisPubTopics) > 0 && len(keys) > 0 {
keysByTopic := localcache.GetPublishKeysByTopic(c.redisPubTopics, keys)
for topic, keys := range keysByTopic {
if len(keys) > 0 {
data, err := json.Marshal(keys)
if err != nil {
log.ZWarn(ctx, "keys json marshal failed", err, "topic", topic, "keys", keys)
} else {
if err := c.redisClient.Publish(ctx, topic, string(data)).Err(); err != nil {
log.ZWarn(ctx, "redis publish cache delete error", err, "topic", topic, "keys", keys)
}
}
}
}
}
}
return nil
}
// Clone creates a copy of BatchDeleterRedis for chain calls to prevent memory pollution.
func (c *BatchDeleterRedis) Clone() cache.BatchDeleter {
return &BatchDeleterRedis{
redisClient: c.redisClient,
keys: c.keys,
rocksClient: c.rocksClient,
redisPubTopics: c.redisPubTopics,
}
}
// AddKeys adds keys to be deleted.
func (c *BatchDeleterRedis) AddKeys(keys ...string) {
c.keys = append(c.keys, keys...)
}
// GetRocksCacheOptions returns the default configuration options for RocksCache.
func GetRocksCacheOptions() *rockscache.Options {
opts := rockscache.NewDefaultOptions()
opts.StrongConsistency = true
opts.RandomExpireAdjustment = 0.2
return &opts
}
// groupKeysBySlot groups keys by their Redis cluster hash slots.
func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) {
slots := make(map[int64][]string)
clusterClient, isCluster := redisClient.(*redis.ClusterClient)
if isCluster {
pipe := clusterClient.Pipeline()
cmds := make([]*redis.IntCmd, len(keys))
for i, key := range keys {
cmds[i] = pipe.ClusterKeySlot(ctx, key)
}
_, err := pipe.Exec(ctx)
if err != nil {
return nil, errs.WrapMsg(err, "get slot err")
}
for i, cmd := range cmds {
slot, err := cmd.Result()
if err != nil {
log.ZWarn(ctx, "some key get slot err", err, "key", keys[i])
continue
}
slots[slot] = append(slots[slot], keys[i])
}
} else {
// If not a cluster client, put all keys in the same slot (0)
slots[0] = keys
}
return slots, nil
}
func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
var t T
var write bool
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
t, err = fn(ctx)
if err != nil {
return "", err
}
bs, err := json.Marshal(t)
if err != nil {
return "", errs.WrapMsg(err, "marshal failed")
}
write = true
return string(bs), nil
})
if err != nil {
return t, errs.Wrap(err)
}
if write {
return t, nil
}
if v == "" {
return t, errs.ErrRecordNotFound.WrapMsg("cache is not found")
}
err = json.Unmarshal([]byte(v), &t)
if err != nil {
errInfo := fmt.Sprintf("cache json.Unmarshal failed, key:%s, value:%s, expire:%s", key, v, expire)
return t, errs.WrapMsg(err, errInfo)
}
return t, nil
}
func batchGetCache[T any, K comparable](
ctx context.Context,
rcClient *rockscache.Client,
expire time.Duration,
keys []K,
keyFn func(key K) string,
fns func(ctx context.Context, key K) (T, error),
) ([]T, error) {
if len(keys) == 0 {
return nil, nil
}
res := make([]T, 0, len(keys))
for _, key := range keys {
val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
return fns(ctx, key)
})
if err != nil {
if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) {
continue
}
return nil, errs.Wrap(err)
}
res = append(res, val)
}
return res, nil
}
+82
View File
@@ -0,0 +1,82 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"time"
)
const (
blackExpireTime = time.Second * 60 * 60 * 12
)
type BlackCacheRedis struct {
cache.BatchDeleter
expireTime time.Duration
rcClient *rockscache.Client
blackDB database.Black
}
func NewBlackCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, blackDB database.Black, options *rockscache.Options) cache.BlackCache {
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.Friend.Topic})
b := localCache.Friend
log.ZDebug(context.Background(), "black local cache init", "Topic", b.Topic, "SlotNum", b.SlotNum, "SlotSize", b.SlotSize, "enable", b.Enable())
return &BlackCacheRedis{
BatchDeleter: batchHandler,
expireTime: blackExpireTime,
rcClient: rockscache.NewClient(rdb, *options),
blackDB: blackDB,
}
}
func (b *BlackCacheRedis) CloneBlackCache() cache.BlackCache {
return &BlackCacheRedis{
BatchDeleter: b.BatchDeleter.Clone(),
expireTime: b.expireTime,
rcClient: b.rcClient,
blackDB: b.blackDB,
}
}
func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
return cachekey.GetBlackIDsKey(ownerUserID)
}
func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) {
return getCache(
ctx,
b.rcClient,
b.getBlackIDsKey(userID),
b.expireTime,
func(ctx context.Context) ([]string, error) {
return b.blackDB.FindBlackUserIDs(ctx, userID)
},
)
}
func (b *BlackCacheRedis) DelBlackIDs(_ context.Context, userID string) cache.BlackCache {
cache := b.CloneBlackCache()
cache.AddKeys(b.getBlackIDsKey(userID))
return cache
}
+246
View File
@@ -0,0 +1,246 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/encrypt"
"github.com/redis/go-redis/v9"
"math/big"
"strings"
"time"
)
const (
conversationExpireTime = time.Second * 60 * 60 * 12
)
func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, opts *rockscache.Options, db database.Conversation) cache.ConversationCache {
batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Conversation.Topic})
c := localCache.Conversation
log.ZDebug(context.Background(), "black local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable())
return &ConversationRedisCache{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, *opts),
conversationDB: db,
expireTime: conversationExpireTime,
}
}
type ConversationRedisCache struct {
cache.BatchDeleter
rcClient *rockscache.Client
conversationDB database.Conversation
expireTime time.Duration
}
func (c *ConversationRedisCache) CloneConversationCache() cache.ConversationCache {
return &ConversationRedisCache{
BatchDeleter: c.BatchDeleter.Clone(),
rcClient: c.rcClient,
conversationDB: c.conversationDB,
expireTime: c.expireTime,
}
}
func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string {
return cachekey.GetConversationKey(ownerUserID, conversationID)
}
func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string {
return cachekey.GetConversationIDsKey(ownerUserID)
}
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID)
}
func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string {
return cachekey.GetRecvMsgOptKey(ownerUserID, conversationID)
}
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
return cachekey.GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID)
}
func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string {
return cachekey.GetConversationHasReadSeqKey(ownerUserID, conversationID)
}
func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conversationID string) string {
return cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID)
}
func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string {
return cachekey.GetUserConversationIDsHashKey(ownerUserID)
}
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
})
}
func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
keys = append(keys, c.getConversationIDsKey(userID))
}
cache := c.CloneConversationCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
return getCache(
ctx,
c.rcClient,
c.getUserConversationIDsHashKey(ownerUserID),
c.expireTime,
func(ctx context.Context) (uint64, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return 0, err
}
datautil.Sort(conversationIDs, true)
bi := big.NewInt(0)
bi.SetString(encrypt.Md5(strings.Join(conversationIDs, ";"))[0:8], 16)
return bi.Uint64(), nil
},
)
}
func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...string) cache.ConversationCache {
keys := make([]string, 0, len(ownerUserIDs))
for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getUserConversationIDsHashKey(ownerUserID))
}
cache := c.CloneConversationCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*model.Conversation, error) {
return getCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*model.Conversation, error) {
return c.conversationDB.Take(ctx, ownerUserID, conversationID)
})
}
func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) cache.ConversationCache {
keys := make([]string, 0, len(conversationIDs))
for _, conversationID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
}
cache := c.CloneConversationCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*model.Conversation, error) {
return batchGetCache(ctx, c.rcClient, c.expireTime, conversationIDs, func(conversationID string) string {
return c.getConversationKey(ownerUserID, conversationID)
}, func(ctx context.Context, conversationID string) (*model.Conversation, error) {
return c.conversationDB.Take(ctx, ownerUserID, conversationID)
})
}
func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*model.Conversation, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
return c.GetConversations(ctx, ownerUserID, conversationIDs)
}
func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) {
return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
})
}
func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) cache.ConversationCache {
keys := make([]string, 0, len(ownerUserIDs))
for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
}
cache := c.CloneConversationCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) cache.ConversationCache {
cache := c.CloneConversationCache()
cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID))
return cache
}
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) cache.ConversationCache {
cache := c.CloneConversationCache()
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID))
return cache
}
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) cache.ConversationCache {
cache := c.CloneConversationCache()
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID))
return cache
}
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID))
}
return cache
}
func (c *ConversationRedisCache) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) {
panic("implement me")
}
func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs ...string) cache.ConversationCache {
panic("implement me")
}
func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
return getCache(ctx, c.rcClient, c.getConversationNotReceiveMessageUserIDsKey(conversationID), c.expireTime, func(ctx context.Context) ([]string, error) {
return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
})
}
func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID))
}
return cache
}
+15
View File
@@ -0,0 +1,15 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
+153
View File
@@ -0,0 +1,153 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"time"
)
const (
friendExpireTime = time.Second * 60 * 60 * 12
)
// FriendCacheRedis is an implementation of the FriendCache interface using Redis.
type FriendCacheRedis struct {
cache.BatchDeleter
friendDB database.Friend
expireTime time.Duration
rcClient *rockscache.Client
}
// NewFriendCacheRedis creates a new instance of FriendCacheRedis.
func NewFriendCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, friendDB database.Friend,
options *rockscache.Options) cache.FriendCache {
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.Friend.Topic})
f := localCache.Friend
log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize, "enable", f.Enable())
return &FriendCacheRedis{
BatchDeleter: batchHandler,
friendDB: friendDB,
expireTime: friendExpireTime,
rcClient: rockscache.NewClient(rdb, *options),
}
}
func (f *FriendCacheRedis) CloneFriendCache() cache.FriendCache {
return &FriendCacheRedis{
BatchDeleter: f.BatchDeleter.Clone(),
friendDB: f.friendDB,
expireTime: f.expireTime,
rcClient: f.rcClient,
}
}
// getFriendIDsKey returns the key for storing friend IDs in the cache.
func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string {
return cachekey.GetFriendIDsKey(ownerUserID)
}
// getTwoWayFriendsIDsKey returns the key for storing two-way friend IDs in the cache.
func (f *FriendCacheRedis) getTwoWayFriendsIDsKey(ownerUserID string) string {
return cachekey.GetTwoWayFriendsIDsKey(ownerUserID)
}
// getFriendKey returns the key for storing friend info in the cache.
func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string {
return cachekey.GetFriendKey(ownerUserID, friendUserID)
}
// GetFriendIDs retrieves friend IDs from the cache or the database if not found.
func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
return f.friendDB.FindFriendUserIDs(ctx, ownerUserID)
})
}
// DelFriendIDs deletes friend IDs from the cache.
func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) cache.FriendCache {
newFriendCache := f.CloneFriendCache()
keys := make([]string, 0, len(ownerUserIDs))
for _, userID := range ownerUserIDs {
keys = append(keys, f.getFriendIDsKey(userID))
}
newFriendCache.AddKeys(keys...)
return newFriendCache
}
// GetTwoWayFriendIDs retrieves two-way friend IDs from the cache.
func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) {
friendIDs, err := f.GetFriendIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
for _, friendID := range friendIDs {
friendFriendID, err := f.GetFriendIDs(ctx, friendID)
if err != nil {
return nil, err
}
if datautil.Contain(ownerUserID, friendFriendID...) {
twoWayFriendIDs = append(twoWayFriendIDs, ownerUserID)
}
}
return twoWayFriendIDs, nil
}
// DelTwoWayFriendIDs deletes two-way friend IDs from the cache.
func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) cache.FriendCache {
newFriendCache := f.CloneFriendCache()
newFriendCache.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID))
return newFriendCache
}
// GetFriend retrieves friend info from the cache or the database if not found.
func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *model.Friend, err error) {
return getCache(ctx, f.rcClient, f.getFriendKey(ownerUserID,
friendUserID), f.expireTime, func(ctx context.Context) (*model.Friend, error) {
return f.friendDB.Take(ctx, ownerUserID, friendUserID)
})
}
// DelFriend deletes friend info from the cache.
func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) cache.FriendCache {
newFriendCache := f.CloneFriendCache()
newFriendCache.AddKeys(f.getFriendKey(ownerUserID, friendUserID))
return newFriendCache
}
// DelFriends deletes multiple friend infos from the cache.
func (f *FriendCacheRedis) DelFriends(ownerUserID string, friendUserIDs []string) cache.FriendCache {
newFriendCache := f.CloneFriendCache()
for _, friendUserID := range friendUserIDs {
key := f.getFriendKey(ownerUserID, friendUserID)
newFriendCache.AddKeys(key) // Assuming AddKeys marks the keys for deletion
}
return newFriendCache
}
+408
View File
@@ -0,0 +1,408 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"fmt"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"time"
)
const (
groupExpireTime = time.Second * 60 * 60 * 12
)
var errIndex = errs.New("err index")
type GroupCacheRedis struct {
cache.BatchDeleter
groupDB database.Group
groupMemberDB database.GroupMember
groupRequestDB database.GroupRequest
expireTime time.Duration
rcClient *rockscache.Client
groupHash cache.GroupHash
}
func NewGroupCacheRedis(
rdb redis.UniversalClient,
localCache *config.LocalCache,
groupDB database.Group,
groupMemberDB database.GroupMember,
groupRequestDB database.GroupRequest,
hashCode cache.GroupHash,
opts *rockscache.Options,
) cache.GroupCache {
batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Group.Topic})
g := localCache.Group
log.ZDebug(context.Background(), "group local cache init", "Topic", g.Topic, "SlotNum", g.SlotNum, "SlotSize", g.SlotSize, "enable", g.Enable())
return &GroupCacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, *opts),
expireTime: groupExpireTime,
groupDB: groupDB,
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
groupHash: hashCode,
}
}
func (g *GroupCacheRedis) CloneGroupCache() cache.GroupCache {
return &GroupCacheRedis{
BatchDeleter: g.BatchDeleter.Clone(),
rcClient: g.rcClient,
expireTime: g.expireTime,
groupDB: g.groupDB,
groupMemberDB: g.groupMemberDB,
groupRequestDB: g.groupRequestDB,
}
}
func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
return cachekey.GetGroupInfoKey(groupID)
}
func (g *GroupCacheRedis) getJoinedGroupsKey(userID string) string {
return cachekey.GetJoinedGroupsKey(userID)
}
func (g *GroupCacheRedis) getGroupMembersHashKey(groupID string) string {
return cachekey.GetGroupMembersHashKey(groupID)
}
func (g *GroupCacheRedis) getGroupMemberIDsKey(groupID string) string {
return cachekey.GetGroupMemberIDsKey(groupID)
}
func (g *GroupCacheRedis) getGroupMemberInfoKey(groupID, userID string) string {
return cachekey.GetGroupMemberInfoKey(groupID, userID)
}
func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string {
return cachekey.GetGroupMemberNumKey(groupID)
}
func (g *GroupCacheRedis) getGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string {
return cachekey.GetGroupRoleLevelMemberIDsKey(groupID, roleLevel)
}
func (g *GroupCacheRedis) GetGroupIndex(group *model.Group, keys []string) (int, error) {
key := g.getGroupInfoKey(group.GroupID)
for i, _key := range keys {
if _key == key {
return i, nil
}
}
return 0, errIndex
}
func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *model.GroupMember, keys []string) (int, error) {
key := g.getGroupMemberInfoKey(groupMember.GroupID, groupMember.UserID)
for i, _key := range keys {
if _key == key {
return i, nil
}
}
return 0, errIndex
}
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*model.Group, err error) {
return batchGetCache(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return g.getGroupInfoKey(groupID)
}, func(ctx context.Context, groupID string) (*model.Group, error) {
return g.groupDB.Take(ctx, groupID)
})
}
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *model.Group, err error) {
return getCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*model.Group, error) {
return g.groupDB.Take(ctx, groupID)
})
}
func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) cache.GroupCache {
newGroupCache := g.CloneGroupCache()
keys := make([]string, 0, len(groupIDs))
for _, groupID := range groupIDs {
keys = append(keys, g.getGroupInfoKey(groupID))
}
newGroupCache.AddKeys(keys...)
return newGroupCache
}
func (g *GroupCacheRedis) DelGroupsOwner(groupIDs ...string) cache.GroupCache {
newGroupCache := g.CloneGroupCache()
keys := make([]string, 0, len(groupIDs))
for _, groupID := range groupIDs {
keys = append(keys, g.getGroupRoleLevelMemberIDsKey(groupID, constant.GroupOwner))
}
newGroupCache.AddKeys(keys...)
return newGroupCache
}
func (g *GroupCacheRedis) DelGroupRoleLevel(groupID string, roleLevels []int32) cache.GroupCache {
newGroupCache := g.CloneGroupCache()
keys := make([]string, 0, len(roleLevels))
for _, roleLevel := range roleLevels {
keys = append(keys, g.getGroupRoleLevelMemberIDsKey(groupID, roleLevel))
}
newGroupCache.AddKeys(keys...)
return newGroupCache
}
func (g *GroupCacheRedis) DelGroupAllRoleLevel(groupID string) cache.GroupCache {
return g.DelGroupRoleLevel(groupID, []int32{constant.GroupOwner, constant.GroupAdmin, constant.GroupOrdinaryUsers})
}
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) {
if g.groupHash == nil {
return 0, errs.ErrInternalServer.WrapMsg("group hash is nil")
}
return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
return g.groupHash.GetGroupHash(ctx, groupID)
})
}
func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*common.GroupSimpleUserID, error) {
if g.groupHash == nil {
return nil, errs.ErrInternalServer.WrapMsg("group hash is nil")
}
res := make(map[string]*common.GroupSimpleUserID)
for _, groupID := range groupIDs {
hash, err := g.GetGroupMembersHash(ctx, groupID)
if err != nil {
return nil, err
}
log.ZDebug(ctx, "GetGroupMemberHashMap", "groupID", groupID, "hash", hash)
num, err := g.GetGroupMemberNum(ctx, groupID)
if err != nil {
return nil, err
}
res[groupID] = &common.GroupSimpleUserID{Hash: hash, MemberNum: uint32(num)}
}
return res, nil
}
func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) cache.GroupCache {
cache := g.CloneGroupCache()
cache.AddKeys(g.getGroupMembersHashKey(groupID))
return cache
}
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
return getCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindMemberUserID(ctx, groupID)
})
}
func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) {
m := make(map[string][]string)
for _, groupID := range groupIDs {
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
m[groupID] = userIDs
}
return m, nil
}
func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) cache.GroupCache {
cache := g.CloneGroupCache()
cache.AddKeys(g.getGroupMemberIDsKey(groupID))
return cache
}
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
return getCache(ctx, g.rcClient, g.getJoinedGroupsKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID)
})
}
func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) cache.GroupCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
keys = append(keys, g.getJoinedGroupsKey(userID))
}
cache := g.CloneGroupCache()
cache.AddKeys(keys...)
return cache
}
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *model.GroupMember, err error) {
return getCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*model.GroupMember, error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
})
}
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupMember, error) {
return batchGetCache(ctx, g.rcClient, g.expireTime, userIDs, func(userID string) string {
return g.getGroupMemberInfoKey(groupID, userID)
}, func(ctx context.Context, userID string) (*model.GroupMember, error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
})
}
func (g *GroupCacheRedis) GetGroupMembersPage(
ctx context.Context,
groupID string,
userIDs []string,
showNumber, pageNumber int32,
) (total uint32, groupMembers []*model.GroupMember, err error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return 0, nil, err
}
if userIDs != nil {
userIDs = datautil.BothExist(userIDs, groupMemberIDs)
} else {
userIDs = groupMemberIDs
}
groupMembers, err = g.GetGroupMembersInfo(ctx, groupID, datautil.Paginate(userIDs, int(showNumber), int(showNumber)))
return uint32(len(userIDs)), groupMembers, err
}
func (g *GroupCacheRedis) GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*model.GroupMember, err error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
}
func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID string) ([]*model.GroupMember, error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
}
func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) cache.GroupCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
}
cache := g.CloneGroupCache()
cache.AddKeys(keys...)
return cache
}
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) {
return getCache(ctx, g.rcClient, g.getGroupMemberNumKey(groupID), g.expireTime, func(ctx context.Context) (int64, error) {
return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID)
})
}
func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) cache.GroupCache {
keys := make([]string, 0, len(groupID))
for _, groupID := range groupID {
keys = append(keys, g.getGroupMemberNumKey(groupID))
}
cache := g.CloneGroupCache()
cache.AddKeys(keys...)
return cache
}
func (g *GroupCacheRedis) GetGroupOwner(ctx context.Context, groupID string) (*model.GroupMember, error) {
members, err := g.GetGroupRoleLevelMemberInfo(ctx, groupID, constant.GroupOwner)
if err != nil {
return nil, err
}
if len(members) == 0 {
return nil, errs.ErrRecordNotFound.WrapMsg(fmt.Sprintf("group %s owner not found", groupID))
}
return members[0], nil
}
func (g *GroupCacheRedis) GetGroupsOwner(ctx context.Context, groupIDs []string) ([]*model.GroupMember, error) {
members := make([]*model.GroupMember, 0, len(groupIDs))
for _, groupID := range groupIDs {
items, err := g.GetGroupRoleLevelMemberInfo(ctx, groupID, constant.GroupOwner)
if err != nil {
return nil, err
}
if len(items) > 0 {
members = append(members, items[0])
}
}
return members, nil
}
func (g *GroupCacheRedis) GetGroupRoleLevelMemberIDs(ctx context.Context, groupID string, roleLevel int32) ([]string, error) {
return getCache(ctx, g.rcClient, g.getGroupRoleLevelMemberIDsKey(groupID, roleLevel), g.expireTime, func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindRoleLevelUserIDs(ctx, groupID, roleLevel)
})
}
func (g *GroupCacheRedis) GetGroupRoleLevelMemberInfo(ctx context.Context, groupID string, roleLevel int32) ([]*model.GroupMember, error) {
userIDs, err := g.GetGroupRoleLevelMemberIDs(ctx, groupID, roleLevel)
if err != nil {
return nil, err
}
return g.GetGroupMembersInfo(ctx, groupID, userIDs)
}
func (g *GroupCacheRedis) GetGroupRolesLevelMemberInfo(ctx context.Context, groupID string, roleLevels []int32) ([]*model.GroupMember, error) {
var userIDs []string
for _, roleLevel := range roleLevels {
ids, err := g.GetGroupRoleLevelMemberIDs(ctx, groupID, roleLevel)
if err != nil {
return nil, err
}
userIDs = append(userIDs, ids...)
}
return g.GetGroupMembersInfo(ctx, groupID, userIDs)
}
func (g *GroupCacheRedis) FindGroupMemberUser(ctx context.Context, groupIDs []string, userID string) (_ []*model.GroupMember, err error) {
if len(groupIDs) == 0 {
groupIDs, err = g.GetJoinedGroupIDs(ctx, userID)
if err != nil {
return nil, err
}
}
return batchGetCache(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return g.getGroupMemberInfoKey(groupID, userID)
}, func(ctx context.Context, groupID string) (*model.GroupMember, error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
})
}
+15
View File
@@ -0,0 +1,15 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
+439
View File
@@ -0,0 +1,439 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"errors"
"github.com/gogo/protobuf/jsonpb"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"time"
)
const msgCacheTimeout = 86400 * time.Second
var concurrentLimit = 3
func NewMsgCache(client redis.UniversalClient, redisEnablePipeline bool) cache.MsgCache {
return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisEnablePipeline: redisEnablePipeline}
}
type msgCache struct {
rdb redis.UniversalClient
msgCacheTimeout time.Duration
redisEnablePipeline bool
}
func (c *msgCache) getAllMessageCacheKey(conversationID string) string {
return cachekey.GetAllMessageCacheKey(conversationID)
}
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
return cachekey.GetMessageCacheKey(conversationID, seq)
}
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {
return cachekey.GetMessageDelUserListKey(conversationID, seq)
}
func (c *msgCache) getUserDelList(conversationID, userID string) string {
return cachekey.GetUserDelListKey(conversationID, userID)
}
func (c *msgCache) getSendMsgKey(id string) string {
return cachekey.GetSendMsgKey(id)
}
func (c *msgCache) getLockMessageTypeKey(clientMsgID string, TypeKey string) string {
return cachekey.GetLockMessageTypeKey(clientMsgID, TypeKey)
}
func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
return cachekey.GetMessageReactionExKey(clientMsgID, sessionType)
}
func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
if c.redisEnablePipeline {
return c.PipeSetMessageToCache(ctx, conversationID, msgs)
}
return c.ParallelSetMessageToCache(ctx, conversationID, msgs)
}
func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
pipe := c.rdb.Pipeline()
for _, msg := range msgs {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return 0, err
}
key := c.getMessageCacheKey(conversationID, msg.Seq)
_ = pipe.Set(ctx, key, s, c.msgCacheTimeout)
}
results, err := pipe.Exec(ctx)
if err != nil {
return 0, errs.Wrap(err)
}
for _, res := range results {
if res.Err() != nil {
return 0, errs.Wrap(err)
}
}
return len(msgs), nil
}
func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
wg := errgroup.Group{}
wg.SetLimit(concurrentLimit)
for _, msg := range msgs {
msg := msg // closure safe var
wg.Go(func() error {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return errs.Wrap(err)
}
key := c.getMessageCacheKey(conversationID, msg.Seq)
if err := c.rdb.Set(ctx, key, s, c.msgCacheTimeout).Err(); err != nil {
return errs.Wrap(err)
}
return nil
})
}
err := wg.Wait()
if err != nil {
return 0, errs.WrapMsg(err, "wg.Wait failed")
}
return len(msgs), nil
}
func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error {
for _, seq := range seqs {
delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
userDelListKey := c.getUserDelList(conversationID, userID)
err := c.rdb.SAdd(ctx, delUserListKey, userID).Err()
if err != nil {
return errs.Wrap(err)
}
err = c.rdb.SAdd(ctx, userDelListKey, seq).Err()
if err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Expire(ctx, delUserListKey, c.msgCacheTimeout).Err(); err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Expire(ctx, userDelListKey, c.msgCacheTimeout).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) {
result, err := c.rdb.SMembers(ctx, c.getUserDelList(conversationID, userID)).Result()
if err != nil {
return nil, errs.Wrap(err)
}
seqs = make([]int64, len(result))
for i, v := range result {
seqs[i] = stringutil.StringToInt64(v)
}
return seqs, nil
}
func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
for _, seq := range seqs {
delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
if err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
continue
}
if len(delUsers) > 0 {
var failedFlag bool
for _, userID := range delUsers {
err = c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err()
if err != nil {
failedFlag = true
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID)
}
}
if !failedFlag {
if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
}
}
}
}
}
func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
if c.redisEnablePipeline {
return c.PipeDeleteMessages(ctx, conversationID, seqs)
}
return c.ParallelDeleteMessages(ctx, conversationID, seqs)
}
func (c *msgCache) ParallelDeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
wg := errgroup.Group{}
wg.SetLimit(concurrentLimit)
for _, seq := range seqs {
seq := seq
wg.Go(func() error {
err := c.rdb.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err()
if err != nil {
return errs.Wrap(err)
}
return nil
})
}
return wg.Wait()
}
func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
pipe := c.rdb.Pipeline()
for _, seq := range seqs {
_ = pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq))
}
results, err := pipe.Exec(ctx)
if err != nil {
return errs.WrapMsg(err, "pipe.del")
}
for _, res := range results {
if res.Err() != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
vals, err := c.rdb.Keys(ctx, c.getAllMessageCacheKey(conversationID)).Result()
if errors.Is(err, redis.Nil) {
return nil
}
if err != nil {
return errs.Wrap(err)
}
for _, v := range vals {
if err := c.rdb.Del(ctx, v).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
for _, seq := range seqs {
key := c.getMessageCacheKey(userID, seq)
result, err := c.rdb.Get(ctx, key).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue
}
return errs.Wrap(err)
}
var msg sdkws.MsgData
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
return err
}
msg.Status = constant.MsgDeleted
s, err := msgprocessor.Pb2String(&msg)
if err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Set(ctx, key, s, c.msgCacheTimeout).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
}
func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := c.rdb.Get(ctx, c.getSendMsgKey(id)).Int()
return int32(result), errs.Wrap(err)
}
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := c.getLockMessageTypeKey(clientMsgID, TypeKey)
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
}
func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := c.getLockMessageTypeKey(clientMsgID, TypeKey)
return errs.Wrap(c.rdb.Del(ctx, key).Err())
}
func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil {
return false, errs.Wrap(err)
}
return n > 0, nil
}
func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
if c.redisEnablePipeline {
return c.PipeGetMessagesBySeq(ctx, conversationID, seqs)
}
return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs)
}
func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline()
results := []*redis.StringCmd{}
for _, seq := range seqs {
results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq)))
}
_, err = pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return seqMsgs, failedSeqs, errs.WrapMsg(err, "pipe.get")
}
for idx, res := range results {
seq := seqs[idx]
if res.Err() != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err())
failedSeqs = append(failedSeqs, seq)
continue
}
msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
failedSeqs = append(failedSeqs, seq)
continue
}
if msg.Status == constant.MsgDeleted {
failedSeqs = append(failedSeqs, seq)
continue
}
seqMsgs = append(seqMsgs, &msg)
}
return
}
func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
type entry struct {
err error
msg *sdkws.MsgData
}
wg := errgroup.Group{}
wg.SetLimit(concurrentLimit)
results := make([]entry, len(seqs)) // set slice len/cap to length of seqs.
for idx, seq := range seqs {
// closure safe var
idx := idx
seq := seq
wg.Go(func() error {
res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result()
if err != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
results[idx] = entry{err: err}
return nil
}
msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res, &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
results[idx] = entry{err: err}
return nil
}
if msg.Status == constant.MsgDeleted {
results[idx] = entry{err: err}
return nil
}
results[idx] = entry{msg: &msg}
return nil
})
}
_ = wg.Wait()
for idx, res := range results {
if res.err != nil {
failedSeqs = append(failedSeqs, seqs[idx])
continue
}
seqMsgs = append(seqMsgs, res.msg)
}
return
}
+400
View File
@@ -0,0 +1,400 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"fmt"
"github.com/openimsdk/protocol/sdkws"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"math/rand"
"testing"
)
func TestParallelSetMessageToCache(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst = rand.Int63()
msgs = []*sdkws.MsgData{}
)
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
})
}
testParallelSetMessageToCache(t, cid, msgs)
}
func testParallelSetMessageToCache(t *testing.T, cid string, msgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
ret, err := cacher.ParallelSetMessageToCache(context.Background(), cid, msgs)
assert.Nil(t, err)
assert.Equal(t, len(msgs), ret)
// validate
for _, msg := range msgs {
key := cacher.getMessageCacheKey(cid, msg.Seq)
val, err := rdb.Exists(context.Background(), key).Result()
assert.Nil(t, err)
assert.EqualValues(t, 1, val)
}
}
func TestPipeSetMessageToCache(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst = rand.Int63()
msgs = []*sdkws.MsgData{}
)
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
})
}
testPipeSetMessageToCache(t, cid, msgs)
}
func testPipeSetMessageToCache(t *testing.T, cid string, msgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
ret, err := cacher.PipeSetMessageToCache(context.Background(), cid, msgs)
assert.Nil(t, err)
assert.Equal(t, len(msgs), ret)
// validate
for _, msg := range msgs {
key := cacher.getMessageCacheKey(cid, msg.Seq)
val, err := rdb.Exists(context.Background(), key).Result()
assert.Nil(t, err)
assert.EqualValues(t, 1, val)
}
}
func TestGetMessagesBySeq(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst = rand.Int63()
msgs = []*sdkws.MsgData{}
)
seqs := []int64{}
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
SendID: fmt.Sprintf("fake-sendid-%v", i),
})
seqs = append(seqs, seqFirst+int64(i))
}
// set data to cache
testPipeSetMessageToCache(t, cid, msgs)
// get data from cache with parallet mode
testParallelGetMessagesBySeq(t, cid, seqs, msgs)
// get data from cache with pipeline mode
testPipeGetMessagesBySeq(t, cid, seqs, msgs)
}
func testParallelGetMessagesBySeq(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, 0, len(failedSeqs))
assert.Equal(t, len(respMsgs), len(seqs))
// validate
for idx, msg := range respMsgs {
assert.Equal(t, msg.Seq, inputMsgs[idx].Seq)
assert.Equal(t, msg.SendID, inputMsgs[idx].SendID)
}
}
func testPipeGetMessagesBySeq(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, 0, len(failedSeqs))
assert.Equal(t, len(respMsgs), len(seqs))
// validate
for idx, msg := range respMsgs {
assert.Equal(t, msg.Seq, inputMsgs[idx].Seq)
assert.Equal(t, msg.SendID, inputMsgs[idx].SendID)
}
}
func TestGetMessagesBySeqWithEmptySeqs(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst int64 = 0
msgs = []*sdkws.MsgData{}
)
seqs := []int64{}
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
SendID: fmt.Sprintf("fake-sendid-%v", i),
})
seqs = append(seqs, seqFirst+int64(i))
}
// don't set cache, only get data from cache.
// get data from cache with parallet mode
testParallelGetMessagesBySeqWithEmptry(t, cid, seqs, msgs)
// get data from cache with pipeline mode
testPipeGetMessagesBySeqWithEmptry(t, cid, seqs, msgs)
}
func testParallelGetMessagesBySeqWithEmptry(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, len(seqs), len(failedSeqs))
assert.Equal(t, 0, len(respMsgs))
}
func testPipeGetMessagesBySeqWithEmptry(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
assert.Equal(t, err, redis.Nil)
assert.Equal(t, len(seqs), len(failedSeqs))
assert.Equal(t, 0, len(respMsgs))
}
func TestGetMessagesBySeqWithLostHalfSeqs(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst int64 = 0
msgs = []*sdkws.MsgData{}
)
seqs := []int64{}
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
SendID: fmt.Sprintf("fake-sendid-%v", i),
})
seqs = append(seqs, seqFirst+int64(i))
}
// Only set half the number of messages.
testParallelSetMessageToCache(t, cid, msgs[:50])
// get data from cache with parallet mode
testParallelGetMessagesBySeqWithLostHalfSeqs(t, cid, seqs, msgs)
// get data from cache with pipeline mode
testPipeGetMessagesBySeqWithLostHalfSeqs(t, cid, seqs, msgs)
}
func testParallelGetMessagesBySeqWithLostHalfSeqs(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, len(seqs)/2, len(failedSeqs))
assert.Equal(t, len(seqs)/2, len(respMsgs))
for idx, msg := range respMsgs {
assert.Equal(t, msg.Seq, seqs[idx])
}
}
func testPipeGetMessagesBySeqWithLostHalfSeqs(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, len(seqs)/2, len(failedSeqs))
assert.Equal(t, len(seqs)/2, len(respMsgs))
for idx, msg := range respMsgs {
assert.Equal(t, msg.Seq, seqs[idx])
}
}
func TestPipeDeleteMessages(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst = rand.Int63()
msgs = []*sdkws.MsgData{}
)
var seqs []int64
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
})
seqs = append(seqs, msgs[i].Seq)
}
testPipeSetMessageToCache(t, cid, msgs)
testPipeDeleteMessagesOK(t, cid, seqs, msgs)
// set again
testPipeSetMessageToCache(t, cid, msgs)
testPipeDeleteMessagesMix(t, cid, seqs[:90], msgs)
}
func testPipeDeleteMessagesOK(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
err := cacher.PipeDeleteMessages(context.Background(), cid, seqs)
assert.Nil(t, err)
// validate
for _, msg := range inputMsgs {
key := cacher.getMessageCacheKey(cid, msg.Seq)
val := rdb.Exists(context.Background(), key).Val()
assert.EqualValues(t, 0, val)
}
}
func testPipeDeleteMessagesMix(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
err := cacher.PipeDeleteMessages(context.Background(), cid, seqs)
assert.Nil(t, err)
// validate
for idx, msg := range inputMsgs {
key := cacher.getMessageCacheKey(cid, msg.Seq)
val, err := rdb.Exists(context.Background(), key).Result()
assert.Nil(t, err)
if idx < 90 {
assert.EqualValues(t, 0, val) // not exists
continue
}
assert.EqualValues(t, 1, val) // exists
}
}
func TestParallelDeleteMessages(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst = rand.Int63()
msgs = []*sdkws.MsgData{}
)
var seqs []int64
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
})
seqs = append(seqs, msgs[i].Seq)
}
randSeqs := []int64{}
for i := seqFirst + 100; i < seqFirst+200; i++ {
randSeqs = append(randSeqs, i)
}
testParallelSetMessageToCache(t, cid, msgs)
testParallelDeleteMessagesOK(t, cid, seqs, msgs)
// set again
testParallelSetMessageToCache(t, cid, msgs)
testParallelDeleteMessagesMix(t, cid, seqs[:90], msgs, 90)
testParallelDeleteMessagesOK(t, cid, seqs[90:], msgs[:90])
// set again
testParallelSetMessageToCache(t, cid, msgs)
testParallelDeleteMessagesMix(t, cid, randSeqs, msgs, 0)
}
func testParallelDeleteMessagesOK(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
err := cacher.PipeDeleteMessages(context.Background(), cid, seqs)
assert.Nil(t, err)
// validate
for _, msg := range inputMsgs {
key := cacher.getMessageCacheKey(cid, msg.Seq)
val := rdb.Exists(context.Background(), key).Val()
assert.EqualValues(t, 0, val)
}
}
func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData, lessValNonExists int) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
err := cacher.PipeDeleteMessages(context.Background(), cid, seqs)
assert.Nil(t, err)
// validate
for idx, msg := range inputMsgs {
key := cacher.getMessageCacheKey(cid, msg.Seq)
val, err := rdb.Exists(context.Background(), key).Result()
assert.Nil(t, err)
if idx < lessValNonExists {
assert.EqualValues(t, 0, val) // not exists
continue
}
assert.EqualValues(t, 1, val) // exists
}
}
+161
View File
@@ -0,0 +1,161 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/s3"
"github.com/openimsdk/tools/s3/cont"
"github.com/openimsdk/tools/s3/minio"
"github.com/redis/go-redis/v9"
"time"
)
func NewObjectCacheRedis(rdb redis.UniversalClient, objDB database.ObjectInfo) cache.ObjectCache {
opts := rockscache.NewDefaultOptions()
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
return &objectCacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, opts),
expireTime: time.Hour * 12,
objDB: objDB,
}
}
type objectCacheRedis struct {
cache.BatchDeleter
objDB database.ObjectInfo
rcClient *rockscache.Client
expireTime time.Duration
}
func (g *objectCacheRedis) getObjectKey(engine string, name string) string {
return cachekey.GetObjectKey(engine, name)
}
func (g *objectCacheRedis) CloneObjectCache() cache.ObjectCache {
return &objectCacheRedis{
BatchDeleter: g.BatchDeleter.Clone(),
rcClient: g.rcClient,
expireTime: g.expireTime,
objDB: g.objDB,
}
}
func (g *objectCacheRedis) DelObjectName(engine string, names ...string) cache.ObjectCache {
objectCache := g.CloneObjectCache()
keys := make([]string, 0, len(names))
for _, name := range names {
keys = append(keys, g.getObjectKey(name, engine))
}
objectCache.AddKeys(keys...)
return objectCache
}
func (g *objectCacheRedis) GetName(ctx context.Context, engine string, name string) (*model.Object, error) {
return getCache(ctx, g.rcClient, g.getObjectKey(name, engine), g.expireTime, func(ctx context.Context) (*model.Object, error) {
return g.objDB.Take(ctx, engine, name)
})
}
func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) cont.S3Cache {
opts := rockscache.NewDefaultOptions()
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
return &s3CacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, opts),
expireTime: time.Hour * 12,
s3: s3,
}
}
type s3CacheRedis struct {
cache.BatchDeleter
s3 s3.Interface
rcClient *rockscache.Client
expireTime time.Duration
}
func (g *s3CacheRedis) getS3Key(engine string, name string) string {
return cachekey.GetS3Key(engine, name)
}
func (g *s3CacheRedis) DelS3Key(ctx context.Context, engine string, keys ...string) error {
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getS3Key(engine, key))
}
return g.BatchDeleter.ExecDelWithKeys(ctx, ks)
}
func (g *s3CacheRedis) GetKey(ctx context.Context, engine string, name string) (*s3.ObjectInfo, error) {
return getCache(ctx, g.rcClient, g.getS3Key(engine, name), g.expireTime, func(ctx context.Context) (*s3.ObjectInfo, error) {
return g.s3.StatObject(ctx, name)
})
}
func NewMinioCache(rdb redis.UniversalClient) minio.Cache {
opts := rockscache.NewDefaultOptions()
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
return &minioCacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, opts),
expireTime: time.Hour * 24 * 7,
}
}
type minioCacheRedis struct {
cache.BatchDeleter
rcClient *rockscache.Client
expireTime time.Duration
}
func (g *minioCacheRedis) getObjectImageInfoKey(key string) string {
return cachekey.GetObjectImageInfoKey(key)
}
func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string {
return cachekey.GetMinioImageThumbnailKey(key, format, width, height)
}
func (g *minioCacheRedis) DelObjectImageInfoKey(ctx context.Context, keys ...string) error {
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getObjectImageInfoKey(key))
}
return g.BatchDeleter.ExecDelWithKeys(ctx, ks)
}
func (g *minioCacheRedis) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error {
return g.BatchDeleter.ExecDelWithKeys(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)})
}
func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) {
info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn)
if err != nil {
return nil, err
}
return info, nil
}
func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) {
return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache)
}
+173
View File
@@ -0,0 +1,173 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
)
func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache {
return &seqCache{rdb: rdb}
}
type seqCache struct {
rdb redis.UniversalClient
}
func (c *seqCache) getMaxSeqKey(conversationID string) string {
return cachekey.GetMaxSeqKey(conversationID)
}
func (c *seqCache) getMinSeqKey(conversationID string) string {
return cachekey.GetMinSeqKey(conversationID)
}
func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string {
return cachekey.GetHasReadSeqKey(conversationID, userID)
}
func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return cachekey.GetConversationUserMinSeqKey(conversationID, userID)
}
func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items))
for i, v := range items {
res, err := c.rdb.Get(ctx, getkey(v)).Result()
if err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
}
val := stringutil.StringToInt64(res)
if val != 0 {
m[items[i]] = val
}
}
return m, nil
}
func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
}
func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
}
func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
}
func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
}
func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
for conversationID, seq := range seqs {
if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
}
func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
}
func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(conversationID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, err
}
return val, nil
}
+103
View File
@@ -0,0 +1,103 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"time"
)
func NewThirdCache(rdb redis.UniversalClient) cache.ThirdCache {
return &thirdCache{rdb: rdb}
}
type thirdCache struct {
rdb redis.UniversalClient
}
func (c *thirdCache) getGetuiTokenKey() string {
return cachekey.GetGetuiTokenKey()
}
func (c *thirdCache) getGetuiTaskIDKey() string {
return cachekey.GetGetuiTaskIDKey()
}
func (c *thirdCache) getUserBadgeUnreadCountSumKey(userID string) string {
return cachekey.GetUserBadgeUnreadCountSumKey(userID)
}
func (c *thirdCache) getFcmAccountTokenKey(account string, platformID int) string {
return cachekey.GetFcmAccountTokenKey(account, platformID)
}
func (c *thirdCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return errs.Wrap(c.rdb.Set(ctx, c.getFcmAccountTokenKey(account, platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
}
func (c *thirdCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
val, err := c.rdb.Get(ctx, c.getFcmAccountTokenKey(account, platformID)).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *thirdCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return errs.Wrap(c.rdb.Del(ctx, c.getFcmAccountTokenKey(account, platformID)).Err())
}
func (c *thirdCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := c.rdb.Incr(ctx, c.getUserBadgeUnreadCountSumKey(userID)).Result()
return int(seq), errs.Wrap(err)
}
func (c *thirdCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return errs.Wrap(c.rdb.Set(ctx, c.getUserBadgeUnreadCountSumKey(userID), value, 0).Err())
}
func (c *thirdCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
val, err := c.rdb.Get(ctx, c.getUserBadgeUnreadCountSumKey(userID)).Int()
return val, errs.Wrap(err)
}
func (c *thirdCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getGetuiTokenKey(), token, time.Duration(expireTime)*time.Second).Err())
}
func (c *thirdCache) GetGetuiToken(ctx context.Context) (string, error) {
val, err := c.rdb.Get(ctx, c.getGetuiTokenKey()).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *thirdCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getGetuiTaskIDKey(), taskID, time.Duration(expireTime)*time.Second).Err())
}
func (c *thirdCache) GetGetuiTaskID(ctx context.Context) (string, error) {
val, err := c.rdb.Get(ctx, c.getGetuiTaskIDKey()).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
+63
View File
@@ -0,0 +1,63 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
)
type tokenCache struct {
rdb redis.UniversalClient
}
func NewTokenCacheModel(rdb redis.UniversalClient) cache.TokenModel {
return &tokenCache{
rdb: rdb,
}
}
func (c *tokenCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err())
}
func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result()
if err != nil {
return nil, errs.Wrap(err)
}
mm := make(map[string]int)
for k, v := range m {
mm[k] = stringutil.StringToInt(v)
}
return mm, nil
}
func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error {
mm := make(map[string]any)
for k, v := range m {
mm[k] = v
}
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err())
}
func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err())
}
+303
View File
@@ -0,0 +1,303 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"encoding/json"
"errors"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"hash/crc32"
"strconv"
"time"
)
const (
userExpireTime = time.Second * 60 * 60 * 12
userOlineStatusExpireTime = time.Second * 60 * 60 * 24
statusMod = 501
)
type UserCacheRedis struct {
cache.BatchDeleter
rdb redis.UniversalClient
userDB database.User
expireTime time.Duration
rcClient *rockscache.Client
}
func NewUserCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, userDB database.User, options *rockscache.Options) cache.UserCache {
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.User.Topic})
u := localCache.User
log.ZDebug(context.Background(), "user local cache init", "Topic", u.Topic, "SlotNum", u.SlotNum, "SlotSize", u.SlotSize, "enable", u.Enable())
return &UserCacheRedis{
BatchDeleter: batchHandler,
rdb: rdb,
userDB: userDB,
expireTime: userExpireTime,
rcClient: rockscache.NewClient(rdb, *options),
}
}
func (u *UserCacheRedis) getOnlineStatusKey(modKey string) string {
return cachekey.GetOnlineStatusKey(modKey)
}
func (u *UserCacheRedis) CloneUserCache() cache.UserCache {
return &UserCacheRedis{
BatchDeleter: u.BatchDeleter.Clone(),
rdb: u.rdb,
userDB: u.userDB,
expireTime: u.expireTime,
rcClient: u.rcClient,
}
}
func (u *UserCacheRedis) getUserInfoKey(userID string) string {
return cachekey.GetUserInfoKey(userID)
}
func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string {
return cachekey.GetUserGlobalRecvMsgOptKey(userID)
}
func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *model.User, err error) {
return getCache(ctx, u.rcClient, u.getUserInfoKey(userID), u.expireTime, func(ctx context.Context) (*model.User, error) {
return u.userDB.Take(ctx, userID)
})
}
func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*model.User, error) {
return batchGetCache(ctx, u.rcClient, u.expireTime, userIDs, func(userID string) string {
return u.getUserInfoKey(userID)
}, func(ctx context.Context, userID string) (*model.User, error) {
return u.userDB.Take(ctx, userID)
})
}
func (u *UserCacheRedis) DelUsersInfo(userIDs ...string) cache.UserCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
keys = append(keys, u.getUserInfoKey(userID))
}
cache := u.CloneUserCache()
cache.AddKeys(keys...)
return cache
}
func (u *UserCacheRedis) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
return getCache(
ctx,
u.rcClient,
u.getUserGlobalRecvMsgOptKey(userID),
u.expireTime,
func(ctx context.Context) (int, error) {
return u.userDB.GetUserGlobalRecvMsgOpt(ctx, userID)
},
)
}
func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs ...string) cache.UserCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
keys = append(keys, u.getUserGlobalRecvMsgOptKey(userID))
}
cache := u.CloneUserCache()
cache.AddKeys(keys...)
return cache
}
// GetUserStatus get user status.
func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) {
userStatus := make([]*user.OnlineStatus, 0, len(userIDs))
for _, userID := range userIDs {
UserIDNum := crc32.ChecksumIEEE([]byte(userID))
modKey := strconv.Itoa(int(UserIDNum % statusMod))
var onlineStatus user.OnlineStatus
key := u.getOnlineStatusKey(modKey)
result, err := u.rdb.HGet(ctx, key, userID).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
// key or field does not exist
userStatus = append(userStatus, &user.OnlineStatus{
UserID: userID,
Status: constant.Offline,
PlatformIDs: nil,
})
continue
} else {
return nil, errs.Wrap(err)
}
}
err = json.Unmarshal([]byte(result), &onlineStatus)
if err != nil {
return nil, errs.Wrap(err)
}
onlineStatus.UserID = userID
onlineStatus.Status = constant.Online
userStatus = append(userStatus, &onlineStatus)
}
return userStatus, nil
}
// SetUserStatus Set the user status and save it in redis.
func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error {
UserIDNum := crc32.ChecksumIEEE([]byte(userID))
modKey := strconv.Itoa(int(UserIDNum % statusMod))
key := u.getOnlineStatusKey(modKey)
log.ZDebug(ctx, "SetUserStatus args", "userID", userID, "status", status, "platformID", platformID, "modKey", modKey, "key", key)
isNewKey, err := u.rdb.Exists(ctx, key).Result()
if err != nil {
return errs.Wrap(err)
}
if isNewKey == 0 {
if status == constant.Online {
onlineStatus := user.OnlineStatus{
UserID: userID,
Status: constant.Online,
PlatformIDs: []int32{platformID},
}
jsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.Wrap(err)
}
_, err = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result()
if err != nil {
return errs.Wrap(err)
}
u.rdb.Expire(ctx, key, userOlineStatusExpireTime)
return nil
}
}
isNil := false
result, err := u.rdb.HGet(ctx, key, userID).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
isNil = true
} else {
return errs.Wrap(err)
}
}
if status == constant.Offline {
err = u.refreshStatusOffline(ctx, userID, status, platformID, isNil, err, result, key)
if err != nil {
return err
}
} else {
err = u.refreshStatusOnline(ctx, userID, platformID, isNil, err, result, key)
if err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (u *UserCacheRedis) refreshStatusOffline(ctx context.Context, userID string, status, platformID int32, isNil bool, err error, result, key string) error {
if isNil {
log.ZWarn(ctx, "this user not online,maybe trigger order not right",
err, "userStatus", status)
return nil
}
var onlineStatus user.OnlineStatus
err = json.Unmarshal([]byte(result), &onlineStatus)
if err != nil {
return errs.Wrap(err)
}
var newPlatformIDs []int32
for _, val := range onlineStatus.PlatformIDs {
if val != platformID {
newPlatformIDs = append(newPlatformIDs, val)
}
}
if newPlatformIDs == nil {
_, err = u.rdb.HDel(ctx, key, userID).Result()
if err != nil {
return errs.Wrap(err)
}
} else {
onlineStatus.PlatformIDs = newPlatformIDs
newjsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.Wrap(err)
}
_, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result()
if err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (u *UserCacheRedis) refreshStatusOnline(ctx context.Context, userID string, platformID int32, isNil bool, err error, result, key string) error {
var onlineStatus user.OnlineStatus
if !isNil {
err := json.Unmarshal([]byte(result), &onlineStatus)
if err != nil {
return errs.Wrap(err)
}
onlineStatus.PlatformIDs = RemoveRepeatedElementsInList(append(onlineStatus.PlatformIDs, platformID))
} else {
onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, platformID)
}
onlineStatus.Status = constant.Online
onlineStatus.UserID = userID
newjsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.WrapMsg(err, "json.Marshal failed")
}
_, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result()
if err != nil {
return errs.Wrap(err)
}
return nil
}
type Comparable interface {
~int | ~string | ~float64 | ~int32
}
func RemoveRepeatedElementsInList[T Comparable](slc []T) []T {
var result []T
tempMap := map[T]struct{}{}
for _, e := range slc {
if _, found := tempMap[e]; !found {
tempMap[e] = struct{}{}
result = append(result, e)
}
}
return result
}