This commit is contained in:
wangchuxiao
2023-02-03 12:16:48 +08:00
parent ff88f5d54a
commit b637a29348
45 changed files with 785 additions and 680 deletions
@@ -1,7 +1,7 @@
package unrelation
import (
"Open_IM/pkg/common/db/table"
"Open_IM/pkg/common/db/table/unrelation"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
@@ -19,10 +19,10 @@ type ExtendMsgSetMongoDriver struct {
}
func NewExtendMsgSetMongoDriver(mgoDB *mongo.Database) *ExtendMsgSetMongoDriver {
return &ExtendMsgSetMongoDriver{mgoDB: mgoDB, ExtendMsgSetCollection: mgoDB.Collection(table.CExtendMsgSet)}
return &ExtendMsgSetMongoDriver{mgoDB: mgoDB, ExtendMsgSetCollection: mgoDB.Collection(unrelation.CExtendMsgSet)}
}
func (e *ExtendMsgSetMongoDriver) CreateExtendMsgSet(ctx context.Context, set *table.ExtendMsgSet) error {
func (e *ExtendMsgSetMongoDriver) CreateExtendMsgSet(ctx context.Context, set *unrelation.ExtendMsgSet) error {
_, err := e.ExtendMsgSetCollection.InsertOne(ctx, set)
return err
}
@@ -31,7 +31,7 @@ type GetAllExtendMsgSetOpts struct {
ExcludeExtendMsgs bool
}
func (e *ExtendMsgSetMongoDriver) GetAllExtendMsgSet(ctx context.Context, ID string, opts *GetAllExtendMsgSetOpts) (sets []*table.ExtendMsgSet, err error) {
func (e *ExtendMsgSetMongoDriver) GetAllExtendMsgSet(ctx context.Context, ID string, opts *GetAllExtendMsgSetOpts) (sets []*unrelation.ExtendMsgSet, err error) {
regex := fmt.Sprintf("^%s", ID)
var findOpts *options.FindOptions
if opts != nil {
@@ -51,7 +51,7 @@ func (e *ExtendMsgSetMongoDriver) GetAllExtendMsgSet(ctx context.Context, ID str
return sets, nil
}
func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*table.ExtendMsgSet, error) {
func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*unrelation.ExtendMsgSet, error) {
var err error
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0})
// update newest
@@ -63,7 +63,7 @@ func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet(ctx context.Context, sourceID
if err != nil {
return nil, utils.Wrap(err, "")
}
var setList []table.ExtendMsgSet
var setList []unrelation.ExtendMsgSet
if err := result.All(ctx, &setList); err != nil {
return nil, utils.Wrap(err, "")
}
@@ -74,7 +74,7 @@ func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet(ctx context.Context, sourceID
}
// first modify msg
func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *table.ExtendMsg) error {
func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *unrelation.ExtendMsg) error {
set, err := e.GetExtendMsgSet(ctx, sourceID, sessionType, 0)
if err != nil {
return utils.Wrap(err, "")
@@ -84,10 +84,10 @@ func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(ctx context.Context, sourceID
if set != nil {
index = set.SplitSourceIDAndGetIndex()
}
err = e.CreateExtendMsgSet(ctx, &table.ExtendMsgSet{
err = e.CreateExtendMsgSet(ctx, &unrelation.ExtendMsgSet{
SourceID: set.GetSourceID(sourceID, index),
SessionType: sessionType,
ExtendMsgs: map[string]table.ExtendMsg{msg.ClientMsgID: *msg},
ExtendMsgs: map[string]unrelation.ExtendMsg{msg.ClientMsgID: *msg},
ExtendMsgNum: 1,
CreateTime: msg.MsgFirstModifyTime,
MaxMsgUpdateTime: msg.MsgFirstModifyTime,
@@ -136,14 +136,14 @@ func (e *ExtendMsgSetMongoDriver) DeleteReactionExtendMsgSet(ctx context.Context
return err
}
func (e *ExtendMsgSetMongoDriver) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *table.ExtendMsg, err error) {
func (e *ExtendMsgSetMongoDriver) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *unrelation.ExtendMsg, err error) {
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{fmt.Sprintf("extend_msgs.%s", clientMsgID): 1})
regex := fmt.Sprintf("^%s", sourceID)
result, err := e.ExtendMsgSetCollection.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "max_msg_update_time": bson.M{"$lte": maxMsgUpdateTime}}, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
var setList []table.ExtendMsgSet
var setList []unrelation.ExtendMsgSet
if err := result.All(ctx, &setList); err != nil {
return nil, utils.Wrap(err, "")
}