mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-01 07:35:58 +08:00
BatchInsertChat2DB
This commit is contained in:
@@ -140,76 +140,85 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
||||
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
|
||||
return errors.New("too large")
|
||||
}
|
||||
var remain int64
|
||||
blk0 := db.msg.GetSingleGocMsgNum() - 1
|
||||
//currentMaxSeq 4998
|
||||
if currentMaxSeq < db.msg.GetSingleGocMsgNum() {
|
||||
remain = blk0 - currentMaxSeq //1
|
||||
} else {
|
||||
excludeBlk0 := currentMaxSeq - blk0 //=1
|
||||
//(5000-1)%5000 == 4999
|
||||
remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum()
|
||||
}
|
||||
//remain=1
|
||||
var insertCounter int64
|
||||
msgsToMongo := make([]unRelationTb.MsgInfoModel, 0)
|
||||
msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0)
|
||||
docID := ""
|
||||
docIDNext := ""
|
||||
var err error
|
||||
for _, m := range msgList {
|
||||
currentMaxSeq++
|
||||
sMsg := unRelationTb.MsgInfoModel{}
|
||||
sMsg.SendTime = m.SendTime
|
||||
m.Seq = currentMaxSeq
|
||||
if sMsg.Msg, err = proto.Marshal(m); err != nil {
|
||||
return utils.Wrap(err, "")
|
||||
num := db.msg.GetSingleGocMsgNum()
|
||||
currentIndex := currentMaxSeq / num
|
||||
var blockMsgs []*[]*sdkws.MsgData
|
||||
for i, data := range msgList {
|
||||
data.Seq = currentMaxSeq + int64(i+1)
|
||||
index := data.Seq/num - currentIndex
|
||||
if i == 0 && index == 1 {
|
||||
index--
|
||||
currentIndex++
|
||||
}
|
||||
if insertCounter < remain {
|
||||
msgsToMongo = append(msgsToMongo, sMsg)
|
||||
insertCounter++
|
||||
docID = db.msg.GetDocID(conversationID, currentMaxSeq)
|
||||
var block *[]*sdkws.MsgData
|
||||
if len(blockMsgs) == int(index) {
|
||||
var size int64
|
||||
if i == 0 {
|
||||
size = num - data.Seq%num
|
||||
} else {
|
||||
temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num
|
||||
if temp >= num {
|
||||
size = num
|
||||
} else {
|
||||
size = temp % num
|
||||
}
|
||||
}
|
||||
temp := make([]*sdkws.MsgData, 0, size)
|
||||
block = &temp
|
||||
blockMsgs = append(blockMsgs, block)
|
||||
} else {
|
||||
msgsToMongoNext = append(msgsToMongoNext, sMsg)
|
||||
docIDNext = db.msg.GetDocID(conversationID, currentMaxSeq)
|
||||
block = blockMsgs[index]
|
||||
}
|
||||
*block = append(*block, msgList[i])
|
||||
}
|
||||
|
||||
if docID != "" {
|
||||
err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo)
|
||||
create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0)
|
||||
if !create {
|
||||
exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex))
|
||||
if err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
doc := &unRelationTb.MsgDocModel{}
|
||||
doc.DocID = docID
|
||||
doc.Msg = msgsToMongo
|
||||
if err = db.msgDocDatabase.Create(ctx, doc); err != nil {
|
||||
return err
|
||||
}
|
||||
create = !exist
|
||||
}
|
||||
for i, msgs := range blockMsgs {
|
||||
docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i))
|
||||
if create || i != 0 { // 插入
|
||||
doc := unRelationTb.MsgDocModel{
|
||||
DocID: docID,
|
||||
Msg: make([]unRelationTb.MsgInfoModel, num),
|
||||
}
|
||||
for _, msg := range *msgs {
|
||||
data, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{
|
||||
SendTime: msg.SendTime,
|
||||
Msg: data,
|
||||
}
|
||||
}
|
||||
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
|
||||
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
||||
} else { // 修改
|
||||
for _, msg := range *msgs {
|
||||
data, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
info := unRelationTb.MsgInfoModel{
|
||||
SendTime: msg.SendTime,
|
||||
Msg: data,
|
||||
}
|
||||
if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil {
|
||||
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
||||
return err
|
||||
}
|
||||
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
||||
} else {
|
||||
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
log.ZDebug(ctx, "PushMsgsToDoc success", "docID", docID, "len", len(msgsToMongo))
|
||||
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
||||
}
|
||||
}
|
||||
if docIDNext != "" {
|
||||
nextDoc := &unRelationTb.MsgDocModel{}
|
||||
nextDoc.DocID = docIDNext
|
||||
nextDoc.Msg = msgsToMongoNext
|
||||
log.ZDebug(ctx, "create next doc", "docIDNext", docIDNext, "len", len(nextDoc.Msg))
|
||||
if err = db.msgDocDatabase.Create(ctx, nextDoc); err != nil {
|
||||
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user