This commit is contained in:
wangchuxiao
2023-01-28 13:19:36 +08:00
parent 8d029a7ab3
commit d9becad8e9
37 changed files with 417 additions and 418 deletions
@@ -0,0 +1,285 @@
package unrelation
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"context"
"errors"
go_redis "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
func (d *db.DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error {
newTime := getCurrentTimestampByMill()
if len(msgList) > GetSingleGocMsgNum() {
return errors.New("too large")
}
isInit := false
var remain uint64
blk0 := uint64(GetSingleGocMsgNum() - 1)
//currentMaxSeq 4998
if currentMaxSeq < uint64(mongo2.GetSingleGocMsgNum()) {
remain = blk0 - currentMaxSeq //1
} else {
excludeBlk0 := currentMaxSeq - blk0 //=1
//(5000-1)%5000 == 4999
remain = (uint64(mongo2.GetSingleGocMsgNum()) - (excludeBlk0 % uint64(mongo2.GetSingleGocMsgNum()))) % uint64(mongo2.GetSingleGocMsgNum())
}
//remain=1
insertCounter := uint64(0)
msgListToMongo := make([]mongo2.MsgInfo, 0)
msgListToMongoNext := make([]mongo2.MsgInfo, 0)
seqUid := ""
seqUidNext := ""
log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList))
var err error
for _, m := range msgList {
log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
currentMaxSeq++
sMsg := mongo2.MsgInfo{}
sMsg.SendTime = m.MsgData.SendTime
m.MsgData.Seq = uint32(currentMaxSeq)
log.Debug(operationID, "mongo msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", userID, "seq: ", currentMaxSeq)
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
return utils.Wrap(err, "")
}
if isInit {
msgListToMongoNext = append(msgListToMongoNext, sMsg)
seqUidNext = mongo2.getSeqUid(userID, uint32(currentMaxSeq))
log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
continue
}
if insertCounter < remain {
msgListToMongo = append(msgListToMongo, sMsg)
insertCounter++
seqUid = mongo2.getSeqUid(userID, uint32(currentMaxSeq))
log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
} else {
msgListToMongoNext = append(msgListToMongoNext, sMsg)
seqUidNext = mongo2.getSeqUid(userID, uint32(currentMaxSeq))
log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
}
}
ctx := context.Background()
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(mongo2.cChat)
if seqUid != "" {
filter := bson.M{"uid": seqUid}
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
if err != nil {
if err == mongo.ErrNoDocuments {
filter := bson.M{"uid": seqUid}
sChat := mongo2.UserChat{}
sChat.UID = seqUid
sChat.Msg = msgListToMongo
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
if _, err = c.InsertOne(ctx, &sChat); err != nil {
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
}
promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter)
} else {
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
return utils.Wrap(err, "")
}
} else {
promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter)
}
}
if seqUidNext != "" {
filter := bson.M{"uid": seqUidNext}
sChat := mongo2.UserChat{}
sChat.UID = seqUidNext
sChat.Msg = msgListToMongoNext
log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
if _, err = c.InsertOne(ctx, &sChat); err != nil {
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
}
promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter)
}
log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
return nil
}
func (d *db.DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) {
newTime := mongo2.getCurrentTimestampByMill()
lenList := len(msgList)
if lenList > mongo2.GetSingleGocMsgNum() {
return errors.New("too large"), 0
}
if lenList < 1 {
return errors.New("too short as 0"), 0
}
// judge sessionType to get seq
var currentMaxSeq uint64
var err error
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
currentMaxSeq, err = d.GetGroupMaxSeq(insertID)
log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", insertID, err)
} else {
currentMaxSeq, err = d.GetUserMaxSeq(insertID)
log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", insertID, err)
}
if err != nil && err != go_redis.Nil {
promePkg.PromeInc(promePkg.SeqGetFailedCounter)
return utils.Wrap(err, ""), 0
}
promePkg.PromeInc(promePkg.SeqGetSuccessCounter)
lastMaxSeq := currentMaxSeq
for _, m := range msgList {
currentMaxSeq++
sMsg := mongo2.MsgInfo{}
sMsg.SendTime = m.MsgData.SendTime
m.MsgData.Seq = uint32(currentMaxSeq)
log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", insertID, "seq: ", currentMaxSeq)
}
log.Debug(operationID, "SetMessageToCache ", insertID, len(msgList))
err, failedNum := d.SetMessageToCache(msgList, insertID, operationID)
if err != nil {
promePkg.PromeAdd(promePkg.MsgInsertRedisFailedCounter, failedNum)
log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), insertID)
} else {
promePkg.PromeInc(promePkg.MsgInsertRedisSuccessCounter)
}
log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, insertID, len(msgList))
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
err = d.SetGroupMaxSeq(insertID, currentMaxSeq)
} else {
err = d.SetUserMaxSeq(insertID, currentMaxSeq)
}
if err != nil {
promePkg.PromeInc(promePkg.SeqSetFailedCounter)
} else {
promePkg.PromeInc(promePkg.SeqSetSuccessCounter)
}
return utils.Wrap(err, ""), lastMaxSeq
}
//func (d *DataBases) BatchInsertChatBoth(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) {
// err, lastMaxSeq := d.BatchInsertChat2Cache(userID, msgList, operationID)
// if err != nil {
// log.Error(operationID, "BatchInsertChat2Cache failed ", err.Error(), userID, len(msgList))
// return err, 0
// }
// for {
// if runtime.NumGoroutine() > 50000 {
// log.NewWarn(operationID, "too many NumGoroutine ", runtime.NumGoroutine())
// time.Sleep(10 * time.Millisecond)
// } else {
// break
// }
// }
// return nil, lastMaxSeq
//}
//
//func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error {
// newTime := getCurrentTimestampByMill()
// if len(msgList) > GetSingleGocMsgNum() {
// return errors.New("too large")
// }
// isInit := false
// currentMaxSeq, err := d.GetUserMaxSeq(userID)
// if err == nil {
//
// } else if err == go_redis.Nil {
// isInit = true
// currentMaxSeq = 0
// } else {
// return utils.Wrap(err, "")
// }
// var remain uint64
// //if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
// // remain = uint64(GetSingleGocMsgNum()-1) - (currentMaxSeq % uint64(GetSingleGocMsgNum()))
// //} else {
// // remain = uint64(GetSingleGocMsgNum()) - ((currentMaxSeq - (uint64(GetSingleGocMsgNum()) - 1)) % uint64(GetSingleGocMsgNum()))
// //}
//
// blk0 := uint64(GetSingleGocMsgNum() - 1)
// if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
// remain = blk0 - currentMaxSeq
// } else {
// excludeBlk0 := currentMaxSeq - blk0
// remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum())
// }
//
// insertCounter := uint64(0)
// msgListToMongo := make([]MsgInfo, 0)
// msgListToMongoNext := make([]MsgInfo, 0)
// seqUid := ""
// seqUidNext := ""
// log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList))
// //4998 remain ==1
// //4999
// for _, m := range msgList {
// log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
// currentMaxSeq++
// sMsg := MsgInfo{}
// sMsg.SendTime = m.MsgData.SendTime
// m.MsgData.Seq = uint32(currentMaxSeq)
// if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
// return utils.Wrap(err, "")
// }
// if isInit {
// msgListToMongoNext = append(msgListToMongoNext, sMsg)
// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
// continue
// }
// if insertCounter < remain {
// msgListToMongo = append(msgListToMongo, sMsg)
// insertCounter++
// seqUid = getSeqUid(userID, uint32(currentMaxSeq))
// log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
// } else {
// msgListToMongoNext = append(msgListToMongoNext, sMsg)
// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
// }
// }
// // ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
//
// ctx := context.Background()
// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
//
// if seqUid != "" {
// filter := bson.M{"uid": seqUid}
// log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
// err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
// if err != nil {
// log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
// return utils.Wrap(err, "")
// }
// }
// if seqUidNext != "" {
// filter := bson.M{"uid": seqUidNext}
// sChat := UserChat{}
// sChat.UID = seqUidNext
// sChat.Msg = msgListToMongoNext
// log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext)
// if _, err = c.InsertOne(ctx, &sChat); err != nil {
// log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
// return utils.Wrap(err, "")
// }
// }
// log.NewWarn(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList))
// return utils.Wrap(d.SetUserMaxSeq(userID, uint64(currentMaxSeq)), "")
//}
//func (d *DataBases)setMessageToCache(msgList []*pbMsg.MsgDataToMQ, uid string) (err error) {
//
//}
@@ -0,0 +1,206 @@
package unrelation
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"strconv"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson"
)
const cExtendMsgSet = "extend_msgs"
const MaxNum = 100
type ExtendMsgSet struct {
SourceID string `bson:"source_id" json:"sourceID"`
SessionType int32 `bson:"session_type" json:"sessionType"`
ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"`
ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"`
CreateTime int64 `bson:"create_time" json:"createTime"` // this block's create time
MaxMsgUpdateTime int64 `bson:"max_msg_update_time" json:"maxMsgUpdateTime"` // index find msg
}
type KeyValue struct {
TypeKey string `bson:"type_key" json:"typeKey"`
Value string `bson:"value" json:"value"`
LatestUpdateTime int64 `bson:"latest_update_time" json:"latestUpdateTime"`
}
type ExtendMsg struct {
ReactionExtensionList map[string]KeyValue `bson:"reaction_extension_list" json:"reactionExtensionList"`
ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"`
MsgFirstModifyTime int64 `bson:"msg_first_modify_time" json:"msgFirstModifyTime"` // this extendMsg create time
AttachedInfo string `bson:"attached_info" json:"attachedInfo"`
Ex string `bson:"ex" json:"ex"`
}
func GetExtendMsgMaxNum() int32 {
return MaxNum
}
func GetExtendMsgSourceID(ID string, index int32) string {
return ID + ":" + strconv.Itoa(int(index))
}
func SplitSourceIDAndGetIndex(sourceID string) int32 {
l := strings.Split(sourceID, ":")
index, _ := strconv.Atoi(l[len(l)-1])
return int32(index)
}
func (d *db.DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
_, err := c.InsertOne(ctx, set)
return err
}
type GetAllExtendMsgSetOpts struct {
ExcludeExtendMsgs bool
}
func (d *db.DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSet, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
regex := fmt.Sprintf("^%s", ID)
var findOpts *options.FindOptions
if opts != nil {
if opts.ExcludeExtendMsgs {
findOpts = &options.FindOptions{}
findOpts.SetProjection(bson.M{"extend_msgs": 0})
}
}
cursor, err := c.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: regex}}, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
err = cursor.All(context.Background(), &sets)
if err != nil {
return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
}
return sets, nil
}
func (d *db.DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64, c *mongo.Collection) (*ExtendMsgSet, error) {
regex := fmt.Sprintf("^%s", sourceID)
var err error
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0})
// update newest
find := bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType}
if maxMsgUpdateTime > 0 {
find["max_msg_update_time"] = maxMsgUpdateTime
}
result, err := c.Find(ctx, find, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
var setList []ExtendMsgSet
if err := result.All(ctx, &setList); err != nil {
return nil, utils.Wrap(err, "")
}
if len(setList) == 0 {
return nil, nil
}
return &setList[0], nil
}
// first modify msg
func (d *db.DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
set, err := d.GetExtendMsgSet(ctx, sourceID, sessionType, 0, c)
if err != nil {
return utils.Wrap(err, "")
}
if set == nil || set.ExtendMsgNum >= GetExtendMsgMaxNum() {
var index int32
if set != nil {
index = SplitSourceIDAndGetIndex(set.SourceID)
}
err = d.CreateExtendMsgSet(&ExtendMsgSet{
SourceID: GetExtendMsgSourceID(sourceID, index),
SessionType: sessionType,
ExtendMsgs: map[string]ExtendMsg{msg.ClientMsgID: *msg},
ExtendMsgNum: 1,
CreateTime: msg.MsgFirstModifyTime,
MaxMsgUpdateTime: msg.MsgFirstModifyTime,
})
} else {
_, err = c.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}})
}
return utils.Wrap(err, "")
}
// insert or update
func (d *db.DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
var updateBson = bson.M{}
for _, v := range reactionExtensionList {
updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = v
}
upsert := true
opt := &options.UpdateOptions{
Upsert: &upsert,
}
set, err := d.GetExtendMsgSet(ctx, sourceID, sessionType, msgFirstModifyTime, c)
if err != nil {
return utils.Wrap(err, "")
}
if set == nil {
return errors.New(fmt.Sprintf("sourceID %s has no set", sourceID))
}
_, err = c.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$set": updateBson}, opt)
return utils.Wrap(err, "")
}
// delete TypeKey
func (d *db.DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
var updateBson = bson.M{}
for _, v := range reactionExtensionList {
updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = ""
}
set, err := d.GetExtendMsgSet(ctx, sourceID, sessionType, msgFirstModifyTime, c)
if err != nil {
return utils.Wrap(err, "")
}
if set == nil {
return errors.New(fmt.Sprintf("sourceID %s has no set", sourceID))
}
_, err = c.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$unset": updateBson})
return err
}
func (d *db.DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{fmt.Sprintf("extend_msgs.%s", clientMsgID): 1})
regex := fmt.Sprintf("^%s", sourceID)
result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "max_msg_update_time": bson.M{"$lte": maxMsgUpdateTime}}, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
var setList []ExtendMsgSet
if err := result.All(ctx, &setList); err != nil {
return nil, utils.Wrap(err, "")
}
if len(setList) == 0 {
return nil, utils.Wrap(errors.New("GetExtendMsg failed, len(setList) == 0"), "")
}
if v, ok := setList[0].ExtendMsgs[clientMsgID]; ok {
return &v, nil
}
return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID))
}
+126
View File
@@ -0,0 +1,126 @@
package unrelation
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/utils"
"context"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"log"
"strings"
"time"
)
type Client struct {
mongo *mongo.Client
}
func NewMongoClient(mdb *mongo.Client) *Client {
return &Client{mongo: mdb}
}
func initMongo() *mongo.Database {
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.DBUri != "" {
// example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
uri = config.Config.Mongo.DBUri
} else {
//mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB
mongodbHosts := ""
for i, v := range config.Config.Mongo.DBAddress {
if i == len(config.Config.Mongo.DBAddress)-1 {
mongodbHosts += v
} else {
mongodbHosts += v + ","
}
}
if config.Config.Mongo.DBPassword != "" && config.Config.Mongo.DBUserName != "" {
// clientOpts := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018/?replicaSet=replset")
//mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
//uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin&replicaSet=replset",
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword, mongodbHosts,
config.Config.Mongo.DBDatabase, config.Config.Mongo.DBMaxPoolSize)
} else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.DBDatabase,
config.Config.Mongo.DBMaxPoolSize)
}
}
log.Println("start to init mongoDB:", uri)
mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
if err != nil {
time.Sleep(time.Duration(30) * time.Second)
mongoClient, err = mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
if err != nil {
panic(err.Error() + " mongo.Connect failed " + uri)
}
}
return mongoClient.Database(config.Config.Mongo.DBDatabase)
}
func GetCollection(mongoClient *mongo.Client) {
}
func CreateAllIndex(mongoClient *mongo.Client) {
// mongodb create index
if err := createMongoIndex(mongoClient, cSendLog, false, "send_id", "-send_time"); err != nil {
panic(err.Error() + " index create failed " + cSendLog + " send_id, -send_time")
}
if err := createMongoIndex(mongoClient, cChat, false, "uid"); err != nil {
fmt.Println(err.Error() + " index create failed " + cChat + " uid, please create index by yourself in field uid")
}
if err := createMongoIndex(mongoClient, cWorkMoment, true, "-create_time", "work_moment_id"); err != nil {
panic(err.Error() + "index create failed " + cWorkMoment + " -create_time, work_moment_id")
}
if err := createMongoIndex(mongoClient, cWorkMoment, true, "work_moment_id"); err != nil {
panic(err.Error() + "index create failed " + cWorkMoment + " work_moment_id ")
}
if err := createMongoIndex(mongoClient, cWorkMoment, false, "user_id", "-create_time"); err != nil {
panic(err.Error() + "index create failed " + cWorkMoment + "user_id, -create_time")
}
if err := createMongoIndex(mongoClient, cTag, false, "user_id", "-create_time"); err != nil {
panic(err.Error() + "index create failed " + cTag + " user_id, -create_time")
}
if err := createMongoIndex(mongoClient, cTag, true, "tag_id"); err != nil {
panic(err.Error() + "index create failed " + cTag + " tag_id")
}
}
func createMongoIndex(client *mongo.Client, collection string, isUnique bool, keys ...string) error {
db := client.Database(config.Config.Mongo.DBDatabase).Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
indexView := db.Indexes()
keysDoc := bsonx.Doc{}
// create composite indexes
for _, key := range keys {
if strings.HasPrefix(key, "-") {
keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1))
} else {
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
}
}
// create index
index := mongo.IndexModel{
Keys: keysDoc,
}
if isUnique == true {
index.Options = options.Index().SetUnique(true)
}
result, err := indexView.CreateOne(
context.Background(),
index,
opts,
)
if err != nil {
return utils.Wrap(err, result)
}
return nil
}
File diff suppressed because it is too large Load Diff
+1
View File
@@ -0,0 +1 @@
package unrelation
+164
View File
@@ -0,0 +1,164 @@
package unrelation
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/utils"
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"time"
)
const (
cSuperGroup = "super_group"
cUserToSuperGroup = "user_to_super_group"
)
type SuperGroupMgo struct {
mgoDB *mongo.Database
superGroupCollection *mongo.Collection
userToSuperGroupCollection *mongo.Collection
}
type SuperGroup struct {
GroupID string `bson:"group_id" json:"groupID"`
MemberIDList []string `bson:"member_id_list" json:"memberIDList"`
}
type UserToSuperGroup struct {
UserID string `bson:"user_id" json:"userID"`
GroupIDList []string `bson:"group_id_list" json:"groupIDList"`
}
func NewSuperGroupMgoDB(mgoDB *mongo.Database) *SuperGroupMgo {
return &SuperGroupMgo{mgoDB: mgoDB, superGroupCollection: mgoDB.Collection(cSuperGroup), userToSuperGroupCollection: mgoDB.Collection(cUserToSuperGroup)}
}
func (db *SuperGroupMgo) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error {
//ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
//c := db.mgoDB.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
err := sCtx.StartTransaction()
if err != nil {
return err
}
superGroup := SuperGroup{
GroupID: groupID,
MemberIDList: initMemberIDList,
}
_, err = db.superGroupCollection.InsertOne(sCtx, superGroup)
if err != nil {
_ = sCtx.AbortTransaction(ctx)
return err
}
upsert := true
opts := &options.UpdateOptions{
Upsert: &upsert,
}
for _, userID := range initMemberIDList {
_, err = db.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
if err != nil {
_ = sCtx.AbortTransaction(ctx)
return err
}
}
return sCtx.CommitTransaction(context.Background())
})
}
func (db *SuperGroupMgo) GetSuperGroup(ctx context.Context, groupID string) (*SuperGroup, error) {
superGroup := SuperGroup{}
err := db.superGroupCollection.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&superGroup)
return &superGroup, err
}
func (db *SuperGroupMgo) AddUserToSuperGroup(ctx context.Context, groupID string, userIDList []string) error {
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
_, err := db.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}})
if err != nil {
_ = sCtx.AbortTransaction(ctx)
return err
}
upsert := true
opts := &options.UpdateOptions{
Upsert: &upsert,
}
for _, userID := range userIDList {
_, err = db.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
if err != nil {
_ = sCtx.AbortTransaction(ctx)
return utils.Wrap(err, "transaction failed")
}
}
return sCtx.CommitTransaction(context.Background())
})
}
func (d *SuperGroupMgo) RemoverUserFromSuperGroup(groupID string, userIDList []string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
session, err := d.mongoClient.StartSession()
if err != nil {
return utils.Wrap(err, "start session failed")
}
defer session.EndSession(ctx)
sCtx := mongo.NewSessionContext(ctx, session)
_, err = c.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}})
if err != nil {
_ = session.AbortTransaction(ctx)
return utils.Wrap(err, "transaction failed")
}
err = d.RemoveGroupFromUser(ctx, sCtx, groupID, userIDList)
if err != nil {
_ = session.AbortTransaction(ctx)
return utils.Wrap(err, "transaction failed")
}
_ = session.CommitTransaction(ctx)
return err
}
func (d *SuperGroupMgo) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
var user UserToSuperGroup
_ = c.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user)
return user, nil
}
func (d *SuperGroupMgo) DeleteSuperGroup(groupID string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
session, err := d.mongoClient.StartSession()
if err != nil {
return utils.Wrap(err, "start session failed")
}
defer session.EndSession(ctx)
sCtx := mongo.NewSessionContext(ctx, session)
superGroup := &SuperGroup{}
result := c.FindOneAndDelete(sCtx, bson.M{"group_id": groupID})
err = result.Decode(superGroup)
if err != nil {
session.AbortTransaction(ctx)
return utils.Wrap(err, "transaction failed")
}
if err = d.RemoveGroupFromUser(ctx, sCtx, groupID, superGroup.MemberIDList); err != nil {
session.AbortTransaction(ctx)
return utils.Wrap(err, "transaction failed")
}
session.CommitTransaction(ctx)
return nil
}
func (d *SuperGroupMgo) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error {
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
_, err := c.UpdateOne(sCtx, bson.M{"user_id": bson.M{"$in": userIDList}}, bson.M{"$pull": bson.M{"group_id_list": groupID}})
if err != nil {
return utils.Wrap(err, "UpdateOne transaction failed")
}
return err
}