Merge remote-tracking branch 'origin/main' into add-new-feature-#484

# Conflicts:
#	pkg/common/db/unrelation/extend_msg.go
This commit is contained in:
plutoyty
2023-07-12 19:16:48 +08:00
154 changed files with 7800 additions and 10503 deletions
-230
View File
@@ -1,230 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unrelation
import (
"context"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
)
type ExtendMsgSetMongoDriver struct {
mgoDB *mongo.Database
ExtendMsgSetCollection *mongo.Collection
}
func NewExtendMsgSetMongoDriver(mgoDB *mongo.Database) unRelationTb.ExtendMsgSetModelInterface {
return &ExtendMsgSetMongoDriver{mgoDB: mgoDB, ExtendMsgSetCollection: mgoDB.Collection(unRelationTb.CExtendMsgSet)}
}
func (e *ExtendMsgSetMongoDriver) CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error {
_, err := e.ExtendMsgSetCollection.InsertOne(ctx, set)
return err
}
func (e *ExtendMsgSetMongoDriver) GetAllExtendMsgSet(
ctx context.Context,
ID string,
opts *unRelationTb.GetAllExtendMsgSetOpts,
) (sets []*unRelationTb.ExtendMsgSetModel, err error) {
regex := fmt.Sprintf("^%s", ID)
var findOpts *options.FindOptions
if opts != nil {
if opts.ExcludeExtendMsgs {
findOpts = &options.FindOptions{}
findOpts.SetProjection(bson.M{"extend_msgs": 0})
}
}
cursor, err := e.ExtendMsgSetCollection.Find(ctx, bson.M{"doc_id": primitive.Regex{Pattern: regex}}, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
err = cursor.All(ctx, &sets)
if err != nil {
return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
}
return sets, nil
}
func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet(
ctx context.Context,
conversationID string,
sessionType int32,
maxMsgUpdateTime int64,
) (*unRelationTb.ExtendMsgSetModel, error) {
var err error
findOpts := options.Find().
SetLimit(1).
SetSkip(0).
SetSort(bson.M{"source_id": -1}).
SetProjection(bson.M{"extend_msgs": 0})
// update newest
find := bson.M{
"source_id": primitive.Regex{Pattern: fmt.Sprintf("^%s", conversationID)},
"session_type": sessionType,
}
if maxMsgUpdateTime > 0 {
find["max_msg_update_time"] = maxMsgUpdateTime
}
result, err := e.ExtendMsgSetCollection.Find(ctx, find, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
var setList []unRelationTb.ExtendMsgSetModel
if err := result.All(ctx, &setList); err != nil {
return nil, utils.Wrap(err, "")
}
if len(setList) == 0 {
return nil, nil
}
return &setList[0], nil
}
// InsertExtendMsg first modify msg.
func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(
ctx context.Context,
conversationID string,
sessionType int32,
msg *unRelationTb.ExtendMsgModel,
) error {
set, err := e.GetExtendMsgSet(ctx, conversationID, sessionType, 0)
if err != nil {
return utils.Wrap(err, "")
}
if set == nil || set.ExtendMsgNum >= set.GetExtendMsgMaxNum() {
var index int32
if set != nil {
index = set.SplitConversationIDAndGetIndex()
}
err = e.CreateExtendMsgSet(ctx, &unRelationTb.ExtendMsgSetModel{
ConversationID: set.GetConversationID(conversationID, index),
SessionType: sessionType,
ExtendMsgs: map[string]unRelationTb.ExtendMsgModel{msg.ClientMsgID: *msg},
ExtendMsgNum: 1,
CreateTime: msg.MsgFirstModifyTime,
MaxMsgUpdateTime: msg.MsgFirstModifyTime,
})
} else {
_, err = e.ExtendMsgSetCollection.UpdateOne(ctx, bson.M{"conversation_id": set.ConversationID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}})
}
return utils.Wrap(err, "")
}
// InsertOrUpdateReactionExtendMsgSet insert or update.
func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet(
ctx context.Context,
conversationID string,
sessionType int32,
clientMsgID string,
msgFirstModifyTime int64,
reactionExtensionList map[string]*unRelationTb.KeyValueModel,
) error {
var updateBson = bson.M{}
for _, v := range reactionExtensionList {
updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = v
}
upsert := true
opt := &options.UpdateOptions{
Upsert: &upsert,
}
set, err := e.GetExtendMsgSet(ctx, conversationID, sessionType, msgFirstModifyTime)
if err != nil {
return utils.Wrap(err, "")
}
if set == nil {
return errors.New(fmt.Sprintf("conversationID %s has no set", conversationID))
}
_, err = e.ExtendMsgSetCollection.UpdateOne(
ctx,
bson.M{"source_id": set.ConversationID, "session_type": sessionType},
bson.M{"$set": updateBson},
opt,
)
return utils.Wrap(err, "")
}
// DeleteReactionExtendMsgSet delete TypeKey.
func (e *ExtendMsgSetMongoDriver) DeleteReactionExtendMsgSet(
ctx context.Context,
conversationID string,
sessionType int32,
clientMsgID string,
msgFirstModifyTime int64,
reactionExtensionList map[string]*unRelationTb.KeyValueModel,
) error {
var updateBson = bson.M{}
for _, v := range reactionExtensionList {
updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = ""
}
set, err := e.GetExtendMsgSet(ctx, conversationID, sessionType, msgFirstModifyTime)
if err != nil {
return utils.Wrap(err, "")
}
if set == nil {
return errors.New(fmt.Sprintf("conversationID %s has no set", conversationID))
}
_, err = e.ExtendMsgSetCollection.UpdateOne(
ctx,
bson.M{"source_id": set.ConversationID, "session_type": sessionType},
bson.M{"$unset": updateBson},
)
return err
}
func (e *ExtendMsgSetMongoDriver) TakeExtendMsg(
ctx context.Context,
conversationID string,
sessionType int32,
clientMsgID string,
maxMsgUpdateTime int64,
) (extendMsg *unRelationTb.ExtendMsgModel, err error) {
findOpts := options.Find().
SetLimit(1).
SetSkip(0).
SetSort(bson.M{"source_id": -1}).
SetProjection(bson.M{fmt.Sprintf("extend_msgs.%s", clientMsgID): 1})
regex := fmt.Sprintf("^%s", conversationID)
result, err := e.ExtendMsgSetCollection.Find(
ctx,
bson.M{
"source_id": primitive.Regex{Pattern: regex},
"session_type": sessionType,
"max_msg_update_time": bson.M{"$lte": maxMsgUpdateTime},
},
findOpts,
)
if err != nil {
return nil, utils.Wrap(err, "")
}
var setList []unRelationTb.ExtendMsgSetModel
if err := result.All(ctx, &setList); err != nil {
return nil, utils.Wrap(err, "")
}
if len(setList) == 0 {
return nil, utils.Wrap(errors.New("GetExtendMsg failed, len(setList) == 0"), "")
}
if v, ok := setList[0].ExtendMsgs[clientMsgID]; ok {
return &v, nil
}
return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID))
}
-4
View File
@@ -107,10 +107,6 @@ func (m *Mongo) CreateSuperGroupIndex() error {
return nil
}
func (m *Mongo) CreateExtendMsgSetIndex() error {
return m.createMongoIndex(unrelation.CExtendMsgSet, true, "-create_time", "work_moment_id")
}
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
db := m.db.Database(config.Config.Mongo.Database).Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
+682 -1
View File
@@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
@@ -302,7 +303,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
}
if msg.Revoke != nil {
revokeContent := sdkws.MessageRevokedContent{
RevokerID: msg.Revoke.ID,
RevokerID: msg.Revoke.UserID,
RevokerRole: msg.Revoke.Role,
ClientMsgID: msg.Msg.ClientMsgID,
RevokerNickname: msg.Revoke.Nickname,
@@ -368,3 +369,683 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(
_, err := m.MsgCollection.BulkWrite(ctx, updates)
return err
}
// RangeUserSendCount
// db.msg.aggregate([
//
// {
// $match: {
// "msgs.msg.send_time": {
// "$gte": 0,
// "$lt": 1788122092317
// }
// }
// },
// {
// "$addFields": {
// "msgs": {
// "$filter": {
// "input": "$msgs",
// "as": "item",
// "cond": {
// "$and": [
// {
// $gte: ["$$item.msg.send_time", 0]
// },
// {
// $lt: ["$$item.msg.send_time", 1788122092317]
// }
// ]
// }
// }
// }
// }
// },
// {
// "$project": {
// "_id": 0,
//
// },
//
// },
// {
// "$project": {
// "result": {
// "$map": {
// "input": "$msgs",
// "as": "item",
// "in": {
// user_id: "$$item.msg.send_id",
// send_date: {
// $dateToString: {
// format: "%Y-%m-%d",
// date: {
// $toDate: "$$item.msg.send_time"
// }
// }
// }
// }
// }
// }
// },
//
// },
// {
// "$unwind": "$result"
// },
// {
// "$group": {
// _id: "$result.send_date",
// count: {
// $sum: 1
// },
// original: {
// $push: "$$ROOT"
// }
// }
// },
// {
// "$addFields": {
// "dates": "$$ROOT"
// }
// },
// {
// "$project": {
// "_id": 0,
// "count": 0,
// "dates.original": 0,
//
// },
//
// },
// {
// "$group": {
// _id: null,
// count: {
// $sum: 1
// },
// dates: {
// $push: "$dates"
// },
// original: {
// $push: "$original"
// },
//
// }
// },
// {
// "$unwind": "$original"
// },
// {
// "$unwind": "$original"
// },
// {
// "$group": {
// _id: "$original.result.user_id",
// count: {
// $sum: 1
// },
// original: {
// $push: "$dates"
// },
//
// }
// },
// {
// "$addFields": {
// "dates": {
// $arrayElemAt: ["$original", 0]
// }
// }
// },
// {
// "$project": {
// original: 0
// }
// },
// {
// $sort: {
// count: - 1
// }
// },
// {
// "$group": {
// _id: null,
// user_count: {
// $sum: 1
// },
// users: {
// $push: "$$ROOT"
// },
//
// }
// },
// {
// "$addFields": {
// "dates": {
// $arrayElemAt: ["$users", 0]
// }
// }
// },
// {
// "$addFields": {
// "dates": "$dates.dates"
// }
// },
// {
// "$project": {
// _id: 0,
// "users.dates": 0,
//
// }
// },
// {
// "$addFields": {
// "msg_count": {
// $sum: "$users.count"
// }
// }
// },
// {
// "$addFields": {
// users: {
// $slice: ["$users", 0, 10]
// }
// }
// }
//
// ])
func (m *MsgMongoDriver) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*table.UserCount, dateCount map[string]int64, err error) {
var sort int
if ase {
sort = 1
} else {
sort = -1
}
type Result struct {
MsgCount int64 `bson:"msg_count"`
UserCount int64 `bson:"user_count"`
Users []struct {
UserID string `bson:"_id"`
Count int64 `bson:"count"`
} `bson:"users"`
Dates []struct {
Date string `bson:"_id"`
Count int64 `bson:"count"`
} `bson:"dates"`
}
or := bson.A{
bson.M{
"doc_id": bson.M{
"$regex": "^si_",
"$options": "i",
},
},
}
if group {
or = append(or,
bson.M{
"doc_id": bson.M{
"$regex": "^g_",
"$options": "i",
},
},
bson.M{
"doc_id": bson.M{
"$regex": "^sg_",
"$options": "i",
},
},
)
}
pipeline := bson.A{
bson.M{
"$match": bson.M{
"$and": bson.A{
bson.M{
"msgs.msg.send_time": bson.M{
"$gte": start.UnixMilli(),
"$lt": end.UnixMilli(),
},
},
bson.M{
"$or": or,
},
},
},
},
bson.M{
"$addFields": bson.M{
"msgs": bson.M{
"$filter": bson.M{
"input": "$msgs",
"as": "item",
"cond": bson.M{
"$and": bson.A{
bson.M{
"$gte": bson.A{
"$$item.msg.send_time", start.UnixMilli(),
},
},
bson.M{
"$lt": bson.A{
"$$item.msg.send_time", end.UnixMilli(),
},
},
},
},
},
},
},
},
bson.M{
"$project": bson.M{
"_id": 0,
},
},
bson.M{
"$project": bson.M{
"result": bson.M{
"$map": bson.M{
"input": "$msgs",
"as": "item",
"in": bson.M{
"user_id": "$$item.msg.send_id",
"send_date": bson.M{
"$dateToString": bson.M{
"format": "%Y-%m-%d",
"date": bson.M{
"$toDate": "$$item.msg.send_time", // 毫秒时间戳
},
},
},
},
},
},
},
},
bson.M{
"$unwind": "$result",
},
bson.M{
"$group": bson.M{
"_id": "$result.send_date",
"count": bson.M{
"$sum": 1,
},
"original": bson.M{
"$push": "$$ROOT",
},
},
},
bson.M{
"$addFields": bson.M{
"dates": "$$ROOT",
},
},
bson.M{
"$project": bson.M{
"_id": 0,
"count": 0,
"dates.original": 0,
},
},
bson.M{
"$group": bson.M{
"_id": nil,
"count": bson.M{
"$sum": 1,
},
"dates": bson.M{
"$push": "$dates",
},
"original": bson.M{
"$push": "$original",
},
},
},
bson.M{
"$unwind": "$original",
},
bson.M{
"$unwind": "$original",
},
bson.M{
"$group": bson.M{
"_id": "$original.result.user_id",
"count": bson.M{
"$sum": 1,
},
"original": bson.M{
"$push": "$dates",
},
},
},
bson.M{
"$addFields": bson.M{
"dates": bson.M{
"$arrayElemAt": bson.A{"$original", 0},
},
},
},
bson.M{
"$project": bson.M{
"original": 0,
},
},
bson.M{
"$sort": bson.M{
"count": sort,
},
},
bson.M{
"$group": bson.M{
"_id": nil,
"user_count": bson.M{
"$sum": 1,
},
"users": bson.M{
"$push": "$$ROOT",
},
},
},
bson.M{
"$addFields": bson.M{
"dates": bson.M{
"$arrayElemAt": bson.A{"$users", 0},
},
},
},
bson.M{
"$addFields": bson.M{
"dates": "$dates.dates",
},
},
bson.M{
"$project": bson.M{
"_id": 0,
"users.dates": 0,
},
},
bson.M{
"$addFields": bson.M{
"msg_count": bson.M{
"$sum": "$users.count",
},
},
},
bson.M{
"$addFields": bson.M{
"users": bson.M{
"$slice": bson.A{"$users", pageNumber - 1, showNumber},
},
},
},
}
cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true))
if err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
defer cur.Close(ctx)
var result []Result
if err := cur.All(ctx, &result); err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
if len(result) == 0 {
return 0, 0, nil, nil, errs.Wrap(err)
}
users = make([]*table.UserCount, len(result[0].Users))
for i, r := range result[0].Users {
users[i] = &table.UserCount{
UserID: r.UserID,
Count: r.Count,
}
}
dateCount = make(map[string]int64)
for _, r := range result[0].Dates {
dateCount[r.Date] = r.Count
}
return result[0].MsgCount, result[0].UserCount, users, dateCount, nil
}
func (m *MsgMongoDriver) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*table.GroupCount, dateCount map[string]int64, err error) {
var sort int
if ase {
sort = 1
} else {
sort = -1
}
type Result struct {
MsgCount int64 `bson:"msg_count"`
UserCount int64 `bson:"user_count"`
Groups []struct {
GroupID string `bson:"_id"`
Count int64 `bson:"count"`
} `bson:"groups"`
Dates []struct {
Date string `bson:"_id"`
Count int64 `bson:"count"`
} `bson:"dates"`
}
pipeline := bson.A{
bson.M{
"$match": bson.M{
"$and": bson.A{
bson.M{
"msgs.msg.send_time": bson.M{
"$gte": start.UnixMilli(),
"$lt": end.UnixMilli(),
},
},
bson.M{
"$or": bson.A{
bson.M{
"doc_id": bson.M{
"$regex": "^g_",
"$options": "i",
},
},
bson.M{
"doc_id": bson.M{
"$regex": "^sg_",
"$options": "i",
},
},
},
},
},
},
},
bson.M{
"$addFields": bson.M{
"msgs": bson.M{
"$filter": bson.M{
"input": "$msgs",
"as": "item",
"cond": bson.M{
"$and": bson.A{
bson.M{
"$gte": bson.A{
"$$item.msg.send_time", start.UnixMilli(),
},
},
bson.M{
"$lt": bson.A{
"$$item.msg.send_time", end.UnixMilli(),
},
},
},
},
},
},
},
},
bson.M{
"$project": bson.M{
"_id": 0,
},
},
bson.M{
"$project": bson.M{
"result": bson.M{
"$map": bson.M{
"input": "$msgs",
"as": "item",
"in": bson.M{
"group_id": "$$item.msg.group_id",
"send_date": bson.M{
"$dateToString": bson.M{
"format": "%Y-%m-%d",
"date": bson.M{
"$toDate": "$$item.msg.send_time", // 毫秒时间戳
},
},
},
},
},
},
},
},
bson.M{
"$unwind": "$result",
},
bson.M{
"$group": bson.M{
"_id": "$result.send_date",
"count": bson.M{
"$sum": 1,
},
"original": bson.M{
"$push": "$$ROOT",
},
},
},
bson.M{
"$addFields": bson.M{
"dates": "$$ROOT",
},
},
bson.M{
"$project": bson.M{
"_id": 0,
"count": 0,
"dates.original": 0,
},
},
bson.M{
"$group": bson.M{
"_id": nil,
"count": bson.M{
"$sum": 1,
},
"dates": bson.M{
"$push": "$dates",
},
"original": bson.M{
"$push": "$original",
},
},
},
bson.M{
"$unwind": "$original",
},
bson.M{
"$unwind": "$original",
},
bson.M{
"$group": bson.M{
"_id": "$original.result.group_id",
"count": bson.M{
"$sum": 1,
},
"original": bson.M{
"$push": "$dates",
},
},
},
bson.M{
"$addFields": bson.M{
"dates": bson.M{
"$arrayElemAt": bson.A{"$original", 0},
},
},
},
bson.M{
"$project": bson.M{
"original": 0,
},
},
bson.M{
"$sort": bson.M{
"count": sort,
},
},
bson.M{
"$group": bson.M{
"_id": nil,
"user_count": bson.M{
"$sum": 1,
},
"groups": bson.M{
"$push": "$$ROOT",
},
},
},
bson.M{
"$addFields": bson.M{
"dates": bson.M{
"$arrayElemAt": bson.A{"$groups", 0},
},
},
},
bson.M{
"$addFields": bson.M{
"dates": "$dates.dates",
},
},
bson.M{
"$project": bson.M{
"_id": 0,
"groups.dates": 0,
},
},
bson.M{
"$addFields": bson.M{
"msg_count": bson.M{
"$sum": "$groups.count",
},
},
},
bson.M{
"$addFields": bson.M{
"groups": bson.M{
"$slice": bson.A{"$groups", pageNumber - 1, showNumber},
},
},
},
}
cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true))
if err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
defer cur.Close(ctx)
var result []Result
if err := cur.All(ctx, &result); err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
if len(result) == 0 {
return 0, 0, nil, nil, errs.Wrap(err)
}
groups = make([]*table.GroupCount, len(result[0].Groups))
for i, r := range result[0].Groups {
groups[i] = &table.GroupCount{
GroupID: r.GroupID,
Count: r.Count,
}
}
dateCount = make(map[string]int64)
for _, r := range result[0].Dates {
dateCount[r.Date] = r.Count
}
return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil
}