msg database

This commit is contained in:
withchao
2023-02-23 18:17:17 +08:00
parent 2359d7ab37
commit 2af2716c2c
17 changed files with 588 additions and 241 deletions
+74 -140
View File
@@ -8,6 +8,7 @@ import (
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/prome"
"Open_IM/pkg/common/tracelog"
"fmt"
"github.com/gogo/protobuf/sortkeys"
"sync"
"time"
@@ -23,97 +24,6 @@ import (
"github.com/golang/protobuf/proto"
)
//type MsgInterface interface {
// // 批量插入消息到db
// BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// // 刪除redis中消息缓存
// DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
// // incrSeq然后批量插入缓存
// BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
// // 删除消息 返回不存在的seqList
// DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// // 通过seqList获取db中写扩散消息
// GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// // 通过seqList获取大群在db里面的消息 没找到返回错误
// GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// // 删除用户所有消息/cache/db然后重置seq
// CleanUpUserMsg(ctx context.Context, userID string) error
// // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
// DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
// // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
// DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// // 获取用户 seq mongo和redis
// GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// // 获取群 seq mongo和redis
// GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// // 设置群用户最小seq 直接调用cache
// SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
// // 设置用户最小seq 直接调用cache
// SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
//
// MsgToMQ(ctx context.Context, key string, data *pbMsg.MsgDataToMQ) (err error)
//}
//
//func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface {
// return &MsgController{}
//}
//
//type MsgController struct {
// database MsgDatabase
//}
//
//func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
// return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq)
//}
//
//func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error {
// return m.database.DeleteMessageFromCache(ctx, sourceID, msgList)
//}
//
//func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
// return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList)
//}
//
//func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
// return m.database.DelMsgBySeqs(ctx, userID, seqs)
//}
//
//func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
// return m.database.GetMsgBySeqs(ctx, userID, seqs)
//}
//
//func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
// return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
//}
//
//func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error {
// return m.database.CleanUpUserMsg(ctx, userID)
//}
//
//func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
// return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime)
//}
//
//func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
// return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
//}
//
//func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
// return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
//}
//
//func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
// return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
//}
//
//func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
// return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
//}
//
//func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
// return m.database.SetUserMinSeq(ctx, userID, minSeq)
//}
type MsgDatabaseInterface interface {
// 批量插入消息
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
@@ -154,8 +64,8 @@ type MsgDatabaseInterface interface {
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
SetSendMsgStatus(ctx context.Context, userID string, status int32) error
GetSendMsgStatus(ctx context.Context, userID string) (int32, error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
@@ -163,65 +73,97 @@ type MsgDatabaseInterface interface {
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error)
}
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
return &MsgDatabase{}
}
type MsgDatabase struct {
mgo unRelationTb.MsgDocModelInterface
cache cache.Cache
msg unRelationTb.MsgDocModel
mgo unRelationTb.MsgDocModelInterface
cache cache.MsgCache
msg unRelationTb.MsgDocModel
ExtendMsg unRelationTb.ExtendMsgSetModelInterface
rdb redis.Client
}
func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel {
r := make(map[string]unRelationTb.KeyValueModel)
for key, value := range reactionExtensionList {
r[key] = unRelationTb.KeyValueModel{
TypeKey: value.TypeKey,
Value: value.Value,
LatestUpdateTime: value.LatestUpdateTime,
}
}
return r
}
func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
//TODO implement me
panic("implement me")
return db.cache.JudgeMessageReactionEXISTS(ctx, clientMsgID, sessionType)
}
func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
//TODO implement me
panic("implement me")
return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value)
}
func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
//TODO implement me
panic("implement me")
}
func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
//TODO implement me
panic("implement me")
}
func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
//TODO implement me
panic("implement me")
return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration)
}
func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
//TODO implement me
panic("implement me")
return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey)
}
func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
//TODO implement me
panic("implement me")
return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType)
}
func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
//TODO implement me
panic("implement me")
return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey)
}
func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
return db.ExtendMsg.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList))
}
func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
extendMsgSet, err := db.ExtendMsg.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
if err != nil {
return nil, err
}
extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID]
if !ok {
return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID))
}
reactionExtensionList := make(map[string]*pbMsg.KeyValueResp)
for key, model := range extendMsg.ReactionExtensionList {
reactionExtensionList[key] = &pbMsg.KeyValueResp{
KeyValue: &sdkws.KeyValue{
TypeKey: model.TypeKey,
Value: model.Value,
LatestUpdateTime: model.LatestUpdateTime,
},
}
}
return &pbMsg.ExtendMsg{
ReactionExtensionList: reactionExtensionList,
ClientMsgID: extendMsg.ClientMsgID,
MsgFirstModifyTime: extendMsg.MsgFirstModifyTime,
AttachedInfo: extendMsg.AttachedInfo,
Ex: extendMsg.Ex,
}, nil
}
func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
//TODO implement me
panic("implement me")
return db.ExtendMsg.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList))
}
func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, userID string, status int32) error {
//TODO implement me
panic("implement me")
func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.cache.SetSendMsgStatus(ctx, id, status)
}
func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, userID string) (int32, error) {
//TODO implement me
panic("implement me")
func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
return db.cache.GetSendMsgStatus(ctx, id)
}
func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error {
@@ -230,32 +172,24 @@ func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDat
}
func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
//TODO implement me
panic("implement me")
return db.cache.GetUserMaxSeq(ctx, userID)
}
func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
//TODO implement me
panic("implement me")
return db.cache.GetUserMinSeq(ctx, userID)
}
func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
//TODO implement me
panic("implement me")
return db.cache.GetGroupMaxSeq(ctx, groupID)
}
func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
//TODO implement me
panic("implement me")
return db.cache.GetGroupMinSeq(ctx, groupID)
}
func (db *MsgDatabase) GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) {
//TODO implement me
panic("implement me")
}
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
return &MsgDatabase{}
seqMsg, _, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
return seqMsg, err
}
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {