Files
open-im-server/pkg/common/storage/controller/msg.go
T

842 lines
31 KiB
Go
Raw Normal View History

2023-07-13 17:07:42 +08:00
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2023-06-30 09:45:02 +08:00
package controller
import (
"context"
"encoding/json"
2024-04-28 11:47:05 +08:00
"errors"
2024-12-26 17:49:05 +08:00
"github.com/openimsdk/tools/utils/jsonutil"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
2024-04-19 22:23:08 +08:00
"github.com/openimsdk/protocol/constant"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/datautil"
2023-06-30 09:45:02 +08:00
)
const (
updateKeyMsg = iota
updateKeyRevoke
)
// CommonMsgDatabase defines the interface for message database operations.
2023-06-30 09:45:02 +08:00
type CommonMsgDatabase interface {
// RevokeMsg revokes a message in a conversation.
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error
// MarkSingleChatMsgsAsRead marks messages as read for a single chat by sequence numbers.
2023-06-30 09:45:02 +08:00
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
// GetMsgBySeqsRange retrieves messages from MongoDB by a range of sequence numbers.
2023-07-13 16:51:52 +08:00
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
// GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers.
2023-07-13 16:51:52 +08:00
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
2024-12-26 17:49:05 +08:00
GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error)
// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers.
2023-06-30 09:45:02 +08:00
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers.
2023-06-30 09:45:02 +08:00
DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error
2024-07-16 10:46:21 +08:00
//SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
2023-06-30 09:45:02 +08:00
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
2024-12-20 15:45:37 +08:00
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
2023-06-30 09:45:02 +08:00
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error)
GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error)
GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error)
2023-06-30 09:45:02 +08:00
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
2024-09-13 09:51:35 +08:00
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*pbmsg.SearchedMsgData, err error)
FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error)
2023-06-30 09:45:02 +08:00
// to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
2024-12-20 15:45:37 +08:00
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
2024-12-20 15:45:37 +08:00
DeleteDoc(ctx context.Context, docID string) error
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
2025-01-02 17:15:38 +08:00
GetLastMessage(ctx context.Context, conversationIDS []string, userID string) (map[string]*sdkws.MsgData, error)
2023-06-30 09:45:02 +08:00
}
2024-07-16 10:46:21 +08:00
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
2024-04-19 22:23:08 +08:00
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil {
return nil, err
}
2024-04-19 22:23:08 +08:00
producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToRedisTopic)
2024-02-02 10:11:13 +08:00
if err != nil {
return nil, err
}
2023-06-30 09:45:02 +08:00
return &commonMsgDatabase{
2023-07-12 15:31:24 +08:00
msgDocDatabase: msgDocModel,
2024-12-26 17:49:05 +08:00
msgCache: msg,
2024-07-16 10:46:21 +08:00
seqUser: seqUser,
seqConversation: seqConversation,
2024-02-02 10:11:13 +08:00
producer: producerToRedis,
}, nil
2023-06-30 09:45:02 +08:00
}
type commonMsgDatabase struct {
msgDocDatabase database.Msg
msgTable model.MsgDocModel
2024-12-26 17:49:05 +08:00
msgCache cache.MsgCache
2024-07-16 10:46:21 +08:00
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producer *kafka.Producer
2023-06-30 09:45:02 +08:00
}
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
_, _, err := db.producer.SendMessage(ctx, key, msg2mq)
return err
}
2024-12-26 17:49:05 +08:00
func (db *commonMsgDatabase) batchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
2023-10-24 20:28:22 +08:00
if len(fields) == 0 {
return nil
}
2024-04-19 22:23:08 +08:00
num := db.msgTable.GetSingleGocMsgNum()
// num = 100
for i, field := range fields { // Check the type of the field
2023-06-30 09:45:02 +08:00
var ok bool
switch key {
case updateKeyMsg:
var msg *model.MsgDataModel
msg, ok = field.(*model.MsgDataModel)
2023-06-30 09:45:02 +08:00
if msg != nil && msg.Seq != firstSeq+int64(i) {
2024-04-19 22:23:08 +08:00
return errs.ErrInternalServer.WrapMsg("seq is invalid")
2023-06-30 09:45:02 +08:00
}
case updateKeyRevoke:
_, ok = field.(*model.RevokeModel)
2023-06-30 09:45:02 +08:00
default:
2024-04-19 22:23:08 +08:00
return errs.ErrInternalServer.WrapMsg("key is invalid")
2023-06-30 09:45:02 +08:00
}
if !ok {
2024-04-19 22:23:08 +08:00
return errs.ErrInternalServer.WrapMsg("field type is invalid")
2023-06-30 09:45:02 +08:00
}
}
// Returns true if the document exists in the database, false if the document does not exist in the database
2023-10-24 20:28:22 +08:00
updateMsgModel := func(seq int64, i int) (bool, error) {
var (
res *mongo.UpdateResult
err error
)
2024-04-19 22:23:08 +08:00
docID := db.msgTable.GetDocID(conversationID, seq)
index := db.msgTable.GetMsgIndex(seq)
2023-10-24 20:28:22 +08:00
field := fields[i]
2023-06-30 09:45:02 +08:00
switch key {
case updateKeyMsg:
2023-10-24 20:28:22 +08:00
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
2023-06-30 09:45:02 +08:00
case updateKeyRevoke:
2023-10-24 20:28:22 +08:00
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
2023-10-23 16:24:55 +08:00
}
2023-10-24 20:28:22 +08:00
if err != nil {
return false, err
2023-06-30 09:45:02 +08:00
}
2023-10-24 20:28:22 +08:00
return res.MatchedCount > 0, nil
2023-06-30 09:45:02 +08:00
}
tryUpdate := true
for i := 0; i < len(fields); i++ {
seq := firstSeq + int64(i) // Current sequence number
2023-06-30 09:45:02 +08:00
if tryUpdate {
2023-10-24 20:28:22 +08:00
matched, err := updateMsgModel(seq, i)
2023-06-30 09:45:02 +08:00
if err != nil {
return err
}
if matched {
continue // The current data has been updated, skip the current data
2023-10-24 20:28:22 +08:00
}
}
doc := model.MsgDocModel{
2024-04-19 22:23:08 +08:00
DocID: db.msgTable.GetDocID(conversationID, seq),
Msg: make([]*model.MsgInfoModel, num),
2023-10-24 20:28:22 +08:00
}
var insert int // Inserted data number
2023-10-24 20:28:22 +08:00
for j := i; j < len(fields); j++ {
seq = firstSeq + int64(j)
2024-04-19 22:23:08 +08:00
if db.msgTable.GetDocID(conversationID, seq) != doc.DocID {
2023-10-24 20:28:22 +08:00
break
}
insert++
switch key {
case updateKeyMsg:
doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
Msg: fields[j].(*model.MsgDataModel),
2023-10-24 20:28:22 +08:00
}
case updateKeyRevoke:
doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
Revoke: fields[j].(*model.RevokeModel),
2023-10-24 20:28:22 +08:00
}
}
}
for i, msgInfo := range doc.Msg {
if msgInfo == nil {
msgInfo = &model.MsgInfoModel{}
doc.Msg[i] = msgInfo
2023-10-24 20:28:22 +08:00
}
if msgInfo.DelList == nil {
2023-10-24 20:28:22 +08:00
doc.Msg[i].DelList = []string{}
2023-06-30 09:45:02 +08:00
}
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) {
i-- // already inserted
tryUpdate = true // next block use update mode
2023-06-30 09:45:02 +08:00
continue
}
return err
}
tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially
i += insert - 1 // Skip the inserted data
2023-06-30 09:45:02 +08:00
}
2024-12-26 17:49:05 +08:00
2023-06-30 09:45:02 +08:00
return nil
}
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error {
2024-12-26 17:49:05 +08:00
if err := db.batchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq); err != nil {
return err
}
return db.msgCache.DelMessageBySeqs(ctx, conversationID, []int64{seq})
2023-06-30 09:45:02 +08:00
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error {
2024-04-19 22:23:08 +08:00
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, totalSeqs) {
2023-06-30 09:45:02 +08:00
var indexes []int64
for _, seq := range seqs {
2024-04-19 22:23:08 +08:00
indexes = append(indexes, db.msgTable.GetMsgIndex(seq))
2023-06-30 09:45:02 +08:00
}
log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes)
if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil {
log.ZError(ctx, "MarkSingleChatMsgsAsRead", err, "userID", userID, "docID", docID, "indexes", indexes)
return err
}
}
2024-12-26 17:49:05 +08:00
return db.msgCache.DelMessageBySeqs(ctx, conversationID, totalSeqs)
2023-06-30 09:45:02 +08:00
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
2024-12-26 17:49:05 +08:00
return db.GetMessageBySeqs(ctx, conversationID, userID, seqs)
2023-06-30 09:45:02 +08:00
}
2024-12-26 17:49:05 +08:00
func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) {
2024-12-26 17:49:05 +08:00
if msg == nil || msg.Msg == nil {
return
}
if msg.IsRead {
msg.Msg.IsRead = true
}
if msg.Msg.ContentType != constant.Quote {
return
}
if msg.Msg.Content == "" {
return
}
type MsgData struct {
SendID string `json:"sendID"`
RecvID string `json:"recvID"`
GroupID string `json:"groupID"`
ClientMsgID string `json:"clientMsgID"`
ServerMsgID string `json:"serverMsgID"`
SenderPlatformID int32 `json:"senderPlatformID"`
SenderNickname string `json:"senderNickname"`
SenderFaceURL string `json:"senderFaceURL"`
SessionType int32 `json:"sessionType"`
MsgFrom int32 `json:"msgFrom"`
ContentType int32 `json:"contentType"`
Content string `json:"content"`
Seq int64 `json:"seq"`
SendTime int64 `json:"sendTime"`
CreateTime int64 `json:"createTime"`
Status int32 `json:"status"`
IsRead bool `json:"isRead"`
Options map[string]bool `json:"options,omitempty"`
OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"`
AtUserIDList []string `json:"atUserIDList"`
AttachedInfo string `json:"attachedInfo"`
Ex string `json:"ex"`
KeyVersion int32 `json:"keyVersion"`
DstUserIDs []string `json:"dstUserIDs"`
}
var quoteMsg struct {
Text string `json:"text,omitempty"`
QuoteMessage *MsgData `json:"quoteMessage,omitempty"`
MessageEntityList json.RawMessage `json:"messageEntityList,omitempty"`
}
if err := json.Unmarshal([]byte(msg.Msg.Content), &quoteMsg); err != nil {
log.ZError(ctx, "json.Unmarshal", err)
return
}
if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.Content == "" {
return
}
if quoteMsg.QuoteMessage.Content == "e30=" {
quoteMsg.QuoteMessage.Content = "{}"
data, err := json.Marshal(&quoteMsg)
if err != nil {
return
}
msg.Msg.Content = string(data)
}
if quoteMsg.QuoteMessage.Seq <= 0 && quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification {
return
}
var msgs []*model.MsgInfoModel
if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok {
msgs = v
} else {
if quoteMsg.QuoteMessage.Seq > 0 {
2024-04-19 22:23:08 +08:00
ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msgTable.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq})
if err != nil {
log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq)
return
}
msgs = ms
cache[quoteMsg.QuoteMessage.Seq] = ms
}
}
if len(msgs) != 0 && msgs[0].Msg.ContentType != constant.MsgRevokeNotification {
return
}
quoteMsg.QuoteMessage.ContentType = constant.MsgRevokeNotification
if len(msgs) > 0 {
quoteMsg.QuoteMessage.Content = msgs[0].Msg.Content
} else {
quoteMsg.QuoteMessage.Content = "{}"
}
data, err := json.Marshal(&quoteMsg)
if err != nil {
log.ZError(ctx, "json.Marshal", err)
return
}
msg.Msg.Content = string(data)
}
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*model.MsgInfoModel, err error) {
2023-06-30 09:45:02 +08:00
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
if err != nil {
return nil, err
}
tempCache := make(map[int64][]*model.MsgInfoModel)
2023-06-30 09:45:02 +08:00
for _, msg := range msgs {
db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg)
2023-06-30 09:45:02 +08:00
}
return msgs, err
}
// GetMsgBySeqsRange In the context of group chat, we have the following parameters:
//
// "maxSeq" of a conversation: It represents the maximum value of messages in the group conversation.
// "minSeq" of a conversation (default: 1): It represents the minimum value of messages in the group conversation.
//
// For a user's perspective regarding the group conversation, we have the following parameters:
//
// "userMaxSeq": It represents the user's upper limit for message retrieval in the group. If not set (default: 0),
// it means the upper limit is the same as the conversation's "maxSeq".
// "userMinSeq": It represents the user's starting point for message retrieval in the group. If not set (default: 0),
// it means the starting point is the same as the conversation's "minSeq".
//
// The scenarios for these parameters are as follows:
//
// For users who have been kicked out of the group, "userMaxSeq" can be set as the maximum value they had before
// being kicked out. This limits their ability to retrieve messages up to a certain point.
// For new users joining the group, if they don't need to receive old messages,
// "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group.
// This ensures that their message retrieval starts from the point they joined.
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) {
2024-07-25 20:01:33 +08:00
userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
2024-11-22 17:46:02 +08:00
if err != nil && !errors.Is(err, redis.Nil) {
2023-06-30 09:45:02 +08:00
return 0, 0, nil, err
}
2024-07-16 10:46:21 +08:00
minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil {
2023-06-30 09:45:02 +08:00
return 0, 0, nil, err
}
if userMinSeq > minSeq {
minSeq = userMinSeq
}
2024-04-19 22:23:08 +08:00
// "minSeq" represents the startSeq value that the user can retrieve.
2023-06-30 09:45:02 +08:00
if minSeq > end {
2024-04-19 22:23:08 +08:00
log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end)
2023-06-30 09:45:02 +08:00
return 0, 0, nil, nil
}
2024-07-16 10:46:21 +08:00
maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID)
if err != nil {
2023-06-30 09:45:02 +08:00
return 0, 0, nil, err
}
2023-07-13 16:51:52 +08:00
log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq)
2023-06-30 09:45:02 +08:00
if userMaxSeq != 0 {
if userMaxSeq < maxSeq {
maxSeq = userMaxSeq
}
}
2024-04-19 22:23:08 +08:00
// "maxSeq" represents the endSeq value that the user can retrieve.
2023-06-30 09:45:02 +08:00
if begin < minSeq {
begin = minSeq
}
if end > maxSeq {
end = maxSeq
}
2024-04-19 22:23:08 +08:00
// "begin" and "end" represent the actual startSeq and endSeq values that the user can retrieve.
2023-06-30 09:45:02 +08:00
if end < begin {
2024-04-19 22:23:08 +08:00
return 0, 0, nil, errs.ErrArgs.WrapMsg("seq end < begin")
2023-06-30 09:45:02 +08:00
}
var seqs []int64
if end-begin+1 <= num {
for i := begin; i <= end; i++ {
seqs = append(seqs, i)
}
} else {
for i := end - num + 1; i <= end; i++ {
seqs = append(seqs, i)
2023-06-30 09:45:02 +08:00
}
}
2024-12-26 17:49:05 +08:00
successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, seqs)
if err != nil {
return 0, 0, nil, err
2023-06-30 09:45:02 +08:00
}
return minSeq, maxSeq, successMsgs, nil
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
2024-07-25 20:01:33 +08:00
userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
2024-09-23 12:16:18 +08:00
if err != nil {
return 0, 0, nil, err
2023-06-30 09:45:02 +08:00
}
2024-07-16 10:46:21 +08:00
minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil {
return 0, 0, nil, err
2023-06-30 09:45:02 +08:00
}
2024-07-16 10:46:21 +08:00
maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID)
if err != nil {
return 0, 0, nil, err
2023-06-30 09:45:02 +08:00
}
2024-09-23 12:16:18 +08:00
userMaxSeq, err := db.seqUser.GetUserMaxSeq(ctx, conversationID, userID)
if err != nil {
return 0, 0, nil, err
}
if userMinSeq > minSeq {
2023-06-30 09:45:02 +08:00
minSeq = userMinSeq
}
2024-09-23 12:16:18 +08:00
if userMaxSeq > 0 && userMaxSeq < maxSeq {
maxSeq = userMaxSeq
}
newSeqs := make([]int64, 0, len(seqs))
2023-06-30 09:45:02 +08:00
for _, seq := range seqs {
2024-09-23 12:16:18 +08:00
if seq <= 0 {
continue
}
2023-06-30 09:45:02 +08:00
if seq >= minSeq && seq <= maxSeq {
newSeqs = append(newSeqs, seq)
}
}
2024-12-26 17:49:05 +08:00
successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs)
2023-06-30 09:45:02 +08:00
if err != nil {
2024-12-26 17:49:05 +08:00
return 0, 0, nil, err
2023-06-30 09:45:02 +08:00
}
return minSeq, maxSeq, successMsgs, nil
}
func (db *commonMsgDatabase) GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) {
var endSeq int64
var isEnd bool
userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil {
return false, 0, nil, err
}
minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil {
return false, 0, nil, err
}
maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID)
if err != nil {
return false, 0, nil, err
}
userMaxSeq, err := db.seqUser.GetUserMaxSeq(ctx, conversationID, userID)
if err != nil {
return false, 0, nil, err
}
if userMinSeq > minSeq {
minSeq = userMinSeq
}
if userMaxSeq > 0 && userMaxSeq < maxSeq {
maxSeq = userMaxSeq
}
newSeqs := make([]int64, 0, len(seqs))
for _, seq := range seqs {
if seq <= 0 {
continue
}
// The normal range and can fetch messages
if seq >= minSeq && seq <= maxSeq {
newSeqs = append(newSeqs, seq)
continue
}
// If the requested seq is smaller than the minimum seq and the pull order is descending (pulling older messages)
if seq < minSeq && pullOrder == sdkws.PullOrder_PullOrderDesc {
isEnd = true
endSeq = minSeq
}
// If the requested seq is larger than the maximum seq and the pull order is ascending (pulling newer messages)
if seq > maxSeq && pullOrder == sdkws.PullOrder_PullOrderAsc {
isEnd = true
endSeq = maxSeq
}
}
if len(newSeqs) == 0 {
return isEnd, endSeq, nil, nil
}
2024-12-26 17:49:05 +08:00
successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs)
if err != nil {
2024-12-26 17:49:05 +08:00
return false, 0, nil, err
}
return isEnd, endSeq, successMsgs, nil
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
2024-04-19 22:23:08 +08:00
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) {
2023-06-30 09:45:02 +08:00
var indexes []int
for _, seq := range seqs {
2024-04-19 22:23:08 +08:00
indexes = append(indexes, int(db.msgTable.GetMsgIndex(seq)))
2023-06-30 09:45:02 +08:00
}
if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil {
return err
}
}
2024-12-26 17:49:05 +08:00
return db.msgCache.DelMessageBySeqs(ctx, conversationID, allSeqs)
2023-06-30 09:45:02 +08:00
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
2024-04-19 22:23:08 +08:00
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) {
2023-06-30 09:45:02 +08:00
for _, seq := range seqs {
2024-04-19 22:23:08 +08:00
if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msgTable.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {
2023-06-30 09:45:02 +08:00
return err
}
}
}
2024-12-26 17:49:05 +08:00
return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
2023-06-30 09:45:02 +08:00
}
func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
2024-07-16 10:46:21 +08:00
return db.seqConversation.GetMaxSeqs(ctx, conversationIDs)
2023-06-30 09:45:02 +08:00
}
2023-06-30 09:45:02 +08:00
func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
2024-07-16 10:46:21 +08:00
return db.seqConversation.GetMaxSeq(ctx, conversationID)
2023-06-30 09:45:02 +08:00
}
2023-06-30 09:45:02 +08:00
func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
2024-07-16 10:46:21 +08:00
return db.seqConversation.SetMinSeqs(ctx, seqs)
2023-06-30 09:45:02 +08:00
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
2024-07-25 20:01:33 +08:00
return db.seqUser.SetUserMinSeqs(ctx, userID, seqs)
2023-06-30 09:45:02 +08:00
}
func (db *commonMsgDatabase) SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return db.seqUser.SetUserMaxSeq(ctx, conversationID, userID, seq)
}
func (db *commonMsgDatabase) SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return db.seqUser.SetUserMinSeq(ctx, conversationID, userID, seq)
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
2024-07-25 20:01:33 +08:00
return db.seqUser.SetUserReadSeqs(ctx, userID, hasReadSeqs)
2023-06-30 09:45:02 +08:00
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
2024-07-25 20:01:33 +08:00
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
2023-06-30 09:45:02 +08:00
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
2024-07-25 20:01:33 +08:00
return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs)
2023-06-30 09:45:02 +08:00
}
func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
2024-07-25 20:01:33 +08:00
return db.seqUser.GetUserReadSeq(ctx, conversationID, userID)
2023-06-30 09:45:02 +08:00
}
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
2024-12-26 17:49:05 +08:00
return db.msgCache.SetSendMsgStatus(ctx, id, status)
2023-06-30 09:45:02 +08:00
}
func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
2024-12-26 17:49:05 +08:00
return db.msgCache.GetSendMsgStatus(ctx, id)
2023-06-30 09:45:02 +08:00
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
2023-06-30 09:45:02 +08:00
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID)
if err != nil {
return
}
2024-07-16 10:46:21 +08:00
minSeqCache, err = db.seqConversation.GetMinSeq(ctx, conversationID)
2023-06-30 09:45:02 +08:00
if err != nil {
return
}
2024-07-16 10:46:21 +08:00
maxSeqCache, err = db.seqConversation.GetMaxSeq(ctx, conversationID)
2023-06-30 09:45:02 +08:00
if err != nil {
return
}
return
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
2023-06-30 09:45:02 +08:00
return db.GetMinMaxSeqMongo(ctx, conversationID)
}
2023-07-13 16:51:52 +08:00
func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
2023-06-30 09:45:02 +08:00
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
if err != nil {
return
}
minSeqMongo = oldestMsgMongo.Msg.Seq
newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
if err != nil {
return
}
maxSeqMongo = newestMsgMongo.Msg.Seq
return
}
2024-12-26 17:49:05 +08:00
func (db *commonMsgDatabase) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) {
2023-07-12 15:31:24 +08:00
return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber)
2023-06-30 09:45:02 +08:00
}
2024-12-26 17:49:05 +08:00
func (db *commonMsgDatabase) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) {
2023-07-12 15:31:24 +08:00
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
2023-06-30 09:45:02 +08:00
}
2023-07-13 18:20:03 +08:00
2024-09-13 09:51:35 +08:00
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*pbmsg.SearchedMsgData, err error) {
var totalMsgs []*pbmsg.SearchedMsgData
2023-07-14 20:40:41 +08:00
total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
2023-07-13 18:20:03 +08:00
if err != nil {
2023-07-14 20:40:41 +08:00
return 0, nil, err
2023-07-13 18:20:03 +08:00
}
for _, msg := range msgs {
2023-08-09 10:53:48 +08:00
if msg.IsRead {
msg.Msg.IsRead = true
}
2024-09-13 09:51:35 +08:00
searchedMsgData := &pbmsg.SearchedMsgData{MsgData: convert.MsgDB2Pb(msg.Msg)}
if msg.Revoke != nil {
searchedMsgData.IsRevoked = true
}
totalMsgs = append(totalMsgs, searchedMsgData)
2023-07-13 18:20:03 +08:00
}
2023-07-14 20:40:41 +08:00
return total, totalMsgs, nil
2023-07-13 18:20:03 +08:00
}
2023-08-09 10:37:56 +08:00
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
totalMsgs := make(map[string]*sdkws.MsgData)
for _, conversationID := range conversationIDs {
seq := seqs[conversationID]
2024-04-19 22:23:08 +08:00
docID := db.msgTable.GetDocID(conversationID, seq)
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil {
return nil, err
}
2024-04-19 22:23:08 +08:00
index := db.msgTable.GetMsgIndex(seq)
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
}
return totalMsgs, nil
}
2024-12-20 15:45:37 +08:00
func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit)
2024-04-28 11:47:05 +08:00
}
2024-12-20 15:45:37 +08:00
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
2024-07-16 10:46:21 +08:00
dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
2024-04-28 11:47:05 +08:00
if err != nil {
if errors.Is(errs.Unwrap(err), redis.Nil) {
return nil
}
return err
}
if dbSeq >= seq {
return nil
}
2024-07-16 10:46:21 +08:00
return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
2024-04-28 11:47:05 +08:00
}
func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
return db.seqConversation.GetCacheMaxSeqWithTime(ctx, conversationIDs)
}
func (db *commonMsgDatabase) GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) {
return db.seqConversation.GetMaxSeqWithTime(ctx, conversationID)
}
func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
// todo: only the time in the redis cache will be taken, not the message time
return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs)
}
2024-12-20 15:45:37 +08:00
func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error {
2024-12-26 17:49:05 +08:00
index := strings.LastIndex(docID, ":")
if index <= 0 {
return errs.ErrInternalServer.WrapMsg("docID is invalid", "docID", docID)
}
index, err := strconv.Atoi(docID[index+1:])
if err != nil {
return errs.WrapMsg(err, "strconv.Atoi", "docID", docID)
}
conversationID := docID[:index]
seqs := make([]int64, db.msgTable.GetSingleGocMsgNum())
minSeq := db.msgTable.GetMinSeq(index)
for i := range seqs {
seqs[i] = minSeq + int64(i)
}
if err := db.msgDocDatabase.DeleteDoc(ctx, docID); err != nil {
return err
}
return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
2024-12-20 15:45:37 +08:00
}
func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time)
}
2024-12-26 17:49:05 +08:00
func (db *commonMsgDatabase) handlerDeleteAndRevoked(ctx context.Context, userID string, msgs []*model.MsgInfoModel) {
for i := range msgs {
msg := msgs[i]
if msg == nil || msg.Msg == nil {
continue
}
msg.Msg.IsRead = msg.IsRead
if datautil.Contain(userID, msg.DelList...) {
msg.Msg.Content = ""
msg.Msg.Status = constant.MsgDeleted
}
if msg.Revoke == nil {
continue
}
msg.Msg.ContentType = constant.MsgRevokeNotification
revokeContent := sdkws.MessageRevokedContent{
RevokerID: msg.Revoke.UserID,
RevokerRole: msg.Revoke.Role,
ClientMsgID: msg.Msg.ClientMsgID,
RevokerNickname: msg.Revoke.Nickname,
RevokeTime: msg.Revoke.Time,
SourceMessageSendTime: msg.Msg.SendTime,
SourceMessageSendID: msg.Msg.SendID,
SourceMessageSenderNickname: msg.Msg.SenderNickname,
SessionType: msg.Msg.SessionType,
Seq: msg.Msg.Seq,
Ex: msg.Msg.Ex,
}
data, err := jsonutil.JsonMarshal(&revokeContent)
if err != nil {
log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal MessageRevokedContent", err, "msg", msg)
continue
}
elem := sdkws.NotificationElem{
Detail: string(data),
}
content, err := jsonutil.JsonMarshal(&elem)
if err != nil {
log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal NotificationElem", err, "msg", msg)
continue
}
msg.Msg.Content = string(content)
}
}
func (db *commonMsgDatabase) handlerQuote(ctx context.Context, userID, conversationID string, msgs []*model.MsgInfoModel) {
temp := make(map[int64][]*model.MsgInfoModel)
for i := range msgs {
db.handlerDBMsg(ctx, temp, userID, conversationID, msgs[i])
}
}
func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) ([]*sdkws.MsgData, error) {
msgs, err := db.msgCache.GetMessageBySeqs(ctx, conversationID, seqs)
if err != nil {
return nil, err
}
db.handlerDeleteAndRevoked(ctx, userID, msgs)
db.handlerQuote(ctx, userID, conversationID, msgs)
seqMsgs := make(map[int64]*model.MsgInfoModel)
for i, msg := range msgs {
if msg.Msg == nil {
continue
}
seqMsgs[msg.Msg.Seq] = msgs[i]
}
res := make([]*sdkws.MsgData, 0, len(seqs))
for _, seq := range seqs {
if v, ok := seqMsgs[seq]; ok {
res = append(res, convert.MsgDB2Pb(v.Msg))
} else {
2025-01-02 17:15:38 +08:00
res = append(res, &sdkws.MsgData{Seq: seq, Status: constant.MsgStatusHasDeleted})
}
}
return res, nil
}
func (db *commonMsgDatabase) GetLastMessage(ctx context.Context, conversationIDs []string, userID string) (map[string]*sdkws.MsgData, error) {
res := make(map[string]*sdkws.MsgData)
for _, conversationID := range conversationIDs {
if _, ok := res[conversationID]; ok {
continue
}
msg, err := db.msgDocDatabase.GetLastMessage(ctx, conversationID)
if err != nil {
if errs.Unwrap(err) == mongo.ErrNoDocuments {
continue
}
return nil, err
2024-12-26 17:49:05 +08:00
}
2025-01-02 17:15:38 +08:00
tmp := []*model.MsgInfoModel{msg}
db.handlerDeleteAndRevoked(ctx, userID, tmp)
db.handlerQuote(ctx, userID, conversationID, tmp)
res[conversationID] = convert.MsgDB2Pb(msg.Msg)
2024-12-26 17:49:05 +08:00
}
return res, nil
}