mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-07 02:26:00 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
# Conflicts: # internal/utils/convert.go
This commit is contained in:
Vendored
+30
-13
@@ -15,23 +15,25 @@ const GroupExpireTime = time.Second * 60 * 60 * 12
|
||||
const groupInfoCacheKey = "GROUP_INFO_CACHE:"
|
||||
|
||||
type GroupCache struct {
|
||||
db *relation.Group
|
||||
expireTime time.Duration
|
||||
redisClient *RedisClient
|
||||
rcClient *rockscache.Client
|
||||
group *relation.Group
|
||||
groupMember *relation.GroupMember
|
||||
groupRequest *relation.GroupRequest
|
||||
expireTime time.Duration
|
||||
redisClient *RedisClient
|
||||
rcClient *rockscache.Client
|
||||
}
|
||||
|
||||
func NewGroupCache(rdb redis.UniversalClient, db *relation.Group, opts rockscache.Options) *GroupCache {
|
||||
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, db: db, redisClient: NewRedisClient(rdb)}
|
||||
func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.Group, groupMemberDB *relation.GroupMember, groupRequestDB *relation.GroupRequest, opts rockscache.Options) *GroupCache {
|
||||
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb)}
|
||||
}
|
||||
|
||||
func (g *GroupCache) getRedisClient() *RedisClient {
|
||||
return g.redisClient
|
||||
}
|
||||
|
||||
func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
|
||||
func (g *GroupCache) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
|
||||
for _, groupID := range groupIDs {
|
||||
group, err := g.GetGroupInfoFromCache(ctx, groupID)
|
||||
group, err := g.GetGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -40,9 +42,9 @@ func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []stri
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func (g *GroupCache) GetGroupInfoFromCache(ctx context.Context, groupID string) (group *relation.Group, err error) {
|
||||
func (g *GroupCache) GetGroupInfo(ctx context.Context, groupID string) (group *relation.Group, err error) {
|
||||
getGroup := func() (string, error) {
|
||||
groupInfo, err := g.db.Take(ctx, groupID)
|
||||
groupInfo, err := g.group.Take(ctx, groupID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
@@ -64,16 +66,16 @@ func (g *GroupCache) GetGroupInfoFromCache(ctx context.Context, groupID string)
|
||||
return group, utils.Wrap(err, "")
|
||||
}
|
||||
|
||||
func (g *GroupCache) DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) {
|
||||
func (g *GroupCache) DelGroupInfo(ctx context.Context, groupID string) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
||||
}()
|
||||
return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID))
|
||||
}
|
||||
|
||||
func (g *GroupCache) DelGroupsInfoFromCache(ctx context.Context, groupIDs []string) error {
|
||||
func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
|
||||
for _, groupID := range groupIDs {
|
||||
if err := g.DelGroupInfoFromCache(ctx, groupID); err != nil {
|
||||
if err := g.DelGroupInfo(ctx, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -83,3 +85,18 @@ func (g *GroupCache) DelGroupsInfoFromCache(ctx context.Context, groupIDs []stri
|
||||
func (g *GroupCache) getGroupInfoCacheKey(groupID string) string {
|
||||
return groupInfoCacheKey + groupID
|
||||
}
|
||||
|
||||
func (g *GroupCache) DelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
|
||||
for _, userID := range userIDs {
|
||||
if err := g.rcClient.TagAsDeleted(joinedSuperGroupListCache + userID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GroupCache) DelJoinedSuperGroupID(ctx context.Context, userID string) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
|
||||
}()
|
||||
return g.rcClient.TagAsDeleted(joinedSuperGroupListCache + userID)
|
||||
}
|
||||
|
||||
Vendored
+16
-12
@@ -14,7 +14,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
go_redis "github.com/go-redis/redis/v8"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
@@ -42,12 +42,16 @@ const (
|
||||
exTypeKeyLocker = "EX_LOCK:"
|
||||
)
|
||||
|
||||
func InitRedis() go_redis.UniversalClient {
|
||||
var rdb go_redis.UniversalClient
|
||||
type RedisClient struct {
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
|
||||
func (r *RedisClient) InitRedis() {
|
||||
var rdb redis.UniversalClient
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
if config.Config.Redis.EnableCluster {
|
||||
rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{
|
||||
rdb = redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: config.Config.Redis.DBAddress,
|
||||
Username: config.Config.Redis.DBUserName,
|
||||
Password: config.Config.Redis.DBPassWord, // no password set
|
||||
@@ -59,7 +63,7 @@ func InitRedis() go_redis.UniversalClient {
|
||||
panic(err.Error() + " redis cluster " + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
|
||||
}
|
||||
} else {
|
||||
rdb = go_redis.NewClient(&go_redis.Options{
|
||||
rdb = redis.NewClient(&redis.Options{
|
||||
Addr: config.Config.Redis.DBAddress[0],
|
||||
Username: config.Config.Redis.DBUserName,
|
||||
Password: config.Config.Redis.DBPassWord, // no password set
|
||||
@@ -71,14 +75,14 @@ func InitRedis() go_redis.UniversalClient {
|
||||
panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
|
||||
}
|
||||
}
|
||||
return rdb
|
||||
r.rdb = rdb
|
||||
}
|
||||
|
||||
type RedisClient struct {
|
||||
rdb go_redis.UniversalClient
|
||||
func (r *RedisClient) GetClient() redis.UniversalClient {
|
||||
return r.rdb
|
||||
}
|
||||
|
||||
func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient {
|
||||
func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
|
||||
return &RedisClient{rdb: rdb}
|
||||
}
|
||||
|
||||
@@ -213,7 +217,7 @@ func (r *RedisClient) GetUserGlobalMsgRecvOpt(userID string) (int, error) {
|
||||
key := conversationReceiveMessageOpt + userID
|
||||
result, err := r.rdb.HGet(context.Background(), key, GlobalMsgRecvOpt).Result()
|
||||
if err != nil {
|
||||
if err == go_redis.Nil {
|
||||
if err == redis.Nil {
|
||||
return 0, nil
|
||||
} else {
|
||||
return 0, err
|
||||
@@ -289,7 +293,7 @@ func (r *RedisClient) CleanUpOneUserAllMsgFromRedis(userID string, operationID s
|
||||
key := messageCache + userID + "_" + "*"
|
||||
vals, err := r.rdb.Keys(ctx, key).Result()
|
||||
log2.Debug(operationID, "vals: ", vals)
|
||||
if err == go_redis.Nil {
|
||||
if err == redis.Nil {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
@@ -407,7 +411,7 @@ func (r *RedisClient) DelMsgFromCache(uid string, seqList []uint32, operationID
|
||||
key := messageCache + uid + "_" + strconv.Itoa(int(seq))
|
||||
result, err := r.rdb.Get(context.Background(), key).Result()
|
||||
if err != nil {
|
||||
if err == go_redis.Nil {
|
||||
if err == redis.Nil {
|
||||
log2.NewDebug(operationID, utils.GetSelfFuncName(), err.Error(), "redis nil")
|
||||
} else {
|
||||
log2.NewError(operationID, utils.GetSelfFuncName(), err.Error(), key)
|
||||
|
||||
@@ -19,6 +19,7 @@ type GroupInterface interface {
|
||||
TakeGroupByID(ctx context.Context, groupID string) (group *relation.Group, err error)
|
||||
|
||||
//mongo
|
||||
CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error
|
||||
GetSuperGroupByID(ctx context.Context, groupID string) (superGroup *unrelation.SuperGroup, err error)
|
||||
}
|
||||
|
||||
@@ -26,7 +27,7 @@ type GroupController struct {
|
||||
database DataBase
|
||||
}
|
||||
|
||||
func NewGroupController(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Database) GroupInterface {
|
||||
func NewGroupController(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Client) GroupInterface {
|
||||
groupController := &GroupController{database: newGroupDatabase(db, rdb, mgoDB)}
|
||||
return groupController
|
||||
}
|
||||
@@ -51,25 +52,40 @@ func (g *GroupController) GetSuperGroupByID(ctx context.Context, groupID string)
|
||||
return g.database.GetSuperGroupByID(ctx, groupID)
|
||||
}
|
||||
|
||||
func (g *GroupController) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error {
|
||||
return g.database.CreateSuperGroup(ctx, groupID, initMemberIDList, memberNumCount)
|
||||
}
|
||||
|
||||
type DataBase interface {
|
||||
FindGroupsByID(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error)
|
||||
CreateGroup(ctx context.Context, groups []*relation.Group) error
|
||||
DeleteGroupByIDs(ctx context.Context, groupIDs []string) error
|
||||
TakeGroupByID(ctx context.Context, groupID string) (group *relation.Group, err error)
|
||||
GetSuperGroupByID(ctx context.Context, groupID string) (superGroup *unrelation.SuperGroup, err error)
|
||||
CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error
|
||||
}
|
||||
|
||||
type GroupDataBase struct {
|
||||
sqlDB *relation.Group
|
||||
groupDB *relation.Group
|
||||
groupMemberDB *relation.GroupMember
|
||||
groupRequestDB *relation.GroupRequest
|
||||
db *gorm.DB
|
||||
|
||||
cache *cache.GroupCache
|
||||
mongoDB *unrelation.SuperGroupMgoDB
|
||||
}
|
||||
|
||||
func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Database) DataBase {
|
||||
sqlDB := relation.NewGroupDB(db)
|
||||
func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Client) DataBase {
|
||||
groupDB := relation.NewGroupDB(db)
|
||||
groupMemberDB := relation.NewGroupMemberDB(db)
|
||||
groupRequestDB := relation.NewGroupRequest(db)
|
||||
newDB := db
|
||||
database := &GroupDataBase{
|
||||
sqlDB: sqlDB,
|
||||
cache: cache.NewGroupCache(rdb, sqlDB, rockscache.Options{
|
||||
groupDB: groupDB,
|
||||
groupMemberDB: groupMemberDB,
|
||||
groupRequestDB: groupRequestDB,
|
||||
db: newDB,
|
||||
cache: cache.NewGroupCache(rdb, groupDB, groupMemberDB, groupRequestDB, rockscache.Options{
|
||||
RandomExpireAdjustment: 0.2,
|
||||
DisableCacheRead: false,
|
||||
DisableCacheDelete: false,
|
||||
@@ -81,19 +97,19 @@ func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Datab
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) FindGroupsByID(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
|
||||
return g.cache.GetGroupsInfoFromCache(ctx, groupIDs)
|
||||
return g.cache.GetGroupsInfo(ctx, groupIDs)
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) CreateGroup(ctx context.Context, groups []*relation.Group) error {
|
||||
return g.sqlDB.Create(ctx, groups)
|
||||
return g.groupDB.Create(ctx, groups)
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) DeleteGroupByIDs(ctx context.Context, groupIDs []string) error {
|
||||
return g.sqlDB.DB.Transaction(func(tx *gorm.DB) error {
|
||||
if err := g.sqlDB.Delete(ctx, groupIDs, tx); err != nil {
|
||||
return g.groupDB.DB.Transaction(func(tx *gorm.DB) error {
|
||||
if err := g.groupDB.Delete(ctx, groupIDs, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil {
|
||||
if err := g.cache.DelGroupsInfo(ctx, groupIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -101,25 +117,56 @@ func (g *GroupDataBase) DeleteGroupByIDs(ctx context.Context, groupIDs []string)
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) TakeGroupByID(ctx context.Context, groupID string) (group *relation.Group, err error) {
|
||||
return g.cache.GetGroupInfoFromCache(ctx, groupID)
|
||||
return g.cache.GetGroupInfo(ctx, groupID)
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) Update(ctx context.Context, groups []*relation.Group) error {
|
||||
return g.sqlDB.DB.Transaction(func(tx *gorm.DB) error {
|
||||
if err := g.sqlDB.Update(ctx, groups, tx); err != nil {
|
||||
return g.db.Transaction(func(tx *gorm.DB) error {
|
||||
if err := g.groupDB.Update(ctx, groups, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
var groupIDs []string
|
||||
for _, group := range groups {
|
||||
groupIDs = append(groupIDs, group.GroupID)
|
||||
}
|
||||
if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil {
|
||||
if err := g.cache.DelGroupsInfo(ctx, groupIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error {
|
||||
sess, err := g.mongoDB.MgoClient.StartSession()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer sess.EndSession(ctx)
|
||||
sCtx := mongo.NewSessionContext(ctx, sess)
|
||||
if err = g.mongoDB.CreateSuperGroup(sCtx, groupID, initMemberIDList, memberNumCount); err != nil {
|
||||
_ = sess.AbortTransaction(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = g.cache.DelJoinedSuperGroupIDs(ctx, initMemberIDList); err != nil {
|
||||
_ = sess.AbortTransaction(ctx)
|
||||
return err
|
||||
}
|
||||
return sess.CommitTransaction(ctx)
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) GetSuperGroupByID(ctx context.Context, groupID string) (superGroup *unrelation.SuperGroup, err error) {
|
||||
return g.mongoDB.GetSuperGroup(ctx, groupID)
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) CreateGroupAndMember(ctx context.Context, groups []*relation.Group, groupMember []*relation.GroupMember) error {
|
||||
return g.db.Transaction(func(tx *gorm.DB) error {
|
||||
if err := g.groupDB.Create(ctx, groups, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.groupMemberDB.Create(ctx, groupMember, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -14,8 +14,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var ChatLogDB *gorm.DB
|
||||
|
||||
type ChatLog struct {
|
||||
ServerMsgID string `gorm:"column:server_msg_id;primary_key;type:char(64)" json:"serverMsgID"`
|
||||
ClientMsgID string `gorm:"column:client_msg_id;type:char(64)" json:"clientMsgID"`
|
||||
@@ -32,6 +30,7 @@ type ChatLog struct {
|
||||
SendTime time.Time `gorm:"column:send_time;index:sendTime;index:content_type,priority:1;index:session_type,priority:1;index:recv_id,priority:1;index:send_id,priority:1" json:"sendTime"`
|
||||
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
|
||||
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
|
||||
DB *gorm.DB
|
||||
}
|
||||
|
||||
func (ChatLog) TableName() string {
|
||||
|
||||
@@ -24,36 +24,40 @@ type GroupMember struct {
|
||||
OperatorUserID string `gorm:"column:operator_user_id;size:64"`
|
||||
MuteEndTime time.Time `gorm:"column:mute_end_time"`
|
||||
Ex string `gorm:"column:ex;size:1024"`
|
||||
DB *gorm.DB
|
||||
DB *gorm.DB `gorm:"-" json:"-"`
|
||||
}
|
||||
|
||||
func (g *GroupMember) Create(ctx context.Context, groupMemberList []*GroupMember) (err error) {
|
||||
func NewGroupMemberDB(db *gorm.DB) *GroupMember {
|
||||
return &GroupMember{DB: db}
|
||||
}
|
||||
|
||||
func (g *GroupMember) Create(ctx context.Context, groupMemberList []*GroupMember, tx ...*gorm.DB) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupMemberList", groupMemberList)
|
||||
}()
|
||||
return utils.Wrap(GroupMemberDB.Create(&groupMemberList).Error, "")
|
||||
return utils.Wrap(getDBConn(g.DB, tx).Create(&groupMemberList).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMember) Delete(ctx context.Context, groupMembers []*GroupMember) (err error) {
|
||||
func (g *GroupMember) Delete(ctx context.Context, groupMembers []*GroupMember, tx ...*gorm.DB) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupMembers", groupMembers)
|
||||
}()
|
||||
return utils.Wrap(GroupMemberDB.Delete(groupMembers).Error, "")
|
||||
return utils.Wrap(getDBConn(g.DB, tx).Delete(groupMembers).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMember) UpdateByMap(ctx context.Context, groupID string, userID string, args map[string]interface{}) (err error) {
|
||||
func (g *GroupMember) UpdateByMap(ctx context.Context, groupID string, userID string, args map[string]interface{}, tx ...*gorm.DB) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "args", args)
|
||||
}()
|
||||
return utils.Wrap(GroupMemberDB.Model(&GroupMember{}).Where("group_id = ? and user_id = ?", groupID, userID).Updates(args).Error, "")
|
||||
return utils.Wrap(getDBConn(g.DB, tx).Model(&GroupMember{}).Where("group_id = ? and user_id = ?", groupID, userID).Updates(args).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMember) Update(ctx context.Context, groupMembers []*GroupMember) (err error) {
|
||||
func (g *GroupMember) Update(ctx context.Context, groupMembers []*GroupMember, tx ...*gorm.DB) (err error) {
|
||||
defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupMembers", groupMembers) }()
|
||||
return utils.Wrap(GroupMemberDB.Updates(&groupMembers).Error, "")
|
||||
return utils.Wrap(getDBConn(g.DB, tx).Updates(&groupMembers).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMember) Find(ctx context.Context, groupMembers []*GroupMember) (groupList []*GroupMember, err error) {
|
||||
func (g *GroupMember) Find(ctx context.Context, groupMembers []*GroupMember, tx ...*gorm.DB) (groupList []*GroupMember, err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupMembers", groupMembers, "groupList", groupList)
|
||||
}()
|
||||
@@ -61,25 +65,23 @@ func (g *GroupMember) Find(ctx context.Context, groupMembers []*GroupMember) (gr
|
||||
for _, groupMember := range groupMembers {
|
||||
where = append(where, []interface{}{groupMember.GroupID, groupMember.UserID})
|
||||
}
|
||||
err = utils.Wrap(GroupMemberDB.Where("(group_id, user_id) in ?", where).Find(&groupList).Error, "")
|
||||
return groupList, err
|
||||
return groupList, utils.Wrap(getDBConn(g.DB, tx).Where("(group_id, user_id) in ?", where).Find(&groupList).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMember) Take(ctx context.Context, groupID string, userID string) (groupMember *GroupMember, err error) {
|
||||
func (g *GroupMember) Take(ctx context.Context, groupID string, userID string, tx ...*gorm.DB) (groupMember *GroupMember, err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember)
|
||||
}()
|
||||
groupMember = &GroupMember{}
|
||||
return groupMember, utils.Wrap(GroupMemberDB.Where("group_id = ? and user_id = ?", groupID, userID).Take(groupMember).Error, "")
|
||||
return groupMember, utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ? and user_id = ?", groupID, userID).Take(groupMember).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMember) TakeOwnerInfo(ctx context.Context, groupID string) (groupMember *GroupMember, err error) {
|
||||
func (g *GroupMember) TakeOwnerInfo(ctx context.Context, groupID string, tx ...*gorm.DB) (groupMember *GroupMember, err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMember", *groupMember)
|
||||
}()
|
||||
groupMember = &GroupMember{}
|
||||
err = GroupMemberDB.Where("group_id = ? and role_level = ?", groupID, constant.GroupOwner).Take(groupMember).Error
|
||||
return groupMember, utils.Wrap(err, "")
|
||||
return groupMember, utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ? and role_level = ?", groupID, constant.GroupOwner).Take(groupMember).Error, "")
|
||||
}
|
||||
|
||||
func InsertIntoGroupMember(toInsertInfo GroupMember) error {
|
||||
|
||||
@@ -37,8 +37,7 @@ func (g *Group) Create(ctx context.Context, groups []*Group, tx ...*gorm.DB) (er
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groups", groups)
|
||||
}()
|
||||
err = utils.Wrap(getDBConn(g.DB, tx).Create(&groups).Error, "")
|
||||
return err
|
||||
return utils.Wrap(getDBConn(g.DB, tx).Create(&groups).Error, "")
|
||||
}
|
||||
|
||||
func (g *Group) Delete(ctx context.Context, groupIDs []string, tx ...*gorm.DB) (err error) {
|
||||
@@ -66,8 +65,7 @@ func (g *Group) Find(ctx context.Context, groupIDs []string, tx ...*gorm.DB) (gr
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs, "groups", groups)
|
||||
}()
|
||||
err = utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Find(&groups).Error, "")
|
||||
return groups, err
|
||||
return groups, utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Find(&groups).Error, "")
|
||||
}
|
||||
|
||||
func (g *Group) Take(ctx context.Context, groupID string, tx ...*gorm.DB) (group *Group, err error) {
|
||||
@@ -75,8 +73,7 @@ func (g *Group) Take(ctx context.Context, groupID string, tx ...*gorm.DB) (group
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group)
|
||||
}()
|
||||
err = utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ?", groupID).Take(group).Error, "")
|
||||
return group, err
|
||||
return group, utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ?", groupID).Take(group).Error, "")
|
||||
}
|
||||
|
||||
//func (g *Group) DeleteTx(ctx context.Context, groupIDs []string) error {
|
||||
|
||||
@@ -22,6 +22,13 @@ type GroupRequest struct {
|
||||
JoinSource int32 `gorm:"column:join_source"`
|
||||
InviterUserID string `gorm:"column:inviter_user_id;size:64"`
|
||||
Ex string `gorm:"column:ex;size:1024"`
|
||||
DB *gorm.DB
|
||||
}
|
||||
|
||||
func NewGroupRequest(db *gorm.DB) *GroupRequest {
|
||||
return &GroupRequest{
|
||||
DB: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (GroupRequest) TableName() string {
|
||||
|
||||
@@ -2,61 +2,73 @@ package relation
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/constant"
|
||||
"gorm.io/gorm"
|
||||
"time"
|
||||
)
|
||||
|
||||
func GetActiveUserNum(from, to time.Time) (int32, error) {
|
||||
var num int64
|
||||
err := ChatLogDB.Table("chat_logs").Select("count(distinct(send_id))").Where("send_time >= ? and send_time <= ?", from, to).Count(&num).Error
|
||||
return int32(num), err
|
||||
type Statistics struct {
|
||||
DB *gorm.DB
|
||||
}
|
||||
|
||||
func GetIncreaseUserNum(from, to time.Time) (int32, error) {
|
||||
var num int64
|
||||
err := UserDB.Where("create_time >= ? and create_time <= ?", from, to).Count(&num).Error
|
||||
return int32(num), err
|
||||
func NewStatistics(db *gorm.DB) *Statistics {
|
||||
return &Statistics{DB: db}
|
||||
}
|
||||
|
||||
func GetTotalUserNum() (int32, error) {
|
||||
var num int64
|
||||
err := UserDB.Count(&num).Error
|
||||
return int32(num), err
|
||||
func (s *Statistics) getUserModel() *gorm.DB {
|
||||
return s.DB.Model(&User{})
|
||||
}
|
||||
|
||||
func GetTotalUserNumByDate(to time.Time) (int32, error) {
|
||||
var num int64
|
||||
err := UserDB.Where("create_time <= ?", to).Count(&num).Error
|
||||
return int32(num), err
|
||||
func (s *Statistics) getChatLogModel() *gorm.DB {
|
||||
return s.DB.Model(&ChatLog{})
|
||||
}
|
||||
|
||||
func GetPrivateMessageNum(from, to time.Time) (int32, error) {
|
||||
var num int64
|
||||
err := ChatLogDB.Where("send_time >= ? and send_time <= ? and session_type = ?", from, to, 1).Count(&num).Error
|
||||
return int32(num), err
|
||||
func (s *Statistics) getGroupModel() *gorm.DB {
|
||||
return s.DB.Model(&Group{})
|
||||
}
|
||||
|
||||
func GetGroupMessageNum(from, to time.Time) (int32, error) {
|
||||
var num int64
|
||||
err := ChatLogDB.Where("send_time >= ? and send_time <= ? and session_type = ?", from, to, 2).Count(&num).Error
|
||||
return int32(num), err
|
||||
func (s *Statistics) GetActiveUserNum(from, to time.Time) (num int64, err error) {
|
||||
err = s.getChatLogModel().Select("count(distinct(send_id))").Where("send_time >= ? and send_time <= ?", from, to).Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
func GetIncreaseGroupNum(from, to time.Time) (int32, error) {
|
||||
var num int64
|
||||
err := GroupDB.Where("create_time >= ? and create_time <= ?", from, to).Count(&num).Error
|
||||
return int32(num), err
|
||||
func (s *Statistics) GetIncreaseUserNum(from, to time.Time) (num int64, err error) {
|
||||
err = s.getUserModel().Where("create_time >= ? and create_time <= ?", from, to).Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
func GetTotalGroupNum() (int32, error) {
|
||||
var num int64
|
||||
err := GroupDB.Count(&num).Error
|
||||
return int32(num), err
|
||||
func (s *Statistics) GetTotalUserNum() (num int64, err error) {
|
||||
err = s.getUserModel().Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
func GetGroupNum(to time.Time) (int32, error) {
|
||||
var num int64
|
||||
err := GroupDB.Where("create_time <= ?", to).Count(&num).Error
|
||||
return int32(num), err
|
||||
func (s *Statistics) GetTotalUserNumByDate(to time.Time) (num int64, err error) {
|
||||
err = s.getUserModel().Where("create_time <= ?", to).Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (s *Statistics) GetPrivateMessageNum(from, to time.Time) (num int64, err error) {
|
||||
err = s.getChatLogModel().Where("send_time >= ? and send_time <= ? and session_type = ?", from, to, constant.SingleChatType).Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (s *Statistics) GetGroupMessageNum(from, to time.Time) (num int64, err error) {
|
||||
err = s.getChatLogModel().Where("send_time >= ? and send_time <= ? and session_type in (?)", from, to, []int{constant.GroupChatType, constant.SuperGroupChatType}).Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (s *Statistics) GetIncreaseGroupNum(from, to time.Time) (num int64, err error) {
|
||||
err = s.getGroupModel().Where("create_time >= ? and create_time <= ?", from, to).Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (s *Statistics) GetTotalGroupNum() (num int64, err error) {
|
||||
err = s.getGroupModel().Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (s *Statistics) GetGroupNum(to time.Time) (num int64, err error) {
|
||||
err = s.getGroupModel().Where("create_time <= ?", to).Count(&num).Error
|
||||
return num, err
|
||||
}
|
||||
|
||||
type activeGroup struct {
|
||||
@@ -65,14 +77,14 @@ type activeGroup struct {
|
||||
MessageNum int `gorm:"column:message_num"`
|
||||
}
|
||||
|
||||
func GetActiveGroups(from, to time.Time, limit int) ([]*activeGroup, error) {
|
||||
func (s *Statistics) GetActiveGroups(from, to time.Time, limit int) ([]*activeGroup, error) {
|
||||
var activeGroups []*activeGroup
|
||||
err := ChatLogDB.Select("recv_id, count(*) as message_num").Where("send_time >= ? and send_time <= ? and session_type in (?)", from, to, []int{constant.GroupChatType, constant.SuperGroupChatType}).Group("recv_id").Limit(limit).Order("message_num DESC").Find(&activeGroups).Error
|
||||
err := s.getChatLogModel().Select("recv_id, count(*) as message_num").Where("send_time >= ? and send_time <= ? and session_type in (?)", from, to, []int{constant.GroupChatType, constant.SuperGroupChatType}).Group("recv_id").Limit(limit).Order("message_num DESC").Find(&activeGroups).Error
|
||||
for _, activeGroup := range activeGroups {
|
||||
group := Group{
|
||||
GroupID: activeGroup.Id,
|
||||
}
|
||||
GroupDB.Where("group_id= ? ", group.GroupID).Find(&group)
|
||||
s.getGroupModel().Where("group_id= ? ", group.GroupID).Find(&group)
|
||||
activeGroup.Name = group.GroupName
|
||||
}
|
||||
return activeGroups, err
|
||||
@@ -84,16 +96,15 @@ type activeUser struct {
|
||||
MessageNum int `gorm:"column:message_num"`
|
||||
}
|
||||
|
||||
func GetActiveUsers(from, to time.Time, limit int) ([]*activeUser, error) {
|
||||
var activeUsers []*activeUser
|
||||
err := ChatLogDB.Select("send_id, count(*) as message_num").Where("send_time >= ? and send_time <= ? and session_type = ?", from, to, constant.SingleChatType).Group("send_id").Limit(limit).Order("message_num DESC").Find(&activeUsers).Error
|
||||
func (s *Statistics) GetActiveUsers(from, to time.Time, limit int) (activeUsers []*activeUser, err error) {
|
||||
err = s.getChatLogModel().Select("send_id, count(*) as message_num").Where("send_time >= ? and send_time <= ? and session_type in (?)", from, to, []int{constant.SingleChatType, constant.GroupChatType, constant.SuperGroupChatType}).Group("send_id").Limit(limit).Order("message_num DESC").Find(&activeUsers).Error
|
||||
for _, activeUser := range activeUsers {
|
||||
user := User{
|
||||
UserID: activeUser.ID,
|
||||
}
|
||||
err = UserDB.Select("user_id, name").Find(&user).Error
|
||||
err = s.getUserModel().Select("user_id, name").Find(&user).Error
|
||||
if err != nil {
|
||||
continue
|
||||
return nil, err
|
||||
}
|
||||
activeUser.Name = user.Nickname
|
||||
activeUser.ID = user.UserID
|
||||
|
||||
@@ -9,11 +9,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
BlackListDB *gorm.DB
|
||||
UserDB *gorm.DB
|
||||
)
|
||||
|
||||
func InitManager() {
|
||||
for k, v := range config.Config.Manager.AppManagerUid {
|
||||
_, err := GetUserByUserID(v)
|
||||
|
||||
@@ -8,13 +8,12 @@ import (
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.mongodb.org/mongo-driver/x/bsonx"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Mongo struct {
|
||||
DB *mongo.Database
|
||||
DB *mongo.Client
|
||||
}
|
||||
|
||||
func (m *Mongo) InitMongo() {
|
||||
@@ -45,7 +44,7 @@ func (m *Mongo) InitMongo() {
|
||||
config.Config.Mongo.DBMaxPoolSize)
|
||||
}
|
||||
}
|
||||
log.Println(utils.GetFuncName(1), "start to init mongoDB:", uri)
|
||||
fmt.Println(utils.GetFuncName(1), "start to init mongoDB:", uri)
|
||||
mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
|
||||
if err != nil {
|
||||
time.Sleep(time.Duration(30) * time.Second)
|
||||
@@ -54,7 +53,11 @@ func (m *Mongo) InitMongo() {
|
||||
panic(err.Error() + " mongo.Connect failed " + uri)
|
||||
}
|
||||
}
|
||||
m.DB = mongoClient.Database(config.Config.Mongo.DBDatabase)
|
||||
m.DB = mongoClient
|
||||
}
|
||||
|
||||
func (m *Mongo) GetClient() *mongo.Client {
|
||||
return m.DB
|
||||
}
|
||||
|
||||
func (m *Mongo) CreateTagIndex() {
|
||||
@@ -97,7 +100,7 @@ func (m *Mongo) CreateWorkMomentIndex() {
|
||||
}
|
||||
|
||||
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
|
||||
db := m.DB.Collection(collection)
|
||||
db := m.DB.Database(config.Config.Mongo.DBDatabase).Collection(collection)
|
||||
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
|
||||
indexView := db.Indexes()
|
||||
keysDoc := bsonx.Doc{}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package unrelation
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
@@ -15,7 +16,8 @@ const (
|
||||
)
|
||||
|
||||
type SuperGroupMgoDB struct {
|
||||
mgoDB *mongo.Database
|
||||
MgoClient *mongo.Client
|
||||
MgoDB *mongo.Database
|
||||
superGroupCollection *mongo.Collection
|
||||
userToSuperGroupCollection *mongo.Collection
|
||||
}
|
||||
@@ -30,41 +32,32 @@ type UserToSuperGroup struct {
|
||||
GroupIDList []string `bson:"group_id_list" json:"groupIDList"`
|
||||
}
|
||||
|
||||
func NewSuperGroupMgoDB(mgoDB *mongo.Database) *SuperGroupMgoDB {
|
||||
return &SuperGroupMgoDB{mgoDB: mgoDB, superGroupCollection: mgoDB.Collection(cSuperGroup), userToSuperGroupCollection: mgoDB.Collection(cUserToSuperGroup)}
|
||||
func NewSuperGroupMgoDB(mgoClient *mongo.Client) *SuperGroupMgoDB {
|
||||
mgoDB := mgoClient.Database(config.Config.Mongo.DBDatabase)
|
||||
return &SuperGroupMgoDB{MgoDB: mgoDB, MgoClient: mgoClient, superGroupCollection: mgoDB.Collection(cSuperGroup), userToSuperGroupCollection: mgoDB.Collection(cUserToSuperGroup)}
|
||||
}
|
||||
|
||||
func (db *SuperGroupMgoDB) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error {
|
||||
//ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
//c := db.mgoDB.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
|
||||
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
|
||||
return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
|
||||
err := sCtx.StartTransaction()
|
||||
func (db *SuperGroupMgoDB) CreateSuperGroup(sCtx mongo.SessionContext, groupID string, initMemberIDList []string, memberNumCount int) error {
|
||||
superGroup := SuperGroup{
|
||||
GroupID: groupID,
|
||||
MemberIDList: initMemberIDList,
|
||||
}
|
||||
_, err := db.superGroupCollection.InsertOne(sCtx, superGroup)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
upsert := true
|
||||
opts := &options.UpdateOptions{
|
||||
Upsert: &upsert,
|
||||
}
|
||||
for _, userID := range initMemberIDList {
|
||||
_, err = db.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
superGroup := SuperGroup{
|
||||
GroupID: groupID,
|
||||
MemberIDList: initMemberIDList,
|
||||
}
|
||||
_, err = db.superGroupCollection.InsertOne(sCtx, superGroup)
|
||||
if err != nil {
|
||||
_ = sCtx.AbortTransaction(ctx)
|
||||
return err
|
||||
}
|
||||
upsert := true
|
||||
opts := &options.UpdateOptions{
|
||||
Upsert: &upsert,
|
||||
}
|
||||
for _, userID := range initMemberIDList {
|
||||
_, err = db.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
|
||||
if err != nil {
|
||||
_ = sCtx.AbortTransaction(ctx)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return sCtx.CommitTransaction(ctx)
|
||||
})
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (db *SuperGroupMgoDB) GetSuperGroup(ctx context.Context, groupID string) (*SuperGroup, error) {
|
||||
@@ -75,7 +68,7 @@ func (db *SuperGroupMgoDB) GetSuperGroup(ctx context.Context, groupID string) (*
|
||||
|
||||
func (db *SuperGroupMgoDB) AddUserToSuperGroup(ctx context.Context, groupID string, userIDList []string) error {
|
||||
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
|
||||
return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
|
||||
return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
|
||||
_, err := db.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}})
|
||||
if err != nil {
|
||||
_ = sCtx.AbortTransaction(ctx)
|
||||
@@ -98,7 +91,7 @@ func (db *SuperGroupMgoDB) AddUserToSuperGroup(ctx context.Context, groupID stri
|
||||
|
||||
func (db *SuperGroupMgoDB) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDList []string) error {
|
||||
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
|
||||
return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
|
||||
return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
|
||||
_, err := db.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}})
|
||||
if err != nil {
|
||||
_ = sCtx.AbortTransaction(ctx)
|
||||
@@ -121,7 +114,7 @@ func (db *SuperGroupMgoDB) GetSuperGroupByUserID(ctx context.Context, userID str
|
||||
|
||||
func (db *SuperGroupMgoDB) DeleteSuperGroup(ctx context.Context, groupID string) error {
|
||||
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
|
||||
return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
|
||||
return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
|
||||
superGroup := &SuperGroup{}
|
||||
_, err := db.superGroupCollection.DeleteOne(sCtx, bson.M{"group_id": groupID})
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user