This commit is contained in:
wangchuxiao
2022-09-12 19:32:24 +08:00
parent 692cebe659
commit ef66b10501
30 changed files with 275 additions and 44 deletions
+1 -1
View File
@@ -107,7 +107,7 @@ func UserToken(c *gin.Context) {
params := api.UserTokenReq{}
if err := c.BindJSON(&params); err != nil {
errMsg := " BindJSON failed " + err.Error()
log.NewError("0", errMsg)
log.NewError(params.OperationID, errMsg)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": errMsg})
return
}
+3 -1
View File
@@ -11,6 +11,8 @@ import (
"Open_IM/internal/demo/register"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"github.com/gin-gonic/gin"
)
@@ -18,7 +20,7 @@ func NewGinRouter() *gin.Engine {
gin.SetMode(gin.ReleaseMode)
baseRouter := gin.Default()
if config.Config.Prometheus.Enable {
baseRouter.GET("/metrics", prometheusHandler())
baseRouter.GET("/metrics", promePkg.PrometheusHandler())
}
router := baseRouter.Group("/cms")
router.Use(middleware.CorsHandler())
+11 -2
View File
@@ -6,8 +6,11 @@ import (
"Open_IM/pkg/statistics"
"fmt"
"github.com/go-playground/validator/v10"
"sync"
promePkg "Open_IM/pkg/common/prometheus"
"github.com/go-playground/validator/v10"
)
var (
@@ -34,7 +37,13 @@ func Init(rpcPort, wsPort int) {
rpcSvr.onInit(rpcPort)
}
func Run() {
func Run(promethuesPort int) {
go ws.run()
go rpcSvr.run()
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
}
+7 -8
View File
@@ -4,13 +4,10 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/statistics"
"fmt"
"net/http"
"strconv"
"sync"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const OnlineTopicBusy = 1
@@ -60,10 +57,12 @@ func Run(promethuesPort int) {
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
if config.Config.Prometheus.Enable {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil)
}
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
}
func SetOnlineTopicStatus(status int) {
w.Lock()
@@ -23,7 +23,8 @@ import (
)
var (
msgInsertMysqlProcessed prometheus.Counter
msgInsertMysqlCounter prometheus.Counter
msgInsertFailedMysqlCounter prometheus.Counter
)
type PersistentConsumerHandler struct {
@@ -38,13 +39,21 @@ func (pc *PersistentConsumerHandler) Init() {
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
if config.Config.Prometheus.Enable {
msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "insert_mysql_msg_total",
Help: "The total number of msg insert mysql events",
})
pc.initPrometheus()
}
}
func (pc *PersistentConsumerHandler) initPrometheus() {
msgInsertMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "insert_mysql_msg_total",
Help: "The total number of msg insert mysql events",
})
msgInsertFailedMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "insert_mysql_failed_msg_total",
Help: "The total number of msg insert mysql events",
})
}
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msg := cMsg.Value
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
@@ -76,13 +85,11 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg))
if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil {
log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
msgInsertFailedMysqlCounter.Inc()
return
}
msgInsertMysqlProcessed.Inc()
msgInsertMysqlProcessed.Add(1)
if config.Config.Prometheus.Enable {
log.NewDebug(msgFromMQ.OperationID, utils.GetSelfFuncName(), "inc msgInsertMysqlProcessed", msgInsertMysqlProcessed.Desc())
msgInsertMysqlProcessed.Inc()
msgInsertMysqlCounter.Inc()
}
}
+8 -1
View File
@@ -14,6 +14,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/statistics"
"fmt"
)
@@ -46,7 +47,13 @@ func init() {
}
}
func Run() {
func Run(promethuesPort int) {
go rpcServer.run()
go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh)
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
}
+3 -2
View File
@@ -5,13 +5,14 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/push"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
"context"
"google.golang.org/grpc"
"net"
"strconv"
"strings"
"google.golang.org/grpc"
)
type RPCServer struct {
+2 -2
View File
@@ -108,9 +108,9 @@ func (s *adminCMSServer) AdminLogin(_ context.Context, req *pbAdminCMS.AdminLogi
}
admin, err := imdb.GetUserByUserID(req.AdminID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "failed", req.AdminID)
log.NewError(req.OperationID, utils.GetSelfFuncName(), "failed", req.AdminID, err.Error())
resp.CommonResp.ErrCode = constant.ErrTokenUnknown.ErrCode
resp.CommonResp.ErrMsg = constant.ErrTokenMalformed.ErrMsg
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
resp.UserName = admin.Nickname