Merge remote-tracking branch 'upstream/main' into merge-main

This commit is contained in:
icey-yu
2025-11-25 16:31:40 +08:00
71 changed files with 1206 additions and 230 deletions
+2 -1
View File
@@ -16,6 +16,7 @@ package cache
import (
"context"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
@@ -57,7 +58,7 @@ type ConversationCache interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
DelUserPinnedConversations(userIDs ...string) ConversationCache
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
+1 -1
View File
@@ -24,7 +24,7 @@ var (
func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache {
initMemMsgCache.Do(func() {
memMsgCache = lru.NewLayLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil)
memMsgCache = lru.NewLazyLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil)
})
return &msgCache{
cache: cache,
+1 -1
View File
@@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
return cache
}
func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
cache.AddKeys(c.getPinnedConversationIDsKey(userID))
+73 -6
View File
@@ -2,30 +2,42 @@ package redis
import (
"context"
"encoding/json"
"strconv"
"sync"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
)
type tokenCache struct {
rdb redis.UniversalClient
accessExpire time.Duration
localCache *config.LocalCache
}
func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel {
c := &tokenCache{rdb: rdb}
func NewTokenCacheModel(rdb redis.UniversalClient, localCache *config.LocalCache, accessExpire int64) cache.TokenModel {
c := &tokenCache{rdb: rdb, localCache: localCache}
c.accessExpire = c.getExpireTime(accessExpire)
return c
}
func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err())
key := cachekey.GetTokenKey(userID, platformID)
if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
return nil
}
// SetTokenFlagEx set token and flag with expire time
@@ -37,6 +49,11 @@ func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platform
if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
return nil
}
@@ -106,7 +123,17 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla
for k, v := range m {
mm[k] = v
}
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err())
err := c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err()
if err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, cachekey.GetTokenKey(userID, platformID))
}
return nil
}
func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error {
@@ -124,11 +151,23 @@ func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[st
}); err != nil {
return err
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, keys...)
}
return nil
}
func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err())
key := cachekey.GetTokenKey(userID, platformID)
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
return nil
}
func (c *tokenCache) getExpireTime(t int64) time.Duration {
@@ -161,6 +200,11 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t
return err
}
// Remove local cache for the token
if c.localCache != nil {
c.removeLocalTokenCache(ctx, keys...)
}
return nil
}
@@ -175,5 +219,28 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
return nil
}
func (c *tokenCache) removeLocalTokenCache(ctx context.Context, keys ...string) {
if len(keys) == 0 {
return
}
topic := c.localCache.Auth.Topic
if topic == "" {
return
}
data, err := json.Marshal(keys)
if err != nil {
log.ZWarn(ctx, "keys json marshal failed", err, "topic", topic, "keys", keys)
} else {
if err := c.rdb.Publish(ctx, topic, string(data)).Err(); err != nil {
log.ZWarn(ctx, "redis publish cache delete error", err, "topic", topic, "keys", keys)
}
}
}
+24 -4
View File
@@ -78,6 +78,8 @@ type ConversationDatabase interface {
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
// FindRandConversation finds random conversations based on the specified timestamp and limit.
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
}
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@@ -120,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := fieldMap["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
cache = cache.DelUserPinnedConversations(userIDs...)
}
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
}
@@ -172,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := args["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
cache = cache.DelUserPinnedConversations(userIDs...)
}
return cache.ChainExecDel(ctx)
}
@@ -203,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
DelUserConversationIDsHash(userIDs...).
DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
DelUserPinnedConversations(pinnedUserIDs...).
ChainExecDel(ctx)
}
@@ -259,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID).
DelConversationNotNotifyMessageUserIDs(ownerUserID).
DelConversationPinnedMessageUserIDs(ownerUserID)
DelUserPinnedConversations(ownerUserID)
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != ""
@@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
return c.conversationDB.FindRandConversation(ctx, ts, limit)
}
func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs)
if err != nil {
return err
}
cache := c.cache.CloneConversationCache()
cache = cache.DelConversations(userID, conversationIDs...).
DelConversationVersionUserIDs(userID).
DelConversationIDs(userID).
DelUserConversationIDsHash(userID).
DelConversationNotNotifyMessageUserIDs(userID).
DelUserPinnedConversations(userID)
return cache.ChainExecDel(ctx)
})
}
-1
View File
@@ -194,7 +194,6 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*model.Group,
}
for _, group := range groups {
c = c.DelGroupsInfo(group.GroupID).
DelGroupMembersHash(group.GroupID).
DelGroupMembersHash(group.GroupID).
DelGroupsMemberNum(group.GroupID).
DelGroupMemberIDs(group.GroupID).
@@ -44,4 +44,5 @@ type Conversation interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
}
@@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li
}
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
}
func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
if len(conversationIDs) == 0 {
return nil
}
return mongoutil.IncrVersion(func() error {
err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}})
return err
}, func() error {
for _, conversationID := range conversationIDs {
if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil {
return err
}
}
return nil
})
}
+7 -6
View File
@@ -5,6 +5,11 @@ import (
"fmt"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant"
@@ -14,10 +19,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
@@ -1154,7 +1155,7 @@ func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit
if err != nil {
return 0, 0, err
}
for i := len(res) - 1; i > 0; i-- {
for i := len(res) - 1; i >= 0; i-- {
v := res[i]
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
@@ -1169,7 +1170,7 @@ func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string,
limit := int64(-1)
if first {
first = false
limit = m.model.GetMsgIndex(seq)
limit = m.model.GetLimitForSingleDoc(seq)
}
docID := m.model.BuildDocIDByIndex(conversationID, i)
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
+1 -1
View File
@@ -73,7 +73,7 @@ func (u *UserMgo) TakeNotification(ctx context.Context, level int64) (user []*mo
}
func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) {
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manager_level": bson.M{"$gte": level}})
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": bson.M{"$gte": level}})
}
func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) {
+4
View File
@@ -132,6 +132,10 @@ func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
return (seq - 1) % singleGocMsgNum
}
func (*MsgDocModel) GetLimitForSingleDoc(seq int64) int64 {
return seq % singleGocMsgNum
}
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
}