merge
This commit is contained in:
Vendored
+4
-4
@@ -232,12 +232,12 @@ func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platf
|
||||
return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err())
|
||||
}
|
||||
|
||||
func (c *msgCache) getMessageCacheKey(sourceID string, seq int64) string {
|
||||
return messageCache + sourceID + "_" + strconv.Itoa(int(seq))
|
||||
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
|
||||
return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
|
||||
}
|
||||
|
||||
func (c *msgCache) allMessageCacheKey(sourceID string) string {
|
||||
return messageCache + sourceID + "_*"
|
||||
func (c *msgCache) allMessageCacheKey(conversationID string) string {
|
||||
return messageCache + conversationID + "_*"
|
||||
}
|
||||
|
||||
func (c *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
|
||||
|
||||
Vendored
+6
-5
@@ -4,10 +4,11 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
@@ -212,12 +213,12 @@ func (c *notificationCache) DeleteTokenByUidPid(ctx context.Context, userID stri
|
||||
return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err())
|
||||
}
|
||||
|
||||
func (c *notificationCache) getMessageCacheKey(sourceID string, seq int64) string {
|
||||
return NotificationMessageCache + sourceID + "_" + strconv.Itoa(int(seq))
|
||||
func (c *notificationCache) getMessageCacheKey(conversationID string, seq int64) string {
|
||||
return NotificationMessageCache + conversationID + "_" + strconv.Itoa(int(seq))
|
||||
}
|
||||
|
||||
func (c *notificationCache) allMessageCacheKey(sourceID string) string {
|
||||
return NotificationMessageCache + sourceID + "_*"
|
||||
func (c *notificationCache) allMessageCacheKey(conversationID string) string {
|
||||
return NotificationMessageCache + conversationID + "_*"
|
||||
}
|
||||
|
||||
func (c *notificationCache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
|
||||
|
||||
@@ -81,9 +81,9 @@ type MsgDatabase interface {
|
||||
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
|
||||
|
||||
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
|
||||
MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData) error
|
||||
MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error
|
||||
MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error
|
||||
MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error
|
||||
}
|
||||
|
||||
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) MsgDatabase {
|
||||
@@ -192,7 +192,7 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.Ms
|
||||
|
||||
func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: conversationID, Messages: messages})
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -207,9 +207,9 @@ func (db *msgDatabase) MsgToPushMQ(ctx context.Context, conversationID string, m
|
||||
return partition, offset, err
|
||||
}
|
||||
|
||||
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, sourceID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: sourceID, MsgData: messages})
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -318,7 +318,7 @@ func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string
|
||||
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
|
||||
}
|
||||
|
||||
func (db *msgDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, sourceID string, msgs []*sdkws.MsgData) (int64, error) {
|
||||
func (db *msgDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
@@ -336,7 +336,7 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID
|
||||
for _, m := range msgList {
|
||||
currentMaxSeq++
|
||||
m.Seq = currentMaxSeq
|
||||
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", sourceID, "seq: ", currentMaxSeq)
|
||||
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", conversationID, "seq: ", currentMaxSeq)
|
||||
}
|
||||
//log.Debug(operationID, "SetMessageToCache ", conversationID, len(msgList))
|
||||
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgList)
|
||||
@@ -346,7 +346,7 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID
|
||||
} else {
|
||||
prome.Inc(prome.MsgInsertRedisSuccessCounter)
|
||||
}
|
||||
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
|
||||
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, conversationID, len(msgList))
|
||||
if msgList[0].SessionType == constant.SuperGroupChatType {
|
||||
err = db.cache.SetGroupMaxSeq(ctx, conversationID, currentMaxSeq)
|
||||
} else {
|
||||
|
||||
@@ -30,13 +30,13 @@ import (
|
||||
|
||||
type NotificationDatabase interface {
|
||||
// 批量插入消息
|
||||
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error
|
||||
BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error
|
||||
// 刪除redis中消息缓存
|
||||
DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*sdkws.MsgData) error
|
||||
DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error
|
||||
// incrSeq然后批量插入缓存
|
||||
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*sdkws.MsgData) (int64, error)
|
||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int64, error)
|
||||
// incrSeq通知seq然后批量插入缓存
|
||||
NotificationBatchInsertChat2Cache(ctx context.Context, sourceID string, msgs []*sdkws.MsgData) (int64, error)
|
||||
NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error)
|
||||
// 删除消息 返回不存在的seqList
|
||||
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
|
||||
// 获取群ID或者UserID最新一条在mongo里面的消息
|
||||
@@ -66,12 +66,12 @@ type NotificationDatabase interface {
|
||||
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
|
||||
|
||||
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
|
||||
GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error)
|
||||
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||
GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error)
|
||||
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
|
||||
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
|
||||
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
|
||||
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||
DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
||||
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
|
||||
@@ -80,9 +80,9 @@ type NotificationDatabase interface {
|
||||
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
|
||||
|
||||
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
|
||||
MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData) error
|
||||
MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error
|
||||
MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error
|
||||
MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error
|
||||
}
|
||||
|
||||
func NewNotificationDatabase(msgDocModel unRelationTb.NotificationDocModelInterface, cacheModel cache.NotificationModel) NotificationDatabase {
|
||||
@@ -141,12 +141,12 @@ func (db *notificationDatabase) DeleteOneMessageKey(ctx context.Context, clientM
|
||||
return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey)
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
|
||||
return db.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
|
||||
func (db *notificationDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
|
||||
return db.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, conversationID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
|
||||
extendMsgSet, err := db.extendMsgDatabase.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
|
||||
func (db *notificationDatabase) GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
|
||||
extendMsgSet, err := db.extendMsgDatabase.GetExtendMsgSet(ctx, conversationID, sessionType, maxMsgUpdateTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -173,8 +173,8 @@ func (db *notificationDatabase) GetExtendMsg(ctx context.Context, sourceID strin
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
|
||||
return db.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
|
||||
func (db *notificationDatabase) DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
|
||||
return db.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, conversationID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
|
||||
@@ -190,9 +190,9 @@ func (db *notificationDatabase) MsgToMQ(ctx context.Context, key string, msg2mq
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData) error {
|
||||
func (db *notificationDatabase) MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages})
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -207,9 +207,9 @@ func (db *notificationDatabase) MsgToPushMQ(ctx context.Context, conversationID
|
||||
return partition, offset, err
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
func (db *notificationDatabase) MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MsgData: messages})
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -230,7 +230,7 @@ func (db *notificationDatabase) GetGroupMinSeq(ctx context.Context, groupID stri
|
||||
return db.cache.GetGroupMinSeq(ctx, groupID)
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
||||
func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
||||
//newTime := utils.GetCurrentTimestampByMill()
|
||||
if int64(len(msgList)) > db.msg.GetsingleGocNotificationNum() {
|
||||
return errors.New("too large")
|
||||
@@ -264,11 +264,11 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID
|
||||
if insertCounter < remain {
|
||||
msgsToMongo = append(msgsToMongo, sMsg)
|
||||
insertCounter++
|
||||
docID = db.msg.GetDocID(sourceID, currentMaxSeq)
|
||||
docID = db.msg.GetDocID(conversationID, currentMaxSeq)
|
||||
//log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
|
||||
} else {
|
||||
msgsToMongoNext = append(msgsToMongoNext, sMsg)
|
||||
docIDNext = db.msg.GetDocID(sourceID, currentMaxSeq)
|
||||
docIDNext = db.msg.GetDocID(conversationID, currentMaxSeq)
|
||||
//log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
|
||||
}
|
||||
}
|
||||
@@ -318,11 +318,11 @@ func (db *notificationDatabase) DeleteMessageFromCache(ctx context.Context, user
|
||||
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, sourceID string, msgs []*sdkws.MsgData) (int64, error) {
|
||||
func (db *notificationDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*sdkws.MsgData) (int64, error) {
|
||||
func (db *notificationDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int64, error) {
|
||||
//newTime := utils.GetCurrentTimestampByMill()
|
||||
lenList := len(msgList)
|
||||
if int64(lenList) > db.msg.GetsingleGocNotificationNum() {
|
||||
@@ -335,11 +335,11 @@ func (db *notificationDatabase) BatchInsertChat2Cache(ctx context.Context, sourc
|
||||
var currentMaxSeq int64
|
||||
var err error
|
||||
if msgList[0].SessionType == constant.SuperGroupChatType {
|
||||
currentMaxSeq, err = db.cache.GetGroupMaxSeq(ctx, sourceID)
|
||||
//log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
|
||||
currentMaxSeq, err = db.cache.GetGroupMaxSeq(ctx, conversationID)
|
||||
//log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", conversationID, err)
|
||||
} else {
|
||||
currentMaxSeq, err = db.cache.GetUserMaxSeq(ctx, sourceID)
|
||||
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
|
||||
currentMaxSeq, err = db.cache.GetUserMaxSeq(ctx, conversationID)
|
||||
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", conversationID, err)
|
||||
}
|
||||
if err != nil && err != redis.Nil {
|
||||
prome.Inc(prome.SeqGetFailedCounter)
|
||||
@@ -350,21 +350,21 @@ func (db *notificationDatabase) BatchInsertChat2Cache(ctx context.Context, sourc
|
||||
for _, m := range msgList {
|
||||
currentMaxSeq++
|
||||
m.Seq = currentMaxSeq
|
||||
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", sourceID, "seq: ", currentMaxSeq)
|
||||
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", conversationID, "seq: ", currentMaxSeq)
|
||||
}
|
||||
//log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList))
|
||||
failedNum, err := db.cache.SetMessageToCache(ctx, sourceID, msgList)
|
||||
//log.Debug(operationID, "SetMessageToCache ", conversationID, len(msgList))
|
||||
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgList)
|
||||
if err != nil {
|
||||
prome.Add(prome.MsgInsertRedisFailedCounter, failedNum)
|
||||
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID)
|
||||
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), conversationID)
|
||||
} else {
|
||||
prome.Inc(prome.MsgInsertRedisSuccessCounter)
|
||||
}
|
||||
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
|
||||
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, conversationID, len(msgList))
|
||||
if msgList[0].SessionType == constant.SuperGroupChatType {
|
||||
err = db.cache.SetGroupMaxSeq(ctx, sourceID, currentMaxSeq)
|
||||
err = db.cache.SetGroupMaxSeq(ctx, conversationID, currentMaxSeq)
|
||||
} else {
|
||||
err = db.cache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
|
||||
err = db.cache.SetUserMaxSeq(ctx, conversationID, currentMaxSeq)
|
||||
}
|
||||
if err != nil {
|
||||
prome.Inc(prome.SeqSetFailedCounter)
|
||||
@@ -439,16 +439,16 @@ func (db *notificationDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context
|
||||
return seqMsgs, indexes, unExistSeqs, nil
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
||||
msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, sourceID)
|
||||
func (db *notificationDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
|
||||
msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db.unmarshalMsg(msgInfo)
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
||||
msgInfo, err := db.msgDocDatabase.GetOldestMsg(ctx, sourceID)
|
||||
func (db *notificationDatabase) GetOldestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
|
||||
msgInfo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -464,10 +464,10 @@ func (db *notificationDatabase) unmarshalMsg(msgInfo *unRelationTb.NotificationI
|
||||
return msgPb, nil
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsgs []*sdkws.MsgData, err error) {
|
||||
func (db *notificationDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64, diffusionType int) (seqMsgs []*sdkws.MsgData, err error) {
|
||||
var hasSeqs []int64
|
||||
singleCount := 0
|
||||
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
|
||||
m := db.msg.GetDocIDSeqsMap(conversationID, seqs)
|
||||
for docID, value := range m {
|
||||
doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
|
||||
if err != nil {
|
||||
@@ -498,7 +498,7 @@ func (db *notificationDatabase) getMsgBySeqs(ctx context.Context, sourceID strin
|
||||
if diffusionType == constant.WriteDiffusion {
|
||||
exceptionMsg = db.msg.GenExceptionMessageBySeqs(diff)
|
||||
} else if diffusionType == constant.ReadDiffusion {
|
||||
exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, sourceID)
|
||||
exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, conversationID)
|
||||
}
|
||||
seqMsgs = append(seqMsgs, exceptionMsg...)
|
||||
}
|
||||
@@ -610,13 +610,13 @@ func (d *delNotificationRecursionStruct) getSetMinSeq() int64 {
|
||||
// seq 70
|
||||
// set minSeq 21
|
||||
// recursion 删除list并且返回设置的最小seq
|
||||
func (db *notificationDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delNotificationRecursionStruct, remainTime int64) (int64, error) {
|
||||
func (db *notificationDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delNotificationRecursionStruct, remainTime int64) (int64, error) {
|
||||
// find from oldest list
|
||||
msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, sourceID, index)
|
||||
msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index)
|
||||
if err != nil || msgs.DocID == "" {
|
||||
if err != nil {
|
||||
if err == unrelation.ErrMsgListNotExist {
|
||||
log.NewDebug(mcontext.GetOperationID(ctx), utils.GetSelfFuncName(), "ID:", sourceID, "index:", index, err.Error())
|
||||
log.NewDebug(mcontext.GetOperationID(ctx), utils.GetSelfFuncName(), "ID:", conversationID, "index:", index, err.Error())
|
||||
} else {
|
||||
//log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID)
|
||||
}
|
||||
@@ -628,7 +628,7 @@ func (db *notificationDatabase) deleteMsgRecursion(ctx context.Context, sourceID
|
||||
}
|
||||
return delStruct.getSetMinSeq() + 1, nil
|
||||
}
|
||||
//log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
|
||||
//log.NewDebug(operationID, "ID:", conversationID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
|
||||
if int64(len(msgs.Msg)) > db.msg.GetsingleGocNotificationNum() {
|
||||
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
|
||||
}
|
||||
@@ -671,7 +671,7 @@ func (db *notificationDatabase) deleteMsgRecursion(ctx context.Context, sourceID
|
||||
}
|
||||
}
|
||||
// 继续递归 index+1
|
||||
seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime)
|
||||
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
|
||||
return seq, utils.Wrap(err, "deleteMsg failed")
|
||||
}
|
||||
|
||||
@@ -704,8 +704,8 @@ func (db *notificationDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx contex
|
||||
return
|
||||
}
|
||||
|
||||
func (db *notificationDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
||||
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, sourceID)
|
||||
func (db *notificationDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
||||
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
@@ -714,7 +714,7 @@ func (db *notificationDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID
|
||||
return 0, 0, err
|
||||
}
|
||||
minSeqMongo = msgPb.Seq
|
||||
newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, sourceID)
|
||||
newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
@@ -2,10 +2,11 @@ package unrelation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -30,10 +31,10 @@ type MsgDocModelInterface interface {
|
||||
Create(ctx context.Context, model *MsgDocModel) error
|
||||
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
|
||||
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
|
||||
GetNewestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error)
|
||||
GetOldestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error)
|
||||
GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
|
||||
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
|
||||
Delete(ctx context.Context, docIDs []string) error
|
||||
GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*MsgDocModel, error)
|
||||
GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*MsgDocModel, error)
|
||||
UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error
|
||||
}
|
||||
|
||||
@@ -59,9 +60,9 @@ func (m *MsgDocModel) IsFull() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (m MsgDocModel) GetDocID(sourceID string, seq int64) string {
|
||||
func (m MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
||||
seqSuffix := seq / singleGocMsgNum
|
||||
return m.indexGen(sourceID, seqSuffix)
|
||||
return m.indexGen(conversationID, seqSuffix)
|
||||
}
|
||||
|
||||
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
|
||||
@@ -83,10 +84,10 @@ func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string
|
||||
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
|
||||
}
|
||||
|
||||
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
|
||||
func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
|
||||
t := make(map[string][]int64)
|
||||
for i := 0; i < len(seqs); i++ {
|
||||
docID := m.GetDocID(sourceID, seqs[i])
|
||||
docID := m.GetDocID(conversationID, seqs[i])
|
||||
if value, ok := t[docID]; !ok {
|
||||
var temp []int64
|
||||
t[docID] = append(temp, seqs[i])
|
||||
@@ -108,8 +109,8 @@ func (m MsgDocModel) getMsgIndex(seq uint32) int {
|
||||
return int(index)
|
||||
}
|
||||
|
||||
func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string {
|
||||
return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||
func (m MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||
}
|
||||
|
||||
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
|
||||
|
||||
@@ -31,10 +31,10 @@ type NotificationDocModelInterface interface {
|
||||
Create(ctx context.Context, model *NotificationDocModel) error
|
||||
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
|
||||
FindOneByDocID(ctx context.Context, docID string) (*NotificationDocModel, error)
|
||||
GetNewestMsg(ctx context.Context, sourceID string) (*NotificationInfoModel, error)
|
||||
GetOldestMsg(ctx context.Context, sourceID string) (*NotificationInfoModel, error)
|
||||
GetNewestMsg(ctx context.Context, conversationID string) (*NotificationInfoModel, error)
|
||||
GetOldestMsg(ctx context.Context, conversationID string) (*NotificationInfoModel, error)
|
||||
Delete(ctx context.Context, docIDs []string) error
|
||||
GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*NotificationDocModel, error)
|
||||
GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*NotificationDocModel, error)
|
||||
UpdateOneDoc(ctx context.Context, msg *NotificationDocModel) error
|
||||
}
|
||||
|
||||
@@ -60,9 +60,9 @@ func (m *NotificationDocModel) IsFull() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (m NotificationDocModel) GetDocID(sourceID string, seq int64) string {
|
||||
func (m NotificationDocModel) GetDocID(conversationID string, seq int64) string {
|
||||
seqSuffix := seq / singleGocNotificationNum
|
||||
return m.indexGen(sourceID, seqSuffix)
|
||||
return m.indexGen(conversationID, seqSuffix)
|
||||
}
|
||||
|
||||
func (m NotificationDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
|
||||
@@ -109,8 +109,8 @@ func (m NotificationDocModel) getMsgIndex(seq uint32) int {
|
||||
return int(index)
|
||||
}
|
||||
|
||||
func (m NotificationDocModel) indexGen(sourceID string, seqSuffix int64) string {
|
||||
return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||
func (m NotificationDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||
}
|
||||
|
||||
func (NotificationDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
|
||||
|
||||
@@ -56,9 +56,9 @@ func (m *NotificationMongoDriver) FindOneByDocID(ctx context.Context, docID stri
|
||||
return doc, err
|
||||
}
|
||||
|
||||
func (m *NotificationMongoDriver) GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*table.NotificationDocModel, error) {
|
||||
func (m *NotificationMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*table.NotificationDocModel, error) {
|
||||
findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"uid": 1})
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: fmt.Sprintf("^%s:", sourceID)}}, findOpts)
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, findOpts)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "")
|
||||
}
|
||||
@@ -73,9 +73,9 @@ func (m *NotificationMongoDriver) GetMsgsByIndex(ctx context.Context, sourceID s
|
||||
return nil, ErrMsgListNotExist
|
||||
}
|
||||
|
||||
func (m *NotificationMongoDriver) GetNewestMsg(ctx context.Context, sourceID string) (*table.NotificationInfoModel, error) {
|
||||
func (m *NotificationMongoDriver) GetNewestMsg(ctx context.Context, conversationID string) (*table.NotificationInfoModel, error) {
|
||||
var msgDocs []table.NotificationDocModel
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": bson.M{"$regex": fmt.Sprintf("^%s:", sourceID)}}, options.Find().SetLimit(1).SetSort(bson.M{"uid": -1}))
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"uid": -1}))
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "")
|
||||
}
|
||||
@@ -92,9 +92,9 @@ func (m *NotificationMongoDriver) GetNewestMsg(ctx context.Context, sourceID str
|
||||
return nil, ErrMsgNotFound
|
||||
}
|
||||
|
||||
func (m *NotificationMongoDriver) GetOldestMsg(ctx context.Context, sourceID string) (*table.NotificationInfoModel, error) {
|
||||
func (m *NotificationMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.NotificationInfoModel, error) {
|
||||
var msgDocs []table.NotificationDocModel
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": bson.M{"$regex": fmt.Sprintf("^%s:", sourceID)}}, options.Find().SetLimit(1).SetSort(bson.M{"uid": 1}))
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"uid": 1}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user