This commit is contained in:
wangchuxiao
2023-02-09 14:40:49 +08:00
parent 8c6c7ee21f
commit 414915eb3b
66 changed files with 424 additions and 1296 deletions
+1
View File
@@ -15,6 +15,7 @@ const (
blackExpireTime = time.Second * 60 * 60 * 12
)
// args fn will exec when no data in cache
type BlackCache interface {
//get blackIDs from cache
GetBlackIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) (blackIDs []string, err error)
+1
View File
@@ -21,6 +21,7 @@ const (
conversationExpireTime = time.Second * 60 * 60 * 12
)
// args fn will exec when no data in cache
type ConversationCache interface {
// get user's conversationIDs from cache
GetUserConversationIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) ([]string, error)
+36 -2
View File
@@ -1,13 +1,47 @@
package cache
import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"github.com/dtm-labs/rockscache"
"time"
)
type ExtendMsgSetCache struct {
friendDB *relation.FriendGorm
expireTime time.Duration
rcClient *rockscache.Client
}
func (e *ExtendMsgSetCache) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsg, err error) {
getExtendMsg := func() (string, error) {
extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime)
if err != nil {
return "", utils.Wrap(err, "GetExtendMsgList failed")
}
bytes, err := json.Marshal(extendMsg)
if err != nil {
return "", utils.Wrap(err, "Marshal failed")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType",
sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg)
}()
extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg)
if err != nil {
return nil, utils.Wrap(err, "Fetch failed")
}
extendMsg = &mongoDB.ExtendMsg{}
err = json.Unmarshal([]byte(extendMsgStr), extendMsg)
return extendMsg, utils.Wrap(err, "Unmarshal failed")
}
func (e *ExtendMsgSetCache) DelExtendMsg(ctx context.Context, clientMsgID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "clientMsgID", clientMsgID)
}()
return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+clientMsgID), "DelExtendMsg err")
}
+3 -1
View File
@@ -19,12 +19,14 @@ const (
friendKey = "FRIEND_INFO:"
)
// args fn will exec when no data in cache
type FriendCache interface {
GetFriendIDs(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserID string) (friendIDs []string, err error)) (friendIDs []string, err error)
// call when friendID List changed
DelFriendIDs(ctx context.Context, ownerUserID string) (err error)
// get single friendInfo from cache
GetFriend(ctx context.Context, ownerUserID, friendUserID string, fn func(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)) (friend *relationTb.FriendModel, err error)
// del friend when friend info changed or remove it
// del friend when friend info changed
DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error)
}
+26 -26
View File
@@ -52,40 +52,40 @@ func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB *relation.GroupGorm,
}
}
func (g *GroupCache) getRedisClient() *RedisClient {
func (g *GroupCacheRedis) getRedisClient() *RedisClient {
return g.redisClient
}
func (g *GroupCache) getGroupInfoKey(groupID string) string {
func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
return groupInfoKey + groupID
}
func (g *GroupCache) getJoinedSuperGroupsIDKey(userID string) string {
func (g *GroupCacheRedis) getJoinedSuperGroupsIDKey(userID string) string {
return joinedSuperGroupsKey + userID
}
func (g *GroupCache) getJoinedGroupsKey(userID string) string {
func (g *GroupCacheRedis) getJoinedGroupsKey(userID string) string {
return joinedGroupsKey + userID
}
func (g *GroupCache) getGroupMembersHashKey(groupID string) string {
func (g *GroupCacheRedis) getGroupMembersHashKey(groupID string) string {
return groupMembersHashKey + groupID
}
func (g *GroupCache) getGroupMemberIDsKey(groupID string) string {
func (g *GroupCacheRedis) getGroupMemberIDsKey(groupID string) string {
return groupMemberIDsKey + groupID
}
func (g *GroupCache) getGroupMemberInfoKey(groupID, userID string) string {
func (g *GroupCacheRedis) getGroupMemberInfoKey(groupID, userID string) string {
return groupMemberInfoKey + groupID + "-" + userID
}
func (g *GroupCache) getGroupMemberNumKey(groupID string) string {
func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string {
return groupMemberNumKey + groupID
}
// / groupInfo
func (g *GroupCache) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
for _, groupID := range groupIDs {
group, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
@@ -96,7 +96,7 @@ func (g *GroupCache) GetGroupsInfo(ctx context.Context, groupIDs []string) (grou
return groups, nil
}
func (g *GroupCache) GetGroupInfo(ctx context.Context, groupID string) (group *relation.GroupGorm, err error) {
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relation.GroupGorm, err error) {
getGroup := func() (string, error) {
groupInfo, err := g.group.Take(ctx, groupID)
if err != nil {
@@ -120,14 +120,14 @@ func (g *GroupCache) GetGroupInfo(ctx context.Context, groupID string) (group *r
return group, utils.Wrap(err, "")
}
func (g *GroupCache) DelGroupInfo(ctx context.Context, groupID string) (err error) {
func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID))
}
func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
for _, groupID := range groupIDs {
if err := g.DelGroupInfo(ctx, groupID); err != nil {
return err
@@ -137,7 +137,7 @@ func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error
}
// userJoinSuperGroup
func (g *GroupCache) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
func (g *GroupCacheRedis) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
for _, userID := range userIDs {
if err := g.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
return err
@@ -146,14 +146,14 @@ func (g *GroupCache) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []
return nil
}
func (g *GroupCache) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) {
func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return g.rcClient.TagAsDeleted(g.getJoinedSuperGroupsIDKey(userID))
}
func (g *GroupCache) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
getJoinedSuperGroupIDList := func() (string, error) {
userToSuperGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
if err != nil {
@@ -177,7 +177,7 @@ func (g *GroupCache) GetJoinedSuperGroupIDs(ctx context.Context, userID string)
}
// groupMembersHash
func (g *GroupCache) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
generateHash := func() (string, error) {
groupInfo, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
@@ -210,7 +210,7 @@ func (g *GroupCache) GetGroupMembersHash(ctx context.Context, groupID string) (h
return uint64(hashCode), err
}
func (g *GroupCache) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
@@ -219,7 +219,7 @@ func (g *GroupCache) DelGroupMembersHash(ctx context.Context, groupID string) (e
// groupMemberIDs
// from redis
func (g *GroupCache) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
f := func() (string, error) {
groupInfo, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
@@ -255,7 +255,7 @@ func (g *GroupCache) GetGroupMemberIDs(ctx context.Context, groupID string) (gro
return groupMemberIDs, nil
}
func (g *GroupCache) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
@@ -263,7 +263,7 @@ func (g *GroupCache) DelGroupMemberIDs(ctx context.Context, groupID string) (err
}
// JoinedGroups
func (g *GroupCache) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
getJoinedGroupIDList := func() (string, error) {
joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
if err != nil {
@@ -286,7 +286,7 @@ func (g *GroupCache) GetJoinedGroupIDs(ctx context.Context, userID string) (join
return joinedGroupIDs, utils.Wrap(err, "")
}
func (g *GroupCache) DelJoinedGroupIDs(ctx context.Context, userID string) (err error) {
func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
@@ -294,7 +294,7 @@ func (g *GroupCache) DelJoinedGroupIDs(ctx context.Context, userID string) (err
}
// GetGroupMemberInfo
func (g *GroupCache) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relation.GroupMember, err error) {
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relation.GroupMember, err error) {
getGroupMemberInfo := func() (string, error) {
groupMemberInfo, err := relation.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID)
if err != nil {
@@ -318,7 +318,7 @@ func (g *GroupCache) GetGroupMemberInfo(ctx context.Context, groupID, userID str
return groupMember, utils.Wrap(err, "")
}
func (g *GroupCache) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
}()
@@ -363,7 +363,7 @@ func (g *GroupCache) GetGroupMembersInfo(ctx context.Context, count, offset int3
return groupMemberList, nil
}
func (g *GroupCache) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID)
}()
@@ -371,7 +371,7 @@ func (g *GroupCache) DelGroupMemberInfo(ctx context.Context, groupID, userID str
}
// groupMemberNum
func (g *GroupCache) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
getGroupMemberNum := func() (string, error) {
num, err := relation.GetGroupMemberNumByGroupID(groupID)
if err != nil {
@@ -389,7 +389,7 @@ func (g *GroupCache) GetGroupMemberNum(ctx context.Context, groupID string) (num
return strconv.Atoi(groupMember)
}
func (g *GroupCache) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
+38 -71
View File
@@ -20,28 +20,36 @@ import (
)
const (
accountTempCode = "ACCOUNT_TEMP_CODE"
resetPwdTempCode = "RESET_PWD_TEMP_CODE"
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
userMinSeq = "REDIS_USER_MIN_SEQ:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
conversationReceiveMessageOpt = "CON_RECV_MSG_OPT:"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
SignalCache = "SIGNAL_CACHE:"
SignalListCache = "SIGNAL_LIST_CACHE:"
GlobalMsgRecvOpt = "GLOBAL_MSG_RECV_OPT"
FcmToken = "FCM_TOKEN:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
userMinSeq = "REDIS_USER_MIN_SEQ:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FcmToken = "FCM_TOKEN:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
)
type Cache interface {
IncrUserSeq(uid string) (uint64, error)
GetUserMaxSeq(uid string) (uint64, error)
SetUserMaxSeq(uid string, maxSeq uint64) error
SetUserMinSeq(uid string, minSeq uint32) (err error)
GetUserMinSeq(uid string) (uint64, error)
SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error)
GetGroupUserMinSeq(groupID, userID string) (uint64, error)
}
// native redis operate
type RedisClient struct {
rdb redis.UniversalClient
}
@@ -86,25 +94,6 @@ func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
return &RedisClient{rdb: rdb}
}
func (r *RedisClient) JudgeAccountEXISTS(account string) (bool, error) {
key := accountTempCode + account
n, err := r.rdb.Exists(context.Background(), key).Result()
if n > 0 {
return true, err
} else {
return false, err
}
}
func (r *RedisClient) SetAccountCode(account string, code, ttl int) (err error) {
key := accountTempCode + account
return r.rdb.Set(context.Background(), key, code, time.Duration(ttl)*time.Second).Err()
}
func (r *RedisClient) GetAccountCode(account string) (string, error) {
key := accountTempCode + account
return r.rdb.Get(context.Background(), key).Result()
}
//Perform seq auto-increment operation of user messages
func (r *RedisClient) IncrUserSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
@@ -195,36 +184,12 @@ func (r *RedisClient) SetTokenMapByUidPid(userID string, platformID int, m map[s
}
return r.rdb.HSet(context.Background(), key, mm).Err()
}
func (r *RedisClient) DeleteTokenByUidPid(userID string, platformID int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return r.rdb.HDel(context.Background(), key, fields...).Err()
}
func (r *RedisClient) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error {
key := conversationReceiveMessageOpt + userID
return r.rdb.HSet(context.Background(), key, conversationID, opt).Err()
}
func (r *RedisClient) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) {
key := conversationReceiveMessageOpt + userID
result, err := r.rdb.HGet(context.Background(), key, conversationID).Result()
return utils.StringToInt(result), err
}
func (r *RedisClient) SetUserGlobalMsgRecvOpt(userID string, opt int32) error {
key := conversationReceiveMessageOpt + userID
return r.rdb.HSet(context.Background(), key, GlobalMsgRecvOpt, opt).Err()
}
func (r *RedisClient) GetUserGlobalMsgRecvOpt(userID string) (int, error) {
key := conversationReceiveMessageOpt + userID
result, err := r.rdb.HGet(context.Background(), key, GlobalMsgRecvOpt).Result()
if err != nil {
if err == redis.Nil {
return 0, nil
} else {
return 0, err
}
}
return utils.StringToInt(result), err
}
func (r *RedisClient) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) {
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
@@ -336,7 +301,7 @@ func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData
if err != nil {
return false, err
}
keyList := SignalListCache + userID
keyList := signalListCache + userID
err = r.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
if err != nil {
return false, err
@@ -345,7 +310,7 @@ func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData
if err != nil {
return false, err
}
key := SignalCache + msg.ClientMsgID
key := signalCache + msg.ClientMsgID
err = r.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, err
@@ -356,7 +321,7 @@ func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData
}
func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
key := SignalCache + clientMsgID
key := signalCache + clientMsgID
invitationInfo = &pbRtc.SignalInviteReq{}
bytes, err := r.rdb.Get(context.Background(), key).Bytes()
if err != nil {
@@ -378,7 +343,7 @@ func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (i
}
func (r *RedisClient) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
keyList := SignalListCache + userID
keyList := signalListCache + userID
result := r.rdb.LPop(context.Background(), keyList)
if err = result.Err(); err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
@@ -400,7 +365,7 @@ func (r *RedisClient) GetAvailableSignalInvitationInfo(userID string) (invitatio
}
func (r *RedisClient) DelUserSignalList(userID string) error {
keyList := SignalListCache + userID
keyList := signalListCache + userID
err := r.rdb.Del(context.Background(), keyList).Err()
return err
}
@@ -516,17 +481,19 @@ func (r *RedisClient) SetMessageReactionExpire(clientMsgID string, sessionType i
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.Expire(context.Background(), key, expiration).Result()
}
func (r *RedisClient) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
result, err := r.rdb.HGet(context.Background(), key, typeKey).Result()
return result, err
}
func (r *RedisClient) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.HSet(context.Background(), key, typeKey, value).Err()
}
func (r *RedisClient) LockMessageTypeKey(clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return r.rdb.SetNX(context.Background(), key, 1, time.Minute).Err()
@@ -537,7 +504,7 @@ func (r *RedisClient) UnLockMessageTypeKey(clientMsgID string, TypeKey string) e
}
func getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID
+9 -44
View File
@@ -3,7 +3,7 @@ package cache
import (
"Open_IM/pkg/common/constant"
pbChat "Open_IM/pkg/proto/msg"
server_api_params "Open_IM/pkg/proto/sdk_ws"
common "Open_IM/pkg/proto/sdk_ws"
"context"
"flag"
"fmt"
@@ -11,6 +11,8 @@ import (
"testing"
)
var DB RedisClient
func Test_SetTokenMapByUidPid(t *testing.T) {
m := make(map[string]int, 0)
m["test1"] = 1
@@ -35,7 +37,7 @@ func Test_GetKeyTTL(t *testing.T) {
ctx := context.Background()
key := flag.String("key", "key", "key value")
flag.Parse()
ttl, err := DB.RDB.TTL(ctx, *key).Result()
ttl, err := DB.GetClient().TTL(ctx, *key).Result()
assert.Nil(t, err)
fmt.Println(ttl)
}
@@ -43,7 +45,7 @@ func Test_HGetAll(t *testing.T) {
ctx := context.Background()
key := flag.String("key", "key", "key value")
flag.Parse()
ttl, err := DB.RDB.TTL(ctx, *key).Result()
ttl, err := DB.GetClient().TTL(ctx, *key).Result()
assert.Nil(t, err)
fmt.Println(ttl)
}
@@ -51,14 +53,14 @@ func Test_HGetAll(t *testing.T) {
func Test_NewSetMessageToCache(t *testing.T) {
var msg pbChat.MsgDataToMQ
m := make(map[string]bool)
var offlinePush server_api_params.OfflinePushInfo
var offlinePush common.OfflinePushInfo
offlinePush.Title = "3"
offlinePush.Ex = "34"
offlinePush.IOSPushSound = "+1"
offlinePush.IOSBadgeCount = true
m[constant.IsPersistent] = true
m[constant.IsHistory] = true
var data server_api_params.MsgData
var data common.MsgData
uid := "test_uid"
data.Seq = 11
data.ClientMsgID = "23jwhjsdf"
@@ -77,7 +79,7 @@ func Test_NewSetMessageToCache(t *testing.T) {
}
func Test_NewGetMessageListBySeq(t *testing.T) {
var msg pbChat.MsgDataToMQ
var data server_api_params.MsgData
var data common.MsgData
uid := "test_uid"
data.Seq = 11
data.ClientMsgID = "23jwhjsdf"
@@ -88,37 +90,7 @@ func Test_NewGetMessageListBySeq(t *testing.T) {
fmt.Println(seqMsg, failedSeqList)
}
func Test_SetUserGlobalMsgRecvOpt(t *testing.T) {
var opt int32
uid := "test_uid"
opt = 1
err := DB.SetUserGlobalMsgRecvOpt(uid, opt)
assert.Nil(t, err)
}
func Test_GetUserGlobalMsgRecvOpt(t *testing.T) {
uid := "test_uid"
opt, err := DB.GetUserGlobalMsgRecvOpt(uid)
assert.Nil(t, err)
fmt.Println("get opt", opt)
}
func Test_JudgeAccountEXISTS(t *testing.T) {
uid := "test_uid"
b, err := DB.JudgeAccountEXISTS(uid)
assert.Nil(t, err)
fmt.Println(b)
}
func Test_SetAccountCode(t *testing.T) {
uid := "test_uid"
code := 666666
err := DB.SetAccountCode(uid, code, 100)
assert.Nil(t, err)
}
func Test_GetAccountCode(t *testing.T) {
uid := "test_uid"
code, err := DB.GetAccountCode(uid)
assert.Nil(t, err)
fmt.Println(code)
}
func Test_SetFcmToken(t *testing.T) {
uid := "test_uid"
token := "dfnWBtOjSj-XIZnUvDlegv:APA91bG09XTtiXfpE6U7gUVMOhnKcUkNCv4WHn0UZr2clUi-tS1jEH-HiCEW8GIAhjLIGcfUJ6NIKteC023ZxDH7J0PJ5sTxoup3fHDUPLU7KgQoZS4tPyFqCbZ6bRB7esDPEnD1n_s0"
@@ -133,10 +105,3 @@ func Test_GetFcmToken(t *testing.T) {
assert.Nil(t, err)
fmt.Println("token is :", token)
}
//func Test_GetGroupMemberList(t *testing.T) {
// groupID := "3791742301"
// list, err := DB.GetGroupMemberIDListFromCache(groupID)
// assert.Nil(t, err)
// fmt.Println(list)
//}
-32
View File
@@ -604,35 +604,3 @@ func DelConversationFromCache(ctx context.Context, ownerUserID, conversationID s
}()
return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err")
}
func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *mongoDB.ExtendMsg, err error) {
getExtendMsg := func() (string, error) {
extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime)
if err != nil {
return "", utils.Wrap(err, "GetExtendMsgList failed")
}
bytes, err := json.Marshal(extendMsg)
if err != nil {
return "", utils.Wrap(err, "Marshal failed")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType",
sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg)
}()
extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg)
if err != nil {
return nil, utils.Wrap(err, "Fetch failed")
}
extendMsg = &mongoDB.ExtendMsg{}
err = json.Unmarshal([]byte(extendMsgStr), extendMsg)
return extendMsg, utils.Wrap(err, "Unmarshal failed")
}
func DelExtendMsg(ctx context.Context, clientMsgID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "clientMsgID", clientMsgID)
}()
return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+clientMsgID), "DelExtendMsg err")
}
+73 -73
View File
@@ -4,8 +4,8 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/relation"
relation2 "Open_IM/pkg/common/db/table/relation"
unrelation2 "Open_IM/pkg/common/db/table/unrelation"
relationTb "Open_IM/pkg/common/db/table/relation"
unrelationTb "Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/utils"
"context"
@@ -20,31 +20,31 @@ import (
//type GroupInterface GroupDataBaseInterface
type GroupInterface interface {
CreateGroup(ctx context.Context, groups []*relation2.GroupModel, groupMembers []*relation2.GroupMemberModel) error
TakeGroup(ctx context.Context, groupID string) (group *relation2.GroupModel, err error)
FindGroup(ctx context.Context, groupIDs []string) (groups []*relation2.GroupModel, err error)
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relation2.GroupModel, error)
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
// GroupMember
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relation2.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relation2.GroupMemberModel, error)
FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relation2.GroupMemberModel, error)
PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relation2.GroupMemberModel, error)
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relation2.GroupMemberModel, error)
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relation2.GroupMemberModel) error
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)
FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error)
PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string][]string, error)
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
UpdateGroupMember(ctx context.Context, groupID, userID string, data map[string]any) error
// GroupRequest
CreateGroupRequest(ctx context.Context, requests []*relation2.GroupRequestModel) error
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relation2.GroupRequestModel, error)
PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relation2.GroupRequestModel, error)
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error)
// SuperGroup
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelation2.SuperGroupModel, error)
FindJoinSuperGroup(ctx context.Context, userID string) (superGroup *unrelation2.UserToSuperGroupModel, err error)
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error)
FindJoinSuperGroup(ctx context.Context, userID string) (superGroup *unrelationTb.UserToSuperGroupModel, err error)
CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error
DeleteSuperGroup(ctx context.Context, groupID string) error
DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error
@@ -53,27 +53,27 @@ type GroupInterface interface {
var _ GroupInterface = (*GroupController)(nil)
func NewGroupInterface(database GroupDataBaseInterface) GroupInterface {
return &GroupController{database: database}
func NewGroupInterface(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.Client) GroupInterface {
return &GroupController{database: NewGroupDatabase(db, rdb, mgoClient)}
}
type GroupController struct {
database GroupDataBaseInterface
}
func (g *GroupController) CreateGroup(ctx context.Context, groups []*relation2.GroupModel, groupMembers []*relation2.GroupMemberModel) error {
func (g *GroupController) CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error {
return g.database.CreateGroup(ctx, groups, groupMembers)
}
func (g *GroupController) TakeGroup(ctx context.Context, groupID string) (group *relation2.GroupModel, err error) {
func (g *GroupController) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
return g.TakeGroup(ctx, groupID)
}
func (g *GroupController) FindGroup(ctx context.Context, groupIDs []string) (groups []*relation2.GroupModel, err error) {
func (g *GroupController) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
return g.database.FindGroup(ctx, groupIDs)
}
func (g *GroupController) SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relation2.GroupModel, error) {
func (g *GroupController) SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error) {
return g.database.SearchGroup(ctx, keyword, pageNumber, showNumber)
}
@@ -85,27 +85,27 @@ func (g *GroupController) DismissGroup(ctx context.Context, groupID string) erro
return g.database.DismissGroup(ctx, groupID)
}
func (g *GroupController) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relation2.GroupMemberModel, err error) {
func (g *GroupController) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
return g.database.TakeGroupMember(ctx, groupID, userID)
}
func (g *GroupController) TakeGroupOwner(ctx context.Context, groupID string) (*relation2.GroupMemberModel, error) {
func (g *GroupController) TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error) {
return g.database.TakeGroupOwner(ctx, groupID)
}
func (g *GroupController) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relation2.GroupMemberModel, error) {
func (g *GroupController) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error) {
return g.database.FindGroupMember(ctx, groupIDs, userIDs, roleLevels)
}
func (g *GroupController) PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relation2.GroupMemberModel, error) {
func (g *GroupController) PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) {
return g.database.PageGroupMember(ctx, groupIDs, userIDs, roleLevels, pageNumber, showNumber)
}
func (g *GroupController) SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relation2.GroupMemberModel, error) {
func (g *GroupController) SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) {
return g.database.SearchGroupMember(ctx, keyword, groupIDs, userIDs, roleLevels, pageNumber, showNumber)
}
func (g *GroupController) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relation2.GroupMemberModel) error {
func (g *GroupController) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error {
return g.database.HandlerGroupRequest(ctx, groupID, userID, handledMsg, handleResult, member)
}
@@ -129,26 +129,26 @@ func (g *GroupController) UpdateGroupMember(ctx context.Context, groupID, userID
return g.database.UpdateGroupMember(ctx, groupID, userID, data)
}
func (g *GroupController) CreateGroupRequest(ctx context.Context, requests []*relation2.GroupRequestModel) error {
func (g *GroupController) CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error {
return g.database.CreateGroupRequest(ctx, requests)
}
func (g *GroupController) TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relation2.GroupRequestModel, error) {
func (g *GroupController) TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error) {
return g.database.TakeGroupRequest(ctx, groupID, userID)
}
func (g *GroupController) PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relation2.GroupRequestModel, error) {
func (g *GroupController) PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error) {
return g.database.PageGroupRequestUser(ctx, userID, pageNumber, showNumber)
}
// func (g *GroupController) TakeSuperGroup(ctx context.Context, groupID string) (superGroup *unrelation2.SuperGroupModel, err error) {
// func (g *GroupController) TakeSuperGroup(ctx context.Context, groupID string) (superGroup *unrelationTb.SuperGroupModel, err error) {
// return g.database.TakeSuperGroup(ctx, groupID)
// }
func (g *GroupController) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelation2.SuperGroupModel, error) {
func (g *GroupController) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error) {
return g.database.FindSuperGroup(ctx, groupIDs)
}
func (g *GroupController) FindJoinSuperGroup(ctx context.Context, userID string) (*unrelation2.UserToSuperGroupModel, error) {
func (g *GroupController) FindJoinSuperGroup(ctx context.Context, userID string) (*unrelationTb.UserToSuperGroupModel, error) {
return g.database.FindJoinSuperGroup(ctx, userID)
}
@@ -169,31 +169,31 @@ func (g *GroupController) CreateSuperGroupMember(ctx context.Context, groupID st
}
type GroupDataBaseInterface interface {
CreateGroup(ctx context.Context, groups []*relation2.GroupModel, groupMembers []*relation2.GroupMemberModel) error
TakeGroup(ctx context.Context, groupID string) (group *relation2.GroupModel, err error)
FindGroup(ctx context.Context, groupIDs []string) (groups []*relation2.GroupModel, err error)
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relation2.GroupModel, error)
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
// GroupMember
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relation2.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relation2.GroupMemberModel, error)
FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relation2.GroupMemberModel, error)
PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relation2.GroupMemberModel, error)
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relation2.GroupMemberModel, error)
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relation2.GroupMemberModel) error
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)
FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error)
PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string][]string, error)
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
UpdateGroupMember(ctx context.Context, groupID, userID string, data map[string]any) error
// GroupRequest
CreateGroupRequest(ctx context.Context, requests []*relation2.GroupRequestModel) error
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relation2.GroupRequestModel, error)
PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relation2.GroupRequestModel, error)
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error)
// SuperGroup
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelation2.SuperGroupModel, error)
FindJoinSuperGroup(ctx context.Context, userID string) (*unrelation2.UserToSuperGroupModel, error)
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error)
FindJoinSuperGroup(ctx context.Context, userID string) (*unrelationTb.UserToSuperGroupModel, error)
CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error
DeleteSuperGroup(ctx context.Context, groupID string) error
DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error
@@ -225,16 +225,16 @@ func NewGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.C
var _ GroupDataBaseInterface = (*GroupDataBase)(nil)
type GroupDataBase struct {
groupDB relation2.GroupModelInterface
groupMemberDB relation2.GroupMemberModelInterface
groupRequestDB relation2.GroupRequestModelInterface
groupDB relationTb.GroupModelInterface
groupMemberDB relationTb.GroupMemberModelInterface
groupRequestDB relationTb.GroupRequestModelInterface
db *gorm.DB
cache *cache.GroupCache
mongoDB *unrelation.SuperGroupMongoDriver
}
func (g *GroupDataBase) CreateGroup(ctx context.Context, groups []*relation2.GroupModel, groupMembers []*relation2.GroupMemberModel) error {
func (g *GroupDataBase) CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error {
if len(groups) > 0 && len(groupMembers) > 0 {
return g.db.Transaction(func(tx *gorm.DB) error {
if err := g.groupDB.Create(ctx, groups, tx); err != nil {
@@ -252,15 +252,15 @@ func (g *GroupDataBase) CreateGroup(ctx context.Context, groups []*relation2.Gro
return nil
}
func (g *GroupDataBase) TakeGroup(ctx context.Context, groupID string) (group *relation2.GroupModel, err error) {
func (g *GroupDataBase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
return g.groupDB.Take(ctx, groupID)
}
func (g *GroupDataBase) FindGroup(ctx context.Context, groupIDs []string) (groups []*relation2.GroupModel, err error) {
func (g *GroupDataBase) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
return g.groupDB.Find(ctx, groupIDs)
}
func (g *GroupDataBase) SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relation2.GroupModel, error) {
func (g *GroupDataBase) SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error) {
return g.groupDB.Search(ctx, keyword, pageNumber, showNumber)
}
@@ -277,27 +277,27 @@ func (g *GroupDataBase) DismissGroup(ctx context.Context, groupID string) error
})
}
func (g *GroupDataBase) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relation2.GroupMemberModel, err error) {
func (g *GroupDataBase) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
}
func (g *GroupDataBase) TakeGroupOwner(ctx context.Context, groupID string) (*relation2.GroupMemberModel, error) {
func (g *GroupDataBase) TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error) {
return g.groupMemberDB.TakeOwner(ctx, groupID)
}
func (g *GroupDataBase) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relation2.GroupMemberModel, error) {
func (g *GroupDataBase) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error) {
return g.groupMemberDB.Find(ctx, groupIDs, userIDs, roleLevels)
}
func (g *GroupDataBase) PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relation2.GroupMemberModel, error) {
func (g *GroupDataBase) PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) {
return g.groupMemberDB.SearchMember(ctx, "", groupIDs, userIDs, roleLevels, pageNumber, showNumber)
}
func (g *GroupDataBase) SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relation2.GroupMemberModel, error) {
func (g *GroupDataBase) SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) {
return g.groupMemberDB.SearchMember(ctx, keyword, groupIDs, userIDs, roleLevels, pageNumber, showNumber)
}
func (g *GroupDataBase) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relation2.GroupMemberModel) error {
func (g *GroupDataBase) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error {
if member == nil {
return g.groupRequestDB.UpdateHandler(ctx, groupID, userID, handledMsg, handleResult)
}
@@ -305,7 +305,7 @@ func (g *GroupDataBase) HandlerGroupRequest(ctx context.Context, groupID string,
if err := g.groupRequestDB.UpdateHandler(ctx, groupID, userID, handledMsg, handleResult, tx); err != nil {
return err
}
return g.groupMemberDB.Create(ctx, []*relation2.GroupMemberModel{member}, tx)
return g.groupMemberDB.Create(ctx, []*relationTb.GroupMemberModel{member}, tx)
})
}
@@ -345,46 +345,46 @@ func (g *GroupDataBase) UpdateGroupMember(ctx context.Context, groupID, userID s
return g.groupMemberDB.Update(ctx, groupID, userID, data)
}
func (g *GroupDataBase) CreateGroupRequest(ctx context.Context, requests []*relation2.GroupRequestModel) error {
func (g *GroupDataBase) CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error {
return g.groupRequestDB.Create(ctx, requests)
}
func (g *GroupDataBase) TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relation2.GroupRequestModel, error) {
func (g *GroupDataBase) TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error) {
return g.groupRequestDB.Take(ctx, groupID, userID)
}
func (g *GroupDataBase) PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relation2.GroupRequestModel, error) {
func (g *GroupDataBase) PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error) {
return g.groupRequestDB.Page(ctx, userID, pageNumber, showNumber)
}
func (g *GroupDataBase) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelation2.SuperGroupModel, error) {
func (g *GroupDataBase) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error) {
return g.mongoDB.FindSuperGroup(ctx, groupIDs)
}
func (g *GroupDataBase) FindJoinSuperGroup(ctx context.Context, userID string) (*unrelation2.UserToSuperGroupModel, error) {
func (g *GroupDataBase) FindJoinSuperGroup(ctx context.Context, userID string) (*unrelationTb.UserToSuperGroupModel, error) {
return g.mongoDB.GetSuperGroupByUserID(ctx, userID)
}
func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error {
return g.mongoDB.Transaction(ctx, func(s unrelation2.SuperGroupModelInterface, tx any) error {
return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error {
return s.CreateSuperGroup(ctx, groupID, initMemberIDList, tx)
})
}
func (g *GroupDataBase) DeleteSuperGroup(ctx context.Context, groupID string) error {
return g.mongoDB.Transaction(ctx, func(s unrelation2.SuperGroupModelInterface, tx any) error {
return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error {
return s.DeleteSuperGroup(ctx, groupID, tx)
})
}
func (g *GroupDataBase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
return g.mongoDB.Transaction(ctx, func(s unrelation2.SuperGroupModelInterface, tx any) error {
return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error {
return s.RemoverUserFromSuperGroup(ctx, groupID, userIDs, tx)
})
}
func (g *GroupDataBase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
return g.mongoDB.Transaction(ctx, func(s unrelation2.SuperGroupModelInterface, tx any) error {
return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error {
return s.AddUserToSuperGroup(ctx, groupID, userIDs, tx)
})
}
+18 -18
View File
@@ -3,24 +3,24 @@ package controller
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/relation"
relation2 "Open_IM/pkg/common/db/table/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
"context"
"gorm.io/gorm"
)
type UserInterface interface {
//获取指定用户的信息 如有userID未找到 也返回错误
FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error)
FindWithError(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error)
//获取指定用户的信息 如有userID未找到 不返回错误
Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error)
Find(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error)
//插入多条 外部保证userID 不重复 且在db中不存在
Create(ctx context.Context, users []*relation2.UserModel) (err error)
Create(ctx context.Context, users []*relationTb.UserModel) (err error)
//更新(非零值) 外部保证userID存在
Update(ctx context.Context, users []*relation2.UserModel) (err error)
Update(ctx context.Context, users []*relationTb.UserModel) (err error)
//更新(零值) 外部保证userID存在
UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error)
//如果没找到,不返回错误
Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error)
Page(ctx context.Context, pageNumber, showNumber int32) (users []*relationTb.UserModel, count int64, err error)
//只要有一个存在就为true
IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
}
@@ -29,21 +29,21 @@ type UserController struct {
database UserDatabaseInterface
}
func (u *UserController) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) {
func (u *UserController) Find(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) {
return u.database.Find(ctx, userIDs)
}
func (u *UserController) Create(ctx context.Context, users []*relation2.UserModel) error {
func (u *UserController) Create(ctx context.Context, users []*relationTb.UserModel) error {
return u.database.Create(ctx, users)
}
func (u *UserController) Update(ctx context.Context, users []*relation2.UserModel) (err error) {
func (u *UserController) Update(ctx context.Context, users []*relationTb.UserModel) (err error) {
return u.database.Update(ctx, users)
}
func (u *UserController) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) {
return u.database.UpdateByMap(ctx, userID, args)
}
func (u *UserController) Get(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error) {
func (u *UserController) Get(ctx context.Context, pageNumber, showNumber int32) (users []*relationTb.UserModel, count int64, err error) {
return u.database.Get(ctx, pageNumber, showNumber)
}
@@ -56,11 +56,11 @@ func NewUserController(db *gorm.DB) *UserController {
}
type UserDatabaseInterface interface {
Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error)
Create(ctx context.Context, users []*relation2.UserModel) error
Update(ctx context.Context, users []*relation2.UserModel) (err error)
Find(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error)
Create(ctx context.Context, users []*relationTb.UserModel) error
Update(ctx context.Context, users []*relationTb.UserModel) (err error)
UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error)
Get(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error)
Get(ctx context.Context, pageNumber, showNumber int32) (users []*relationTb.UserModel, count int64, err error)
IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
}
@@ -77,7 +77,7 @@ func newUserDatabase(db *gorm.DB) *UserDatabase {
}
// 获取指定用户的信息 如有userID未找到 也返回错误
func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) {
func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) {
users, err = u.user.Find(ctx, userIDs)
if err != nil {
return
@@ -89,12 +89,12 @@ func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*rel
}
// 插入多条 外部保证userID 不重复 且在db中不存在
func (u *UserDatabase) Create(ctx context.Context, users []*relation2.UserModel) (err error) {
func (u *UserDatabase) Create(ctx context.Context, users []*relationTb.UserModel) (err error) {
return u.user.Create(ctx, users)
}
// 更新(非零值) 外部保证userID存在
func (u *UserDatabase) Update(ctx context.Context, users []*relation2.UserModel) (err error) {
func (u *UserDatabase) Update(ctx context.Context, users []*relationTb.UserModel) (err error) {
return u.user.Update(ctx, users)
}
@@ -104,7 +104,7 @@ func (u *UserDatabase) UpdateByMap(ctx context.Context, userID string, args map[
}
// 获取,如果没找到,不返回错误
func (u *UserDatabase) Get(ctx context.Context, showNumber, pageNumber int32) (users []*relation2.UserModel, count int64, err error) {
func (u *UserDatabase) Get(ctx context.Context, showNumber, pageNumber int32) (users []*relationTb.UserModel, count int64, err error) {
return u.user.Get(ctx, showNumber, pageNumber)
}
+2 -2
View File
@@ -7,7 +7,7 @@ import (
)
type ConversationLocalCacheInterface interface {
GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) []string
GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
}
type ConversationLocalCache struct {
@@ -23,7 +23,7 @@ func NewConversationLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) Co
}
}
func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) []string {
func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
g.client.GetConn()
return []string{}
}
@@ -1,7 +1,7 @@
package unrelation
import (
commonPb "Open_IM/pkg/proto/sdk_ws"
common "Open_IM/pkg/proto/sdk_ws"
"context"
"strconv"
"strings"
@@ -13,7 +13,7 @@ const (
ExtendMsgMaxNum = 100
)
type ExtendMsgSet struct {
type ExtendMsgSetModel struct {
SourceID string `bson:"source_id" json:"sourceID"`
SessionType int32 `bson:"session_type" json:"sessionType"`
ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"`
@@ -22,33 +22,33 @@ type ExtendMsgSet struct {
MaxMsgUpdateTime int64 `bson:"max_msg_update_time" json:"maxMsgUpdateTime"` // index find msg
}
type KeyValue struct {
type KeyValueModel struct {
TypeKey string `bson:"type_key" json:"typeKey"`
Value string `bson:"value" json:"value"`
LatestUpdateTime int64 `bson:"latest_update_time" json:"latestUpdateTime"`
}
type ExtendMsg struct {
ReactionExtensionList map[string]KeyValue `bson:"reaction_extension_list" json:"reactionExtensionList"`
ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"`
MsgFirstModifyTime int64 `bson:"msg_first_modify_time" json:"msgFirstModifyTime"` // this extendMsg create time
AttachedInfo string `bson:"attached_info" json:"attachedInfo"`
Ex string `bson:"ex" json:"ex"`
ReactionExtensionList map[string]KeyValueModel `bson:"reaction_extension_list" json:"reactionExtensionList"`
ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"`
MsgFirstModifyTime int64 `bson:"msg_first_modify_time" json:"msgFirstModifyTime"` // this extendMsg create time
AttachedInfo string `bson:"attached_info" json:"attachedInfo"`
Ex string `bson:"ex" json:"ex"`
}
func (ExtendMsgSet) TableName() string {
func (ExtendMsgSetModel) TableName() string {
return CExtendMsgSet
}
func (ExtendMsgSet) GetExtendMsgMaxNum() int32 {
func (ExtendMsgSetModel) GetExtendMsgMaxNum() int32 {
return ExtendMsgMaxNum
}
func (ExtendMsgSet) GetSourceID(ID string, index int32) string {
func (ExtendMsgSetModel) GetSourceID(ID string, index int32) string {
return ID + ":" + strconv.Itoa(int(index))
}
func (e *ExtendMsgSet) SplitSourceIDAndGetIndex() int32 {
func (e *ExtendMsgSetModel) SplitSourceIDAndGetIndex() int32 {
l := strings.Split(e.SourceID, ":")
index, _ := strconv.Atoi(l[len(l)-1])
return int32(index)
@@ -59,11 +59,11 @@ type GetAllExtendMsgSetOpts struct {
}
type ExtendMsgSetInterface interface {
CreateExtendMsgSet(ctx context.Context, set *ExtendMsgSet) error
GetAllExtendMsgSet(ctx context.Context, ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSet, err error)
GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*ExtendMsgSet, error)
CreateExtendMsgSet(ctx context.Context, set *ExtendMsgSetModel) error
GetAllExtendMsgSet(ctx context.Context, ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSetModel, err error)
GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*ExtendMsgSetModel, error)
InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *ExtendMsg) error
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*commonPb.KeyValue) error
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*commonPb.KeyValue) error
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*common.KeyValue) error
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*common.KeyValue) error
GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error)
}