mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-05 17:45:59 +08:00
errcode
This commit is contained in:
@@ -0,0 +1,99 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
unRelationTb "Open_IM/pkg/common/db/table/unrelation"
|
||||
"Open_IM/pkg/proto/sdkws"
|
||||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
type ExtendMsgInterface interface {
|
||||
CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error
|
||||
GetAllExtendMsgSet(ctx context.Context, ID string, opts *unRelationTb.GetAllExtendMsgSetOpts) (sets []*unRelationTb.ExtendMsgSetModel, err error)
|
||||
GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*unRelationTb.ExtendMsgSetModel, error)
|
||||
InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *unRelationTb.ExtendMsgModel) error
|
||||
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||
GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *unRelationTb.ExtendMsgModel, err error)
|
||||
}
|
||||
|
||||
type ExtendMsgController struct {
|
||||
database ExtendMsgDatabase
|
||||
}
|
||||
|
||||
func NewExtendMsgController(mgo *mongo.Client, rdb redis.UniversalClient) *ExtendMsgController {
|
||||
return &ExtendMsgController{}
|
||||
}
|
||||
|
||||
func (e *ExtendMsgController) CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error {
|
||||
return e.database.CreateExtendMsgSet(ctx, set)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgController) GetAllExtendMsgSet(ctx context.Context, ID string, opts *unRelationTb.GetAllExtendMsgSetOpts) (sets []*unRelationTb.ExtendMsgSetModel, err error) {
|
||||
return e.GetAllExtendMsgSet(ctx, ID, opts)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgController) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*unRelationTb.ExtendMsgSetModel, error) {
|
||||
return e.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgController) InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *unRelationTb.ExtendMsgModel) error {
|
||||
return e.InsertExtendMsg(ctx, sourceID, sessionType, msg)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgController) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
|
||||
return e.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, reactionExtensionList)
|
||||
}
|
||||
func (e *ExtendMsgController) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
|
||||
return e.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, reactionExtensionList)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgController) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *unRelationTb.ExtendMsgModel, err error) {
|
||||
return e.GetExtendMsg(ctx, sourceID, sessionType, clientMsgID, maxMsgUpdateTime)
|
||||
}
|
||||
|
||||
type ExtendMsgDatabaseInterface interface {
|
||||
CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error
|
||||
GetAllExtendMsgSet(ctx context.Context, ID string, opts *unRelationTb.GetAllExtendMsgSetOpts) (sets []*unRelationTb.ExtendMsgSetModel, err error)
|
||||
GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*unRelationTb.ExtendMsgSetModel, error)
|
||||
InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *unRelationTb.ExtendMsgModel) error
|
||||
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||
GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *unRelationTb.ExtendMsgModel, err error)
|
||||
}
|
||||
|
||||
type ExtendMsgDatabase struct {
|
||||
model unRelationTb.ExtendMsgSetModelInterface
|
||||
}
|
||||
|
||||
func NewExtendMsgDatabase() ExtendMsgDatabaseInterface {
|
||||
return &ExtendMsgDatabase{}
|
||||
}
|
||||
|
||||
func (e *ExtendMsgDatabase) CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error {
|
||||
return e.model.CreateExtendMsgSet(ctx, set)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgDatabase) GetAllExtendMsgSet(ctx context.Context, sourceID string, opts *unRelationTb.GetAllExtendMsgSetOpts) (sets []*unRelationTb.ExtendMsgSetModel, err error) {
|
||||
return e.model.GetAllExtendMsgSet(ctx, sourceID, opts)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgDatabase) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*unRelationTb.ExtendMsgSetModel, error) {
|
||||
return e.model.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgDatabase) InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *unRelationTb.ExtendMsgModel) error {
|
||||
return e.model.InsertExtendMsg(ctx, sourceID, sessionType, msg)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
|
||||
return e.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, reactionExtensionList)
|
||||
}
|
||||
func (e *ExtendMsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
|
||||
return e.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, reactionExtensionList)
|
||||
}
|
||||
|
||||
func (e *ExtendMsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *unRelationTb.ExtendMsgModel, err error) {
|
||||
return e.GetExtendMsg(ctx, sourceID, sessionType, clientMsgID, maxMsgUpdateTime)
|
||||
}
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"Open_IM/pkg/common/db/cache"
|
||||
"Open_IM/pkg/common/db/relation"
|
||||
relationTb "Open_IM/pkg/common/db/table/relation"
|
||||
unrelationTb "Open_IM/pkg/common/db/table/unrelation"
|
||||
unRelationTb "Open_IM/pkg/common/db/table/unrelation"
|
||||
"Open_IM/pkg/common/db/unrelation"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
@@ -47,8 +47,8 @@ type GroupInterface interface {
|
||||
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
|
||||
PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error)
|
||||
// SuperGroup
|
||||
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error)
|
||||
FindJoinSuperGroup(ctx context.Context, userID string) (superGroup *unrelationTb.UserToSuperGroupModel, err error)
|
||||
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unRelationTb.SuperGroupModel, error)
|
||||
FindJoinSuperGroup(ctx context.Context, userID string) (superGroup *unRelationTb.UserToSuperGroupModel, err error)
|
||||
CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error
|
||||
DeleteSuperGroup(ctx context.Context, groupID string) error
|
||||
DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
||||
@@ -153,11 +153,11 @@ func (g *GroupController) PageGroupRequestUser(ctx context.Context, userID strin
|
||||
return g.database.PageGroupRequestUser(ctx, userID, pageNumber, showNumber)
|
||||
}
|
||||
|
||||
func (g *GroupController) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error) {
|
||||
func (g *GroupController) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unRelationTb.SuperGroupModel, error) {
|
||||
return g.database.FindSuperGroup(ctx, groupIDs)
|
||||
}
|
||||
|
||||
func (g *GroupController) FindJoinSuperGroup(ctx context.Context, userID string) (*unrelationTb.UserToSuperGroupModel, error) {
|
||||
func (g *GroupController) FindJoinSuperGroup(ctx context.Context, userID string) (*unRelationTb.UserToSuperGroupModel, error) {
|
||||
return g.database.FindJoinSuperGroup(ctx, userID)
|
||||
}
|
||||
|
||||
@@ -203,8 +203,8 @@ type GroupDataBaseInterface interface {
|
||||
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
|
||||
PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error)
|
||||
// SuperGroup
|
||||
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error)
|
||||
FindJoinSuperGroup(ctx context.Context, userID string) (*unrelationTb.UserToSuperGroupModel, error)
|
||||
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unRelationTb.SuperGroupModel, error)
|
||||
FindJoinSuperGroup(ctx context.Context, userID string) (*unRelationTb.UserToSuperGroupModel, error)
|
||||
CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error
|
||||
DeleteSuperGroup(ctx context.Context, groupID string) error
|
||||
DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
||||
@@ -467,11 +467,11 @@ func (g *GroupDataBase) PageGroupRequestUser(ctx context.Context, userID string,
|
||||
return g.groupRequestDB.Page(ctx, userID, pageNumber, showNumber)
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error) {
|
||||
func (g *GroupDataBase) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*table.SuperGroupModel, error) {
|
||||
return g.mongoDB.FindSuperGroup(ctx, groupIDs)
|
||||
}
|
||||
|
||||
func (g *GroupDataBase) FindJoinSuperGroup(ctx context.Context, userID string) (*unrelationTb.UserToSuperGroupModel, error) {
|
||||
func (g *GroupDataBase) FindJoinSuperGroup(ctx context.Context, userID string) (*table.UserToSuperGroupModel, error) {
|
||||
return g.mongoDB.GetSuperGroupByUserID(ctx, userID)
|
||||
}
|
||||
|
||||
|
||||
+525
-47
@@ -1,76 +1,554 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db/cache"
|
||||
unRelationTb "Open_IM/pkg/common/db/table/unrelation"
|
||||
"Open_IM/pkg/common/db/unrelation"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/common/prome"
|
||||
"Open_IM/pkg/common/tracelog"
|
||||
"github.com/gogo/protobuf/sortkeys"
|
||||
"sync"
|
||||
|
||||
//"Open_IM/pkg/common/log"
|
||||
pbMsg "Open_IM/pkg/proto/msg"
|
||||
"Open_IM/pkg/proto/sdkws"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
type MsgInterface interface {
|
||||
BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
|
||||
BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64)
|
||||
|
||||
DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error)
|
||||
// logic delete
|
||||
DelMsgLogic(ctx context.Context, userID string, seqList []uint32) error
|
||||
DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error)
|
||||
ReplaceMsgToBlankByIndex(docID string, index int) (replaceMaxSeq uint32, err error)
|
||||
ReplaceMsgByIndex(ctx context.Context, suffixUserID string, msg *sdkws.MsgData, seqIndex int) error
|
||||
// 获取群ID或者UserID最新一条在mongo里面的消息
|
||||
GetNewestMsg(ID string) (msg *sdkws.MsgData, err error)
|
||||
// 获取群ID或者UserID最老一条在mongo里面的消息
|
||||
GetOldestMsg(ID string) (msg *sdkws.MsgData, err error)
|
||||
|
||||
GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error)
|
||||
GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error)
|
||||
GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, indexList []int, unExistSeqList []uint32, err error)
|
||||
SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error
|
||||
|
||||
CleanUpUserMsgFromMongo(userID string, operationID string) error
|
||||
// 批量插入消息到db
|
||||
BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
|
||||
// 刪除redis中消息缓存
|
||||
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error
|
||||
// incrSeq然后批量插入缓存
|
||||
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error)
|
||||
// 删除消息 返回不存在的seqList
|
||||
DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error)
|
||||
// 获取群ID或者UserID最新一条在db里面的消息
|
||||
GetNewestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
|
||||
// 获取群ID或者UserID最老一条在db里面的消息
|
||||
GetOldestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
|
||||
// 通过seqList获取db中写扩散消息
|
||||
GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
|
||||
// 通过seqList获取大群在db里面的消息
|
||||
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
|
||||
// 删除用户所有消息/cache/db然后重置seq
|
||||
CleanUpUserMsgFromMongo(ctx context.Context, userID string) error
|
||||
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
|
||||
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error
|
||||
// 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
|
||||
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
|
||||
}
|
||||
|
||||
func NewMsgController() MsgDatabaseInterface {
|
||||
return MsgController
|
||||
func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface {
|
||||
return &MsgController{}
|
||||
}
|
||||
|
||||
type MsgController struct {
|
||||
database MsgDatabase
|
||||
}
|
||||
|
||||
func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error {
|
||||
return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq)
|
||||
}
|
||||
|
||||
func (m *MsgController) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error {
|
||||
return m.database.DeleteMessageFromCache(ctx, userID, msgList)
|
||||
}
|
||||
|
||||
func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) {
|
||||
return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList)
|
||||
}
|
||||
|
||||
func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) {
|
||||
return m.database.DelMsgBySeqs(ctx, userID, seqs)
|
||||
}
|
||||
|
||||
func (m *MsgController) GetNewestMsg(ctx context.Context, ID string) (msg *sdkws.MsgData, err error) {
|
||||
return m.database.GetNewestMsg(ctx, ID)
|
||||
}
|
||||
|
||||
func (m *MsgController) GetOldestMsg(ctx context.Context, ID string) (msg *sdkws.MsgData, err error) {
|
||||
return m.database.GetOldestMsg(ctx, ID)
|
||||
}
|
||||
|
||||
func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
|
||||
return m.database.GetMsgBySeqs(ctx, userID, seqs)
|
||||
}
|
||||
|
||||
func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
|
||||
return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
|
||||
}
|
||||
|
||||
func (m *MsgController) CleanUpUserMsgFromMongo(ctx context.Context, userID string) error {
|
||||
return m.database.CleanUpUserMsgFromMongo(ctx, userID)
|
||||
}
|
||||
|
||||
func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error {
|
||||
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
|
||||
}
|
||||
|
||||
func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
|
||||
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
|
||||
}
|
||||
|
||||
type MsgDatabaseInterface interface {
|
||||
BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
|
||||
BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64)
|
||||
|
||||
DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error)
|
||||
// logic delete
|
||||
DelMsgLogic(ctx context.Context, userID string, seqList []uint32) error
|
||||
DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error)
|
||||
ReplaceMsgToBlankByIndex(docID string, index int) (replaceMaxSeq uint32, err error)
|
||||
ReplaceMsgByIndex(ctx context.Context, suffixUserID string, msg *sdkws.MsgData, seqIndex int) error
|
||||
// 批量插入消息
|
||||
BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
|
||||
// 刪除redis中消息缓存
|
||||
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error
|
||||
// incrSeq然后批量插入缓存
|
||||
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error)
|
||||
// 删除消息 返回不存在的seqList
|
||||
DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error)
|
||||
// 获取群ID或者UserID最新一条在mongo里面的消息
|
||||
GetNewestMsg(ID string) (msg *sdkws.MsgData, err error)
|
||||
GetNewestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
|
||||
// 获取群ID或者UserID最老一条在mongo里面的消息
|
||||
GetOldestMsg(ID string) (msg *sdkws.MsgData, err error)
|
||||
|
||||
GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error)
|
||||
GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error)
|
||||
GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, indexList []int, unExistSeqList []uint32, err error)
|
||||
SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error
|
||||
GetOldestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
|
||||
// 通过seqList获取mongo中写扩散消息
|
||||
GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
|
||||
// 通过seqList获取大群在 mongo里面的消息
|
||||
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
|
||||
// 删除用户所有消息/redis/mongo然后重置seq
|
||||
CleanUpUserMsgFromMongo(userID string, operationID string) error
|
||||
}
|
||||
|
||||
func NewMsgDatabase() MsgDatabaseInterface {
|
||||
return MsgDatabase
|
||||
CleanUpUserMsgFromMongo(ctx context.Context, userID string) error
|
||||
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
|
||||
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error
|
||||
// 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
|
||||
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
|
||||
}
|
||||
|
||||
type MsgDatabase struct {
|
||||
msgModel unRelationTb.MsgDocModelInterface
|
||||
msgCache cache.Cache
|
||||
msg unRelationTb.MsgDocModel
|
||||
}
|
||||
|
||||
func (m *MsgDatabase) BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error {
|
||||
|
||||
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
|
||||
return &MsgDatabase{}
|
||||
}
|
||||
|
||||
func (m *MsgDatabase) CleanUpUserMsgFromMongo(userID string, operationID string) error {
|
||||
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error {
|
||||
//newTime := utils.GetCurrentTimestampByMill()
|
||||
if len(msgList) > db.msg.GetSingleGocMsgNum() {
|
||||
return errors.New("too large")
|
||||
}
|
||||
var remain uint64
|
||||
blk0 := uint64(db.msg.GetSingleGocMsgNum() - 1)
|
||||
//currentMaxSeq 4998
|
||||
if currentMaxSeq < uint64(db.msg.GetSingleGocMsgNum()) {
|
||||
remain = blk0 - currentMaxSeq //1
|
||||
} else {
|
||||
excludeBlk0 := currentMaxSeq - blk0 //=1
|
||||
//(5000-1)%5000 == 4999
|
||||
remain = (uint64(db.msg.GetSingleGocMsgNum()) - (excludeBlk0 % uint64(db.msg.GetSingleGocMsgNum()))) % uint64(db.msg.GetSingleGocMsgNum())
|
||||
}
|
||||
//remain=1
|
||||
insertCounter := uint64(0)
|
||||
msgsToMongo := make([]unRelationTb.MsgInfoModel, 0)
|
||||
msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0)
|
||||
docID := ""
|
||||
docIDNext := ""
|
||||
var err error
|
||||
for _, m := range msgList {
|
||||
//log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
|
||||
currentMaxSeq++
|
||||
sMsg := unRelationTb.MsgInfoModel{}
|
||||
sMsg.SendTime = m.MsgData.SendTime
|
||||
m.MsgData.Seq = uint32(currentMaxSeq)
|
||||
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
if insertCounter < remain {
|
||||
msgsToMongo = append(msgsToMongo, sMsg)
|
||||
insertCounter++
|
||||
docID = db.msg.GetDocID(sourceID, uint32(currentMaxSeq))
|
||||
//log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
|
||||
} else {
|
||||
msgsToMongoNext = append(msgsToMongoNext, sMsg)
|
||||
docIDNext = db.msg.GetDocID(sourceID, uint32(currentMaxSeq))
|
||||
//log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
|
||||
}
|
||||
}
|
||||
|
||||
if docID != "" {
|
||||
//filter := bson.M{"uid": seqUid}
|
||||
//log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
|
||||
//err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
|
||||
err = db.msgModel.PushMsgsToDoc(ctx, docID, msgsToMongo)
|
||||
if err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
doc := &unRelationTb.MsgDocModel{}
|
||||
doc.DocID = docID
|
||||
doc.Msg = msgsToMongo
|
||||
if err = db.msgModel.Create(ctx, doc); err != nil {
|
||||
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
|
||||
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
|
||||
} else {
|
||||
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
|
||||
//log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
} else {
|
||||
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
|
||||
}
|
||||
}
|
||||
if docIDNext != "" {
|
||||
nextDoc := &unRelationTb.MsgDocModel{}
|
||||
nextDoc.DocID = docIDNext
|
||||
nextDoc.Msg = msgsToMongoNext
|
||||
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
||||
if err = db.msgModel.Create(ctx, nextDoc); err != nil {
|
||||
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
|
||||
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
|
||||
}
|
||||
//log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
|
||||
return db.msgCache.DeleteMessageFromCache(ctx, userID, msgs)
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) {
|
||||
//newTime := utils.GetCurrentTimestampByMill()
|
||||
lenList := len(msgList)
|
||||
if lenList > db.msg.GetSingleGocMsgNum() {
|
||||
return 0, errors.New("too large")
|
||||
}
|
||||
if lenList < 1 {
|
||||
return 0, errors.New("too short as 0")
|
||||
}
|
||||
// judge sessionType to get seq
|
||||
var currentMaxSeq uint64
|
||||
var err error
|
||||
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
||||
currentMaxSeq, err = db.msgCache.GetGroupMaxSeq(ctx, sourceID)
|
||||
//log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
|
||||
} else {
|
||||
currentMaxSeq, err = db.msgCache.GetUserMaxSeq(ctx, sourceID)
|
||||
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
|
||||
}
|
||||
if err != nil && err != redis.Nil {
|
||||
prome.PromeInc(prome.SeqGetFailedCounter)
|
||||
return 0, utils.Wrap(err, "")
|
||||
}
|
||||
prome.PromeInc(prome.SeqGetSuccessCounter)
|
||||
lastMaxSeq := currentMaxSeq
|
||||
for _, m := range msgList {
|
||||
currentMaxSeq++
|
||||
m.MsgData.Seq = uint32(currentMaxSeq)
|
||||
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", sourceID, "seq: ", currentMaxSeq)
|
||||
}
|
||||
//log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList))
|
||||
failedNum, err := db.msgCache.SetMessageToCache(ctx, sourceID, msgList)
|
||||
if err != nil {
|
||||
prome.PromeAdd(prome.MsgInsertRedisFailedCounter, failedNum)
|
||||
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID)
|
||||
} else {
|
||||
prome.PromeInc(prome.MsgInsertRedisSuccessCounter)
|
||||
}
|
||||
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
|
||||
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
||||
err = db.msgCache.SetGroupMaxSeq(ctx, sourceID, currentMaxSeq)
|
||||
} else {
|
||||
err = db.msgCache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
|
||||
}
|
||||
if err != nil {
|
||||
prome.PromeInc(prome.SeqSetFailedCounter)
|
||||
} else {
|
||||
prome.PromeInc(prome.SeqSetSuccessCounter)
|
||||
}
|
||||
return lastMaxSeq, utils.Wrap(err, "")
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) {
|
||||
sortkeys.Uint32s(seqs)
|
||||
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
|
||||
lock := sync.Mutex{}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(docIDSeqsMap))
|
||||
for k, v := range docIDSeqsMap {
|
||||
go func(docID string, seqs []uint32) {
|
||||
defer wg.Done()
|
||||
unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
lock.Lock()
|
||||
totalUnExistSeqs = append(totalUnExistSeqs, unExistSeqList...)
|
||||
lock.Unlock()
|
||||
}(k, v)
|
||||
}
|
||||
return totalUnExistSeqs, nil
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []uint32) (unExistSeqs []uint32, err error) {
|
||||
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i, v := range seqMsgs {
|
||||
if err = db.msgModel.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return unExistSeqs, nil
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []uint32) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []uint32, err error) {
|
||||
doc, err := db.msgModel.FindOneByDocID(ctx, docID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
singleCount := 0
|
||||
var hasSeqList []uint32
|
||||
for i := 0; i < len(doc.Msg); i++ {
|
||||
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
if utils.Contain(msgPb.Seq, seqs) {
|
||||
indexes = append(indexes, i)
|
||||
seqMsgs = append(seqMsgs, msgPb)
|
||||
hasSeqList = append(hasSeqList, msgPb.Seq)
|
||||
singleCount++
|
||||
if singleCount == len(seqs) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, i := range seqs {
|
||||
if utils.Contain(i, hasSeqList) {
|
||||
continue
|
||||
}
|
||||
unExistSeqs = append(unExistSeqs, i)
|
||||
}
|
||||
return seqMsgs, indexes, unExistSeqs, nil
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
||||
msgInfo, err := db.msgModel.GetNewestMsg(ctx, sourceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db.unmarshalMsg(msgInfo)
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
||||
msgInfo, err := db.msgModel.GetOldestMsg(ctx, sourceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db.unmarshalMsg(msgInfo)
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
|
||||
msgPb = &sdkws.MsgData{}
|
||||
err = proto.Unmarshal(msgInfo.Msg, msgPb)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "")
|
||||
}
|
||||
return msgPb, nil
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []uint32, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
|
||||
var hasSeqs []uint32
|
||||
singleCount := 0
|
||||
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
|
||||
for docID, value := range m {
|
||||
doc, err := db.msgModel.FindOneByDocID(ctx, docID)
|
||||
if err != nil {
|
||||
//log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
|
||||
continue
|
||||
}
|
||||
singleCount = 0
|
||||
for i := 0; i < len(doc.Msg); i++ {
|
||||
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
|
||||
if err != nil {
|
||||
//log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
if utils.Contain(msgPb.Seq, value) {
|
||||
seqMsg = append(seqMsg, msgPb)
|
||||
hasSeqs = append(hasSeqs, msgPb.Seq)
|
||||
singleCount++
|
||||
if singleCount == len(value) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(hasSeqs) != len(seqs) {
|
||||
var diff []uint32
|
||||
var exceptionMsg []*sdkws.MsgData
|
||||
diff = utils.Difference(hasSeqs, seqs)
|
||||
if diffusionType == constant.WriteDiffusion {
|
||||
exceptionMsg = db.msg.GenExceptionMessageBySeqs(diff)
|
||||
} else if diffusionType == constant.ReadDiffusion {
|
||||
exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, sourceID)
|
||||
}
|
||||
seqMsg = append(seqMsg, exceptionMsg...)
|
||||
}
|
||||
return seqMsg, nil
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
|
||||
return db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion)
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
|
||||
return db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion)
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) CleanUpUserMsgFromMongo(ctx context.Context, userID string) error {
|
||||
maxSeq, err := db.msgCache.GetUserMaxSeq(ctx, userID)
|
||||
if err == redis.Nil {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
docIDs := db.msg.GetSeqDocIDList(userID, uint32(maxSeq))
|
||||
err = db.msgModel.Delete(ctx, docIDs)
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = db.msgCache.SetUserMinSeq(ctx, userID, maxSeq)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
|
||||
var delStruct delMsgRecursionStruct
|
||||
minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
|
||||
if err != nil {
|
||||
//log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMsg failed")
|
||||
}
|
||||
if minSeq == 0 {
|
||||
return nil
|
||||
}
|
||||
//log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq)
|
||||
for _, userID := range userIDs {
|
||||
userMinSeq, err := db.msgCache.GetGroupUserMinSeq(ctx, groupID, userID)
|
||||
if err != nil && err != redis.Nil {
|
||||
//log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error())
|
||||
continue
|
||||
}
|
||||
if userMinSeq > uint64(minSeq) {
|
||||
err = db.msgCache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
|
||||
} else {
|
||||
err = db.msgCache.SetGroupUserMinSeq(ctx, groupID, userID, uint64(minSeq))
|
||||
}
|
||||
if err != nil {
|
||||
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
|
||||
var delStruct delMsgRecursionStruct
|
||||
minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime)
|
||||
if err != nil {
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
if minSeq == 0 {
|
||||
return nil
|
||||
}
|
||||
return db.msgCache.SetUserMinSeq(ctx, userID, uint64(minSeq))
|
||||
}
|
||||
|
||||
// this is struct for recursion
|
||||
type delMsgRecursionStruct struct {
|
||||
minSeq uint32
|
||||
delDocIDList []string
|
||||
}
|
||||
|
||||
func (d *delMsgRecursionStruct) getSetMinSeq() uint32 {
|
||||
return d.minSeq
|
||||
}
|
||||
|
||||
// index 0....19(del) 20...69
|
||||
// seq 70
|
||||
// set minSeq 21
|
||||
// recursion 删除list并且返回设置的最小seq
|
||||
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (uint32, error) {
|
||||
// find from oldest list
|
||||
msgs, err := db.msgModel.GetMsgsByIndex(ctx, sourceID, index)
|
||||
if err != nil || msgs.DocID == "" {
|
||||
if err != nil {
|
||||
if err == unrelation.ErrMsgListNotExist {
|
||||
//log.NewInfo(operationID, utils.GetSelfFuncName(), "ID:", sourceID, "index:", index, err.Error())
|
||||
} else {
|
||||
//log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID)
|
||||
}
|
||||
}
|
||||
// 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList)
|
||||
err = db.msgModel.Delete(ctx, delStruct.delDocIDList)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return delStruct.getSetMinSeq() + 1, nil
|
||||
}
|
||||
//log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
|
||||
if len(msgs.Msg) > db.msg.GetSingleGocMsgNum() {
|
||||
log.NewWarn(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "msgs too large:", len(msgs.Msg), "docID:", msgs.DocID)
|
||||
}
|
||||
if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
|
||||
delStruct.delDocIDList = append(delStruct.delDocIDList, msgs.DocID)
|
||||
lastMsgPb := &sdkws.MsgData{}
|
||||
err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
|
||||
if err != nil {
|
||||
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID)
|
||||
return 0, utils.Wrap(err, "proto.Unmarshal failed")
|
||||
}
|
||||
delStruct.minSeq = lastMsgPb.Seq
|
||||
} else {
|
||||
var hasMarkDelFlag bool
|
||||
for _, msg := range msgs.Msg {
|
||||
msgPb := &sdkws.MsgData{}
|
||||
err = proto.Unmarshal(msg.Msg, msgPb)
|
||||
if err != nil {
|
||||
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID)
|
||||
return 0, utils.Wrap(err, "proto.Unmarshal failed")
|
||||
}
|
||||
if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
|
||||
msgPb.Status = constant.MsgDeleted
|
||||
bytes, _ := proto.Marshal(msgPb)
|
||||
msg.Msg = bytes
|
||||
msg.SendTime = 0
|
||||
hasMarkDelFlag = true
|
||||
} else {
|
||||
if err := db.msgModel.Delete(ctx, delStruct.delDocIDList); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if hasMarkDelFlag {
|
||||
if err := db.msgModel.UpdateOneDoc(ctx, msgs); err != nil {
|
||||
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
|
||||
}
|
||||
}
|
||||
return msgPb.Seq + 1, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
//log.NewDebug(operationID, sourceID, "continue to", delStruct)
|
||||
// 继续递归 index+1
|
||||
seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime)
|
||||
return seq, utils.Wrap(err, "deleteMsg failed")
|
||||
}
|
||||
|
||||
@@ -23,6 +23,8 @@ type UserInterface interface {
|
||||
Page(ctx context.Context, pageNumber, showNumber int32) (users []*relationTb.UserModel, count int64, err error)
|
||||
//只要有一个存在就为true
|
||||
IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
|
||||
//获取所有用户ID
|
||||
GetAllUserID(ctx context.Context) ([]string, error)
|
||||
}
|
||||
|
||||
type UserController struct {
|
||||
@@ -55,6 +57,11 @@ func (u *UserController) Page(ctx context.Context, pageNumber, showNumber int32)
|
||||
func (u *UserController) IsExist(ctx context.Context, userIDs []string) (exist bool, err error) {
|
||||
return u.database.IsExist(ctx, userIDs)
|
||||
}
|
||||
|
||||
func (u *UserController) GetAllUserID(ctx context.Context) ([]string, error) {
|
||||
return u.database.GetAllUserID(ctx)
|
||||
}
|
||||
|
||||
func NewUserController(db *gorm.DB) *UserController {
|
||||
controller := &UserController{database: newUserDatabase(db)}
|
||||
return controller
|
||||
@@ -75,6 +82,8 @@ type UserDatabaseInterface interface {
|
||||
Page(ctx context.Context, pageNumber, showNumber int32) (users []*relationTb.UserModel, count int64, err error)
|
||||
//只要有一个存在就为true
|
||||
IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
|
||||
//获取所有用户ID
|
||||
GetAllUserID(ctx context.Context) ([]string, error)
|
||||
}
|
||||
|
||||
type UserDatabase struct {
|
||||
@@ -138,3 +147,7 @@ func (u *UserDatabase) IsExist(ctx context.Context, userIDs []string) (exist boo
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (u *UserDatabase) GetAllUserID(ctx context.Context) ([]string, error) {
|
||||
return u.user.GetAllUserID(ctx)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user