This commit is contained in:
wangchuxiao
2023-05-05 21:30:32 +08:00
parent b00bd8bc35
commit 72aa6d5eab
18 changed files with 666 additions and 437 deletions
+108 -122
View File
@@ -38,33 +38,26 @@ type MsgDatabase interface {
// incrSeq通知seq然后批量插入缓存
NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error)
// 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 获取群ID或者UserID最新一条在mongo里面的消息
DelMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 通过seqList获取mongo中写扩散消息
GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在 mongo里面的消息
GetMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
GetMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/redis/mongo然后重置seq
CleanUpUserMsg(ctx context.Context, userID string) error
CleanUpConversationMsgs(ctx context.Context, conversationID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error
// 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// 获取用户 seq mongo和redis
GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// 获取群 seq mongo和redis
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
// DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error
// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// 获取会话 seq mongo和redis
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// msg modify
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error)
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
@@ -72,13 +65,20 @@ type MsgDatabase interface {
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
// msg send status
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetUserMinSeq(ctx context.Context, conversationIDs []string) (map[string]int64, error)
SetUserMinSeq(ctx context.Context, seqs map[string]int64) (err error)
// to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error
MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
@@ -213,24 +213,8 @@ func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, conversationID string,
}
return nil
}
func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return db.cache.GetUserMaxSeq(ctx, userID)
}
func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
return db.cache.GetUserMinSeq(ctx, userID)
}
func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return db.cache.GetGroupMaxSeq(ctx, groupID)
}
func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return db.cache.GetGroupMinSeq(ctx, groupID)
}
func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
//newTime := utils.GetCurrentTimestampByMill()
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
return errors.New("too large")
}
@@ -309,21 +293,19 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID st
}
prome.Inc(prome.MsgInsertMongoSuccessCounter)
}
//log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
return nil
}
func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*sdkws.MsgData) error {
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs)
}
func (db *msgDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error) {
return 0, nil
}
func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) (int64, error) {
//newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList)
func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) (int64, error) {
lenList := len(msgs)
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
return 0, errors.New("too large")
}
@@ -332,25 +314,18 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID
}
// judge sessionType to get seq
lastMaxSeq := currentMaxSeq
for _, m := range msgList {
for _, m := range msgs {
currentMaxSeq++
m.Seq = currentMaxSeq
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", conversationID, "seq: ", currentMaxSeq)
}
//log.Debug(operationID, "SetMessageToCache ", conversationID, len(msgList))
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgList)
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
if err != nil {
prome.Add(prome.MsgInsertRedisFailedCounter, failedNum)
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), conversationID)
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else {
prome.Inc(prome.MsgInsertRedisSuccessCounter)
}
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, conversationID, len(msgList))
if msgList[0].SessionType == constant.SuperGroupChatType {
err = db.cache.SetGroupMaxSeq(ctx, conversationID, currentMaxSeq)
} else {
err = db.cache.SetUserMaxSeq(ctx, conversationID, currentMaxSeq)
}
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
if err != nil {
prome.Inc(prome.SeqSetFailedCounter)
} else {
@@ -359,9 +334,9 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID
return lastMaxSeq, utils.Wrap(err, "")
}
func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
sortkeys.Int64s(seqs)
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
docIDSeqsMap := db.msg.GetDocIDSeqsMap(conversationID, seqs)
lock := sync.Mutex{}
var wg sync.WaitGroup
wg.Add(len(docIDSeqsMap))
@@ -456,14 +431,14 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string,
for docID, value := range m {
doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil {
//log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
log.ZError(ctx, "get message from mongo exception", err, "docID", docID)
continue
}
singleCount = 0
for i := 0; i < len(doc.Msg); i++ {
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
if err != nil {
//log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
log.ZError(ctx, "unmarshal message exception", err, "docID", docID, "msg", &doc.Msg[i])
return nil, err
}
if utils.Contain(msgPb.Seq, value...) {
@@ -488,12 +463,23 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string,
func (db *msgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, seqs []int64, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
m := db.msg.GetDocIDSeqsMap(conversationID, seqs)
for {
for int64(len(seqMsg)) != num {
for docID, value := range m {
beginSeq, endSeq := db.msg.GetSeqsBeginEnd(value)
msgs, seqs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq)
if err != nil {
log.ZError(ctx, "GetMsgBySeqIndexIn1Doc error", err, "docID", docID, "beginSeq", beginSeq, "endSeq", endSeq)
continue
}
var newMsgs []*sdkws.MsgData
for _, msg := range msgs {
if msg.Status != constant.MsgDeleted {
newMsgs = append(newMsgs, msg)
}
}
if int64(len(newMsgs)) != num {
if int64(len(seqMsg)) != num {
} else {
break
}
}
}
return seqMsg, nil
@@ -550,53 +536,52 @@ func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, conversationID string,
return successMsgs, nil
}
func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0)
func (db *msgDatabase) CleanUpConversationMsgs(ctx context.Context, conversationID string) error {
err := db.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, 0)
if err != nil {
return err
}
err = db.cache.CleanUpOneUserAllMsg(ctx, userID)
return utils.Wrap(err, "")
return db.cache.CleanUpOneUserAllMsg(ctx, conversationID)
}
func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil {
log.ZError(ctx, "deleteMsgRecursion failed", err)
}
if minSeq == 0 {
return nil
}
//log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq)
for _, userID := range userIDs {
userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
if err != nil && err != redis.Nil {
//log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error())
continue
}
if userMinSeq > minSeq {
err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
} else {
err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq)
}
}
return nil
}
// func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
// var delStruct delMsgRecursionStruct
// minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
// if err != nil {
// log.ZError(ctx, "deleteMsgRecursion failed", err)
// }
// if minSeq == 0 {
// return nil
// }
// //log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq)
// for _, userID := range userIDs {
// userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
// if err != nil && err != redis.Nil {
// //log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error())
// continue
// }
// if userMinSeq > minSeq {
// err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
// } else {
// err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
// }
// if err != nil {
// //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq)
// }
// }
// return nil
// }
func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
func (db *msgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error {
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime)
minSeq, err := db.deleteMsgRecursion(ctx, conversationID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil {
return utils.Wrap(err, "")
}
if minSeq == 0 {
return nil
}
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
return db.cache.SetUserMinSeq(ctx, map[string]int64{conversationID: minSeq})
}
// this is struct for recursion
@@ -678,35 +663,23 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st
return seq, utils.Wrap(err, "deleteMsg failed")
}
func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID)
func (db *msgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID)
if err != nil {
return 0, 0, 0, 0, err
}
// from cache
minSeqCache, err = db.cache.GetUserMinSeq(ctx, userID)
minSeqCache, err = db.cache.GetUserMinSeq(ctx, conversationID)
if err != nil {
return 0, 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, userID)
maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, conversationID)
if err != nil {
return 0, 0, 0, 0, err
}
return
}
func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetGroupMaxSeq(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
return
}
func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
if err != nil {
@@ -729,14 +702,27 @@ func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID str
return
}
func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
func (db *msgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return db.cache.SetMaxSeq(ctx, conversationID, maxSeq)
}
func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
func (db *msgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetMaxSeqs(ctx, conversationIDs)
}
func (db *msgDatabase) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
return db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
func (db *msgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return db.cache.GetMaxSeq(ctx, conversationID)
}
func (db *msgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return db.cache.SetMinSeq(ctx, conversationID, minSeq)
}
func (db *msgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetMinSeqs(ctx, conversationIDs)
}
func (db *msgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return db.cache.GetMinSeq(ctx, conversationID)
}
func (db *msgDatabase) GetUserMinSeq(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetUserMinSeq(ctx, conversationIDs)
}
func (db *msgDatabase) SetUserMinSeq(ctx context.Context, seqs map[string]int64) (err error) {
return db.cache.SetUserMinSeq(ctx, seqs)
}