Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
Gordon
2023-02-10 20:58:54 +08:00
109 changed files with 992 additions and 36145 deletions
+126 -115
View File
@@ -1,17 +1,14 @@
package cache
import (
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"math/big"
"strconv"
"strings"
"time"
)
@@ -157,6 +154,27 @@ func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID strin
})
}
func (g *GroupCacheRedis) GetGroupMemberHash1(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
// todo
mapGroupUserIDs, err := g.groupMember.FindJoinUserID(ctx, groupIDs)
if err != nil {
return nil, err
}
res := make(map[string]*relationTb.GroupSimpleUserID)
for _, groupID := range groupIDs {
userIDs := mapGroupUserIDs[groupID]
users := &relationTb.GroupSimpleUserID{}
if len(userIDs) > 0 {
utils.Sort(userIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
users.Hash = bi.Uint64()
}
res[groupID] = users
}
return res, nil
}
func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
@@ -178,111 +196,104 @@ func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string)
return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID))
}
// JoinedGroups
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
getJoinedGroupIDList := func() (string, error) {
joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(joinedGroupList)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
}()
joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
return joinedGroupIDs, utils.Wrap(err, "")
}
//// JoinedGroups
//func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
// getJoinedGroupIDList := func() (string, error) {
// joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
// if err != nil {
// return "", err
// }
// bytes, err := json.Marshal(joinedGroupList)
// if err != nil {
// return "", utils.Wrap(err, "")
// }
// return string(bytes), nil
// }
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
// }()
// joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
// if err != nil {
// return nil, err
// }
// err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
// return joinedGroupIDs, utils.Wrap(err, "")
//}
func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userID string) (err error) {
func (g *GroupCacheRedis) DelJoinedGroupID(ctx context.Context, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID))
}
// GetGroupMemberInfo
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relation.GroupMember, err error) {
getGroupMemberInfo := func() (string, error) {
groupMemberInfo, err := relation.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(groupMemberInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember)
}()
groupMemberInfoStr, err := g.rcClient.Fetch(g.getGroupMemberInfoKey(groupID, userID), time.Second*30*60, getGroupMemberInfo)
if err != nil {
return nil, err
}
groupMember = &relation.GroupMember{}
err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember)
return groupMember, utils.Wrap(err, "")
//func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userIDs []string) (err error) {
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
// }()
// for _, userID := range userIDs {
// if err := g.DelJoinedGroupID(ctx, userID); err != nil {
// return err
// }
// }
// return nil
//}
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
return GetCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationTb.GroupMemberModel, error) {
return g.groupMember.Take(ctx, groupID, userID)
})
}
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID, userIDs []string) (groupMember *relationTb.GroupMemberModel, err error) {
//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID, userIDs []string) (groupMember *relationTb.GroupMemberModel, err error) {
//
// return nil, err
//}
return nil, err
}
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
}()
groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
if count < 0 || offset < 0 {
return nil, nil
}
var groupMemberList []*relation.GroupMember
var start, stop int32
start = offset
stop = offset + count
l := int32(len(groupMemberIDList))
if start > stop {
return nil, nil
}
if start >= l {
return nil, nil
}
if count != 0 {
if stop >= l {
stop = l
}
groupMemberIDList = groupMemberIDList[start:stop]
} else {
if l < 1000 {
stop = l
} else {
stop = 1000
}
groupMemberIDList = groupMemberIDList[start:stop]
}
for _, userID := range groupMemberIDList {
groupMember, err := g.GetGroupMemberInfo(ctx, groupID, userID)
if err != nil {
return
}
groupMembers = append(groupMembers, groupMember)
}
return groupMemberList, nil
}
//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
// }()
// groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
// if err != nil {
// return nil, err
// }
// if count < 0 || offset < 0 {
// return nil, nil
// }
// var groupMemberList []*relation.GroupMember
// var start, stop int32
// start = offset
// stop = offset + count
// l := int32(len(groupMemberIDList))
// if start > stop {
// return nil, nil
// }
// if start >= l {
// return nil, nil
// }
// if count != 0 {
// if stop >= l {
// stop = l
// }
// groupMemberIDList = groupMemberIDList[start:stop]
// } else {
// if l < 1000 {
// stop = l
// } else {
// stop = 1000
// }
// groupMemberIDList = groupMemberIDList[start:stop]
// }
// for _, userID := range groupMemberIDList {
// groupMember, err := g.GetGroupMemberInfo(ctx, groupID, userID)
// if err != nil {
// return
// }
// groupMembers = append(groupMembers, groupMember)
// }
// return groupMemberList, nil
//}
func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
defer func() {
@@ -292,23 +303,23 @@ func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userI
}
// groupMemberNum
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
getGroupMemberNum := func() (string, error) {
num, err := relation.GetGroupMemberNumByGroupID(groupID)
if err != nil {
return "", err
}
return strconv.Itoa(int(num)), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
}()
groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
if err != nil {
return 0, err
}
return strconv.Atoi(groupMember)
}
//func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
// getGroupMemberNum := func() (string, error) {
// num, err := relation.GetGroupMemberNumByGroupID(groupID)
// if err != nil {
// return "", err
// }
// return strconv.Itoa(int(num)), nil
// }
// defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
// }()
// groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
// if err != nil {
// return 0, err
// }
// return strconv.Atoi(groupMember)
//}
func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
defer func() {
-142
View File
@@ -1,142 +0,0 @@
package controller
import (
"Open_IM/pkg/common/db/relation"
"gorm.io/gorm"
"time"
)
type AdminCMSInterface interface {
GetActiveUserNum(from, to time.Time) (num int64, err error)
GetIncreaseUserNum(from, to time.Time) (num int64, err error)
GetTotalUserNum() (num int64, err error)
GetTotalUserNumByDate(to time.Time) (num int64, err error)
GetSingleChatMessageNum(from, to time.Time) (num int64, err error)
GetGroupMessageNum(from, to time.Time) (num int64, err error)
GetIncreaseGroupNum(from, to time.Time) (num int64, err error)
GetTotalGroupNum() (num int64, err error)
GetGroupNum(to time.Time) (num int64, err error)
GetActiveGroups(from, to time.Time, limit int) (activeGroups []*relation.ActiveGroup, err error)
GetActiveUsers(from, to time.Time, limit int) (activeUsers []*relation.ActiveUser, err error)
}
type AdminCMSController struct {
database AdminCMSDatabaseInterface
}
func NewAdminCMSController(db *gorm.DB) AdminCMSInterface {
adminCMSController := &AdminCMSController{
database: newAdminCMSDatabase(db),
}
return adminCMSController
}
func newAdminCMSDatabase(db *gorm.DB) AdminCMSDatabaseInterface {
return &AdminCMSDatabase{Statistics: relation.NewStatistics(db)}
}
func (admin *AdminCMSController) GetActiveUserNum(from, to time.Time) (num int64, err error) {
return admin.database.GetActiveUserNum(from, to)
}
func (admin *AdminCMSController) GetIncreaseUserNum(from, to time.Time) (num int64, err error) {
return admin.database.GetIncreaseUserNum(from, to)
}
func (admin *AdminCMSController) GetTotalUserNum() (num int64, err error) {
return admin.database.GetTotalUserNum()
}
func (admin *AdminCMSController) GetTotalUserNumByDate(to time.Time) (num int64, err error) {
return admin.database.GetTotalUserNumByDate(to)
}
func (admin *AdminCMSController) GetSingleChatMessageNum(from, to time.Time) (num int64, err error) {
return admin.GetSingleChatMessageNum(from, to)
}
func (admin *AdminCMSController) GetGroupMessageNum(from, to time.Time) (num int64, err error) {
return admin.database.GetGroupMessageNum(from, to)
}
func (admin *AdminCMSController) GetIncreaseGroupNum(from, to time.Time) (num int64, err error) {
return admin.database.GetIncreaseGroupNum(from, to)
}
func (admin *AdminCMSController) GetTotalGroupNum() (num int64, err error) {
return admin.database.GetTotalGroupNum()
}
func (admin *AdminCMSController) GetGroupNum(to time.Time) (num int64, err error) {
return admin.database.GetGroupNum(to)
}
func (admin *AdminCMSController) GetActiveGroups(from, to time.Time, limit int) ([]*relation.ActiveGroup, error) {
return admin.database.GetActiveGroups(from, to, limit)
}
func (admin *AdminCMSController) GetActiveUsers(from, to time.Time, limit int) (activeUsers []*relation.ActiveUser, err error) {
return admin.database.GetActiveUsers(from, to, limit)
}
type AdminCMSDatabaseInterface interface {
GetActiveUserNum(from, to time.Time) (num int64, err error)
GetIncreaseUserNum(from, to time.Time) (num int64, err error)
GetTotalUserNum() (num int64, err error)
GetTotalUserNumByDate(to time.Time) (num int64, err error)
GetSingleChatMessageNum(from, to time.Time) (num int64, err error)
GetGroupMessageNum(from, to time.Time) (num int64, err error)
GetIncreaseGroupNum(from, to time.Time) (num int64, err error)
GetTotalGroupNum() (num int64, err error)
GetGroupNum(to time.Time) (num int64, err error)
GetActiveGroups(from, to time.Time, limit int) ([]*relation.ActiveGroup, error)
GetActiveUsers(from, to time.Time, limit int) (activeUsers []*relation.ActiveUser, err error)
}
type AdminCMSDatabase struct {
Statistics *relation.Statistics
}
func (admin *AdminCMSDatabase) GetActiveUserNum(from, to time.Time) (num int64, err error) {
return admin.Statistics.GetActiveUserNum(from, to)
}
func (admin *AdminCMSDatabase) GetIncreaseUserNum(from, to time.Time) (num int64, err error) {
return admin.Statistics.GetIncreaseUserNum(from, to)
}
func (admin *AdminCMSDatabase) GetTotalUserNum() (num int64, err error) {
return admin.Statistics.GetTotalUserNum()
}
func (admin *AdminCMSDatabase) GetTotalUserNumByDate(to time.Time) (num int64, err error) {
return admin.Statistics.GetTotalUserNumByDate(to)
}
func (admin *AdminCMSDatabase) GetSingleChatMessageNum(from, to time.Time) (num int64, err error) {
return admin.Statistics.GetSingleChatMessageNum(from, to)
}
func (admin *AdminCMSDatabase) GetGroupMessageNum(from, to time.Time) (num int64, err error) {
return admin.Statistics.GetGroupMessageNum(from, to)
}
func (admin *AdminCMSDatabase) GetIncreaseGroupNum(from, to time.Time) (num int64, err error) {
return admin.Statistics.GetIncreaseGroupNum(from, to)
}
func (admin *AdminCMSDatabase) GetTotalGroupNum() (num int64, err error) {
return admin.Statistics.GetTotalGroupNum()
}
func (admin *AdminCMSDatabase) GetGroupNum(to time.Time) (num int64, err error) {
return admin.Statistics.GetGroupNum(to)
}
func (admin *AdminCMSDatabase) GetActiveGroups(from, to time.Time, limit int) ([]*relation.ActiveGroup, error) {
return admin.Statistics.GetActiveGroups(from, to, limit)
}
func (admin *AdminCMSDatabase) GetActiveUsers(from, to time.Time, limit int) (activeUsers []*relation.ActiveUser, err error) {
return admin.Statistics.GetActiveUsers(from, to, limit)
}
+19 -35
View File
@@ -15,8 +15,6 @@ import (
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/mongo"
"gorm.io/gorm"
"math/big"
"strings"
)
//type GroupInterface GroupDataBaseInterface
@@ -248,7 +246,7 @@ type GroupDataBase struct {
func (g *GroupDataBase) delGroupMemberCache(ctx context.Context, groupID string, userIDs []string) error {
for _, userID := range userIDs {
if err := g.cache.DelJoinedGroupIDs(ctx, userID); err != nil {
if err := g.cache.DelJoinedGroupID(ctx, userID); err != nil {
return err
}
if err := g.cache.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
@@ -272,21 +270,24 @@ func (g *GroupDataBase) FindGroupMemberUserID(ctx context.Context, groupID strin
}
func (g *GroupDataBase) CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error {
if len(groups) > 0 && len(groupMembers) > 0 {
return g.db.Transaction(func(tx *gorm.DB) error {
return g.db.Transaction(func(tx *gorm.DB) error {
if len(groups) > 0 {
if err := g.groupDB.Create(ctx, groups, tx); err != nil {
return err
}
return g.groupMemberDB.Create(ctx, groupMembers, tx)
})
}
if len(groups) > 0 {
return g.groupDB.Create(ctx, groups)
}
if len(groupMembers) > 0 {
return g.groupMemberDB.Create(ctx, groupMembers)
}
return nil
}
if len(groupMembers) > 0 {
if err := g.groupMemberDB.Create(ctx, groupMembers, tx); err != nil {
return err
}
//if err := g.cache.DelJoinedGroupIDs(ctx, utils.Slice(groupMembers, func(e *relationTb.GroupMemberModel) string {
// return e.UserID
//})); err != nil {
// return err
//}
}
return nil
})
}
func (g *GroupDataBase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
@@ -337,12 +338,11 @@ func (g *GroupDataBase) TakeGroupMember(ctx context.Context, groupID string, use
}
func (g *GroupDataBase) TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error) {
return g.groupMemberDB.TakeOwner(ctx, groupID)
return g.groupMemberDB.TakeOwner(ctx, groupID) // todo cache group owner
}
func (g *GroupDataBase) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error) {
//g.cache.GetGroupMembersInfo()
return g.groupMemberDB.Find(ctx, groupIDs, userIDs, roleLevels)
return g.groupMemberDB.Find(ctx, groupIDs, userIDs, roleLevels) // todo cache group find
}
func (g *GroupDataBase) PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) {
@@ -383,23 +383,7 @@ func (g *GroupDataBase) DeleteGroupMember(ctx context.Context, groupID string, u
}
func (g *GroupDataBase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
mapGroupUserIDs, err := g.groupMemberDB.FindJoinUserID(ctx, groupIDs)
if err != nil {
return nil, err
}
res := make(map[string]*relationTb.GroupSimpleUserID)
for _, groupID := range groupIDs {
userIDs := mapGroupUserIDs[groupID]
users := &relationTb.GroupSimpleUserID{}
if len(userIDs) > 0 {
utils.Sort(userIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
users.Hash = bi.Uint64()
}
res[groupID] = users
}
return res, nil
return g.cache.GetGroupMemberHash1(ctx, groupIDs)
}
func (g *GroupDataBase) MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error) {
+4 -2
View File
@@ -8,10 +8,12 @@ import (
type MsgInterface interface {
BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64)
DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error)
DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error
// logic delete
DelMsgLogic(ctx context.Context, userID string, seqList []uint32) error
DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error)
ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error)
ReplaceMsgToBlankByIndex(docID string, index int) (replaceMaxSeq uint32, err error)
}
type MsgDatabaseInterface interface {
+5 -5
View File
@@ -30,11 +30,11 @@ type UserController struct {
}
// 获取指定用户的信息 如有userID未找到 也返回错误
func (u *UserController) FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) {
func (u *UserController) FindWithError(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) {
return u.database.FindWithError(ctx, userIDs)
}
func (u *UserController) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) {
func (u *UserController) Find(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) {
return u.database.Find(ctx, userIDs)
}
func (u *UserController) Create(ctx context.Context, users []*relationTb.UserModel) error {
@@ -90,7 +90,7 @@ func newUserDatabase(db *gorm.DB) *UserDatabase {
}
// 获取指定用户的信息 如有userID未找到 也返回错误
func (u *UserDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) {
func (u *UserDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) {
users, err = u.user.Find(ctx, userIDs)
if err != nil {
return
@@ -102,7 +102,7 @@ func (u *UserDatabase) FindWithError(ctx context.Context, userIDs []string) (use
}
// 获取指定用户的信息 如有userID未找到 不返回错误
func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) {
func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) {
users, err = u.user.Find(ctx, userIDs)
return
}
@@ -123,7 +123,7 @@ func (u *UserDatabase) UpdateByMap(ctx context.Context, userID string, args map[
}
// 获取,如果没找到,不返回错误
func (u *UserDatabase) Page(ctx context.Context, showNumber, pageNumber int32) (users []*relation2.UserModel, count int64, err error) {
func (u *UserDatabase) Page(ctx context.Context, showNumber, pageNumber int32) (users []*relationTb.UserModel, count int64, err error) {
return u.user.Page(ctx, showNumber, pageNumber)
}
-100
View File
@@ -1,100 +0,0 @@
package relation
import (
"Open_IM/pkg/common/config"
"fmt"
"time"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
type Mysql struct {
gormConn *gorm.DB
}
func (m *Mysql) GormConn() *gorm.DB {
return m.gormConn
}
func (m *Mysql) SetGormConn(gormConn *gorm.DB) {
m.gormConn = gormConn
}
func (m *Mysql) InitConn() *Mysql {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql")
var db *gorm.DB
db, err := gorm.Open(mysql.Open(dsn), nil)
if err != nil {
time.Sleep(time.Duration(30) * time.Second)
db, err = gorm.Open(mysql.Open(dsn), nil)
if err != nil {
panic(err.Error() + " open failed " + dsn)
}
}
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8 COLLATE utf8_general_ci;", config.Config.Mysql.DBDatabaseName)
err = db.Exec(sql).Error
if err != nil {
panic(err.Error() + " Exec failed:" + sql)
}
dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName)
newLogger := logger.New(
Writer{},
logger.Config{
SlowThreshold: time.Duration(config.Config.Mysql.SlowThreshold) * time.Millisecond, // Slow SQL threshold
LogLevel: logger.LogLevel(config.Config.Mysql.LogLevel), // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
Colorful: true, // Disable color
},
)
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: newLogger,
})
if err != nil {
panic(err.Error() + " Open failed " + dsn)
}
sqlDB, err := db.DB()
if err != nil {
panic(err.Error() + " DB.DB() failed ")
}
sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime))
sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns)
sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns)
if db == nil {
panic("db is nil")
}
m.SetGormConn(db)
return m
}
//models := []interface{}{&Friend{}, &FriendRequest{}, &Group{}, &GroupMember{}, &GroupRequest{},
// &User{}, &Black{}, &ChatLog{}, &Conversation{}, &AppVersion{}}
func (m *Mysql) AutoMigrateModel(model interface{}) error {
err := m.gormConn.AutoMigrate(model)
if err != nil {
return err
}
m.gormConn.Set("gorm:table_options", "CHARSET=utf8")
m.gormConn.Set("gorm:table_options", "collation=utf8_unicode_ci")
_ = m.gormConn.Migrator().CreateTable(model)
return nil
}
type Writer struct{}
func (w Writer) Printf(format string, args ...interface{}) {
fmt.Printf(format, args...)
}
func getDBConn(db *gorm.DB, tx []any) *gorm.DB {
if len(tx) > 0 {
if txDB, ok := tx[0].(*gorm.DB); ok {
return txDB
}
}
return db
}
-101
View File
@@ -1,101 +0,0 @@
package relation
import (
"Open_IM/pkg/common/constant"
"gorm.io/gorm"
"time"
)
type Statistics struct {
DB *gorm.DB
}
func NewStatistics(db *gorm.DB) *Statistics {
return &Statistics{DB: db}
}
func (s *Statistics) getUserModel() *gorm.DB {
return s.DB.Model(&User{})
}
func (s *Statistics) getChatLogModel() *gorm.DB {
return s.DB.Model(&ChatLog{})
}
func (s *Statistics) getGroupModel() *gorm.DB {
return s.DB.Model(&Group{})
}
func (s *Statistics) GetActiveUserNum(from, to time.Time) (num int64, err error) {
err = s.getChatLogModel().Select("count(distinct(send_id))").Where("send_time >= ? and send_time <= ?", from, to).Count(&num).Error
return num, err
}
func (s *Statistics) GetIncreaseUserNum(from, to time.Time) (num int64, err error) {
err = s.getUserModel().Where("create_time >= ? and create_time <= ?", from, to).Count(&num).Error
return num, err
}
func (s *Statistics) GetTotalUserNum() (num int64, err error) {
err = s.getUserModel().Count(&num).Error
return num, err
}
func (s *Statistics) GetTotalUserNumByDate(to time.Time) (num int64, err error) {
err = s.getUserModel().Where("create_time <= ?", to).Count(&num).Error
return num, err
}
func (s *Statistics) GetSingleChatMessageNum(from, to time.Time) (num int64, err error) {
err = s.getChatLogModel().Where("send_time >= ? and send_time <= ? and session_type = ?", from, to, constant.SingleChatType).Count(&num).Error
return num, err
}
func (s *Statistics) GetGroupMessageNum(from, to time.Time) (num int64, err error) {
err = s.getChatLogModel().Where("send_time >= ? and send_time <= ? and session_type in (?)", from, to, []int{constant.GroupChatType, constant.SuperGroupChatType}).Count(&num).Error
return num, err
}
func (s *Statistics) GetIncreaseGroupNum(from, to time.Time) (num int64, err error) {
err = s.getGroupModel().Where("create_time >= ? and create_time <= ?", from, to).Count(&num).Error
return num, err
}
func (s *Statistics) GetTotalGroupNum() (num int64, err error) {
err = s.getGroupModel().Count(&num).Error
return num, err
}
func (s *Statistics) GetGroupNum(to time.Time) (num int64, err error) {
err = s.getGroupModel().Where("create_time <= ?", to).Count(&num).Error
return num, err
}
func (s *Statistics) GetActiveGroups(from, to time.Time, limit int) ([]*ActiveGroup, error) {
var activeGroups []*ActiveGroup
err := s.getChatLogModel().Select("recv_id, count(*) as message_num").Where("send_time >= ? and send_time <= ? and session_type in (?)", from, to, []int{constant.GroupChatType, constant.SuperGroupChatType}).Group("recv_id").Limit(limit).Order("message_num DESC").Find(&activeGroups).Error
for _, activeGroup := range activeGroups {
group := Group{
GroupID: activeGroup.ID,
}
s.getGroupModel().Where("group_id= ? ", group.GroupID).Find(&group)
activeGroup.Name = group.GroupName
}
return activeGroups, err
}
func (s *Statistics) GetActiveUsers(from, to time.Time, limit int) (activeUsers []*ActiveUser, err error) {
err = s.getChatLogModel().Select("send_id, count(*) as message_num").Where("send_time >= ? and send_time <= ? and session_type in (?)", from, to, []int{constant.SingleChatType, constant.GroupChatType, constant.SuperGroupChatType}).Group("send_id").Limit(limit).Order("message_num DESC").Find(&activeUsers).Error
for _, activeUser := range activeUsers {
user := User{
UserID: activeUser.ID,
}
err = s.getUserModel().Select("user_id, name").Find(&user).Error
if err != nil {
return nil, err
}
activeUser.Name = user.Nickname
activeUser.ID = user.UserID
}
return activeUsers, err
}
+14
View File
@@ -49,6 +49,20 @@ func (u UserMsgDocModel) GetSeqUid(uid string, seq uint32) string {
return u.getSeqUid(uid, seq)
}
func (u UserMsgDocModel) GetDocIDSeqsMap(uid string, seqs []uint32) map[string][]uint32 {
t := make(map[string][]uint32)
for i := 0; i < len(seqs); i++ {
seqUid := u.getSeqUid(uid, seqs[i])
if value, ok := t[seqUid]; !ok {
var temp []uint32
t[seqUid] = append(temp, seqs[i])
} else {
t[seqUid] = append(value, seqs[i])
}
}
return t
}
func (UserMsgDocModel) getMsgIndex(seq uint32) int {
seqSuffix := seq / singleGocMsgNum
var index uint32
-42
View File
@@ -1,42 +0,0 @@
package unrelation
import "context"
const (
CTag = "tag"
CSendLog = "send_log"
)
type TagModel struct {
UserID string `bson:"user_id"`
TagID string `bson:"tag_id"`
TagName string `bson:"tag_name"`
UserList []string `bson:"user_list"`
}
func (TagModel) TableName() string {
return CTag
}
type TagSendLogModel struct {
UserList []CommonUserModel `bson:"tag_list"`
SendID string `bson:"send_id"`
SenderPlatformID int32 `bson:"sender_platform_id"`
Content string `bson:"content"`
SendTime int64 `bson:"send_time"`
}
func (TagSendLogModel) TableName() string {
return CSendLog
}
type TagModelInterface interface {
GetUserTags(ctx context.Context, userID string) ([]TagModel, error)
CreateTag(ctx context.Context, userID, tagName string, userList []string) error
GetTagByID(ctx context.Context, userID, tagID string) (TagModel, error)
DeleteTag(ctx context.Context, userID, tagID string) error
SetTag(ctx context.Context, userID, tagID, newName string, increaseUserIDList []string, reduceUserIDList []string) error
GetUserIDListByTagID(ctx context.Context, userID, tagID string) ([]string, error)
SaveTagSendLog(ctx context.Context, tagSendLog *TagSendLogModel) error
GetTagSendLogs(ctx context.Context, userID string, showNumber, pageNumber int32) ([]TagSendLogModel, error)
}
@@ -1,48 +0,0 @@
package unrelation
import "context"
const (
CWorkMoment = "work_moment"
)
type WorkMoment struct {
WorkMomentID string `bson:"work_moment_id"`
UserID string `bson:"user_id"`
UserName string `bson:"user_name"`
FaceURL string `bson:"face_url"`
Content string `bson:"content"`
LikeUserList []*CommonUserModel `bson:"like_user_list"`
AtUserList []*CommonUserModel `bson:"at_user_list"`
PermissionUserList []*CommonUserModel `bson:"permission_user_list"`
Comments []*CommonUserModel `bson:"comments"`
PermissionUserIDList []string `bson:"permission_user_id_list"`
Permission int32 `bson:"permission"`
CreateTime int32 `bson:"create_time"`
}
type Comment struct {
UserID string `bson:"user_id" json:"user_id"`
UserName string `bson:"user_name" json:"user_name"`
ReplyUserID string `bson:"reply_user_id" json:"reply_user_id"`
ReplyUserName string `bson:"reply_user_name" json:"reply_user_name"`
ContentID string `bson:"content_id" json:"content_id"`
Content string `bson:"content" json:"content"`
CreateTime int32 `bson:"create_time" json:"create_time"`
}
func (WorkMoment) TableName() string {
return CWorkMoment
}
type WorkMomentModelInterface interface {
CreateOneWorkMoment(ctx context.Context, workMoment *WorkMoment) error
DeleteOneWorkMoment(ctx context.Context, workMomentID string) error
DeleteComment(ctx context.Context, workMomentID, contentID, opUserID string) error
GetWorkMomentByID(ctx context.Context, workMomentID string) (*WorkMoment, error)
LikeOneWorkMoment(ctx context.Context, likeUserID, userName, workMomentID string) (*WorkMoment, bool, error)
CommentOneWorkMoment(ctx context.Context, comment *Comment, workMomentID string) (*WorkMoment, error)
GetUserSelfWorkMoments(ctx context.Context, userID string, showNumber, pageNumber int32) ([]*WorkMoment, error)
GetUserWorkMoments(ctx context.Context, opUserID, userID string, showNumber, pageNumber int32, friendIDList []string) ([]*WorkMoment, error)
GetUserFriendWorkMoments(ctx context.Context, showNumber, pageNumber int32, userID string, friendIDList []string) ([]*WorkMoment, error)
}
+2 -26
View File
@@ -61,20 +61,8 @@ func (m *Mongo) GetClient() *mongo.Client {
return m.db
}
func (m *Mongo) CreateTagIndex() {
if err := m.createMongoIndex(unrelation.CSendLog, false, "send_id", "-send_time"); err != nil {
panic(err.Error() + " index create failed " + unrelation.CSendLog + " send_id, -send_time")
}
if err := m.createMongoIndex(unrelation.CTag, false, "user_id", "-create_time"); err != nil {
panic(err.Error() + "index create failed " + unrelation.CTag + " user_id, -create_time")
}
if err := m.createMongoIndex(unrelation.CTag, true, "tag_id"); err != nil {
panic(err.Error() + "index create failed " + unrelation.CTag + " tag_id")
}
}
func (m *Mongo) CreateMsgIndex() {
if err := m.createMongoIndex(unrelation.CChat, false, "uid"); err != nil {
if err := m.createMongoIndex(unrelation, false, "uid"); err != nil {
fmt.Println(err.Error() + " index create failed " + unrelation.CChat + " uid, please create index by yourself in field uid")
}
}
@@ -88,21 +76,9 @@ func (m *Mongo) CreateSuperGroupIndex() {
}
}
func (m *Mongo) CreateWorkMomentIndex() {
if err := m.createMongoIndex(unrelation.CWorkMoment, true, "-create_time", "work_moment_id"); err != nil {
panic(err.Error() + "index create failed " + unrelation.CWorkMoment + " -create_time, work_moment_id")
}
if err := m.createMongoIndex(unrelation.CWorkMoment, true, "work_moment_id"); err != nil {
panic(err.Error() + "index create failed " + unrelation.CWorkMoment + " work_moment_id ")
}
if err := m.createMongoIndex(unrelation.CWorkMoment, false, "user_id", "-create_time"); err != nil {
panic(err.Error() + "index create failed " + unrelation.CWorkMoment + "user_id, -create_time")
}
}
func (m *Mongo) CreateExtendMsgSetIndex() {
if err := m.createMongoIndex(unrelation.CExtendMsgSet, true, "-create_time", "work_moment_id"); err != nil {
panic(err.Error() + "index create failed " + unrelation.CWorkMoment + " -create_time, work_moment_id")
panic(err.Error() + "index create failed " + unrelation.CExtendMsgSet + " -create_time, work_moment_id")
}
}
+20 -26
View File
@@ -69,7 +69,7 @@ package unrelation
// return nil
//}
//func (d *db.DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error {
//func (d *db.DataBases) ReplaceMsgByIndex(suffixUserID string, msg *sdkws.MsgData, operationID string, seqIndex int) error {
// log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg)
// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@@ -90,7 +90,7 @@ package unrelation
// return nil
//}
//func (d *db.DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error {
//func (d *db.DataBases) ReplaceMsgBySeq(uid string, msg *sdkws.MsgData, operationID string) error {
// log.NewInfo(operationID, utils.GetSelfFuncName(), uid, *msg)
// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@@ -122,7 +122,7 @@ package unrelation
// return err
//}
//
//func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
//func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) {
// log.NewInfo(operationID, utils.GetSelfFuncName(), uid, seqList)
// var hasSeqList []uint32
// singleCount := 0
@@ -153,7 +153,7 @@ package unrelation
// }
// singleCount = 0
// for i := 0; i < len(sChat.Msg); i++ {
// msg := new(open_im_sdk.MsgData)
// msg := new(sdkws.MsgData)
// if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
// log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
// return nil, err
@@ -217,11 +217,11 @@ package unrelation
// }
// for i, msg := range userChat.Msg {
// if i <= index {
// msgPb := &open_im_sdk.MsgData{}
// msgPb := &sdkws.MsgData{}
// if err = proto.Unmarshal(msg.Msg, msgPb); err != nil {
// continue
// }
// newMsgPb := &open_im_sdk.MsgData{Seq: msgPb.Seq}
// newMsgPb := &sdkws.MsgData{Seq: msgPb.Seq}
// bytes, err := proto.Marshal(newMsgPb)
// if err != nil {
// continue
@@ -235,7 +235,7 @@ package unrelation
// return replaceMaxSeq, err
//}
//
//func (d *db.DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
//func (d *db.DataBases) GetNewestMsg(ID string) (msg *sdkws.MsgData, err error) {
// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
// regex := fmt.Sprintf("^%s", ID)
@@ -251,7 +251,7 @@ package unrelation
// }
// if len(userChats) > 0 {
// if len(userChats[0].Msg) > 0 {
// msgPb := &open_im_sdk.MsgData{}
// msgPb := &sdkws.MsgData{}
// err = proto.Unmarshal(userChats[0].Msg[len(userChats[0].Msg)-1].Msg, msgPb)
// if err != nil {
// return nil, utils.Wrap(err, "")
@@ -263,7 +263,7 @@ package unrelation
// return nil, nil
//}
//
//func (d *db.DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
//func (d *db.DataBases) GetOldestMsg(ID string) (msg *sdkws.MsgData, err error) {
// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
// regex := fmt.Sprintf("^%s", ID)
@@ -288,7 +288,7 @@ package unrelation
// if len(oldestMsg) == 0 {
// oldestMsg = userChats[0].Msg[len(userChats[0].Msg)-1].Msg
// }
// msgPb := &open_im_sdk.MsgData{}
// msgPb := &sdkws.MsgData{}
// err = proto.Unmarshal(oldestMsg, msgPb)
// if err != nil {
// return nil, utils.Wrap(err, "")
@@ -298,7 +298,7 @@ package unrelation
// return nil, nil
//}
//
//func (d *db.DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
//func (d *db.DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) {
// var hasSeqList []uint32
// singleCount := 0
// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
@@ -325,7 +325,7 @@ package unrelation
// }
// singleCount = 0
// for i := 0; i < len(sChat.Msg); i++ {
// msg := new(open_im_sdk.MsgData)
// msg := new(sdkws.MsgData)
// if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
// log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
// return nil, err
@@ -349,7 +349,7 @@ package unrelation
// }
// return seqMsg, nil
//}
//func (d *db.DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
//func (d *db.DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) {
// var hasSeqList []uint32
// singleCount := 0
// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
@@ -376,7 +376,7 @@ package unrelation
// }
// singleCount = 0
// for i := 0; i < len(sChat.Msg); i++ {
// msg := new(open_im_sdk.MsgData)
// msg := new(sdkws.MsgData)
// if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
// log.NewError(operationID, "Unmarshal err", seqUid, value, groupID, seqList, err.Error())
// return nil, err
@@ -401,7 +401,7 @@ package unrelation
// return seqMsg, nil
//}
//
//func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) {
//func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, indexList []int, unexistSeqList []uint32, err error) {
// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
// sChat := UserChat{}
@@ -412,7 +412,7 @@ package unrelation
// singleCount := 0
// var hasSeqList []uint32
// for i := 0; i < len(sChat.Msg); i++ {
// msg := new(open_im_sdk.MsgData)
// msg := new(sdkws.MsgData)
// if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
// log.NewError(operationID, "Unmarshal err", msg.String(), err.Error())
// return nil, nil, nil, err
@@ -436,18 +436,18 @@ package unrelation
// return seqMsg, indexList, unexistSeqList, nil
//}
//
//func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) {
//func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*sdkws.MsgData) {
// for _, v := range seqList {
// msg := new(open_im_sdk.MsgData)
// msg := new(sdkws.MsgData)
// msg.Seq = v
// exceptionMsg = append(exceptionMsg, msg)
// }
// return exceptionMsg
//}
//
//func genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (exceptionMsg []*open_im_sdk.MsgData) {
//func genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (exceptionMsg []*sdkws.MsgData) {
// for _, v := range seqList {
// msg := new(open_im_sdk.MsgData)
// msg := new(sdkws.MsgData)
// msg.Seq = v
// msg.GroupID = groupID
// msg.SessionType = constant.SuperGroupChatType
@@ -598,13 +598,7 @@ package unrelation
// return utils.Md5(tagName + userID + strconv.Itoa(rand.Int()) + time.Now().String())
//}
//func generateWorkMomentID(userID string) string {
// return utils.Md5(userID + strconv.Itoa(rand.Int()) + time.Now().String())
//}
//func generateWorkMomentCommentID(workMomentID string) string {
// return utils.Md5(workMomentID + strconv.Itoa(rand.Int()) + time.Now().String())
//}
//func getCurrentTimestampByMill() int64 {
// return time.Now().UnixNano() / 1e6
+24 -21
View File
@@ -121,7 +121,7 @@ func (m *MsgMongoDriver) ReplaceMsgByIndex(ctx context.Context, suffixUserID str
return nil
}
func (d *db.DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error {
func (d *db.DataBases) ReplaceMsgBySeq(uid string, msg *sdkws.MsgData, operationID string) error {
log.NewInfo(operationID, utils.GetSelfFuncName(), uid, *msg)
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@@ -153,7 +153,7 @@ func (d *db.DataBases) UpdateOneMsgList(msg *UserChat) error {
return err
}
func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) {
log.NewInfo(operationID, utils.GetSelfFuncName(), uid, seqList)
var hasSeqList []uint32
singleCount := 0
@@ -184,7 +184,7 @@ func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID
}
singleCount = 0
for i := 0; i < len(sChat.Msg); i++ {
msg := new(open_im_sdk.MsgData)
msg := new(sdkws.MsgData)
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
return nil, err
@@ -209,10 +209,11 @@ func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID
return seqMsg, nil
}
func (d *db.DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, error) {
// model
func (d *db.DataBases) GetUserMsgListByIndex(docID string, index int64) (*UserChat, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
regex := fmt.Sprintf("^%s", docID)
findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"uid": 1})
var msgs []UserChat
//primitive.Regex{Pattern: regex}
@@ -231,6 +232,7 @@ func (d *db.DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat,
}
}
// model
func (d *db.DataBases) DelMongoMsgs(IDList []string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@@ -238,6 +240,7 @@ func (d *db.DataBases) DelMongoMsgs(IDList []string) error {
return err
}
// model
func (d *db.DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@@ -248,11 +251,11 @@ func (d *db.DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (rep
}
for i, msg := range userChat.Msg {
if i <= index {
msgPb := &open_im_sdk.MsgData{}
msgPb := &sdkws.MsgData{}
if err = proto.Unmarshal(msg.Msg, msgPb); err != nil {
continue
}
newMsgPb := &open_im_sdk.MsgData{Seq: msgPb.Seq}
newMsgPb := &sdkws.MsgData{Seq: msgPb.Seq}
bytes, err := proto.Marshal(newMsgPb)
if err != nil {
continue
@@ -266,7 +269,7 @@ func (d *db.DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (rep
return replaceMaxSeq, err
}
func (d *db.DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetNewestMsg(ID string) (msg *sdkws.MsgData, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
@@ -282,7 +285,7 @@ func (d *db.DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err er
}
if len(userChats) > 0 {
if len(userChats[0].Msg) > 0 {
msgPb := &open_im_sdk.MsgData{}
msgPb := &sdkws.MsgData{}
err = proto.Unmarshal(userChats[0].Msg[len(userChats[0].Msg)-1].Msg, msgPb)
if err != nil {
return nil, utils.Wrap(err, "")
@@ -294,7 +297,7 @@ func (d *db.DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err er
return nil, nil
}
func (d *db.DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetOldestMsg(ID string) (msg *sdkws.MsgData, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
@@ -319,7 +322,7 @@ func (d *db.DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err er
if len(oldestMsg) == 0 {
oldestMsg = userChats[0].Msg[len(userChats[0].Msg)-1].Msg
}
msgPb := &open_im_sdk.MsgData{}
msgPb := &sdkws.MsgData{}
err = proto.Unmarshal(oldestMsg, msgPb)
if err != nil {
return nil, utils.Wrap(err, "")
@@ -329,7 +332,7 @@ func (d *db.DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err er
return nil, nil
}
func (d *db.DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) {
var hasSeqList []uint32
singleCount := 0
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
@@ -356,7 +359,7 @@ func (d *db.DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, opera
}
singleCount = 0
for i := 0; i < len(sChat.Msg); i++ {
msg := new(open_im_sdk.MsgData)
msg := new(sdkws.MsgData)
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
return nil, err
@@ -380,7 +383,7 @@ func (d *db.DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, opera
}
return seqMsg, nil
}
func (d *db.DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) {
var hasSeqList []uint32
singleCount := 0
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
@@ -407,7 +410,7 @@ func (d *db.DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []
}
singleCount = 0
for i := 0; i < len(sChat.Msg); i++ {
msg := new(open_im_sdk.MsgData)
msg := new(sdkws.MsgData)
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
log.NewError(operationID, "Unmarshal err", seqUid, value, groupID, seqList, err.Error())
return nil, err
@@ -432,7 +435,7 @@ func (d *db.DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []
return seqMsg, nil
}
func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) {
func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, indexList []int, unexistSeqList []uint32, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
sChat := UserChat{}
@@ -443,7 +446,7 @@ func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, s
singleCount := 0
var hasSeqList []uint32
for i := 0; i < len(sChat.Msg); i++ {
msg := new(open_im_sdk.MsgData)
msg := new(sdkws.MsgData)
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
log.NewError(operationID, "Unmarshal err", msg.String(), err.Error())
return nil, nil, nil, err
@@ -467,18 +470,18 @@ func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, s
return seqMsg, indexList, unexistSeqList, nil
}
func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) {
func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqList {
msg := new(open_im_sdk.MsgData)
msg := new(sdkws.MsgData)
msg.Seq = v
exceptionMsg = append(exceptionMsg, msg)
}
return exceptionMsg
}
func genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (exceptionMsg []*open_im_sdk.MsgData) {
func genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqList {
msg := new(open_im_sdk.MsgData)
msg := new(sdkws.MsgData)
msg.Seq = v
msg.GroupID = groupID
msg.SessionType = constant.SuperGroupChatType
-117
View File
@@ -1,117 +0,0 @@
package unrelation
import (
"Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/utils"
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"math/rand"
"strconv"
"time"
)
type TagMongoDriver struct {
mgoDB *mongo.Database
TagCollection *mongo.Collection
TagSendLogCollection *mongo.Collection
}
func NewTagMongoDriver(mgoDB *mongo.Database) *TagMongoDriver {
return &TagMongoDriver{mgoDB: mgoDB, TagCollection: mgoDB.Collection(unrelation.CTag), TagSendLogCollection: mgoDB.Collection(unrelation.CSendLog)}
}
func (db *TagMongoDriver) generateTagID(tagName, userID string) string {
return utils.Md5(tagName + userID + strconv.Itoa(rand.Int()) + time.Now().String())
}
func (db *TagMongoDriver) GetUserTags(ctx context.Context, userID string) ([]unrelation.TagModel, error) {
var tags []unrelation.TagModel
cursor, err := db.TagCollection.Find(ctx, bson.M{"user_id": userID})
if err != nil {
return tags, err
}
if err = cursor.All(ctx, &tags); err != nil {
return tags, err
}
return tags, nil
}
func (db *TagMongoDriver) CreateTag(ctx context.Context, userID, tagName string, userList []string) error {
tagID := generateTagID(tagName, userID)
tag := unrelation.TagModel{
UserID: userID,
TagID: tagID,
TagName: tagName,
UserList: userList,
}
_, err := db.TagCollection.InsertOne(ctx, tag)
return err
}
func (db *TagMongoDriver) GetTagByID(ctx context.Context, userID, tagID string) (unrelation.TagModel, error) {
var tag unrelation.TagModel
err := db.TagCollection.FindOne(ctx, bson.M{"user_id": userID, "tag_id": tagID}).Decode(&tag)
return tag, err
}
func (db *TagMongoDriver) DeleteTag(ctx context.Context, userID, tagID string) error {
_, err := db.TagCollection.DeleteOne(ctx, bson.M{"user_id": userID, "tag_id": tagID})
return err
}
func (db *TagMongoDriver) SetTag(ctx context.Context, userID, tagID, newName string, increaseUserIDList []string, reduceUserIDList []string) error {
var tag unrelation.TagModel
if err := db.TagCollection.FindOne(ctx, bson.M{"tag_id": tagID, "user_id": userID}).Decode(&tag); err != nil {
return err
}
if newName != "" {
_, err := db.TagCollection.UpdateOne(ctx, bson.M{"user_id": userID, "tag_id": tagID}, bson.M{"$set": bson.M{"tag_name": newName}})
if err != nil {
return err
}
}
tag.UserList = append(tag.UserList, increaseUserIDList...)
tag.UserList = utils.RemoveRepeatedStringInList(tag.UserList)
for _, v := range reduceUserIDList {
for i2, v2 := range tag.UserList {
if v == v2 {
tag.UserList[i2] = ""
}
}
}
var newUserList []string
for _, v := range tag.UserList {
if v != "" {
newUserList = append(newUserList, v)
}
}
_, err := db.TagCollection.UpdateOne(ctx, bson.M{"user_id": userID, "tag_id": tagID}, bson.M{"$set": bson.M{"user_list": newUserList}})
if err != nil {
return err
}
return nil
}
func (db *TagMongoDriver) GetUserIDListByTagID(ctx context.Context, userID, tagID string) ([]string, error) {
var tag unrelation.TagModel
err := db.TagCollection.FindOne(ctx, bson.M{"user_id": userID, "tag_id": tagID}).Decode(&tag)
return tag.UserList, err
}
func (db *TagMongoDriver) SaveTagSendLog(ctx context.Context, tagSendLog *unrelation.TagSendLogModel) error {
_, err := db.TagSendLogCollection.InsertOne(ctx, tagSendLog)
return err
}
func (db *TagMongoDriver) GetTagSendLogs(ctx context.Context, userID string, showNumber, pageNumber int32) ([]unrelation.TagSendLogModel, error) {
var tagSendLogs []unrelation.TagSendLogModel
findOpts := options.Find().SetLimit(int64(showNumber)).SetSkip(int64(showNumber) * (int64(pageNumber) - 1)).SetSort(bson.M{"send_time": -1})
cursor, err := db.TagSendLogCollection.Find(ctx, bson.M{"send_id": userID}, findOpts)
if err != nil {
return tagSendLogs, err
}
err = cursor.All(ctx, &tagSendLogs)
return tagSendLogs, err
}
-151
View File
@@ -1,151 +0,0 @@
package unrelation
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/utils"
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"math/rand"
"strconv"
"time"
)
type WorkMomentMongoDriver struct {
mgoDB *mongo.Database
WorkMomentCollection *mongo.Collection
}
func NewWorkMomentMongoDriver(mgoDB *mongo.Database) *WorkMomentMongoDriver {
return &WorkMomentMongoDriver{mgoDB: mgoDB, WorkMomentCollection: mgoDB.Collection(unrelation.CWorkMoment)}
}
func (db *WorkMomentMongoDriver) generateWorkMomentID(userID string) string {
return utils.Md5(userID + strconv.Itoa(rand.Int()) + time.Now().String())
}
func (db *WorkMomentMongoDriver) generateWorkMomentCommentID(workMomentID string) string {
return utils.Md5(workMomentID + strconv.Itoa(rand.Int()) + time.Now().String())
}
func (db *WorkMomentMongoDriver) CreateOneWorkMoment(ctx context.Context, workMoment *unrelation.WorkMoment) error {
workMomentID := db.generateWorkMomentID(workMoment.UserID)
workMoment.WorkMomentID = workMomentID
workMoment.CreateTime = int32(time.Now().Unix())
_, err := db.WorkMomentCollection.InsertOne(ctx, workMoment)
return err
}
func (db *WorkMomentMongoDriver) DeleteOneWorkMoment(ctx context.Context, workMomentID string) error {
_, err := db.WorkMomentCollection.DeleteOne(ctx, bson.M{"work_moment_id": workMomentID})
return err
}
func (db *WorkMomentMongoDriver) DeleteComment(ctx context.Context, workMomentID, contentID, opUserID string) error {
_, err := db.WorkMomentCollection.UpdateOne(ctx, bson.D{{"work_moment_id", workMomentID},
{"$or", bson.A{
bson.D{{"user_id", opUserID}},
bson.D{{"comments", bson.M{"$elemMatch": bson.M{"user_id": opUserID}}}},
},
}}, bson.M{"$pull": bson.M{"comments": bson.M{"content_id": contentID}}})
return err
}
func (db *WorkMomentMongoDriver) GetWorkMomentByID(ctx context.Context, workMomentID string) (*unrelation.WorkMoment, error) {
workMoment := &unrelation.WorkMoment{}
err := db.WorkMomentCollection.FindOne(ctx, bson.M{"work_moment_id": workMomentID}).Decode(workMoment)
return workMoment, err
}
func (db *WorkMomentMongoDriver) LikeOneWorkMoment(ctx context.Context, likeUserID, userName, workMomentID string) (*unrelation.WorkMoment, bool, error) {
workMoment, err := db.GetWorkMomentByID(ctx, workMomentID)
if err != nil {
return nil, false, err
}
var isAlreadyLike bool
for i, user := range workMoment.LikeUserList {
if likeUserID == user.UserID {
isAlreadyLike = true
workMoment.LikeUserList = append(workMoment.LikeUserList[0:i], workMoment.LikeUserList[i+1:]...)
}
}
if !isAlreadyLike {
workMoment.LikeUserList = append(workMoment.LikeUserList, &unrelation.CommonUserModel{UserID: likeUserID, UserName: userName})
}
_, err = db.WorkMomentCollection.UpdateOne(ctx, bson.M{"work_moment_id": workMomentID}, bson.M{"$set": bson.M{"like_user_list": workMoment.LikeUserList}})
return workMoment, !isAlreadyLike, err
}
func (db *WorkMomentMongoDriver) CommentOneWorkMoment(ctx context.Context, comment *unrelation.Comment, workMomentID string) (unrelation.WorkMoment, error) {
comment.ContentID = generateWorkMomentCommentID(workMomentID)
var workMoment unrelation.WorkMoment
err := db.WorkMomentCollection.FindOneAndUpdate(ctx, bson.M{"work_moment_id": workMomentID}, bson.M{"$push": bson.M{"comments": comment}}).Decode(&workMoment)
return workMoment, err
}
func (db *WorkMomentMongoDriver) GetUserSelfWorkMoments(ctx context.Context, userID string, showNumber, pageNumber int32) ([]unrelation.WorkMoment, error) {
var workMomentList []unrelation.WorkMoment
findOpts := options.Find().SetLimit(int64(showNumber)).SetSkip(int64(showNumber) * (int64(pageNumber) - 1)).SetSort(bson.M{"create_time": -1})
result, err := db.WorkMomentCollection.Find(ctx, bson.M{"user_id": userID}, findOpts)
if err != nil {
return workMomentList, nil
}
err = result.All(ctx, &workMomentList)
return workMomentList, err
}
func (db *WorkMomentMongoDriver) GetUserWorkMoments(ctx context.Context, opUserID, userID string, showNumber, pageNumber int32, friendIDList []string) ([]unrelation.WorkMoment, error) {
var workMomentList []unrelation.WorkMoment
findOpts := options.Find().SetLimit(int64(showNumber)).SetSkip(int64(showNumber) * (int64(pageNumber) - 1)).SetSort(bson.M{"create_time": -1})
result, err := db.WorkMomentCollection.Find(ctx, bson.D{ // 等价条件: select * from
{"user_id", userID},
{"$or", bson.A{
bson.D{{"permission", constant.WorkMomentPermissionCantSee}, {"permission_user_id_list", bson.D{{"$nin", bson.A{opUserID}}}}},
bson.D{{"permission", constant.WorkMomentPermissionCanSee}, {"permission_user_id_list", bson.D{{"$in", bson.A{opUserID}}}}},
bson.D{{"permission", constant.WorkMomentPublic}},
}},
}, findOpts)
if err != nil {
return workMomentList, nil
}
err = result.All(ctx, &workMomentList)
return workMomentList, err
}
func (db *WorkMomentMongoDriver) GetUserFriendWorkMoments(ctx context.Context, showNumber, pageNumber int32, userID string, friendIDList []string) ([]unrelation.WorkMoment, error) {
var workMomentList []unrelation.WorkMoment
findOpts := options.Find().SetLimit(int64(showNumber)).SetSkip(int64(showNumber) * (int64(pageNumber) - 1)).SetSort(bson.M{"create_time": -1})
var filter bson.D
permissionFilter := bson.D{
{"$or", bson.A{
bson.D{{"permission", constant.WorkMomentPermissionCantSee}, {"permission_user_id_list", bson.D{{"$nin", bson.A{userID}}}}},
bson.D{{"permission", constant.WorkMomentPermissionCanSee}, {"permission_user_id_list", bson.D{{"$in", bson.A{userID}}}}},
bson.D{{"permission", constant.WorkMomentPublic}},
}}}
if config.Config.WorkMoment.OnlyFriendCanSee {
filter = bson.D{
{"$or", bson.A{
bson.D{{"user_id", userID}}, //self
bson.D{{"$and", bson.A{permissionFilter, bson.D{{"user_id", bson.D{{"$in", friendIDList}}}}}}},
},
},
}
} else {
filter = bson.D{
{"$or", bson.A{
bson.D{{"user_id", userID}}, //self
permissionFilter,
},
},
}
}
result, err := db.WorkMomentCollection.Find(ctx, filter, findOpts)
if err != nil {
return workMomentList, err
}
err = result.All(ctx, &workMomentList)
return workMomentList, err
}