This commit is contained in:
wangchuxiao
2023-05-26 16:22:03 +08:00
parent 501e03e57e
commit 60670c5012
10 changed files with 124 additions and 180 deletions
+73 -91
View File
@@ -3,11 +3,9 @@ package controller
import (
"fmt"
"sort"
"strconv"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
@@ -30,8 +28,6 @@ import (
const (
updateKeyMsg = iota
updateKeyRevoke
updateKeyDel
updateKeyRead
)
type CommonMsgDatabase interface {
@@ -48,9 +44,12 @@ type CommonMsgDatabase interface {
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在 mongo里面的消息
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
// 用户根据seq删除消息
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
// 物理删除消息置空
DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
@@ -63,7 +62,6 @@ type CommonMsgDatabase interface {
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error)
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
@@ -164,10 +162,6 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
}
case updateKeyRevoke:
_, ok = field.(*unRelationTb.RevokeModel)
case updateKeyDel:
_, ok = field.([]string)
case updateKeyRead:
_, ok = field.([]string)
default:
return errs.ErrInternalServer.Wrap("key is invalid")
}
@@ -175,30 +169,20 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
return errs.ErrInternalServer.Wrap("field type is invalid")
}
}
getDocID := func(seq int64) string {
return conversationID + ":" + strconv.FormatInt(seq/num, 10)
}
getIndex := func(seq int64) int64 {
return seq % num
}
// 返回值为true表示数据库存在该文档,false表示数据库不存在该文档
updateMsgModel := func(seq int64, i int) (bool, error) {
var (
res *mongo.UpdateResult
err error
)
docID := getDocID(seq)
index := getIndex(seq)
docID := db.msg.GetDocID(conversationID, seq)
index := db.msg.GetMsgIndex(seq)
field := fields[i]
switch key {
case updateKeyMsg:
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
case updateKeyRevoke:
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
case updateKeyDel:
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", field)
case updateKeyRead:
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", field)
}
if err != nil {
return false, err
@@ -218,33 +202,25 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
}
}
doc := unRelationTb.MsgDocModel{
DocID: getDocID(seq),
DocID: db.msg.GetDocID(conversationID, seq),
Msg: make([]*unRelationTb.MsgInfoModel, num),
}
var insert int // 插入的数量
for j := i; j < len(fields); j++ {
seq = firstSeq + int64(j)
if getDocID(seq) != doc.DocID {
if db.msg.GetDocID(conversationID, seq) != doc.DocID {
break
}
insert++
switch key {
case updateKeyMsg:
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
doc.Msg[db.msg.GetMsgIndex(seq)] = &unRelationTb.MsgInfoModel{
Msg: fields[j].(*unRelationTb.MsgDataModel),
}
case updateKeyRevoke:
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
doc.Msg[db.msg.GetMsgIndex(seq)] = &unRelationTb.MsgInfoModel{
Revoke: fields[j].(*unRelationTb.RevokeModel),
}
case updateKeyDel:
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
DelList: fields[j].([]string),
}
case updateKeyRead:
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
ReadList: fields[j].([]string),
}
}
}
for i, model := range doc.Msg {
@@ -255,9 +231,6 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
if model.DelList == nil {
doc.Msg[i].DelList = []string{}
}
// if model.ReadList == nil {
// doc.Msg[i].ReadList = []string{}
// }
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) {
@@ -381,43 +354,43 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st
return totalMsgs, nil
}
func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*unRelationTb.MsgDataModel, err error) {
var reFetchSeqs []int64
if delNums > 0 {
newBeginSeq := rangeBegin - delNums
if newBeginSeq >= begin {
newEndSeq := rangeBegin - 1
for i := newBeginSeq; i <= newEndSeq; i++ {
reFetchSeqs = append(reFetchSeqs, i)
}
}
}
if len(reFetchSeqs) == 0 {
return
}
if len(reFetchSeqs) > 0 {
// m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs)
// for docID, seqs := range m {
// msgs, _, err := db.findMsgInfoBySeq(ctx, docID, seqs)
// if err != nil {
// return nil, err
// }
// for _, msg := range msgs {
// if msg.Status != constant.MsgDeleted {
// seqMsgs = append(seqMsgs, msg)
// }
// }
// }
}
if len(seqMsgs) < int(delNums) {
seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin)
if err != nil {
return seqMsgs, err
}
seqMsgs = append(seqMsgs, seqMsgs2...)
}
return seqMsgs, nil
}
// func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*unRelationTb.MsgDataModel, err error) {
// var reFetchSeqs []int64
// if delNums > 0 {
// newBeginSeq := rangeBegin - delNums
// if newBeginSeq >= begin {
// newEndSeq := rangeBegin - 1
// for i := newBeginSeq; i <= newEndSeq; i++ {
// reFetchSeqs = append(reFetchSeqs, i)
// }
// }
// }
// if len(reFetchSeqs) == 0 {
// return
// }
// if len(reFetchSeqs) > 0 {
// m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs)
// for docID, seqs := range m {
// msgs, _, err := db.findMsgInfoBySeq(ctx, docID, seqs)
// if err != nil {
// return nil, err
// }
// for _, msg := range msgs {
// if msg.Status != constant.MsgDeleted {
// seqMsgs = append(seqMsgs, msg)
// }
// }
// }
// }
// if len(seqMsgs) < int(delNums) {
// seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin)
// if err != nil {
// return seqMsgs, err
// }
// seqMsgs = append(seqMsgs, seqMsgs2...)
// }
// return seqMsgs, nil
// }
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) {
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
@@ -463,23 +436,9 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
}
totalNotExistSeqs = append(totalNotExistSeqs, notExistSeqs...)
}
log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs)
log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs, "del seqs", delSeqs)
// 补未找到的消息
seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(totalNotExistSeqs)...)
for _, msg := range seqMsgs {
if msg.Status == constant.MsgDeleted {
delSeqs = append(delSeqs, msg.Seq)
}
}
if len(delSeqs) > 0 {
// msgs, err := db.refetchDelSeqsMsgs(ctx, conversationID, int64(len(delSeqs)), allSeqs[0], begin)
// if err != nil {
// log.ZWarn(ctx, "refetchDelSeqsMsgs", err, "delSeqs", delSeqs, "begin", begin)
// }
// for _, msg := range msgs {
// seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg))
// }
}
// sort by seq
if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 {
sort.Sort(utils.MsgBySeq(seqMsgs))
}
@@ -644,6 +603,30 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
return seq, err
}
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
var indexes []int
for _, seq := range seqs {
indexes = append(indexes, int(db.msg.GetMsgIndex(seq)))
}
if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil {
return err
}
}
return nil
}
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
for _, seq := range seqs {
if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {
return err
}
}
}
return nil
}
func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) {
for _, conversationID := range conversationIDs {
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
@@ -709,7 +692,6 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
if err != nil {
return
}
// from cache
minSeqCache, err = db.cache.GetMinSeq(ctx, conversationID)
if err != nil {
return