mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-04 09:05:59 +08:00
feat: add scripts (#525)
This commit is contained in:
@@ -14,13 +14,22 @@ import (
|
||||
|
||||
type ConversationDatabase interface {
|
||||
//UpdateUserConversationFiled 更新用户该会话的属性信息
|
||||
UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error
|
||||
UpdateUsersConversationFiled(
|
||||
ctx context.Context,
|
||||
userIDs []string,
|
||||
conversationID string,
|
||||
args map[string]interface{},
|
||||
) error
|
||||
//CreateConversation 创建一批新的会话
|
||||
CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error
|
||||
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
|
||||
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation []*relationTb.ConversationModel) error
|
||||
//FindConversations 根据会话ID获取某个用户的多个会话
|
||||
FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
|
||||
FindConversations(
|
||||
ctx context.Context,
|
||||
ownerUserID string,
|
||||
conversationIDs []string,
|
||||
) ([]*relationTb.ConversationModel, error)
|
||||
//FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID
|
||||
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
|
||||
//GetUserAllConversation 获取一个用户在服务器上所有的会话
|
||||
@@ -28,17 +37,29 @@ type ConversationDatabase interface {
|
||||
//SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
|
||||
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error
|
||||
//SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作
|
||||
SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
|
||||
SetUsersConversationFiledTx(
|
||||
ctx context.Context,
|
||||
userIDs []string,
|
||||
conversation *relationTb.ConversationModel,
|
||||
filedMap map[string]interface{},
|
||||
) error
|
||||
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error
|
||||
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
|
||||
GetAllConversationIDs(ctx context.Context) ([]string, error)
|
||||
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
|
||||
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationTb.ConversationModel, error)
|
||||
GetConversationsByConversationID(
|
||||
ctx context.Context,
|
||||
conversationIDs []string,
|
||||
) ([]*relationTb.ConversationModel, error)
|
||||
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationTb.ConversationModel, error)
|
||||
}
|
||||
|
||||
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||
func NewConversationDatabase(
|
||||
conversation relationTb.ConversationModelInterface,
|
||||
cache cache.ConversationCache,
|
||||
tx tx.Tx,
|
||||
) ConversationDatabase {
|
||||
return &conversationDatabase{
|
||||
conversationDB: conversation,
|
||||
cache: cache,
|
||||
@@ -52,7 +73,12 @@ type conversationDatabase struct {
|
||||
tx tx.Tx
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) {
|
||||
func (c *conversationDatabase) SetUsersConversationFiledTx(
|
||||
ctx context.Context,
|
||||
userIDs []string,
|
||||
conversation *relationTb.ConversationModel,
|
||||
filedMap map[string]interface{},
|
||||
) (err error) {
|
||||
cache := c.cache.NewCache()
|
||||
if err := c.tx.Transaction(func(tx any) error {
|
||||
conversationTx := c.conversationDB.NewTx(tx)
|
||||
@@ -100,7 +126,12 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
|
||||
return cache.ExecDel(ctx)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
|
||||
func (c *conversationDatabase) UpdateUsersConversationFiled(
|
||||
ctx context.Context,
|
||||
userIDs []string,
|
||||
conversationID string,
|
||||
args map[string]interface{},
|
||||
) error {
|
||||
_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -108,7 +139,10 @@ func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context,
|
||||
return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
|
||||
func (c *conversationDatabase) CreateConversation(
|
||||
ctx context.Context,
|
||||
conversations []*relationTb.ConversationModel,
|
||||
) error {
|
||||
if err := c.conversationDB.Create(ctx, conversations); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -121,7 +155,10 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
|
||||
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
|
||||
func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(
|
||||
ctx context.Context,
|
||||
conversations []*relationTb.ConversationModel,
|
||||
) error {
|
||||
cache := c.cache.NewCache()
|
||||
if err := c.tx.Transaction(func(tx any) error {
|
||||
conversationTx := c.conversationDB.NewTx(tx)
|
||||
@@ -159,19 +196,34 @@ func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Con
|
||||
return cache.ExecDel(ctx)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
|
||||
func (c *conversationDatabase) FindConversations(
|
||||
ctx context.Context,
|
||||
ownerUserID string,
|
||||
conversationIDs []string,
|
||||
) ([]*relationTb.ConversationModel, error) {
|
||||
return c.cache.GetConversations(ctx, ownerUserID, conversationIDs)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) {
|
||||
func (c *conversationDatabase) GetConversation(
|
||||
ctx context.Context,
|
||||
ownerUserID string,
|
||||
conversationID string,
|
||||
) (*relationTb.ConversationModel, error) {
|
||||
return c.cache.GetConversation(ctx, ownerUserID, conversationID)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
|
||||
func (c *conversationDatabase) GetUserAllConversation(
|
||||
ctx context.Context,
|
||||
ownerUserID string,
|
||||
) ([]*relationTb.ConversationModel, error) {
|
||||
return c.cache.GetUserAllConversations(ctx, ownerUserID)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
|
||||
func (c *conversationDatabase) SetUserConversations(
|
||||
ctx context.Context,
|
||||
ownerUserID string,
|
||||
conversations []*relationTb.ConversationModel,
|
||||
) error {
|
||||
cache := c.cache.NewCache()
|
||||
if err := c.tx.Transaction(func(tx any) error {
|
||||
var conversationIDs []string
|
||||
@@ -221,7 +273,11 @@ func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context,
|
||||
return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
|
||||
func (c *conversationDatabase) CreateGroupChatConversation(
|
||||
ctx context.Context,
|
||||
groupID string,
|
||||
userIDs []string,
|
||||
) error {
|
||||
cache := c.cache.NewCache()
|
||||
conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
|
||||
if err := c.tx.Transaction(func(tx any) error {
|
||||
@@ -261,7 +317,10 @@ func (c *conversationDatabase) GetConversationIDs(ctx context.Context, userID st
|
||||
return c.cache.GetUserConversationIDs(ctx, userID)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
|
||||
func (c *conversationDatabase) GetUserConversationIDsHash(
|
||||
ctx context.Context,
|
||||
ownerUserID string,
|
||||
) (hash uint64, err error) {
|
||||
return c.cache.GetUserConversationIDsHash(ctx, ownerUserID)
|
||||
}
|
||||
|
||||
@@ -269,14 +328,22 @@ func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]str
|
||||
return c.conversationDB.GetAllConversationIDs(ctx)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
|
||||
func (c *conversationDatabase) GetUserAllHasReadSeqs(
|
||||
ctx context.Context,
|
||||
ownerUserID string,
|
||||
) (map[string]int64, error) {
|
||||
return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
|
||||
func (c *conversationDatabase) GetConversationsByConversationID(
|
||||
ctx context.Context,
|
||||
conversationIDs []string,
|
||||
) ([]*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationTb.ConversationModel, error) {
|
||||
func (c *conversationDatabase) GetConversationIDsNeedDestruct(
|
||||
ctx context.Context,
|
||||
) ([]*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
|
||||
}
|
||||
|
||||
@@ -573,6 +573,10 @@ func (g *groupDatabase) CountTotal(ctx context.Context, before *time.Time) (coun
|
||||
return g.groupDB.CountTotal(ctx, before)
|
||||
}
|
||||
|
||||
func (g *groupDatabase) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) {
|
||||
func (g *groupDatabase) CountRangeEverydayTotal(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
) (map[string]int64, error) {
|
||||
return g.groupDB.CountRangeEverydayTotal(ctx, start, end)
|
||||
}
|
||||
|
||||
+345
-52
@@ -18,10 +18,11 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
|
||||
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -40,16 +41,36 @@ type CommonMsgDatabase interface {
|
||||
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
|
||||
DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
|
||||
// incrSeq然后批量插入缓存
|
||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
|
||||
BatchInsertChat2Cache(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
msgs []*sdkws.MsgData,
|
||||
) (seq int64, isNewConversation bool, err error)
|
||||
|
||||
// 通过seqList获取mongo中写扩散消息
|
||||
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
|
||||
GetMsgBySeqsRange(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
begin, end, num, userMaxSeq int64,
|
||||
) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
|
||||
// 通过seqList获取大群在 mongo里面的消息
|
||||
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
|
||||
GetMsgBySeqs(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
seqs []int64,
|
||||
) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
|
||||
// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
|
||||
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
|
||||
// 用户标记删除过期消息返回标记删除的seq列表
|
||||
UserMsgsDestruct(cte context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error)
|
||||
UserMsgsDestruct(
|
||||
cte context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
destructTime int64,
|
||||
lastMsgDestructTime time.Time,
|
||||
) (seqs []int64, err error)
|
||||
|
||||
// 用户根据seq删除消息
|
||||
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
|
||||
@@ -75,7 +96,10 @@ type CommonMsgDatabase interface {
|
||||
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
|
||||
|
||||
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error)
|
||||
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
||||
GetConversationMinMaxSeqInMongoAndCache(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
||||
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
||||
|
||||
@@ -85,8 +109,23 @@ type CommonMsgDatabase interface {
|
||||
MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error
|
||||
|
||||
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*unRelationTb.UserCount, dateCount map[string]int64, err error)
|
||||
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
||||
RangeUserSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
group bool,
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, users []*unRelationTb.UserCount, dateCount map[string]int64, err error)
|
||||
RangeGroupSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
||||
}
|
||||
|
||||
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
|
||||
@@ -121,16 +160,32 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData) error {
|
||||
func (db *commonMsgDatabase) MsgToModifyMQ(
|
||||
ctx context.Context,
|
||||
key, conversationID string,
|
||||
messages []*sdkws.MsgData,
|
||||
) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, key, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
|
||||
_, _, err := db.producerToModify.SendMessage(
|
||||
ctx,
|
||||
key,
|
||||
&pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages},
|
||||
)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
||||
func (db *commonMsgDatabase) MsgToPushMQ(
|
||||
ctx context.Context,
|
||||
key, conversationID string,
|
||||
msg2mq *sdkws.MsgData,
|
||||
) (int32, int64, error) {
|
||||
partition, offset, err := db.producerToPush.SendMessage(
|
||||
ctx,
|
||||
key,
|
||||
&pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID},
|
||||
)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
|
||||
return 0, 0, err
|
||||
@@ -138,15 +193,30 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationI
|
||||
return partition, offset, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
func (db *commonMsgDatabase) MsgToMongoMQ(
|
||||
ctx context.Context,
|
||||
key, conversationID string,
|
||||
messages []*sdkws.MsgData,
|
||||
lastSeq int64,
|
||||
) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||||
_, _, err := db.producerToMongo.SendMessage(
|
||||
ctx,
|
||||
key,
|
||||
&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages},
|
||||
)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
|
||||
func (db *commonMsgDatabase) BatchInsertBlock(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
fields []any,
|
||||
key int8,
|
||||
firstSeq int64,
|
||||
) error {
|
||||
if len(fields) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -247,7 +317,12 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
||||
func (db *commonMsgDatabase) BatchInsertChat2DB(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
msgList []*sdkws.MsgData,
|
||||
currentMaxSeq int64,
|
||||
) error {
|
||||
if len(msgList) == 0 {
|
||||
return errs.ErrArgs.Wrap("msgList is empty")
|
||||
}
|
||||
@@ -293,11 +368,21 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
|
||||
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error {
|
||||
func (db *commonMsgDatabase) RevokeMsg(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
seq int64,
|
||||
revoke *unRelationTb.RevokeModel,
|
||||
) error {
|
||||
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error {
|
||||
func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
totalSeqs []int64,
|
||||
) error {
|
||||
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, totalSeqs) {
|
||||
var indexes []int64
|
||||
for _, seq := range seqs {
|
||||
@@ -320,7 +405,11 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
|
||||
db.cache.DelUserDeleteMsgsList(ctx, conversationID, seqs)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
||||
func (db *commonMsgDatabase) BatchInsertChat2Cache(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
msgs []*sdkws.MsgData,
|
||||
) (seq int64, isNew bool, err error) {
|
||||
currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
prome.Inc(prome.SeqGetFailedCounter)
|
||||
@@ -367,7 +456,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
|
||||
return lastMaxSeq, isNew, utils.Wrap(err, "")
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
|
||||
func (db *commonMsgDatabase) getMsgBySeqs(
|
||||
ctx context.Context,
|
||||
userID, conversationID string,
|
||||
seqs []int64,
|
||||
) (totalMsgs []*sdkws.MsgData, err error) {
|
||||
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
|
||||
//log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
|
||||
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs)
|
||||
@@ -381,7 +474,11 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
|
||||
return totalMsgs, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, err error) {
|
||||
func (db *commonMsgDatabase) findMsgInfoBySeq(
|
||||
ctx context.Context,
|
||||
userID, docID string,
|
||||
seqs []int64,
|
||||
) (totalMsgs []*unRelationTb.MsgInfoModel, err error) {
|
||||
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
|
||||
for _, msg := range msgs {
|
||||
if msg.IsRead {
|
||||
@@ -391,8 +488,25 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID
|
||||
return msgs, err
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) {
|
||||
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
|
||||
func (db *commonMsgDatabase) getMsgBySeqsRange(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
allSeqs []int64,
|
||||
begin, end int64,
|
||||
) (seqMsgs []*sdkws.MsgData, err error) {
|
||||
log.ZDebug(
|
||||
ctx,
|
||||
"getMsgBySeqsRange",
|
||||
"conversationID",
|
||||
conversationID,
|
||||
"allSeqs",
|
||||
allSeqs,
|
||||
"begin",
|
||||
begin,
|
||||
"end",
|
||||
end,
|
||||
)
|
||||
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
|
||||
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
|
||||
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs)
|
||||
@@ -409,7 +523,12 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
|
||||
return seqMsgs, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) {
|
||||
func (db *commonMsgDatabase) GetMsgBySeqsRange(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
begin, end, num, userMaxSeq int64,
|
||||
) (int64, int64, []*sdkws.MsgData, error) {
|
||||
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
@@ -429,7 +548,18 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq)
|
||||
log.ZDebug(
|
||||
ctx,
|
||||
"GetMsgBySeqsRange",
|
||||
"userMinSeq",
|
||||
userMinSeq,
|
||||
"conMinSeq",
|
||||
minSeq,
|
||||
"conMaxSeq",
|
||||
maxSeq,
|
||||
"userMaxSeq",
|
||||
userMaxSeq,
|
||||
)
|
||||
if userMaxSeq != 0 {
|
||||
if userMaxSeq < maxSeq {
|
||||
maxSeq = userMaxSeq
|
||||
@@ -479,7 +609,18 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
|
||||
cacheDelNum += 1
|
||||
}
|
||||
}
|
||||
log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID, "cacheDelNum", cacheDelNum)
|
||||
log.ZDebug(
|
||||
ctx,
|
||||
"get delSeqs from redis",
|
||||
"delSeqs",
|
||||
delSeqs,
|
||||
"userID",
|
||||
userID,
|
||||
"conversationID",
|
||||
conversationID,
|
||||
"cacheDelNum",
|
||||
cacheDelNum,
|
||||
)
|
||||
var reGetSeqsCache []int64
|
||||
for i := 1; i <= cacheDelNum; {
|
||||
newSeq := newBegin - int64(i)
|
||||
@@ -499,7 +640,15 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2))
|
||||
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", reGetSeqsCache)
|
||||
log.ZError(
|
||||
ctx,
|
||||
"get message from redis exception",
|
||||
err,
|
||||
"conversationID",
|
||||
conversationID,
|
||||
"seqs",
|
||||
reGetSeqsCache,
|
||||
)
|
||||
}
|
||||
}
|
||||
failedSeqs = append(failedSeqs, failedSeqs2...)
|
||||
@@ -525,7 +674,12 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
|
||||
return minSeq, maxSeq, successMsgs, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
|
||||
func (db *commonMsgDatabase) GetMsgBySeqs(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
seqs []int64,
|
||||
) (int64, int64, []*sdkws.MsgData, error) {
|
||||
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
@@ -551,10 +705,33 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
||||
log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
|
||||
log.ZError(
|
||||
ctx,
|
||||
"get message from redis exception",
|
||||
err,
|
||||
"failedSeqs",
|
||||
failedSeqs,
|
||||
"conversationID",
|
||||
conversationID,
|
||||
)
|
||||
}
|
||||
}
|
||||
log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs", len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID)
|
||||
log.ZInfo(
|
||||
ctx,
|
||||
"db.cache.GetMessagesBySeq",
|
||||
"userID",
|
||||
userID,
|
||||
"conversationID",
|
||||
conversationID,
|
||||
"seqs",
|
||||
seqs,
|
||||
"successMsgs",
|
||||
len(successMsgs),
|
||||
"failedSeqs",
|
||||
failedSeqs,
|
||||
"conversationID",
|
||||
conversationID,
|
||||
)
|
||||
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
||||
if len(failedSeqs) > 0 {
|
||||
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
|
||||
@@ -568,7 +745,11 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
|
||||
return minSeq, maxSeq, successMsgs, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error {
|
||||
func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
remainTime int64,
|
||||
) error {
|
||||
var delStruct delMsgRecursionStruct
|
||||
var skip int64
|
||||
minSeq, err := db.deleteMsgRecursion(ctx, conversationID, skip, &delStruct, remainTime)
|
||||
@@ -588,7 +769,13 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
|
||||
return db.cache.SetMinSeq(ctx, conversationID, minSeq)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) {
|
||||
func (db *commonMsgDatabase) UserMsgsDestruct(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
destructTime int64,
|
||||
lastMsgDestructTime time.Time,
|
||||
) (seqs []int64, err error) {
|
||||
var index int64
|
||||
for {
|
||||
// from oldest 2 newest
|
||||
@@ -596,7 +783,16 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
|
||||
if err != nil || msgDocModel.DocID == "" {
|
||||
if err != nil {
|
||||
if err == unrelation.ErrMsgListNotExist {
|
||||
log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index)
|
||||
log.ZDebug(
|
||||
ctx,
|
||||
"deleteMsgRecursion finished",
|
||||
"conversationID",
|
||||
conversationID,
|
||||
"userID",
|
||||
userID,
|
||||
"index",
|
||||
index,
|
||||
)
|
||||
} else {
|
||||
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
|
||||
}
|
||||
@@ -645,13 +841,26 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
|
||||
// seq 70
|
||||
// set minSeq 21
|
||||
// recursion 删除list并且返回设置的最小seq
|
||||
func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
|
||||
func (db *commonMsgDatabase) deleteMsgRecursion(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
index int64,
|
||||
delStruct *delMsgRecursionStruct,
|
||||
remainTime int64,
|
||||
) (int64, error) {
|
||||
// find from oldest list
|
||||
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
|
||||
if err != nil || msgDocModel.DocID == "" {
|
||||
if err != nil {
|
||||
if err == unrelation.ErrMsgListNotExist {
|
||||
log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
|
||||
log.ZDebug(
|
||||
ctx,
|
||||
"deleteMsgRecursion ErrMsgListNotExist",
|
||||
"conversationID",
|
||||
conversationID,
|
||||
"index:",
|
||||
index,
|
||||
)
|
||||
} else {
|
||||
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
|
||||
}
|
||||
@@ -663,11 +872,23 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
|
||||
}
|
||||
return delStruct.getSetMinSeq() + 1, nil
|
||||
}
|
||||
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg))
|
||||
log.ZDebug(
|
||||
ctx,
|
||||
"doc info",
|
||||
"conversationID",
|
||||
conversationID,
|
||||
"index",
|
||||
index,
|
||||
"docID",
|
||||
msgDocModel.DocID,
|
||||
"len",
|
||||
len(msgDocModel.Msg),
|
||||
)
|
||||
if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() {
|
||||
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID)
|
||||
}
|
||||
if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() {
|
||||
if msgDocModel.IsFull() &&
|
||||
msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() {
|
||||
log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID)
|
||||
delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID)
|
||||
delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq
|
||||
@@ -704,7 +925,11 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
|
||||
return seq, err
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
|
||||
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
allSeqs []int64,
|
||||
) error {
|
||||
if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -720,7 +945,12 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
|
||||
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
seqs []int64,
|
||||
) error {
|
||||
cachedMsgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs)
|
||||
@@ -789,31 +1019,70 @@ func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []s
|
||||
func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
|
||||
return db.cache.GetMinSeq(ctx, conversationID)
|
||||
}
|
||||
func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
|
||||
|
||||
func (db *commonMsgDatabase) GetConversationUserMinSeq(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
userID string,
|
||||
) (int64, error) {
|
||||
return db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
|
||||
}
|
||||
func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) {
|
||||
|
||||
func (db *commonMsgDatabase) GetConversationUserMinSeqs(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
userIDs []string,
|
||||
) (map[string]int64, error) {
|
||||
return db.cache.GetConversationUserMinSeqs(ctx, conversationID, userIDs)
|
||||
}
|
||||
func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
|
||||
|
||||
func (db *commonMsgDatabase) SetConversationUserMinSeq(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
userID string,
|
||||
minSeq int64,
|
||||
) error {
|
||||
return db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq)
|
||||
}
|
||||
func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
|
||||
|
||||
func (db *commonMsgDatabase) SetConversationUserMinSeqs(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
seqs map[string]int64,
|
||||
) (err error) {
|
||||
return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
|
||||
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
seqs map[string]int64,
|
||||
) error {
|
||||
return db.cache.SetUserConversationsMinSeqs(ctx, userID, seqs)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
|
||||
func (db *commonMsgDatabase) UserSetHasReadSeqs(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
hasReadSeqs map[string]int64,
|
||||
) error {
|
||||
return db.cache.UserSetHasReadSeqs(ctx, userID, hasReadSeqs)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
|
||||
func (db *commonMsgDatabase) SetHasReadSeq(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationID string,
|
||||
hasReadSeq int64,
|
||||
) error {
|
||||
return db.cache.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq)
|
||||
}
|
||||
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
|
||||
|
||||
func (db *commonMsgDatabase) GetHasReadSeqs(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
conversationIDs []string,
|
||||
) (map[string]int64, error) {
|
||||
return db.cache.GetHasReadSeqs(ctx, userID, conversationIDs)
|
||||
}
|
||||
|
||||
@@ -829,7 +1098,10 @@ func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (i
|
||||
return db.cache.GetSendMsgStatus(ctx, id)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
|
||||
func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
|
||||
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -845,11 +1117,17 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
|
||||
return
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
||||
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
) (minSeqMongo, maxSeqMongo int64, err error) {
|
||||
return db.GetMinMaxSeqMongo(ctx, conversationID)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
||||
func (db *commonMsgDatabase) GetMinMaxSeqMongo(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
) (minSeqMongo, maxSeqMongo int64, err error) {
|
||||
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -863,10 +1141,25 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation
|
||||
return
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*unRelationTb.UserCount, dateCount map[string]int64, err error) {
|
||||
func (db *commonMsgDatabase) RangeUserSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
group bool,
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, users []*unRelationTb.UserCount, dateCount map[string]int64, err error) {
|
||||
return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error) {
|
||||
func (db *commonMsgDatabase) RangeGroupSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error) {
|
||||
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
|
||||
}
|
||||
|
||||
@@ -2,18 +2,25 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/cont"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
type S3Database interface {
|
||||
PartLimit() *s3.PartLimit
|
||||
PartSize(ctx context.Context, size int64) (int64, error)
|
||||
AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error)
|
||||
InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error)
|
||||
InitiateMultipartUpload(
|
||||
ctx context.Context,
|
||||
hash string,
|
||||
size int64,
|
||||
expire time.Duration,
|
||||
maxParts int,
|
||||
) (*cont.InitiateUploadResult, error)
|
||||
CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error)
|
||||
AccessURL(ctx context.Context, name string, expire time.Duration) (time.Time, string, error)
|
||||
SetObject(ctx context.Context, info *relation.ObjectModel) error
|
||||
@@ -43,11 +50,21 @@ func (s *s3Database) AuthSign(ctx context.Context, uploadID string, partNumbers
|
||||
return s.s3.AuthSign(ctx, uploadID, partNumbers)
|
||||
}
|
||||
|
||||
func (s *s3Database) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) {
|
||||
func (s *s3Database) InitiateMultipartUpload(
|
||||
ctx context.Context,
|
||||
hash string,
|
||||
size int64,
|
||||
expire time.Duration,
|
||||
maxParts int,
|
||||
) (*cont.InitiateUploadResult, error) {
|
||||
return s.s3.InitiateUpload(ctx, hash, size, expire, maxParts)
|
||||
}
|
||||
|
||||
func (s *s3Database) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) {
|
||||
func (s *s3Database) CompleteMultipartUpload(
|
||||
ctx context.Context,
|
||||
uploadID string,
|
||||
parts []string,
|
||||
) (*cont.UploadResult, error) {
|
||||
return s.s3.CompleteUpload(ctx, uploadID, parts)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user