mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-12 04:55:59 +08:00
Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release
This commit is contained in:
@@ -3,6 +3,7 @@ package admin
|
||||
import (
|
||||
apiStruct "Open_IM/pkg/cms_api_struct"
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
pbAdmin "Open_IM/pkg/proto/admin_cms"
|
||||
@@ -25,6 +26,7 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.NewPrivateLog(constant.LogFileName)
|
||||
operationID := utils.OperationIDGenerator()
|
||||
log.NewInfo(operationID, utils.GetSelfFuncName(), "minio config: ", config.Config.Credential.Minio)
|
||||
var initUrl string
|
||||
|
||||
@@ -18,7 +18,8 @@ import (
|
||||
|
||||
func NewGinRouter() *gin.Engine {
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
baseRouter := gin.Default()
|
||||
baseRouter := gin.New()
|
||||
baseRouter.Use()
|
||||
if config.Config.Prometheus.Enable {
|
||||
baseRouter.GET("/metrics", promePkg.PrometheusHandler())
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ func (pc *PersistentConsumerHandler) Init() {
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) initPrometheus() {
|
||||
// counter
|
||||
msgInsertMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "insert_mysql_msg_total",
|
||||
Help: "The total number of msg insert mysql events",
|
||||
@@ -52,6 +53,17 @@ func (pc *PersistentConsumerHandler) initPrometheus() {
|
||||
Name: "insert_mysql_failed_msg_total",
|
||||
Help: "The total number of msg insert mysql events",
|
||||
})
|
||||
|
||||
// 启动计时器
|
||||
// requestDurations := prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
// Name: "http_request_duration_seconds",
|
||||
// Help: "A histogram of the HTTP request durations in seconds.",
|
||||
// Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
|
||||
// })
|
||||
// 开始
|
||||
// timer := prometheus.NewTimer(requestDurations)
|
||||
// 停止
|
||||
// timer.ObserveDuration()
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
@@ -85,7 +97,9 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
|
||||
log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg))
|
||||
if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil {
|
||||
log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
|
||||
msgInsertFailedMysqlCounter.Inc()
|
||||
if config.Config.Prometheus.Enable {
|
||||
msgInsertFailedMysqlCounter.Inc()
|
||||
}
|
||||
return
|
||||
}
|
||||
if config.Config.Prometheus.Enable {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"Open_IM/pkg/common/db"
|
||||
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
||||
"Open_IM/pkg/common/log"
|
||||
promePkg "Open_IM/pkg/common/prometheus"
|
||||
"Open_IM/pkg/common/token_verify"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
pbAuth "Open_IM/pkg/proto/auth"
|
||||
@@ -21,6 +22,11 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func (rpc *rpcAuth) initPrometheus() {
|
||||
promePkg.NewUserLoginCounter()
|
||||
promePkg.NewUserRegisterCounter()
|
||||
}
|
||||
|
||||
func (rpc *rpcAuth) UserRegister(_ context.Context, req *pbAuth.UserRegisterReq) (*pbAuth.UserRegisterResp, error) {
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String())
|
||||
var user db.User
|
||||
@@ -35,6 +41,7 @@ func (rpc *rpcAuth) UserRegister(_ context.Context, req *pbAuth.UserRegisterReq)
|
||||
log.NewError(req.OperationID, errMsg, user)
|
||||
return &pbAuth.UserRegisterResp{CommonResp: &pbAuth.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: errMsg}}, nil
|
||||
}
|
||||
promePkg.PromeInc(promePkg.UserRegisterCounter)
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc return ", pbAuth.UserRegisterResp{CommonResp: &pbAuth.CommonResp{}})
|
||||
return &pbAuth.UserRegisterResp{CommonResp: &pbAuth.CommonResp{}}, nil
|
||||
}
|
||||
@@ -47,6 +54,7 @@ func (rpc *rpcAuth) UserToken(_ context.Context, req *pbAuth.UserTokenReq) (*pbA
|
||||
log.NewError(req.OperationID, errMsg)
|
||||
return &pbAuth.UserTokenResp{CommonResp: &pbAuth.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: errMsg}}, nil
|
||||
}
|
||||
promePkg.PromeInc(promePkg.UserLoginCounter)
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc return ", pbAuth.UserTokenResp{CommonResp: &pbAuth.CommonResp{}, Token: tokens, ExpiredTime: expTime})
|
||||
return &pbAuth.UserTokenResp{CommonResp: &pbAuth.CommonResp{}, Token: tokens, ExpiredTime: expTime}, nil
|
||||
}
|
||||
@@ -141,6 +149,9 @@ func (rpc *rpcAuth) Run() {
|
||||
|
||||
}
|
||||
log.NewInfo(operationID, "RegisterAuthServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName)
|
||||
if config.Config.Prometheus.Enable {
|
||||
rpc.initPrometheus()
|
||||
}
|
||||
err = srv.Serve(listener)
|
||||
if err != nil {
|
||||
log.NewError(operationID, "Serve failed ", err.Error())
|
||||
|
||||
@@ -1552,6 +1552,9 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou
|
||||
if err := rocksCache.DelGroupInfoFromCache(req.GroupID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
|
||||
}
|
||||
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
|
||||
}
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
|
||||
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
|
||||
}
|
||||
|
||||
@@ -9,10 +9,18 @@ import (
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
"Open_IM/pkg/proto/msg"
|
||||
"Open_IM/pkg/utils"
|
||||
"google.golang.org/grpc"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var (
|
||||
sendMsgSuccessCounter prometheus.Counter
|
||||
sendMsgFailedCounter prometheus.Counter
|
||||
)
|
||||
|
||||
type rpcChat struct {
|
||||
@@ -46,6 +54,17 @@ func NewRpcChatServer(port int) *rpcChat {
|
||||
return &rc
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) initPrometheus() {
|
||||
sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "send_msg_success",
|
||||
Help: "The number of send msg success",
|
||||
})
|
||||
sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "send_msg_failed",
|
||||
Help: "The number of send msg failed",
|
||||
})
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) Run() {
|
||||
log.Info("", "rpcChat init...")
|
||||
listenIP := ""
|
||||
@@ -78,6 +97,9 @@ func (rpc *rpcChat) Run() {
|
||||
panic(utils.Wrap(err, "register chat module rpc to etcd err"))
|
||||
}
|
||||
go rpc.runCh()
|
||||
if config.Config.Prometheus.Enable {
|
||||
rpc.initPrometheus()
|
||||
}
|
||||
err = srv.Serve(listener)
|
||||
if err != nil {
|
||||
log.Error("", "rpc rpcChat failed ", err.Error())
|
||||
|
||||
@@ -30,7 +30,9 @@ import (
|
||||
//When the number of group members is greater than this value,Online users will be sent first,Guaranteed service availability
|
||||
const GroupMemberNum = 500
|
||||
|
||||
var ExcludeContentType = []int{constant.HasReadReceipt, constant.GroupHasReadReceipt}
|
||||
var (
|
||||
ExcludeContentType = []int{constant.HasReadReceipt, constant.GroupHasReadReceipt}
|
||||
)
|
||||
|
||||
type MsgCallBackReq struct {
|
||||
SendID string `json:"sendID"`
|
||||
|
||||
@@ -33,6 +33,13 @@ func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, err
|
||||
|
||||
CacheGroupMtx.Lock()
|
||||
defer CacheGroupMtx.Unlock()
|
||||
|
||||
if groupHashRemote == 0 {
|
||||
log.Info(operationID, "groupHashRemote == 0 ", groupID)
|
||||
delete(CacheGroupMemberUserIDList, groupID)
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID]
|
||||
if ok && groupInLocalCache.MemberListHash == groupHashRemote {
|
||||
log.Debug(operationID, "in local cache ", groupID)
|
||||
|
||||
Reference in New Issue
Block a user