BatchInsertBlock

This commit is contained in:
withchao
2023-05-25 14:57:15 +08:00
parent b8c3400145
commit 30d8276986
4 changed files with 344 additions and 774 deletions
+168 -158
View File
@@ -26,8 +26,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/protobuf/proto"
)
type CommonMsgDatabase interface {
@@ -148,6 +146,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
return nil
}
num := db.msg.GetSingleGocMsgNum()
num = 100
if msgList[0].Msg != nil {
firstSeq = msgList[0].Msg.Seq
}
@@ -164,13 +163,13 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
err error
)
if msg.Msg != nil {
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msgList[0].Msg)
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msg.Msg)
} else if msg.Revoke != nil {
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msgList[0].Revoke)
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msg.Revoke)
} else if msg.DelList != nil {
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msgList[0].DelList)
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msg.DelList)
} else if msg.ReadList != nil {
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msgList[0].ReadList)
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList)
} else {
return false, errs.ErrArgs.Wrap("msg all field is nil")
}
@@ -199,12 +198,21 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
}
var insert int
for j := i; j < len(msgList); j++ {
seq = firstSeq + int64(j)
if getDocID(seq) != docID {
break
}
insert++
doc.Msg[getIndex(seq)] = *msgList[j]
}
for i, model := range doc.Msg {
if model.DelList == nil {
doc.Msg[i].DelList = []string{}
}
if model.ReadList == nil {
doc.Msg[i].ReadList = []string{}
}
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) {
i--
@@ -220,91 +228,91 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
}
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
num := db.msg.GetSingleGocMsgNum()
currentIndex := currentMaxSeq / num
var blockMsgs []*[]*sdkws.MsgData
for i, data := range msgList {
data.Seq = currentMaxSeq + int64(i+1)
index := data.Seq/num - currentIndex
if i == 0 && index == 1 {
index--
currentIndex++
}
var block *[]*sdkws.MsgData
if len(blockMsgs) == int(index) {
var size int64
if i == 0 {
size = num - data.Seq%num
} else {
temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num
if temp >= num {
size = num
} else {
size = temp % num
}
}
temp := make([]*sdkws.MsgData, 0, size)
block = &temp
blockMsgs = append(blockMsgs, block)
} else {
block = blockMsgs[index]
}
*block = append(*block, msgList[i])
}
create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0)
if !create {
exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex))
if err != nil {
return err
}
create = !exist
}
for i, msgs := range blockMsgs {
docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i))
if create || i != 0 { // 插入
doc := unRelationTb.MsgDocModel{
DocID: docID,
Msg: make([]unRelationTb.MsgInfoModel, num),
}
for i := 0; i < len(doc.Msg); i++ {
doc.Msg[i].ReadList = []string{}
doc.Msg[i].DelList = []string{}
}
for _, msg := range *msgs {
data, err := proto.Marshal(msg)
if err != nil {
return err
}
doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{
SendTime: msg.SendTime,
Msg: data,
ReadList: []string{},
DelList: []string{},
}
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
prome.Inc(prome.MsgInsertMongoFailedCounter)
return utils.Wrap(err, "")
}
prome.Inc(prome.MsgInsertMongoSuccessCounter)
} else { // 修改
for _, msg := range *msgs {
data, err := proto.Marshal(msg)
if err != nil {
return err
}
info := unRelationTb.MsgInfoModel{
SendTime: msg.SendTime,
Msg: data,
}
if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil {
prome.Inc(prome.MsgInsertMongoFailedCounter)
return err
}
prome.Inc(prome.MsgInsertMongoSuccessCounter)
}
}
}
//num := db.msg.GetSingleGocMsgNum()
//currentIndex := currentMaxSeq / num
//var blockMsgs []*[]*sdkws.MsgData
//for i, data := range msgList {
// data.Seq = currentMaxSeq + int64(i+1)
// index := data.Seq/num - currentIndex
// if i == 0 && index == 1 {
// index--
// currentIndex++
// }
// var block *[]*sdkws.MsgData
// if len(blockMsgs) == int(index) {
// var size int64
// if i == 0 {
// size = num - data.Seq%num
// } else {
// temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num
// if temp >= num {
// size = num
// } else {
// size = temp % num
// }
// }
// temp := make([]*sdkws.MsgData, 0, size)
// block = &temp
// blockMsgs = append(blockMsgs, block)
// } else {
// block = blockMsgs[index]
// }
// *block = append(*block, msgList[i])
//}
//create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0)
//if !create {
// exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex))
// if err != nil {
// return err
// }
// create = !exist
//}
//for i, msgs := range blockMsgs {
// docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i))
// if create || i != 0 { // 插入
// doc := unRelationTb.MsgDocModel{
// DocID: docID,
// Msg: make([]unRelationTb.MsgInfoModel, num),
// }
// for i := 0; i < len(doc.Msg); i++ {
// doc.Msg[i].ReadList = []string{}
// doc.Msg[i].DelList = []string{}
// }
// for _, msg := range *msgs {
// data, err := proto.Marshal(msg)
// if err != nil {
// return err
// }
// doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{
// SendTime: msg.SendTime,
// Msg: data,
// ReadList: []string{},
// DelList: []string{},
// }
// }
// if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
// prome.Inc(prome.MsgInsertMongoFailedCounter)
// return utils.Wrap(err, "")
// }
// prome.Inc(prome.MsgInsertMongoSuccessCounter)
// } else { // 修改
// for _, msg := range *msgs {
// data, err := proto.Marshal(msg)
// if err != nil {
// return err
// }
// info := unRelationTb.MsgInfoModel{
// SendTime: msg.SendTime,
// Msg: data,
// }
// if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil {
// prome.Inc(prome.MsgInsertMongoFailedCounter)
// return err
// }
// prome.Inc(prome.MsgInsertMongoSuccessCounter)
// }
// }
//}
return nil
}
@@ -408,10 +416,11 @@ func (db *commonMsgDatabase) GetOldestMsg(ctx context.Context, conversationID st
func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
msgPb = &sdkws.MsgData{}
err = proto.Unmarshal(msgInfo.Msg, msgPb)
if err != nil {
return nil, utils.Wrap(err, "")
}
// todo: unmarshal
//err = proto.Unmarshal(msgInfo.Msg, msgPb)
//if err != nil {
// return nil, utils.Wrap(err, "")
//}
return msgPb, nil
}
@@ -644,69 +653,70 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
// recursion 删除list并且返回设置的最小seq
func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list
msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index)
if err != nil || msgs.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
}
}
// 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs)
if err != nil {
return 0, err
}
return delStruct.getSetMinSeq() + 1, nil
}
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg))
if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
}
if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID)
lastMsgPb := &sdkws.MsgData{}
err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
if err != nil {
log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID)
return 0, utils.Wrap(err, "proto.Unmarshal failed")
}
delStruct.minSeq = lastMsgPb.Seq
} else {
var hasMarkDelFlag bool
for i, msg := range msgs.Msg {
if msg.SendTime != 0 {
msgPb := &sdkws.MsgData{}
err = proto.Unmarshal(msg.Msg, msgPb)
if err != nil {
log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID)
return 0, utils.Wrap(err, "proto.Unmarshal failed")
}
if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
msgPb.Status = constant.MsgDeleted
bytes, _ := proto.Marshal(msgPb)
msg.Msg = bytes
msg.SendTime = 0
hasMarkDelFlag = true
} else {
// 到本条消息不需要删除, minSeq置为这条消息的seq
if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil {
return 0, err
}
if hasMarkDelFlag {
if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil {
return delStruct.getSetMinSeq(), err
}
}
return msgPb.Seq, nil
}
}
}
}
// 继续递归 index+1
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
return seq, err
//msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index)
//if err != nil || msgs.DocID == "" {
// if err != nil {
// if err == unrelation.ErrMsgListNotExist {
// log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
// } else {
// log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
// }
// }
// // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
// err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs)
// if err != nil {
// return 0, err
// }
// return delStruct.getSetMinSeq() + 1, nil
//}
//log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg))
//if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
// log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
//}
//if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
// delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID)
// lastMsgPb := &sdkws.MsgData{}
// err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
// if err != nil {
// log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID)
// return 0, utils.Wrap(err, "proto.Unmarshal failed")
// }
// delStruct.minSeq = lastMsgPb.Seq
//} else {
// var hasMarkDelFlag bool
// for i, msg := range msgs.Msg {
// if msg.SendTime != 0 {
// msgPb := &sdkws.MsgData{}
// err = proto.Unmarshal(msg.Msg, msgPb)
// if err != nil {
// log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID)
// return 0, utils.Wrap(err, "proto.Unmarshal failed")
// }
// if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
// msgPb.Status = constant.MsgDeleted
// bytes, _ := proto.Marshal(msgPb)
// msg.Msg = bytes
// msg.SendTime = 0
// hasMarkDelFlag = true
// } else {
// // 到本条消息不需要删除, minSeq置为这条消息的seq
// if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil {
// return 0, err
// }
// if hasMarkDelFlag {
// if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil {
// return delStruct.getSetMinSeq(), err
// }
// }
// return msgPb.Seq, nil
// }
// }
// }
//}
//// 继续递归 index+1
//seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
//return seq, err
return 0, nil
}
func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) {