mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-03 16:45:59 +08:00
pull msg
This commit is contained in:
@@ -41,10 +41,9 @@ type MsgDatabase interface {
|
||||
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
|
||||
// 获取群ID或者UserID最新一条在mongo里面的消息
|
||||
// 通过seqList获取mongo中写扩散消息
|
||||
GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
||||
GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error)
|
||||
// 通过seqList获取大群在 mongo里面的消息
|
||||
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
||||
GetMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
||||
// 删除用户所有消息/redis/mongo然后重置seq
|
||||
CleanUpUserMsg(ctx context.Context, userID string) error
|
||||
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
|
||||
@@ -450,7 +449,7 @@ func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *
|
||||
return msgPb, nil
|
||||
}
|
||||
|
||||
func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64, diffusionType int) (seqMsgs []*sdkws.MsgData, err error) {
|
||||
func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, err error) {
|
||||
var hasSeqs []int64
|
||||
singleCount := 0
|
||||
m := db.msg.GetDocIDSeqsMap(conversationID, seqs)
|
||||
@@ -481,27 +480,45 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string,
|
||||
var diff []int64
|
||||
var exceptionMsg []*sdkws.MsgData
|
||||
diff = utils.Difference(hasSeqs, seqs)
|
||||
if diffusionType == constant.WriteDiffusion {
|
||||
exceptionMsg = db.msg.GenExceptionMessageBySeqs(diff)
|
||||
} else if diffusionType == constant.ReadDiffusion {
|
||||
exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, conversationID)
|
||||
}
|
||||
exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, conversationID)
|
||||
seqMsgs = append(seqMsgs, exceptionMsg...)
|
||||
}
|
||||
return seqMsgs, nil
|
||||
}
|
||||
|
||||
func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
|
||||
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs)
|
||||
func (db *msgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, seqs []int64, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||
m := db.msg.GetDocIDSeqsMap(conversationID, seqs)
|
||||
for {
|
||||
|
||||
if int64(len(seqMsg)) != num {
|
||||
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return seqMsg, nil
|
||||
}
|
||||
|
||||
func (db *msgDatabase) GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||
var seqs []int64
|
||||
for i := end; i > end-num; i-- {
|
||||
if i >= begin {
|
||||
seqs = append(seqs, i)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
||||
log.Error(mcontext.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
|
||||
log.ZError(ctx, "get message from redis exception", err, conversationID, seqs)
|
||||
}
|
||||
}
|
||||
// get from cache or db
|
||||
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
||||
if len(failedSeqs) > 0 {
|
||||
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion)
|
||||
mongoMsgs, err := db.getMsgBySeqsRange(ctx, conversationID, failedSeqs, begin, end, num-int64(len(successMsgs)))
|
||||
if err != nil {
|
||||
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
||||
return nil, err
|
||||
@@ -512,11 +529,7 @@ func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i
|
||||
return successMsgs, nil
|
||||
}
|
||||
|
||||
func (db *msgDatabase) GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||
var seqs []int64
|
||||
for i := begin; i <= end; i++ {
|
||||
seqs = append(seqs, i)
|
||||
}
|
||||
func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
|
||||
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
@@ -526,28 +539,7 @@ func (db *msgDatabase) GetMsgBySeqsRange(ctx context.Context, conversationID str
|
||||
}
|
||||
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
||||
if len(failedSeqs) > 0 {
|
||||
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, seqs, constant.ReadDiffusion)
|
||||
if err != nil {
|
||||
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
||||
return nil, err
|
||||
}
|
||||
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
||||
successMsgs = append(successMsgs, mongoMsgs...)
|
||||
}
|
||||
return successMsgs, nil
|
||||
}
|
||||
|
||||
func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
|
||||
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs)
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
||||
log.Error(mcontext.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
|
||||
}
|
||||
}
|
||||
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
||||
if len(failedSeqs) > 0 {
|
||||
mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion)
|
||||
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, seqs)
|
||||
if err != nil {
|
||||
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
||||
return nil, err
|
||||
@@ -571,7 +563,7 @@ func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context,
|
||||
var delStruct delMsgRecursionStruct
|
||||
minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
|
||||
if err != nil {
|
||||
//log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMsg failed")
|
||||
log.ZError(ctx, "deleteMsgRecursion failed", err)
|
||||
}
|
||||
if minSeq == 0 {
|
||||
return nil
|
||||
|
||||
@@ -17,8 +17,8 @@ const (
|
||||
)
|
||||
|
||||
type MsgDocModel struct {
|
||||
DocID string `bson:"uid"`
|
||||
Msg []MsgInfoModel `bson:"msg"`
|
||||
DocID string `bson:"doc_id"`
|
||||
Msg []MsgInfoModel `bson:"msgs"`
|
||||
}
|
||||
|
||||
type MsgInfoModel struct {
|
||||
@@ -31,6 +31,7 @@ type MsgDocModelInterface interface {
|
||||
Create(ctx context.Context, model *MsgDocModel) error
|
||||
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
|
||||
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
|
||||
GetMsgBySeqIndexIn1Doc(ctx context.Context, conversationID string, begin, end int64) ([]*sdkws.MsgData, error)
|
||||
GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
|
||||
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
|
||||
Delete(ctx context.Context, docIDs []string) error
|
||||
@@ -98,15 +99,15 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st
|
||||
return t
|
||||
}
|
||||
|
||||
func (m MsgDocModel) getMsgIndex(seq uint32) int {
|
||||
func (m MsgDocModel) getMsgIndex(seq int64) int64 {
|
||||
seqSuffix := seq / singleGocMsgNum
|
||||
var index uint32
|
||||
var index int64
|
||||
if seqSuffix == 0 {
|
||||
index = (seq - seqSuffix*singleGocMsgNum) - 1
|
||||
} else {
|
||||
index = seq - seqSuffix*singleGocMsgNum
|
||||
}
|
||||
return int(index)
|
||||
return index
|
||||
}
|
||||
|
||||
func (m MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
@@ -132,3 +133,25 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode
|
||||
_, err := m.MsgCollection.UpdateOne(ctx, bson.M{"uid": msg.DocID}, bson.M{"$set": bson.M{"msg": msg.Msg}})
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, begin, end int64) ([]*sdkws.MsgData, error) {
|
||||
// uid = getSeqUid(uid, seq)
|
||||
// seqIndex := getMsgIndex(seq)
|
||||
// m.msg.GetSeqDocIDList()
|
||||
result, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": docID, "msg": bson.M{"$slice": []int{seqIndex, 1}}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var msgInfos []table.MsgInfoModel
|
||||
if err := result.Decode(&msgInfos); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(msgInfos) < 1 {
|
||||
return nil, errs.ErrRecordNotFound.Wrap("mongo GetMsgBySeqIndex failed, len is 0")
|
||||
}
|
||||
var msg sdkws.MsgData
|
||||
if err := proto.Unmarshal(msgInfos[0].Msg, &msg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user