mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-08 19:16:35 +08:00
mongodb superGroup to cache
This commit is contained in:
@@ -71,6 +71,7 @@ type config struct {
|
||||
SecretAccessKey string `yaml:"secretAccessKey"`
|
||||
EndpointInner string `yaml:"endpointInner"`
|
||||
EndpointInnerEnable bool `yaml:"endpointInnerEnable"`
|
||||
StorageTime int `yaml:"storageTime"`
|
||||
} `yaml:"minio"`
|
||||
}
|
||||
|
||||
@@ -253,15 +254,17 @@ type config struct {
|
||||
}
|
||||
|
||||
Callback struct {
|
||||
CallbackUrl string `yaml:"callbackUrl"`
|
||||
CallbackBeforeSendSingleMsg callBackConfig `yaml:"callbackBeforeSendSingleMsg"`
|
||||
CallbackAfterSendSingleMsg callBackConfig `yaml:"callbackAfterSendSingleMsg"`
|
||||
CallbackBeforeSendGroupMsg callBackConfig `yaml:"callbackBeforeSendGroupMsg"`
|
||||
CallbackAfterSendGroupMsg callBackConfig `yaml:"callbackAfterSendGroupMsg"`
|
||||
CallbackWordFilter callBackConfig `yaml:"callbackWordFilter"`
|
||||
CallbackUserOnline callBackConfig `yaml:"callbackUserOnline"`
|
||||
CallbackUserOffline callBackConfig `yaml:"callbackUserOffline"`
|
||||
CallbackOfflinePush callBackConfig `yaml:"callbackOfflinePush"`
|
||||
CallbackUrl string `yaml:"callbackUrl"`
|
||||
CallbackBeforeSendSingleMsg callBackConfig `yaml:"callbackBeforeSendSingleMsg"`
|
||||
CallbackAfterSendSingleMsg callBackConfig `yaml:"callbackAfterSendSingleMsg"`
|
||||
CallbackBeforeSendGroupMsg callBackConfig `yaml:"callbackBeforeSendGroupMsg"`
|
||||
CallbackAfterSendGroupMsg callBackConfig `yaml:"callbackAfterSendGroupMsg"`
|
||||
CallbackWordFilter callBackConfig `yaml:"callbackWordFilter"`
|
||||
CallbackUserOnline callBackConfig `yaml:"callbackUserOnline"`
|
||||
CallbackUserOffline callBackConfig `yaml:"callbackUserOffline"`
|
||||
CallbackOfflinePush callBackConfig `yaml:"callbackOfflinePush"`
|
||||
CallbackOnlinePush callBackConfig `yaml:"callbackOnlinePush"`
|
||||
CallbackBeforeSuperGroupOnlinePush callBackConfig `yaml:"callbackSuperGroupOnlinePush"`
|
||||
} `yaml:"callback"`
|
||||
Notification struct {
|
||||
///////////////////////group/////////////////////////////
|
||||
|
||||
@@ -192,14 +192,16 @@ const (
|
||||
VerificationCodeForResetSuffix = "_forReset"
|
||||
|
||||
//callbackCommand
|
||||
CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand"
|
||||
CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand"
|
||||
CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand"
|
||||
CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand"
|
||||
CallbackWordFilterCommand = "callbackWordFilterCommand"
|
||||
CallbackUserOnlineCommand = "callbackUserOnlineCommand"
|
||||
CallbackUserOfflineCommand = "callbackUserOfflineCommand"
|
||||
CallbackOfflinePushCommand = "callbackOfflinePushCommand"
|
||||
CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand"
|
||||
CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand"
|
||||
CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand"
|
||||
CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand"
|
||||
CallbackWordFilterCommand = "callbackWordFilterCommand"
|
||||
CallbackUserOnlineCommand = "callbackUserOnlineCommand"
|
||||
CallbackUserOfflineCommand = "callbackUserOfflineCommand"
|
||||
CallbackOfflinePushCommand = "callbackOfflinePushCommand"
|
||||
CallbackOnlinePushCommand = "callbackOnlinePushCommand"
|
||||
CallbackSuperGroupOnlinePushCommand = "callbackSuperGroupOnlinePushCommand"
|
||||
//callback actionCode
|
||||
ActionAllow = 0
|
||||
ActionForbidden = 1
|
||||
|
||||
+2
-12
@@ -147,21 +147,11 @@ func init() {
|
||||
}
|
||||
}
|
||||
// 强一致性缓存,当一个key被标记删除,其他请求线程会被锁住轮询直到新的key生成,适合各种同步的拉取, 如果弱一致可能导致拉取还是老数据,毫无意义
|
||||
DB.Rc = rockscache.NewClient(go_redis.NewClient(&go_redis.Options{
|
||||
Addr: config.Config.Redis.DBAddress[0],
|
||||
Password: config.Config.Redis.DBPassWord, // no password set
|
||||
DB: 0, // use default DB
|
||||
PoolSize: 100, // 连接池大小
|
||||
}), rockscache.NewDefaultOptions())
|
||||
DB.Rc = rockscache.NewClient(DB.RDB, rockscache.NewDefaultOptions())
|
||||
DB.Rc.Options.StrongConsistency = true
|
||||
|
||||
// 弱一致性缓存,当一个key被标记删除,其他请求线程直接返回该key的value,适合高频并且生成很缓存很慢的情况 如大群发消息缓存的缓存
|
||||
DB.WeakRc = rockscache.NewClient(go_redis.NewClient(&go_redis.Options{
|
||||
Addr: config.Config.Redis.DBAddress[0],
|
||||
Password: config.Config.Redis.DBPassWord, // no password set
|
||||
DB: 0, // use default DB
|
||||
PoolSize: 100, // 连接池大小
|
||||
}), rockscache.NewDefaultOptions())
|
||||
DB.WeakRc = rockscache.NewClient(DB.RDB, rockscache.NewDefaultOptions())
|
||||
DB.WeakRc.Options.StrongConsistency = false
|
||||
}
|
||||
|
||||
|
||||
+10
-10
@@ -908,8 +908,8 @@ func (d *DataBases) GetUserWorkMoments(opUserID, userID string, showNumber, page
|
||||
result, err := c.Find(ctx, bson.D{ // 等价条件: select * from
|
||||
{"user_id", userID},
|
||||
{"$or", bson.A{
|
||||
bson.D{{"permission", constant.WorkMomentPermissionCantSee}, {"permission_user_id_list", bson.D{{"$nin", bson.A{userID}}}}},
|
||||
bson.D{{"permission", constant.WorkMomentPermissionCanSee}, {"permission_user_id_list", bson.D{{"$in", bson.A{userID}}}}},
|
||||
bson.D{{"permission", constant.WorkMomentPermissionCantSee}, {"permission_user_id_list", bson.D{{"$nin", bson.A{opUserID}}}}},
|
||||
bson.D{{"permission", constant.WorkMomentPermissionCanSee}, {"permission_user_id_list", bson.D{{"$in", bson.A{opUserID}}}}},
|
||||
bson.D{{"permission", constant.WorkMomentPublic}},
|
||||
}},
|
||||
}, findOpts)
|
||||
@@ -970,7 +970,7 @@ func (d *DataBases) CreateSuperGroup(groupID string, initMemberIDList []string,
|
||||
}
|
||||
_, err = c.InsertOne(sCtx, superGroup)
|
||||
if err != nil {
|
||||
session.AbortTransaction(ctx)
|
||||
_ := session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
var users []UserToSuperGroup
|
||||
@@ -992,7 +992,7 @@ func (d *DataBases) CreateSuperGroup(groupID string, initMemberIDList []string,
|
||||
for _, userID := range initMemberIDList {
|
||||
_, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
|
||||
if err != nil {
|
||||
session.AbortTransaction(ctx)
|
||||
_ := session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
|
||||
@@ -1022,7 +1022,7 @@ func (d *DataBases) AddUserToSuperGroup(groupID string, userIDList []string) err
|
||||
}
|
||||
_, err = c.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}})
|
||||
if err != nil {
|
||||
session.AbortTransaction(ctx)
|
||||
_ := session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
|
||||
@@ -1039,11 +1039,11 @@ func (d *DataBases) AddUserToSuperGroup(groupID string, userIDList []string) err
|
||||
for _, userID := range userIDList {
|
||||
_, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
|
||||
if err != nil {
|
||||
session.AbortTransaction(ctx)
|
||||
_ := session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
}
|
||||
session.CommitTransaction(ctx)
|
||||
_ := session.CommitTransaction(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1058,15 +1058,15 @@ func (d *DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []strin
|
||||
sCtx := mongo.NewSessionContext(ctx, session)
|
||||
_, err = c.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}})
|
||||
if err != nil {
|
||||
session.AbortTransaction(ctx)
|
||||
_ := session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
err = d.RemoveGroupFromUser(ctx, sCtx, groupID, userIDList)
|
||||
if err != nil {
|
||||
session.AbortTransaction(ctx)
|
||||
_ := session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
session.CommitTransaction(ctx)
|
||||
_ := session.CommitTransaction(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package rocksCache
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db"
|
||||
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
||||
"Open_IM/pkg/common/log"
|
||||
@@ -12,18 +13,19 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
userInfoCache = "USER_INFO_CACHE:"
|
||||
friendRelationCache = "FRIEND_RELATION_CACHE:"
|
||||
blackListCache = "BLACK_LIST_CACHE:"
|
||||
groupCache = "GROUP_CACHE:"
|
||||
groupInfoCache = "GROUP_INFO_CACHE:"
|
||||
groupOwnerIDCache = "GROUP_OWNER_ID:"
|
||||
joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:"
|
||||
groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:"
|
||||
groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:"
|
||||
allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:"
|
||||
allDepartmentCache = "ALL_DEPARTMENT_CACHE:"
|
||||
allDepartmentMemberCache = "ALL_DEPARTMENT_MEMBER_CACHE:"
|
||||
userInfoCache = "USER_INFO_CACHE:"
|
||||
friendRelationCache = "FRIEND_RELATION_CACHE:"
|
||||
blackListCache = "BLACK_LIST_CACHE:"
|
||||
groupCache = "GROUP_CACHE:"
|
||||
groupInfoCache = "GROUP_INFO_CACHE:"
|
||||
groupOwnerIDCache = "GROUP_OWNER_ID:"
|
||||
joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:"
|
||||
groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:"
|
||||
groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:"
|
||||
allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:"
|
||||
allDepartmentCache = "ALL_DEPARTMENT_CACHE:"
|
||||
allDepartmentMemberCache = "ALL_DEPARTMENT_MEMBER_CACHE:"
|
||||
joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -41,13 +43,7 @@ func init() {
|
||||
panic(err.Error())
|
||||
}
|
||||
n += len(keys)
|
||||
//fmt.Printf("\n %s key found %d keys: %v, current cursor %d\n", key, n, keys, cursor)
|
||||
//if len(keys) > 0 {
|
||||
// err = db.DB.RDB.Del(context.Background(), keys...).Err()
|
||||
// if err != nil {
|
||||
// panic(err.Error())
|
||||
// }
|
||||
//}
|
||||
// for each for redis cluster
|
||||
for _, key := range keys {
|
||||
if err = db.DB.RDB.Del(context.Background(), key).Err(); err != nil {
|
||||
log.NewError("", fName, key, err.Error())
|
||||
@@ -132,15 +128,28 @@ func DelJoinedGroupIDListFromCache(userID string) error {
|
||||
}
|
||||
|
||||
func GetGroupMemberIDListFromCache(groupID string) ([]string, error) {
|
||||
getGroupMemberIDList := func() (string, error) {
|
||||
groupMemberIDList, err := imdb.GetGroupMemberIDListByGroupID(groupID)
|
||||
f := func() (string, error) {
|
||||
groupInfo, err := GetGroupInfoFromCache(groupID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
return "", utils.Wrap(err, "GetGroupInfoFromCache failed")
|
||||
}
|
||||
var groupMemberIDList []string
|
||||
if groupInfo.GroupType == constant.SuperGroup {
|
||||
superGroup, err := db.DB.GetSuperGroup(groupID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
groupMemberIDList = superGroup.MemberIDList
|
||||
} else {
|
||||
groupMemberIDList, err = imdb.GetGroupMemberIDListByGroupID(groupID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
}
|
||||
bytes, err := json.Marshal(groupMemberIDList)
|
||||
return string(bytes), utils.Wrap(err, "")
|
||||
}
|
||||
groupIDListStr, err := db.DB.Rc.Fetch(groupCache+groupID, time.Second*30*60, getGroupMemberIDList)
|
||||
groupIDListStr, err := db.DB.Rc.Fetch(groupCache+groupID, time.Second*30*60, f)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "")
|
||||
}
|
||||
@@ -149,8 +158,8 @@ func GetGroupMemberIDListFromCache(groupID string) ([]string, error) {
|
||||
return groupMemberIDList, utils.Wrap(err, "")
|
||||
}
|
||||
|
||||
func DelGroupMemberIDListFromCache(userID string) error {
|
||||
err := db.DB.Rc.TagAsDeleted(groupCache + userID)
|
||||
func DelGroupMemberIDListFromCache(groupID string) error {
|
||||
err := db.DB.Rc.TagAsDeleted(groupCache + groupID)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -307,3 +316,31 @@ func GetAllDepartmentMembersFromCache() ([]db.DepartmentMember, error) {
|
||||
func DelAllDepartmentMembersFromCache() error {
|
||||
return db.DB.Rc.TagAsDeleted(allDepartmentMemberCache)
|
||||
}
|
||||
|
||||
func GetJoinedSuperGroupListFromCache(userID string) ([]string, error) {
|
||||
getJoinedSuperGroupIDList := func() (string, error) {
|
||||
userToSuperGroup, err := db.DB.GetSuperGroupByUserID(userID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
bytes, err := json.Marshal(userToSuperGroup.GroupIDList)
|
||||
return string(bytes), utils.Wrap(err, "")
|
||||
}
|
||||
joinedSuperGroupListStr, err := db.DB.Rc.Fetch(joinedSuperGroupListCache+userID, time.Second, getJoinedSuperGroupIDList)
|
||||
var joinedSuperGroupList []string
|
||||
err = json.Unmarshal([]byte(joinedSuperGroupListStr), &joinedSuperGroupList)
|
||||
return joinedSuperGroupList, err
|
||||
}
|
||||
|
||||
func DelJoinedSuperGroupIDListFromCache(userID string) error {
|
||||
err := db.DB.Rc.TagAsDeleted(joinedSuperGroupListCache + userID)
|
||||
return err
|
||||
}
|
||||
|
||||
func GetSuperGroupMemberIDListFromCache(groupID string) ([]string, error) {
|
||||
return GetGroupMemberIDListFromCache(groupID)
|
||||
}
|
||||
|
||||
func DelSuperGroupMemberIDListFromCache(groupID string) error {
|
||||
return DelGroupMemberIDListFromCache(groupID)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user