mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-07 02:26:00 +08:00
check login
This commit is contained in:
+20
-14
@@ -1,7 +1,6 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -100,19 +99,21 @@ type config struct {
|
||||
DBMaxOpenConns int `yaml:"dbMaxOpenConns"`
|
||||
DBMaxIdleConns int `yaml:"dbMaxIdleConns"`
|
||||
DBMaxLifeTime int `yaml:"dbMaxLifeTime"`
|
||||
LogLevel int `yaml:"logLevel"`
|
||||
SlowThreshold int `yaml:"slowThreshold"`
|
||||
}
|
||||
Mongo struct {
|
||||
DBUri string `yaml:"dbUri"`
|
||||
DBAddress string `yaml:"dbAddress"`
|
||||
DBDirect bool `yaml:"dbDirect"`
|
||||
DBTimeout int `yaml:"dbTimeout"`
|
||||
DBDatabase string `yaml:"dbDatabase"`
|
||||
DBSource string `yaml:"dbSource"`
|
||||
DBUserName string `yaml:"dbUserName"`
|
||||
DBPassword string `yaml:"dbPassword"`
|
||||
DBMaxPoolSize int `yaml:"dbMaxPoolSize"`
|
||||
DBRetainChatRecords int `yaml:"dbRetainChatRecords"`
|
||||
ChatRecordsClearTime string `yaml:"chatRecordsClearTime"`
|
||||
DBUri string `yaml:"dbUri"`
|
||||
DBAddress []string `yaml:"dbAddress"`
|
||||
DBDirect bool `yaml:"dbDirect"`
|
||||
DBTimeout int `yaml:"dbTimeout"`
|
||||
DBDatabase string `yaml:"dbDatabase"`
|
||||
DBSource string `yaml:"dbSource"`
|
||||
DBUserName string `yaml:"dbUserName"`
|
||||
DBPassword string `yaml:"dbPassword"`
|
||||
DBMaxPoolSize int `yaml:"dbMaxPoolSize"`
|
||||
DBRetainChatRecords int `yaml:"dbRetainChatRecords"`
|
||||
ChatRecordsClearTime string `yaml:"chatRecordsClearTime"`
|
||||
}
|
||||
Redis struct {
|
||||
DBAddress []string `yaml:"dbAddress"`
|
||||
@@ -157,6 +158,8 @@ type config struct {
|
||||
Etcd struct {
|
||||
EtcdSchema string `yaml:"etcdSchema"`
|
||||
EtcdAddr []string `yaml:"etcdAddr"`
|
||||
UserName string `yaml:"userName"`
|
||||
Password string `yaml:"password"`
|
||||
}
|
||||
Log struct {
|
||||
StorageLocation string `yaml:"storageLocation"`
|
||||
@@ -205,6 +208,8 @@ type config struct {
|
||||
Enable bool `yaml:"enable"`
|
||||
Intent string `yaml:"intent"`
|
||||
MasterSecret string `yaml:"masterSecret"`
|
||||
ChannelID string `yaml:"channelID"`
|
||||
ChannelName string `yaml:"channelName"`
|
||||
}
|
||||
Fcm struct {
|
||||
ServiceAccount string `yaml:"serviceAccount"`
|
||||
@@ -218,7 +223,9 @@ type config struct {
|
||||
}
|
||||
|
||||
Kafka struct {
|
||||
Ws2mschat struct {
|
||||
SASLUserName string `yaml:"SASLUserName"`
|
||||
SASLPassword string `yaml:"SASLPassword"`
|
||||
Ws2mschat struct {
|
||||
Addr []string `yaml:"addr"`
|
||||
Topic string `yaml:"topic"`
|
||||
}
|
||||
@@ -545,7 +552,6 @@ type PDefaultTips struct {
|
||||
|
||||
func init() {
|
||||
cfgName := os.Getenv("CONFIG_NAME")
|
||||
fmt.Println("GET IM DEFAULT CONFIG PATH :", Root, "ENV PATH:", cfgName)
|
||||
if len(cfgName) != 0 {
|
||||
bytes, err := ioutil.ReadFile(filepath.Join(cfgName, "config", "config.yaml"))
|
||||
if err != nil {
|
||||
|
||||
@@ -336,3 +336,5 @@ const LogFileName = "OpenIM.log"
|
||||
const StatisticsTimeInterval = 60
|
||||
|
||||
const MaxNotificationNum = 100
|
||||
|
||||
const CurrentVersion = "v2.3.3-rc0"
|
||||
|
||||
@@ -182,7 +182,6 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati
|
||||
for _, v := range seqList {
|
||||
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
|
||||
key := messageCache + userID + "_" + strconv.Itoa(int(v))
|
||||
|
||||
result, err := d.RDB.Get(context.Background(), key).Result()
|
||||
if err != nil {
|
||||
errResult = err
|
||||
@@ -205,7 +204,7 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati
|
||||
return seqMsg, failedSeqList, errResult
|
||||
}
|
||||
|
||||
func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error {
|
||||
func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) (error, int) {
|
||||
ctx := context.Background()
|
||||
pipe := d.RDB.Pipeline()
|
||||
var failedList []pbChat.MsgDataToMQ
|
||||
@@ -225,10 +224,10 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string,
|
||||
}
|
||||
}
|
||||
if len(failedList) != 0 {
|
||||
return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, operationID))
|
||||
return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, operationID)), len(failedList)
|
||||
}
|
||||
_, err := pipe.Exec(ctx)
|
||||
return err
|
||||
return err, 0
|
||||
}
|
||||
func (d *DataBases) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
+110
-104
@@ -4,6 +4,7 @@ import (
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/log"
|
||||
promePkg "Open_IM/pkg/common/prometheus"
|
||||
pbMsg "Open_IM/pkg/proto/msg"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
@@ -14,10 +15,6 @@ import (
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
func (d *DataBases) BatchDeleteChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) {
|
||||
|
||||
}
|
||||
|
||||
func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error {
|
||||
newTime := getCurrentTimestampByMill()
|
||||
if len(msgList) > GetSingleGocMsgNum() {
|
||||
@@ -85,10 +82,13 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
||||
sChat.Msg = msgListToMongo
|
||||
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
|
||||
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
||||
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
|
||||
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter)
|
||||
} else {
|
||||
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
|
||||
log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
@@ -101,9 +101,11 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
||||
sChat.Msg = msgListToMongoNext
|
||||
log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
||||
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
||||
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
|
||||
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter)
|
||||
}
|
||||
log.Debug(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
||||
return nil
|
||||
@@ -129,8 +131,10 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
|
||||
log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", insertID, err)
|
||||
}
|
||||
if err != nil && err != go_redis.Nil {
|
||||
promePkg.PromeInc(promePkg.SeqGetFailedCounter)
|
||||
return utils.Wrap(err, ""), 0
|
||||
}
|
||||
promePkg.PromeInc(promePkg.SeqGetSuccessCounter)
|
||||
|
||||
lastMaxSeq := currentMaxSeq
|
||||
for _, m := range msgList {
|
||||
@@ -142,9 +146,12 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
|
||||
log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", insertID, "seq: ", currentMaxSeq)
|
||||
}
|
||||
log.Debug(operationID, "SetMessageToCache ", insertID, len(msgList))
|
||||
err = d.SetMessageToCache(msgList, insertID, operationID)
|
||||
err, failedNum := d.SetMessageToCache(msgList, insertID, operationID)
|
||||
if err != nil {
|
||||
promePkg.PromeAdd(promePkg.MsgInsertRedisFailedCounter, failedNum)
|
||||
log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), insertID)
|
||||
} else {
|
||||
promePkg.PromeInc(promePkg.MsgInsertRedisSuccessCounter)
|
||||
}
|
||||
log.Debug(operationID, "batch to redis cost time ", getCurrentTimestampByMill()-newTime, insertID, len(msgList))
|
||||
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
||||
@@ -152,6 +159,11 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
|
||||
} else {
|
||||
err = d.SetUserMaxSeq(insertID, currentMaxSeq)
|
||||
}
|
||||
if err != nil {
|
||||
promePkg.PromeInc(promePkg.SeqSetFailedCounter)
|
||||
} else {
|
||||
promePkg.PromeInc(promePkg.SeqSetSuccessCounter)
|
||||
}
|
||||
return utils.Wrap(err, ""), lastMaxSeq
|
||||
}
|
||||
|
||||
@@ -171,106 +183,100 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
|
||||
// }
|
||||
// return nil, lastMaxSeq
|
||||
//}
|
||||
|
||||
func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error {
|
||||
newTime := getCurrentTimestampByMill()
|
||||
if len(msgList) > GetSingleGocMsgNum() {
|
||||
return errors.New("too large")
|
||||
}
|
||||
isInit := false
|
||||
currentMaxSeq, err := d.GetUserMaxSeq(userID)
|
||||
if err == nil {
|
||||
|
||||
} else if err == go_redis.Nil {
|
||||
isInit = true
|
||||
currentMaxSeq = 0
|
||||
} else {
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
var remain uint64
|
||||
//if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
|
||||
// remain = uint64(GetSingleGocMsgNum()-1) - (currentMaxSeq % uint64(GetSingleGocMsgNum()))
|
||||
//} else {
|
||||
// remain = uint64(GetSingleGocMsgNum()) - ((currentMaxSeq - (uint64(GetSingleGocMsgNum()) - 1)) % uint64(GetSingleGocMsgNum()))
|
||||
//}
|
||||
|
||||
blk0 := uint64(GetSingleGocMsgNum() - 1)
|
||||
if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
|
||||
remain = blk0 - currentMaxSeq
|
||||
} else {
|
||||
excludeBlk0 := currentMaxSeq - blk0
|
||||
remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum())
|
||||
}
|
||||
|
||||
insertCounter := uint64(0)
|
||||
msgListToMongo := make([]MsgInfo, 0)
|
||||
msgListToMongoNext := make([]MsgInfo, 0)
|
||||
seqUid := ""
|
||||
seqUidNext := ""
|
||||
log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList))
|
||||
//4998 remain ==1
|
||||
//4999
|
||||
for _, m := range msgList {
|
||||
log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
|
||||
currentMaxSeq++
|
||||
sMsg := MsgInfo{}
|
||||
sMsg.SendTime = m.MsgData.SendTime
|
||||
m.MsgData.Seq = uint32(currentMaxSeq)
|
||||
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
if isInit {
|
||||
msgListToMongoNext = append(msgListToMongoNext, sMsg)
|
||||
seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
|
||||
log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
||||
continue
|
||||
}
|
||||
if insertCounter < remain {
|
||||
msgListToMongo = append(msgListToMongo, sMsg)
|
||||
insertCounter++
|
||||
seqUid = getSeqUid(userID, uint32(currentMaxSeq))
|
||||
log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
||||
} else {
|
||||
msgListToMongoNext = append(msgListToMongoNext, sMsg)
|
||||
seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
|
||||
log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
||||
}
|
||||
}
|
||||
// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
|
||||
ctx := context.Background()
|
||||
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
|
||||
|
||||
if seqUid != "" {
|
||||
filter := bson.M{"uid": seqUid}
|
||||
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
|
||||
err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
|
||||
if err != nil {
|
||||
log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
}
|
||||
if seqUidNext != "" {
|
||||
filter := bson.M{"uid": seqUidNext}
|
||||
sChat := UserChat{}
|
||||
sChat.UID = seqUidNext
|
||||
sChat.Msg = msgListToMongoNext
|
||||
log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext)
|
||||
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
||||
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
}
|
||||
log.NewWarn(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
||||
return utils.Wrap(d.SetUserMaxSeq(userID, uint64(currentMaxSeq)), "")
|
||||
}
|
||||
//
|
||||
//func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error {
|
||||
// newTime := getCurrentTimestampByMill()
|
||||
// if len(msgList) > GetSingleGocMsgNum() {
|
||||
// return errors.New("too large")
|
||||
// }
|
||||
// isInit := false
|
||||
// currentMaxSeq, err := d.GetUserMaxSeq(userID)
|
||||
// if err == nil {
|
||||
//
|
||||
// } else if err == go_redis.Nil {
|
||||
// isInit = true
|
||||
// currentMaxSeq = 0
|
||||
// } else {
|
||||
// return utils.Wrap(err, "")
|
||||
// }
|
||||
// var remain uint64
|
||||
// //if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
|
||||
// // remain = uint64(GetSingleGocMsgNum()-1) - (currentMaxSeq % uint64(GetSingleGocMsgNum()))
|
||||
// //} else {
|
||||
// // remain = uint64(GetSingleGocMsgNum()) - ((currentMaxSeq - (uint64(GetSingleGocMsgNum()) - 1)) % uint64(GetSingleGocMsgNum()))
|
||||
// //}
|
||||
//
|
||||
// blk0 := uint64(GetSingleGocMsgNum() - 1)
|
||||
// if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
|
||||
// remain = blk0 - currentMaxSeq
|
||||
// } else {
|
||||
// excludeBlk0 := currentMaxSeq - blk0
|
||||
// remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum())
|
||||
// }
|
||||
//
|
||||
// insertCounter := uint64(0)
|
||||
// msgListToMongo := make([]MsgInfo, 0)
|
||||
// msgListToMongoNext := make([]MsgInfo, 0)
|
||||
// seqUid := ""
|
||||
// seqUidNext := ""
|
||||
// log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList))
|
||||
// //4998 remain ==1
|
||||
// //4999
|
||||
// for _, m := range msgList {
|
||||
// log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
|
||||
// currentMaxSeq++
|
||||
// sMsg := MsgInfo{}
|
||||
// sMsg.SendTime = m.MsgData.SendTime
|
||||
// m.MsgData.Seq = uint32(currentMaxSeq)
|
||||
// if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
|
||||
// return utils.Wrap(err, "")
|
||||
// }
|
||||
// if isInit {
|
||||
// msgListToMongoNext = append(msgListToMongoNext, sMsg)
|
||||
// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
|
||||
// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
||||
// continue
|
||||
// }
|
||||
// if insertCounter < remain {
|
||||
// msgListToMongo = append(msgListToMongo, sMsg)
|
||||
// insertCounter++
|
||||
// seqUid = getSeqUid(userID, uint32(currentMaxSeq))
|
||||
// log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
||||
// } else {
|
||||
// msgListToMongoNext = append(msgListToMongoNext, sMsg)
|
||||
// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
|
||||
// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
||||
// }
|
||||
// }
|
||||
// // ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
//
|
||||
// ctx := context.Background()
|
||||
// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
|
||||
//
|
||||
// if seqUid != "" {
|
||||
// filter := bson.M{"uid": seqUid}
|
||||
// log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
|
||||
// err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
|
||||
// if err != nil {
|
||||
// log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
||||
// return utils.Wrap(err, "")
|
||||
// }
|
||||
// }
|
||||
// if seqUidNext != "" {
|
||||
// filter := bson.M{"uid": seqUidNext}
|
||||
// sChat := UserChat{}
|
||||
// sChat.UID = seqUidNext
|
||||
// sChat.Msg = msgListToMongoNext
|
||||
// log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext)
|
||||
// if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
||||
// log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||
// return utils.Wrap(err, "")
|
||||
// }
|
||||
// }
|
||||
// log.NewWarn(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
||||
// return utils.Wrap(d.SetUserMaxSeq(userID, uint64(currentMaxSeq)), "")
|
||||
//}
|
||||
|
||||
//func (d *DataBases)setMessageToCache(msgList []*pbMsg.MsgDataToMQ, uid string) (err error) {
|
||||
//
|
||||
//}
|
||||
|
||||
func (d *DataBases) GetFromCacheAndInsertDB(msgUserIDPrefix string) {
|
||||
//get value from redis
|
||||
|
||||
//batch insert to db
|
||||
}
|
||||
|
||||
+19
-4
@@ -58,15 +58,30 @@ func init() {
|
||||
// example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
|
||||
uri = config.Config.Mongo.DBUri
|
||||
} else {
|
||||
//mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB
|
||||
mongodbHosts := ""
|
||||
for i, v := range config.Config.Mongo.DBAddress {
|
||||
if i == len(config.Config.Mongo.DBAddress)-1 {
|
||||
mongodbHosts += v
|
||||
} else {
|
||||
mongodbHosts += v + ","
|
||||
}
|
||||
}
|
||||
|
||||
if config.Config.Mongo.DBPassword != "" && config.Config.Mongo.DBUserName != "" {
|
||||
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword, config.Config.Mongo.DBAddress,
|
||||
// clientOpts := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018/?replicaSet=replset")
|
||||
//mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
|
||||
//uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin&replicaSet=replset",
|
||||
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
|
||||
config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword, mongodbHosts,
|
||||
config.Config.Mongo.DBDatabase, config.Config.Mongo.DBMaxPoolSize)
|
||||
} else {
|
||||
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
|
||||
config.Config.Mongo.DBAddress, config.Config.Mongo.DBDatabase,
|
||||
mongodbHosts, config.Config.Mongo.DBDatabase,
|
||||
config.Config.Mongo.DBMaxPoolSize)
|
||||
}
|
||||
}
|
||||
|
||||
mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
|
||||
if err != nil {
|
||||
fmt.Println(" mongo.Connect failed, try ", utils.GetSelfFuncName(), err.Error(), uri)
|
||||
@@ -77,7 +92,7 @@ func init() {
|
||||
panic(err1.Error())
|
||||
}
|
||||
}
|
||||
fmt.Println("0", utils.GetSelfFuncName(), "mongo driver client init success: ", uri)
|
||||
fmt.Println("mongo driver client init success: ", uri)
|
||||
// mongodb create index
|
||||
if err := createMongoIndex(mongoClient, cSendLog, false, "send_id", "-send_time"); err != nil {
|
||||
fmt.Println("send_id", "-send_time", "index create failed", err.Error())
|
||||
@@ -100,7 +115,7 @@ func init() {
|
||||
if err := createMongoIndex(mongoClient, cTag, true, "tag_id"); err != nil {
|
||||
fmt.Println("tag_id", "index create failed", err.Error())
|
||||
}
|
||||
fmt.Println("create index success")
|
||||
fmt.Println("createMongoIndex success")
|
||||
DB.mongoClient = mongoClient
|
||||
|
||||
// redis pool init
|
||||
|
||||
+18
-11
@@ -23,6 +23,7 @@ func (w Writer) Printf(format string, args ...interface{}) {
|
||||
}
|
||||
|
||||
func initMysqlDB() {
|
||||
fmt.Println("init mysqlDB start")
|
||||
//When there is no open IM database, connect to the mysql built-in database to create openIM database
|
||||
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
|
||||
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql")
|
||||
@@ -30,42 +31,43 @@ func initMysqlDB() {
|
||||
var err1 error
|
||||
db, err := gorm.Open(mysql.Open(dsn), nil)
|
||||
if err != nil {
|
||||
fmt.Println("0", "Open failed ", err.Error(), dsn)
|
||||
fmt.Println("Open failed ", err.Error(), dsn)
|
||||
}
|
||||
if err != nil {
|
||||
time.Sleep(time.Duration(30) * time.Second)
|
||||
db, err1 = gorm.Open(mysql.Open(dsn), nil)
|
||||
if err1 != nil {
|
||||
fmt.Println("0", "Open failed ", err1.Error(), dsn)
|
||||
fmt.Println("Open failed ", err1.Error(), dsn)
|
||||
panic(err1.Error())
|
||||
}
|
||||
}
|
||||
|
||||
//Check the database and table during initialization
|
||||
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8 COLLATE utf8_general_ci;", config.Config.Mysql.DBDatabaseName)
|
||||
fmt.Println("exec sql: ", sql, " begin")
|
||||
err = db.Exec(sql).Error
|
||||
if err != nil {
|
||||
fmt.Println("0", "Exec failed ", err.Error(), sql)
|
||||
fmt.Println("Exec failed ", err.Error(), sql)
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
fmt.Println("exec sql: ", sql, " end")
|
||||
dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
|
||||
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName)
|
||||
|
||||
newLogger := logger.New(
|
||||
Writer{},
|
||||
logger.Config{
|
||||
SlowThreshold: 200 * time.Millisecond, // Slow SQL threshold
|
||||
LogLevel: logger.Warn, // Log level
|
||||
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
|
||||
Colorful: true, // Disable color
|
||||
SlowThreshold: time.Duration(config.Config.Mysql.SlowThreshold) * time.Millisecond, // Slow SQL threshold
|
||||
LogLevel: logger.LogLevel(config.Config.Mysql.LogLevel), // Log level
|
||||
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
|
||||
Colorful: true, // Disable color
|
||||
},
|
||||
)
|
||||
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
|
||||
Logger: newLogger,
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Println("0", "Open failed ", err.Error(), dsn)
|
||||
fmt.Println("Open failed ", err.Error(), dsn)
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
@@ -78,7 +80,7 @@ func initMysqlDB() {
|
||||
sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns)
|
||||
sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns)
|
||||
|
||||
fmt.Println("open db ok ", dsn)
|
||||
fmt.Println("open mysql ok ", dsn)
|
||||
db.AutoMigrate(
|
||||
&Register{},
|
||||
&Friend{},
|
||||
@@ -88,7 +90,7 @@ func initMysqlDB() {
|
||||
&GroupRequest{},
|
||||
&User{},
|
||||
&Black{}, &ChatLog{}, &Register{}, &Conversation{}, &AppVersion{}, &Department{}, &BlackList{}, &IpLimit{}, &UserIpLimit{}, &Invitation{}, &RegisterAddFriend{},
|
||||
&ClientInitConfig{})
|
||||
&ClientInitConfig{}, &UserIpRecord{})
|
||||
db.Set("gorm:table_options", "CHARSET=utf8")
|
||||
db.Set("gorm:table_options", "collation=utf8_unicode_ci")
|
||||
|
||||
@@ -179,6 +181,11 @@ func initMysqlDB() {
|
||||
db.Migrator().CreateTable(&ClientInitConfig{})
|
||||
}
|
||||
|
||||
if !db.Migrator().HasTable(&UserIpRecord{}) {
|
||||
fmt.Println("CreateTable Friend")
|
||||
db.Migrator().CreateTable(&UserIpRecord{})
|
||||
}
|
||||
|
||||
DB.MysqlDB.db = db
|
||||
return
|
||||
}
|
||||
|
||||
@@ -2,7 +2,10 @@ package im_mysql_model
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/db"
|
||||
"Open_IM/pkg/utils"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func IsLimitRegisterIp(RegisterIp string) (bool, error) {
|
||||
@@ -23,13 +26,22 @@ func IsLimitLoginIp(LoginIp string) (bool, error) {
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func IsLimitUserLoginIp(userID string, LoginIp string) (bool, error) {
|
||||
func IsLimitUserLoginIp(userID string, loginIp string) (limit bool, err error) {
|
||||
//如果已经存在则放行
|
||||
var count int64
|
||||
if err := db.DB.MysqlDB.DefaultGormDB().Table("user_ip_limits").Where("ip=? and user_id=?", LoginIp, userID).Count(&count).Error; err != nil {
|
||||
return false, err
|
||||
result := db.DB.MysqlDB.DefaultGormDB().Table("user_ip_limits").Where("user_id=?", userID).Count(&count)
|
||||
if err := result.Error; err != nil {
|
||||
return true, err
|
||||
}
|
||||
return count == 0, nil
|
||||
if count < 1 {
|
||||
return false, nil
|
||||
}
|
||||
result = db.DB.MysqlDB.DefaultGormDB().Table("user_ip_limits").Where("user_id=? and ip = ?", userID, loginIp).Count(&count)
|
||||
if err := result.Error; err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func QueryIPLimits(ip string) (*db.IpLimit, error) {
|
||||
@@ -83,12 +95,12 @@ func InsertIpRecord(userID, createIp string) error {
|
||||
|
||||
func UpdateIpReocord(userID, ip string) (err error) {
|
||||
record := &db.UserIpRecord{UserID: userID, LastLoginIp: ip, LastLoginTime: time.Now()}
|
||||
result := db.DB.MysqlDB.DefaultGormDB().Model(&db.UserIpRecord{}).Where("user_id=?", userID).Updates(record).Updates("login_times = login_times + 1")
|
||||
result := db.DB.MysqlDB.DefaultGormDB().Model(&db.UserIpRecord{}).Where("user_id=?", userID).Updates(record).Update("login_times", gorm.Expr("login_times+?", 1))
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
return utils.Wrap(result.Error, "")
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
err = InsertIpRecord(userID, ip)
|
||||
}
|
||||
return err
|
||||
return utils.Wrap(err, "")
|
||||
}
|
||||
|
||||
@@ -12,11 +12,9 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
//init managers
|
||||
for k, v := range config.Config.Manager.AppManagerUid {
|
||||
user, err := GetUserByUserID(v)
|
||||
_, err := GetUserByUserID(v)
|
||||
if err != nil {
|
||||
fmt.Println("GetUserByUserID failed ", err.Error(), v, user)
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
@@ -30,9 +28,10 @@ func init() {
|
||||
appMgr.AppMangerLevel = constant.AppAdmin
|
||||
err = UserRegister(appMgr)
|
||||
if err != nil {
|
||||
fmt.Println("AppManager insert error", err.Error(), appMgr, "time: ", appMgr.Birth.Unix())
|
||||
fmt.Println("AppManager insert error ", err.Error(), appMgr)
|
||||
} else {
|
||||
fmt.Println("AppManager insert ", appMgr)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ func Test_NewSetMessageToCache(t *testing.T) {
|
||||
data.AtUserIDList = []string{"1212", "23232"}
|
||||
msg.MsgData = &data
|
||||
messageList := []*pbChat.MsgDataToMQ{&msg}
|
||||
err := DB.SetMessageToCache(messageList, uid, "cacheTest")
|
||||
err, _ := DB.SetMessageToCache(messageList, uid, "cacheTest")
|
||||
assert.Nil(t, err)
|
||||
|
||||
}
|
||||
|
||||
@@ -433,6 +433,13 @@ func DelJoinedSuperGroupIDListFromCache(userID string) error {
|
||||
|
||||
func GetGroupMemberListHashFromCache(groupID string) (uint64, error) {
|
||||
generateHash := func() (string, error) {
|
||||
groupInfo, err := GetGroupInfoFromCache(groupID)
|
||||
if err != nil {
|
||||
return "0", utils.Wrap(err, "GetGroupInfoFromCache failed")
|
||||
}
|
||||
if groupInfo.Status == constant.GroupStatusDismissed {
|
||||
return "0", nil
|
||||
}
|
||||
groupMemberIDList, err := GetGroupMemberIDListFromCache(groupID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "GetGroupMemberIDListFromCache failed")
|
||||
@@ -447,6 +454,9 @@ func GetGroupMemberListHashFromCache(groupID string) (uint64, error) {
|
||||
return strconv.Itoa(int(bi.Uint64())), nil
|
||||
}
|
||||
hashCode, err := db.DB.Rc.Fetch(groupMemberListHashCache+groupID, time.Second*30*60, generateHash)
|
||||
if err != nil {
|
||||
return 0, utils.Wrap(err, "fetch failed")
|
||||
}
|
||||
hashCodeUint64, err := strconv.Atoi(hashCode)
|
||||
return uint64(hashCodeUint64), err
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"github.com/Shopify/sarama"
|
||||
"Open_IM/pkg/common/config"
|
||||
"sync"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
@@ -17,8 +19,13 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
|
||||
p := Consumer{}
|
||||
p.Topic = topic
|
||||
p.addr = addr
|
||||
|
||||
consumer, err := sarama.NewConsumer(p.addr, nil)
|
||||
consumerConfig := sarama.NewConfig()
|
||||
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" {
|
||||
consumerConfig.Net.SASL.Enable = true
|
||||
consumerConfig.Net.SASL.User = config.Config.Kafka.SASLUserName
|
||||
consumerConfig.Net.SASL.Password = config.Config.Kafka.SASLPassword
|
||||
}
|
||||
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
return nil
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
log "Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/utils"
|
||||
"errors"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
promePkg "Open_IM/pkg/common/prometheus"
|
||||
)
|
||||
|
||||
type Producer struct {
|
||||
@@ -22,7 +26,11 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
||||
p.config.Producer.Return.Errors = true
|
||||
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
|
||||
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
|
||||
|
||||
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" {
|
||||
p.config.Net.SASL.Enable = true
|
||||
p.config.Net.SASL.User = config.Config.Kafka.SASLUserName
|
||||
p.config.Net.SASL.Password = config.Config.Kafka.SASLPassword
|
||||
}
|
||||
p.addr = addr
|
||||
p.topic = topic
|
||||
|
||||
@@ -57,5 +65,8 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string)
|
||||
}
|
||||
a, b, c := p.producer.SendMessage(kMsg)
|
||||
log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
|
||||
if c == nil {
|
||||
promePkg.PromeInc(promePkg.SendMsgCounter)
|
||||
}
|
||||
return a, b, utils.Wrap(c, "")
|
||||
}
|
||||
|
||||
@@ -0,0 +1,426 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
var (
|
||||
//auth rpc
|
||||
UserLoginCounter prometheus.Counter
|
||||
UserRegisterCounter prometheus.Counter
|
||||
|
||||
//seg
|
||||
SeqGetSuccessCounter prometheus.Counter
|
||||
SeqGetFailedCounter prometheus.Counter
|
||||
SeqSetSuccessCounter prometheus.Counter
|
||||
SeqSetFailedCounter prometheus.Counter
|
||||
|
||||
//msg-db
|
||||
MsgInsertRedisSuccessCounter prometheus.Counter
|
||||
MsgInsertRedisFailedCounter prometheus.Counter
|
||||
MsgInsertMongoSuccessCounter prometheus.Counter
|
||||
MsgInsertMongoFailedCounter prometheus.Counter
|
||||
MsgPullFromRedisSuccessCounter prometheus.Counter
|
||||
MsgPullFromRedisFailedCounter prometheus.Counter
|
||||
MsgPullFromMongoSuccessCounter prometheus.Counter
|
||||
MsgPullFromMongoFailedCounter prometheus.Counter
|
||||
|
||||
//msg-ws
|
||||
MsgRecvTotalCounter prometheus.Counter
|
||||
GetNewestSeqTotalCounter prometheus.Counter
|
||||
PullMsgBySeqListTotalCounter prometheus.Counter
|
||||
|
||||
SingleChatMsgRecvSuccessCounter prometheus.Counter
|
||||
GroupChatMsgRecvSuccessCounter prometheus.Counter
|
||||
WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter
|
||||
OnlineUserGauge prometheus.Gauge
|
||||
|
||||
//msg-msg
|
||||
SingleChatMsgProcessSuccessCounter prometheus.Counter
|
||||
SingleChatMsgProcessFailedCounter prometheus.Counter
|
||||
GroupChatMsgProcessSuccessCounter prometheus.Counter
|
||||
GroupChatMsgProcessFailedCounter prometheus.Counter
|
||||
WorkSuperGroupChatMsgProcessSuccessCounter prometheus.Counter
|
||||
WorkSuperGroupChatMsgProcessFailedCounter prometheus.Counter
|
||||
|
||||
//msg-push
|
||||
MsgOnlinePushSuccessCounter prometheus.Counter
|
||||
MsgOfflinePushSuccessCounter prometheus.Counter
|
||||
MsgOfflinePushFailedCounter prometheus.Counter
|
||||
// api
|
||||
ApiRequestCounter prometheus.Counter
|
||||
ApiRequestSuccessCounter prometheus.Counter
|
||||
ApiRequestFailedCounter prometheus.Counter
|
||||
|
||||
// grpc
|
||||
GrpcRequestCounter prometheus.Counter
|
||||
GrpcRequestSuccessCounter prometheus.Counter
|
||||
GrpcRequestFailedCounter prometheus.Counter
|
||||
|
||||
SendMsgCounter prometheus.Counter
|
||||
)
|
||||
|
||||
func NewUserLoginCounter() {
|
||||
if UserLoginCounter != nil {
|
||||
return
|
||||
}
|
||||
UserLoginCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "user_login",
|
||||
Help: "The number of user login",
|
||||
})
|
||||
}
|
||||
func NewUserRegisterCounter() {
|
||||
if UserRegisterCounter != nil {
|
||||
return
|
||||
}
|
||||
UserRegisterCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "user_register",
|
||||
Help: "The number of user register",
|
||||
})
|
||||
}
|
||||
|
||||
func NewSeqGetSuccessCounter() {
|
||||
if SeqGetSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
SeqGetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "seq_get_success",
|
||||
Help: "The number of successful get seq",
|
||||
})
|
||||
}
|
||||
func NewSeqGetFailedCounter() {
|
||||
if SeqGetFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
SeqGetFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "seq_get_failed",
|
||||
Help: "The number of failed get seq",
|
||||
})
|
||||
}
|
||||
|
||||
func NewSeqSetSuccessCounter() {
|
||||
if SeqSetSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
SeqSetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "seq_set_success",
|
||||
Help: "The number of successful set seq",
|
||||
})
|
||||
}
|
||||
func NewSeqSetFailedCounter() {
|
||||
if SeqSetFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
SeqSetFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "seq_set_failed",
|
||||
Help: "The number of failed set seq",
|
||||
})
|
||||
}
|
||||
|
||||
func NewApiRequestCounter() {
|
||||
if ApiRequestCounter != nil {
|
||||
return
|
||||
}
|
||||
ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "api_request",
|
||||
Help: "The number of api request",
|
||||
})
|
||||
}
|
||||
|
||||
func NewApiRequestSuccessCounter() {
|
||||
if ApiRequestSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "api_request_success",
|
||||
Help: "The number of api request success",
|
||||
})
|
||||
}
|
||||
|
||||
func NewApiRequestFailedCounter() {
|
||||
if ApiRequestFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "api_request_failed",
|
||||
Help: "The number of api request failed",
|
||||
})
|
||||
}
|
||||
|
||||
func NewGrpcRequestCounter() {
|
||||
if GrpcRequestCounter != nil {
|
||||
return
|
||||
}
|
||||
GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "grpc_request",
|
||||
Help: "The number of api request",
|
||||
})
|
||||
}
|
||||
|
||||
func NewGrpcRequestSuccessCounter() {
|
||||
if GrpcRequestSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "grpc_request_success",
|
||||
Help: "The number of grpc request success",
|
||||
})
|
||||
}
|
||||
|
||||
func NewGrpcRequestFailedCounter() {
|
||||
if GrpcRequestFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "grpc_request_failed",
|
||||
Help: "The number of grpc request failed",
|
||||
})
|
||||
}
|
||||
|
||||
func NewSendMsgCount() {
|
||||
if SendMsgCounter != nil {
|
||||
return
|
||||
}
|
||||
SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "send_msg",
|
||||
Help: "The number of send msg",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgInsertRedisSuccessCounter() {
|
||||
if MsgInsertRedisSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgInsertRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_insert_redis_success",
|
||||
Help: "The number of successful insert msg to redis",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgInsertRedisFailedCounter() {
|
||||
if MsgInsertRedisFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgInsertRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_insert_redis_failed",
|
||||
Help: "The number of failed insert msg to redis",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgInsertMongoSuccessCounter() {
|
||||
if MsgInsertMongoSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgInsertMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_insert_mongo_success",
|
||||
Help: "The number of successful insert msg to mongo",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgInsertMongoFailedCounter() {
|
||||
if MsgInsertMongoFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgInsertMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_insert_mongo_failed",
|
||||
Help: "The number of failed insert msg to mongo",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgPullFromRedisSuccessCounter() {
|
||||
if MsgPullFromRedisSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgPullFromRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_pull_from_redis_success",
|
||||
Help: "The number of successful pull msg from redis",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgPullFromRedisFailedCounter() {
|
||||
if MsgPullFromRedisFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgPullFromRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_pull_from_redis_failed",
|
||||
Help: "The number of failed pull msg from redis",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgPullFromMongoSuccessCounter() {
|
||||
if MsgPullFromMongoSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgPullFromMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_pull_from_mongo_success",
|
||||
Help: "The number of successful pull msg from mongo",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgPullFromMongoFailedCounter() {
|
||||
if MsgPullFromMongoFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgPullFromMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_pull_from_mongo_failed",
|
||||
Help: "The number of failed pull msg from mongo",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgRecvTotalCounter() {
|
||||
if MsgRecvTotalCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgRecvTotalCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_recv_total",
|
||||
Help: "The number of msg received",
|
||||
})
|
||||
}
|
||||
|
||||
func NewGetNewestSeqTotalCounter() {
|
||||
if GetNewestSeqTotalCounter != nil {
|
||||
return
|
||||
}
|
||||
GetNewestSeqTotalCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "get_newest_seq_total",
|
||||
Help: "the number of get newest seq",
|
||||
})
|
||||
}
|
||||
func NewPullMsgBySeqListTotalCounter() {
|
||||
if PullMsgBySeqListTotalCounter != nil {
|
||||
return
|
||||
}
|
||||
PullMsgBySeqListTotalCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "pull_msg_by_seq_list_total",
|
||||
Help: "The number of pull msg by seq list",
|
||||
})
|
||||
}
|
||||
|
||||
func NewSingleChatMsgRecvSuccessCounter() {
|
||||
if SingleChatMsgRecvSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
SingleChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "single_chat_msg_recv_success",
|
||||
Help: "The number of single chat msg successful received ",
|
||||
})
|
||||
}
|
||||
|
||||
func NewGroupChatMsgRecvSuccessCounter() {
|
||||
if GroupChatMsgRecvSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
GroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "group_chat_msg_recv_success",
|
||||
Help: "The number of group chat msg successful received",
|
||||
})
|
||||
}
|
||||
|
||||
func NewWorkSuperGroupChatMsgRecvSuccessCounter() {
|
||||
if WorkSuperGroupChatMsgRecvSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
WorkSuperGroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "work_super_group_chat_msg_recv_success",
|
||||
Help: "The number of work/super group chat msg successful received",
|
||||
})
|
||||
}
|
||||
|
||||
func NewOnlineUserGauges() {
|
||||
if OnlineUserGauge != nil {
|
||||
return
|
||||
}
|
||||
OnlineUserGauge = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "online_user_num",
|
||||
Help: "The number of online user num",
|
||||
})
|
||||
}
|
||||
|
||||
func NewSingleChatMsgProcessSuccessCounter() {
|
||||
if SingleChatMsgProcessSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
SingleChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "single_chat_msg_process_success",
|
||||
Help: "The number of single chat msg successful processed",
|
||||
})
|
||||
}
|
||||
|
||||
func NewSingleChatMsgProcessFailedCounter() {
|
||||
if SingleChatMsgProcessFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
SingleChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "single_chat_msg_process_failed",
|
||||
Help: "The number of single chat msg failed processed",
|
||||
})
|
||||
}
|
||||
|
||||
func NewGroupChatMsgProcessSuccessCounter() {
|
||||
if GroupChatMsgProcessSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
GroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "group_chat_msg_process_success",
|
||||
Help: "The number of group chat msg successful processed",
|
||||
})
|
||||
}
|
||||
|
||||
func NewGroupChatMsgProcessFailedCounter() {
|
||||
if GroupChatMsgProcessFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
GroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "group_chat_msg_process_failed",
|
||||
Help: "The number of group chat msg failed processed",
|
||||
})
|
||||
}
|
||||
|
||||
func NewWorkSuperGroupChatMsgProcessSuccessCounter() {
|
||||
if WorkSuperGroupChatMsgProcessSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
WorkSuperGroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "work_super_group_chat_msg_process_success",
|
||||
Help: "The number of work/super group chat msg successful processed",
|
||||
})
|
||||
}
|
||||
func NewWorkSuperGroupChatMsgProcessFailedCounter() {
|
||||
if WorkSuperGroupChatMsgProcessFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
WorkSuperGroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "work_super_group_chat_msg_process_failed",
|
||||
Help: "The number of work/super group chat msg failed processed",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgOnlinePushSuccessCounter() {
|
||||
if MsgOnlinePushSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgOnlinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_online_push_success",
|
||||
Help: "The number of msg successful online pushed",
|
||||
})
|
||||
}
|
||||
|
||||
func NewMsgOfflinePushSuccessCounter() {
|
||||
if MsgOfflinePushSuccessCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgOfflinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_offline_push_success",
|
||||
Help: "The number of msg successful offline pushed",
|
||||
})
|
||||
}
|
||||
func NewMsgOfflinePushFailedCounter() {
|
||||
if MsgOfflinePushFailedCounter != nil {
|
||||
return
|
||||
}
|
||||
MsgOfflinePushFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "msg_offline_push_failed",
|
||||
Help: "The number of msg failed offline pushed",
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"Open_IM/pkg/common/log"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/peer"
|
||||
)
|
||||
|
||||
func UnaryServerInterceptorProme(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
remote, _ := peer.FromContext(ctx)
|
||||
remoteAddr := remote.Addr.String()
|
||||
|
||||
in, _ := json.Marshal(req)
|
||||
inStr := string(in)
|
||||
log.NewInfo("ip", remoteAddr, "access_start", info.FullMethod, "in", inStr)
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
out, _ := json.Marshal(resp)
|
||||
outStr := string(out)
|
||||
duration := int64(time.Since(start) / time.Millisecond)
|
||||
if duration >= 500 {
|
||||
log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration)
|
||||
} else {
|
||||
log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration)
|
||||
}
|
||||
}()
|
||||
resp, err = handler(ctx, req)
|
||||
return
|
||||
}
|
||||
@@ -2,10 +2,12 @@ package prometheus
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
"bytes"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
@@ -24,3 +26,57 @@ func PrometheusHandler() gin.HandlerFunc {
|
||||
h.ServeHTTP(c.Writer, c.Request)
|
||||
}
|
||||
}
|
||||
|
||||
type responseBodyWriter struct {
|
||||
gin.ResponseWriter
|
||||
body *bytes.Buffer
|
||||
}
|
||||
|
||||
func (r responseBodyWriter) Write(b []byte) (int, error) {
|
||||
r.body.Write(b)
|
||||
return r.ResponseWriter.Write(b)
|
||||
}
|
||||
|
||||
func PromeTheusMiddleware(c *gin.Context) {
|
||||
PromeInc(ApiRequestCounter)
|
||||
w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer}
|
||||
c.Writer = w
|
||||
c.Next()
|
||||
if c.Writer.Status() == http.StatusOK {
|
||||
PromeInc(ApiRequestSuccessCounter)
|
||||
} else {
|
||||
PromeInc(ApiRequestFailedCounter)
|
||||
}
|
||||
}
|
||||
|
||||
func PromeInc(counter prometheus.Counter) {
|
||||
if config.Config.Prometheus.Enable {
|
||||
if counter != nil {
|
||||
counter.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func PromeAdd(counter prometheus.Counter, add int) {
|
||||
if config.Config.Prometheus.Enable {
|
||||
if counter != nil {
|
||||
counter.Add(float64(add))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func PromeGaugeInc(gauges prometheus.Gauge) {
|
||||
if config.Config.Prometheus.Enable {
|
||||
if gauges != nil {
|
||||
gauges.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func PromeGaugeDec(gauges prometheus.Gauge) {
|
||||
if config.Config.Prometheus.Enable {
|
||||
if gauges != nil {
|
||||
gauges.Dec()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user