Merge remote-tracking branch 'origin/errcode' into errcode

# Conflicts:
#	pkg/common/db/cache/conversation.go
#	pkg/common/db/relation/conversation_model.go
This commit is contained in:
Gordon
2023-02-03 10:26:26 +08:00
15 changed files with 404 additions and 274 deletions
+8 -6
View File
@@ -2,11 +2,13 @@ package cache
import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/table"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"time"
)
@@ -16,16 +18,16 @@ const (
)
type BlackCache struct {
blackDB *relation.Black
blackDB *table.BlackModel
expireTime time.Duration
rcClient *rockscache.Client
}
func NewBlackCache(blackDB *relation.Black) *BlackCache {
func NewBlackCache(rdb redis.UniversalClient, blackDB *relation.BlackGorm, options rockscache.Options) *BlackCache {
return &BlackCache{
blackDB: nil,
expireTime: 0,
rcClient: nil,
blackDB: blackDB,
expireTime: blackExpireTime,
rcClient: rockscache.NewClient(rdb, options),
}
}
@@ -48,7 +50,7 @@ func (b *BlackCache) GetBlackIDs(ctx context.Context, userID string) (blackIDs [
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "blackIDList", blackIDs)
}()
blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, time.Second*30*60, getBlackIDList)
blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, b.expireTime, getBlackIDList)
if err != nil {
return nil, utils.Wrap(err, "")
}
+75 -11
View File
@@ -2,8 +2,10 @@ package cache
import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/table"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"Open_IM/pkg/utilsv2"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
@@ -12,17 +14,19 @@ import (
)
const (
friendExpireTime = time.Second * 60 * 60 * 12
friendIDsKey = "FRIEND_IDS:"
friendExpireTime = time.Second * 60 * 60 * 12
friendIDsKey = "FRIEND_IDS:"
TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:"
friendKey = "FRIEND_INFO:"
)
type FriendCache struct {
friendDB *relation.Friend
friendDB *relation.FriendGorm
expireTime time.Duration
rcClient *rockscache.Client
}
func NewFriendCache(rdb redis.UniversalClient, friendDB *relation.Friend, options rockscache.Options) *FriendCache {
func NewFriendCache(rdb redis.UniversalClient, friendDB *relation.FriendGorm, options rockscache.Options) *FriendCache {
return &FriendCache{
friendDB: friendDB,
expireTime: friendExpireTime,
@@ -30,17 +34,25 @@ func NewFriendCache(rdb redis.UniversalClient, friendDB *relation.Friend, option
}
}
func (f *FriendCache) getFriendRelationKey(ownerUserID string) string {
func (f *FriendCache) getFriendIDsKey(ownerUserID string) string {
return friendIDsKey + ownerUserID
}
func (f *FriendCache) getTwoWayFriendsIDsKey(ownerUserID string) string {
return TwoWayFriendsIDsKey + ownerUserID
}
func (f *FriendCache) getFriendKey(ownerUserID, friendUserID string) string {
return friendKey + ownerUserID + "-" + friendUserID
}
func (f *FriendCache) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
getFriendIDList := func() (string, error) {
friendIDList, err := f.friendDB.GetFriendIDs(ctx, ownerUserID)
getFriendIDs := func() (string, error) {
friendIDs, err := f.friendDB.GetFriendIDs(ctx, ownerUserID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(friendIDList)
bytes, err := json.Marshal(friendIDs)
if err != nil {
return "", utils.Wrap(err, "")
}
@@ -49,11 +61,11 @@ func (f *FriendCache) GetFriendIDs(ctx context.Context, ownerUserID string) (fri
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendIDs", friendIDs)
}()
friendIDListStr, err := f.rcClient.Fetch(f.getFriendRelationKey(ownerUserID), f.expireTime, getFriendIDList)
friendIDsStr, err := f.rcClient.Fetch(f.getFriendIDsKey(ownerUserID), f.expireTime, getFriendIDs)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(friendIDListStr), &friendIDs)
err = json.Unmarshal([]byte(friendIDsStr), &friendIDs)
return friendIDs, utils.Wrap(err, "")
}
@@ -61,5 +73,57 @@ func (f *FriendCache) DelFriendIDs(ctx context.Context, ownerUserID string) (err
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID)
}()
return f.rcClient.TagAsDeleted(f.getFriendRelationKey(ownerUserID))
return f.rcClient.TagAsDeleted(f.getFriendIDsKey(ownerUserID))
}
func (f *FriendCache) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) {
friendIDs, err := f.GetFriendIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
for _, friendID := range friendIDs {
friendFriendID, err := f.GetFriendIDs(ctx, friendID)
if err != nil {
return nil, err
}
if utils.IsContain(ownerUserID, friendFriendID) {
twoWayFriendIDs = append(twoWayFriendIDs, ownerUserID)
}
}
return twoWayFriendIDs, nil
}
func (f *FriendCache) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID)
}()
return f.rcClient.TagAsDeleted(f.getTwoWayFriendsIDsKey(ownerUserID))
}
func (f *FriendCache) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *table.FriendModel, err error) {
getFriend := func() (string, error) {
friend, err = f.friendDB.Take(ctx, ownerUserID, friendUserID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(friend)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
friendStr, err := f.rcClient.Fetch(f.getFriendKey(ownerUserID, friendUserID), f.expireTime, getFriend)
if err != nil {
return nil, err
}
friend = &table.FriendModel{}
err = json.Unmarshal([]byte(friendStr), friend)
return friend, utils.Wrap(err, "")
}
func (f *FriendCache) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserID", friendUserID)
}()
return f.rcClient.TagAsDeleted(f.getFriendKey(ownerUserID, friendUserID))
}
+3 -35
View File
@@ -33,20 +33,18 @@ type GroupCache struct {
group *relation.GroupGorm
groupMember *relation.GroupMemberGorm
groupRequest *relation.GroupRequestGorm
mongoDB *unrelation.SuperGroupMgoDB
mongoDB *unrelation.SuperGroupMongoDriver
expireTime time.Duration
redisClient *RedisClient
rcClient *rockscache.Client
//local cache
cacheGroupMtx sync.RWMutex
cacheGroupMemberUserIDs map[string]*localcache.GroupMemberIDsHash
}
func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.GroupGorm, groupMemberDB *relation.GroupMemberGorm, groupRequestDB *relation.GroupRequestGorm, mongoClient *unrelation.SuperGroupMgoDB, opts rockscache.Options) *GroupCache {
func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.GroupGorm, groupMemberDB *relation.GroupMemberGorm, groupRequestDB *relation.GroupRequestGorm, mongoClient *unrelation.SuperGroupMongoDriver, opts rockscache.Options) *GroupCache {
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime,
group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb),
mongoDB: mongoClient, cacheGroupMemberUserIDs: make(map[string]*localcache.GroupMemberIDsHash, 0),
mongoDB: mongoClient,
}
}
@@ -260,36 +258,6 @@ func (g *GroupCache) DelGroupMemberIDs(ctx context.Context, groupID string) (err
return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID))
}
// from local map
func (g *GroupCache) LocalGetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
remoteHash, err := g.GetGroupMembersHash(ctx, groupID)
if err != nil {
g.cacheGroupMtx.Lock()
defer g.cacheGroupMtx.Unlock()
delete(g.cacheGroupMemberUserIDs, groupID)
return nil, err
}
g.cacheGroupMtx.Lock()
defer g.cacheGroupMtx.Unlock()
if remoteHash == 0 {
delete(g.cacheGroupMemberUserIDs, groupID)
return []string{}, nil
}
localCache, ok := g.cacheGroupMemberUserIDs[groupID]
if ok && localCache.MemberListHash == remoteHash {
return localCache.UserIDs, nil
}
groupMemberIDsRemote, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
g.cacheGroupMemberUserIDs[groupID] = &localcache.GroupMemberIDsHash{
MemberListHash: remoteHash,
UserIDs: groupMemberIDsRemote,
}
return groupMemberIDsRemote, nil
}
// JoinedGroups
func (g *GroupCache) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
getJoinedGroupIDList := func() (string, error) {
+1 -2
View File
@@ -324,9 +324,8 @@ func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData
return false, nil
}
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
return false, errors.New("signalInfo do not need offlinePush")
return false, nil
default:
log2.NewDebug(operationID, utils.GetSelfFuncName(), "req invalid type", string(msg.Content))
return false, nil
}
if isInviteSignal {
+43 -11
View File
@@ -2,32 +2,35 @@ package cache
import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/table"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"strconv"
"time"
)
const (
UserExpireTime = time.Second * 60 * 60 * 12
userInfoKey = "USER_INFO:"
userExpireTime = time.Second * 60 * 60 * 12
userInfoKey = "USER_INFO:"
userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:"
)
type UserCache struct {
userDB *relation.User
userDB *relation.UserGorm
expireTime time.Duration
redisClient *RedisClient
rcClient *rockscache.Client
}
func NewUserCache(rdb redis.UniversalClient, userDB *relation.User, options rockscache.Options) *UserCache {
func NewUserCache(rdb redis.UniversalClient, userDB *relation.UserGorm, options rockscache.Options) *UserCache {
return &UserCache{
userDB: userDB,
expireTime: UserExpireTime,
expireTime: userExpireTime,
redisClient: NewRedisClient(rdb),
rcClient: rockscache.NewClient(rdb, options),
}
@@ -37,7 +40,11 @@ func (u *UserCache) getUserInfoKey(userID string) string {
return userInfoKey + userID
}
func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *relation.User, err error) {
func (u *UserCache) getUserGlobalRecvMsgOptKey(userID string) string {
return userGlobalRecvMsgOptKey + userID
}
func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *table.UserModel, err error) {
getUserInfo := func() (string, error) {
userInfo, err := u.userDB.Take(ctx, userID)
if err != nil {
@@ -52,17 +59,17 @@ func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *r
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "userInfo", *userInfo)
}()
userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), time.Second*30*60, getUserInfo)
userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserInfo)
if err != nil {
return nil, err
}
userInfo = &relation.User{}
userInfo = &table.UserModel{}
err = json.Unmarshal([]byte(userInfoStr), userInfo)
return userInfo, utils.Wrap(err, "")
}
func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relation.User, error) {
var users []*relation.User
func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*table.UserModel, error) {
var users []*table.UserModel
for _, userID := range userIDs {
user, err := GetUserInfoFromCache(ctx, userID)
if err != nil {
@@ -77,7 +84,7 @@ func (u *UserCache) DelUserInfo(ctx context.Context, userID string) (err error)
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID) + userID)
return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID))
}
func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err error) {
@@ -88,3 +95,28 @@ func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err err
}
return nil
}
func (u *UserCache) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
getUserGlobalRecvMsgOpt := func() (string, error) {
userInfo, err := u.userDB.Take(ctx, userID)
if err != nil {
return "", err
}
return strconv.Itoa(int(userInfo.GlobalRecvMsgOpt)), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "opt", opt)
}()
optStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserGlobalRecvMsgOpt)
if err != nil {
return 0, err
}
return strconv.Atoi(optStr)
}
func (u *UserCache) DelUserGlobalRecvMsgOpt(ctx context.Context, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return u.rcClient.TagAsDeleted(u.getUserGlobalRecvMsgOptKey(userID))
}