diff --git a/cmd/open_im_demo/main.go b/cmd/open_im_demo/main.go index 0e2dbaea6..9d8152218 100644 --- a/cmd/open_im_demo/main.go +++ b/cmd/open_im_demo/main.go @@ -65,7 +65,6 @@ func main() { defaultPorts := config.Config.Demo.Port ginPort := flag.Int("port", defaultPorts[0], "get ginServerPort from cmd,default 10004 as port") flag.Parse() - fmt.Println("start demo api server, port: ", *ginPort) address := "0.0.0.0:" + strconv.Itoa(*ginPort) if config.Config.Api.ListenIP != "" { address = config.Config.Api.ListenIP + ":" + strconv.Itoa(*ginPort) diff --git a/internal/api/group/group.go b/internal/api/group/group.go index 5c4427875..1cddd7ede 100644 --- a/internal/api/group/group.go +++ b/internal/api/group/group.go @@ -42,10 +42,14 @@ func KickGroupMember(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) return } - + if len(params.KickedUserIDList) > constant.MaxNotificationNum { + errMsg := params.OperationID + " too many members " + utils.Int32ToString(int32(len(params.KickedUserIDList))) + log.Error(params.OperationID, errMsg) + c.JSON(http.StatusOK, gin.H{"errCode": 400, "errMsg": errMsg}) + return + } req := &rpc.KickGroupMemberReq{} utils.CopyStructFields(req, ¶ms) - var ok bool var errInfo string ok, req.OpUserID, errInfo = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID) @@ -318,6 +322,12 @@ func InviteUserToGroup(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) return } + if len(params.InvitedUserIDList) > constant.MaxNotificationNum { + errMsg := params.OperationID + " too many members " + utils.Int32ToString(int32(len(params.InvitedUserIDList))) + log.Error(params.OperationID, errMsg) + c.JSON(http.StatusOK, gin.H{"errCode": 400, "errMsg": errMsg}) + return + } req := &rpc.InviteUserToGroupReq{} utils.CopyStructFields(req, ¶ms) diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 8269a03b0..88ef3d12d 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -27,14 +27,13 @@ var ( ) func Init(rpcPort, wsPort int) { - //log initialization - rwLock = new(sync.RWMutex) validate = validator.New() statistics.NewStatistics(&sendMsgAllCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) ws.onInit(wsPort) rpcSvr.onInit(rpcPort) + initPrometheus() } func Run(promethuesPort int) { diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 2ee9be712..7b77840a7 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/msg" pbRtc "Open_IM/pkg/proto/rtc" @@ -43,15 +44,18 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { case constant.WSGetNewestSeq: log.NewInfo(m.OperationID, "getSeqReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.getSeqReq(conn, &m) + promePkg.PromeInc(promePkg.GetNewestSeqTotalCounter) case constant.WSSendMsg: log.NewInfo(m.OperationID, "sendMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendMsgReq(conn, &m) + promePkg.PromeInc(promePkg.MsgRecvTotalCounter) case constant.WSSendSignalMsg: log.NewInfo(m.OperationID, "sendSignalMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendSignalMsgReq(conn, &m) case constant.WSPullMsgBySeqList: log.NewInfo(m.OperationID, "pullMsgBySeqListReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.pullMsgBySeqListReq(conn, &m) + promePkg.PromeInc(promePkg.PullMsgBySeqListTotalCounter) case constant.WsLogoutMsg: log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier) // conn.Close() diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index f0ba76a3a..0dc41864f 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -34,6 +34,16 @@ type RPCServer struct { target string } +func initPrometheus() { + promePkg.NewMsgRecvTotalCounter() + promePkg.NewGetNewestSeqTotalCounter() + promePkg.NewPullMsgBySeqListTotalCounter() + promePkg.NewMsgOnlinePushSuccessCounter() + //promePkg.NewSingleChatMsgRecvSuccessCounter() + //promePkg.NewGroupChatMsgRecvSuccessCounter() + //promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter() +} + func (r *RPCServer) onInit(rpcPort int) { r.rpcPort = rpcPort r.rpcRegisterName = config.Config.RpcRegisterName.OpenImRelayName @@ -188,6 +198,7 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { tempT.OnlinePush = true + promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) temp := &pbRelay.SingleMsgToUserPlatform{ ResultCode: resultCode, diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index ce996d3c5..81ed48087 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -47,6 +47,11 @@ func init() { } } +func initPrometheus() { + promePkg.NewMsgOfflinePushSuccessCounter() + promePkg.NewMsgOfflinePushFailedCounter() +} + func Run(promethuesPort int) { go rpcServer.run() go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index f129ad385..634d2e008 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -21,6 +21,7 @@ import ( "context" "strings" + promePkg "Open_IM/pkg/common/prometheus" "github.com/golang/protobuf/proto" ) @@ -140,8 +141,10 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } pushResult, err := offlinePusher.Push(UIDList, title, detailContent, pushMsg.OperationID, opts) if err != nil { + promePkg.PromeInc(promePkg.MsgOfflinePushFailedCounter) log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) } else { + promePkg.PromeInc(promePkg.MsgOfflinePushSuccessCounter) log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) } } @@ -261,12 +264,13 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { } pushResult, err := offlinePusher.Push(needOfflinePushUserIDList, title, detailContent, pushMsg.OperationID, opts) if err != nil { + promePkg.PromeInc(promePkg.MsgOfflinePushFailedCounter) log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) } else { + promePkg.PromeInc(promePkg.MsgOfflinePushSuccessCounter) log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) } } - } } diff --git a/internal/rpc/msg/pull_message.go b/internal/rpc/msg/pull_message.go index 4fcf9f9f7..1a99a14e6 100644 --- a/internal/rpc/msg/pull_message.go +++ b/internal/rpc/msg/pull_message.go @@ -8,6 +8,8 @@ import ( commonDB "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" open_im_sdk "Open_IM/pkg/proto/sdk_ws" + + promePkg "Open_IM/pkg/common/prometheus" ) func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *open_im_sdk.GetMaxAndMinSeqReq) (*open_im_sdk.GetMaxAndMinSeqResp, error) { @@ -48,57 +50,62 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) resp := new(open_im_sdk.PullMessageBySeqListResp) m := make(map[string]*open_im_sdk.MsgDataList) - //msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID) redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID) if err != nil { if err != go_redis.Nil { + promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) } else { log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) } msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(in.UserID, failedSeqList, in.OperationID) if err1 != nil { + promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error()) resp.ErrCode = 201 resp.ErrMsg = err.Error() return resp, nil } else { + promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) redisMsgList = append(redisMsgList, msgList...) resp.List = redisMsgList } } else { + promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) resp.List = redisMsgList } + for k, v := range in.GroupSeqList { x := new(open_im_sdk.MsgDataList) redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, in.OperationID) if err != nil { if err != go_redis.Nil { + promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) } else { log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) } msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID) if err1 != nil { + promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error()) resp.ErrCode = 201 resp.ErrMsg = err.Error() return resp, nil } else { + promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) redisMsgList = append(redisMsgList, msgList...) x.MsgDataList = redisMsgList m[k] = x } } else { + promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) x.MsgDataList = redisMsgList m[k] = x } } resp.GroupMsgDataList = m - //respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID) - //respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat) return resp, nil - } type MsgFormats []*open_im_sdk.MsgData diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index cf50b3f9a..e5cd0ed97 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -19,6 +19,11 @@ import ( "google.golang.org/grpc" ) +//var ( +// sendMsgSuccessCounter prometheus.Counter +// sendMsgFailedCounter prometheus.Counter +//) + type rpcChat struct { rpcPort int rpcRegisterName string @@ -51,7 +56,29 @@ func NewRpcChatServer(port int) *rpcChat { } func (rpc *rpcChat) initPrometheus() { - promePkg.NewSendMsgCount() + //sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + // Name: "send_msg_success", + // Help: "The number of send msg success", + //}) + //sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + // Name: "send_msg_failed", + // Help: "The number of send msg failed", + //}) + promePkg.NewMsgPullFromRedisSuccessCounter() + promePkg.NewMsgPullFromRedisFailedCounter() + promePkg.NewMsgPullFromMongoSuccessCounter() + promePkg.NewMsgPullFromMongoFailedCounter() + + promePkg.NewSingleChatMsgRecvSuccessCounter() + promePkg.NewGroupChatMsgRecvSuccessCounter() + promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter() + + promePkg.NewSingleChatMsgProcessSuccessCounter() + promePkg.NewSingleChatMsgProcessFailedCounter() + promePkg.NewGroupChatMsgProcessSuccessCounter() + promePkg.NewGroupChatMsgProcessFailedCounter() + promePkg.NewWorkSuperGroupChatMsgProcessSuccessCounter() + promePkg.NewWorkSuperGroupChatMsgProcessFailedCounter() } func (rpc *rpcChat) Run() { @@ -97,9 +124,7 @@ func (rpc *rpcChat) Run() { panic(utils.Wrap(err, "register chat module rpc to etcd err")) } go rpc.runCh() - if config.Config.Prometheus.Enable { - rpc.initPrometheus() - } + rpc.initPrometheus() err = srv.Serve(listener) if err != nil { log.Error("", "rpc rpcChat failed ", err.Error()) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 876f2cda7..4be38cee7 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -23,6 +23,7 @@ import ( "sync" "time" + promePkg "Open_IM/pkg/common/prometheus" go_redis "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" ) @@ -244,11 +245,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } - flag, errCode, errMsg, _ = messageVerification(pb) - log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) - if !flag { - return returnMsg(&replay, pb, errCode, errMsg, "", 0) - } + //flag, errCode, errMsg, _ = messageVerification(pb) + //log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) + //if !flag { + // return returnMsg(&replay, pb, errCode, errMsg, "", 0) + //} t1 = time.Now() rpc.encapsulateMsgData(pb.MsgData) log.Info(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1)) @@ -270,6 +271,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } switch pb.MsgData.SessionType { case constant.SingleChatType: + promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter) // callback t1 = time.Now() callbackResp := callbackBeforeSendSingleMsg(pb) @@ -282,8 +284,14 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S callbackResp.ErrCode = 201 } log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) + promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } + flag, errCode, errMsg, _ = messageVerification(pb) + log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) + if !flag { + return returnMsg(&replay, pb, errCode, errMsg, "", 0) + } t1 = time.Now() isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) log.Info(pb.OperationID, "modifyMessageByUserMessageReceiveOpt ", " cost time: ", time.Since(t1)) @@ -295,6 +303,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1)) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error()) + promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } @@ -304,6 +313,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1)) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) + promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } @@ -315,9 +325,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp) } log.Debug(pb.OperationID, "send msg cost time all: ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.GroupChatType: // callback + promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter) callbackResp := callbackBeforeSendGroupMsg(pb) if callbackResp.ErrCode != 0 { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg resp:", callbackResp) @@ -327,10 +339,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S callbackResp.ErrCode = 201 } log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) + promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } var memberUserIDList []string if flag, errCode, errMsg, memberUserIDList = messageVerification(pb); !flag { + promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, errCode, errMsg, "", 0) } log.Debug(pb.OperationID, "GetGroupAllMember userID list", memberUserIDList, "len: ", len(memberUserIDList)) @@ -395,6 +409,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } if !sendTag { log.NewWarn(pb.OperationID, "send tag is ", sendTag) + promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } else { if pb.MsgData.ContentType == constant.AtText { @@ -459,6 +474,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S }() } log.Debug(pb.OperationID, "send msg cost time3 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) } case constant.NotificationChatType: @@ -481,6 +497,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.SuperGroupChatType: + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) // callback callbackResp := callbackBeforeSendGroupMsg(pb) if callbackResp.ErrCode != 0 { @@ -490,10 +507,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if callbackResp.ErrCode == 0 { callbackResp.ErrCode = 201 } + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } if flag, errCode, errMsg, _ = messageVerification(pb); !flag { + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, errCode, errMsg, "", 0) } msgToMQSingle.MsgData = pb.MsgData @@ -501,6 +520,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } // callback @@ -508,6 +528,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if callbackResp.ErrCode != 0 { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp) } + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) default: diff --git a/pkg/base_info/group_api_struct.go b/pkg/base_info/group_api_struct.go index f2703bba2..8f2eaccd4 100644 --- a/pkg/base_info/group_api_struct.go +++ b/pkg/base_info/group_api_struct.go @@ -16,7 +16,7 @@ type CommDataResp struct { type KickGroupMemberReq struct { GroupID string `json:"groupID" binding:"required"` - KickedUserIDList []string `json:"kickedUserIDList" binding:"required, min=1, max=100"` + KickedUserIDList []string `json:"kickedUserIDList" binding:"required"` Reason string `json:"reason"` OperationID string `json:"operationID" binding:"required"` } @@ -38,7 +38,7 @@ type GetGroupMembersInfoResp struct { type InviteUserToGroupReq struct { GroupID string `json:"groupID" binding:"required"` - InvitedUserIDList []string `json:"invitedUserIDList" binding:"required, min=1, max=100"` + InvitedUserIDList []string `json:"invitedUserIDList" binding:"required"` Reason string `json:"reason"` OperationID string `json:"operationID" binding:"required"` } diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index 63be03533..d36202367 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -182,7 +182,6 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) - result, err := d.RDB.Get(context.Background(), key).Result() if err != nil { errResult = err diff --git a/pkg/common/prometheus/gather.go b/pkg/common/prometheus/gather.go index 9f6991e81..1d6b71ebb 100644 --- a/pkg/common/prometheus/gather.go +++ b/pkg/common/prometheus/gather.go @@ -10,11 +10,43 @@ var ( UserLoginCounter prometheus.Counter UserRegisterCounter prometheus.Counter + //seg SeqGetSuccessCounter prometheus.Counter SeqGetFailedCounter prometheus.Counter SeqSetSuccessCounter prometheus.Counter SeqSetFailedCounter prometheus.Counter + //msg-db + MsgInsertRedisSuccessCounter prometheus.Counter + MsgInsertRedisFailedCounter prometheus.Counter + MsgInsertMongoSuccessCounter prometheus.Counter + MsgInsertMongoFailedCounter prometheus.Counter + MsgPullFromRedisSuccessCounter prometheus.Counter + MsgPullFromRedisFailedCounter prometheus.Counter + MsgPullFromMongoSuccessCounter prometheus.Counter + MsgPullFromMongoFailedCounter prometheus.Counter + + //msg-ws + MsgRecvTotalCounter prometheus.Counter + GetNewestSeqTotalCounter prometheus.Counter + PullMsgBySeqListTotalCounter prometheus.Counter + + SingleChatMsgRecvSuccessCounter prometheus.Counter + GroupChatMsgRecvSuccessCounter prometheus.Counter + WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter + + //msg-msg + SingleChatMsgProcessSuccessCounter prometheus.Counter + SingleChatMsgProcessFailedCounter prometheus.Counter + GroupChatMsgProcessSuccessCounter prometheus.Counter + GroupChatMsgProcessFailedCounter prometheus.Counter + WorkSuperGroupChatMsgProcessSuccessCounter prometheus.Counter + WorkSuperGroupChatMsgProcessFailedCounter prometheus.Counter + + //msg-push + MsgOnlinePushSuccessCounter prometheus.Counter + MsgOfflinePushSuccessCounter prometheus.Counter + MsgOfflinePushFailedCounter prometheus.Counter // api ApiRequestCounter prometheus.Counter ApiRequestSuccessCounter prometheus.Counter @@ -25,21 +57,22 @@ var ( GrpcRequestSuccessCounter prometheus.Counter GrpcRequestFailedCounter prometheus.Counter - SendMsgCounter prometheus.Counter - MsgInsertRedisSuccessCounter prometheus.Counter - MsgInsertRedisFailedCounter prometheus.Counter - - MsgInsertMongoSuccessCounter prometheus.Counter - MsgInsertMongoFailedCounter prometheus.Counter + SendMsgCounter prometheus.Counter ) func NewUserLoginCounter() { + if UserLoginCounter != nil { + return + } UserLoginCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "user_login", Help: "The number of user login", }) } func NewUserRegisterCounter() { + if UserRegisterCounter != nil { + return + } UserRegisterCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "user_register", Help: "The number of user register", @@ -47,12 +80,18 @@ func NewUserRegisterCounter() { } func NewSeqGetSuccessCounter() { + if SeqGetSuccessCounter != nil { + return + } SeqGetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "seq_get_success", Help: "The number of successful get seq", }) } func NewSeqGetFailedCounter() { + if SeqGetFailedCounter != nil { + return + } SeqGetFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "seq_get_failed", Help: "The number of failed get seq", @@ -60,12 +99,18 @@ func NewSeqGetFailedCounter() { } func NewSeqSetSuccessCounter() { + if SeqSetSuccessCounter != nil { + return + } SeqSetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "seq_set_success", Help: "The number of successful set seq", }) } func NewSeqSetFailedCounter() { + if SeqSetFailedCounter != nil { + return + } SeqSetFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "seq_set_failed", Help: "The number of failed set seq", @@ -73,6 +118,9 @@ func NewSeqSetFailedCounter() { } func NewApiRequestCounter() { + if ApiRequestCounter != nil { + return + } ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "api_request", Help: "The number of api request", @@ -80,6 +128,9 @@ func NewApiRequestCounter() { } func NewApiRequestSuccessCounter() { + if ApiRequestSuccessCounter != nil { + return + } ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "api_request_success", Help: "The number of api request success", @@ -87,6 +138,9 @@ func NewApiRequestSuccessCounter() { } func NewApiRequestFailedCounter() { + if ApiRequestFailedCounter != nil { + return + } ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "api_request_failed", Help: "The number of api request failed", @@ -94,6 +148,9 @@ func NewApiRequestFailedCounter() { } func NewGrpcRequestCounter() { + if GrpcRequestCounter != nil { + return + } GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "grpc_request", Help: "The number of api request", @@ -101,6 +158,9 @@ func NewGrpcRequestCounter() { } func NewGrpcRequestSuccessCounter() { + if GrpcRequestSuccessCounter != nil { + return + } GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "grpc_request_success", Help: "The number of grpc request success", @@ -108,6 +168,9 @@ func NewGrpcRequestSuccessCounter() { } func NewGrpcRequestFailedCounter() { + if GrpcRequestFailedCounter != nil { + return + } GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "grpc_request_failed", Help: "The number of grpc request failed", @@ -115,6 +178,9 @@ func NewGrpcRequestFailedCounter() { } func NewSendMsgCount() { + if SendMsgCounter != nil { + return + } SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "send_msg", Help: "The number of send msg", @@ -122,6 +188,9 @@ func NewSendMsgCount() { } func NewMsgInsertRedisSuccessCounter() { + if MsgInsertRedisSuccessCounter != nil { + return + } MsgInsertRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "msg_insert_redis_success", Help: "The number of successful insert msg to redis", @@ -129,6 +198,9 @@ func NewMsgInsertRedisSuccessCounter() { } func NewMsgInsertRedisFailedCounter() { + if MsgInsertRedisFailedCounter != nil { + return + } MsgInsertRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "msg_insert_redis_failed", Help: "The number of failed insert msg to redis", @@ -136,6 +208,9 @@ func NewMsgInsertRedisFailedCounter() { } func NewMsgInsertMongoSuccessCounter() { + if MsgInsertMongoSuccessCounter != nil { + return + } MsgInsertMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "msg_insert_mongo_success", Help: "The number of successful insert msg to mongo", @@ -143,8 +218,198 @@ func NewMsgInsertMongoSuccessCounter() { } func NewMsgInsertMongoFailedCounter() { + if MsgInsertMongoFailedCounter != nil { + return + } MsgInsertMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "msg_insert_mongo_failed", Help: "The number of failed insert msg to mongo", }) } + +func NewMsgPullFromRedisSuccessCounter() { + if MsgPullFromRedisSuccessCounter != nil { + return + } + MsgPullFromRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_pull_from_redis_success", + Help: "The number of successful pull msg from redis", + }) +} + +func NewMsgPullFromRedisFailedCounter() { + if MsgPullFromRedisFailedCounter != nil { + return + } + MsgPullFromRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_pull_from_redis_failed", + Help: "The number of failed pull msg from redis", + }) +} + +func NewMsgPullFromMongoSuccessCounter() { + if MsgPullFromMongoSuccessCounter != nil { + return + } + MsgPullFromMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_pull_from_mongo_success", + Help: "The number of successful pull msg from mongo", + }) +} + +func NewMsgPullFromMongoFailedCounter() { + if MsgPullFromMongoFailedCounter != nil { + return + } + MsgPullFromMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_pull_from_mongo_failed", + Help: "The number of failed pull msg from mongo", + }) +} + +func NewMsgRecvTotalCounter() { + if MsgRecvTotalCounter != nil { + return + } + MsgRecvTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_recv_total", + Help: "The number of msg received", + }) +} + +func NewGetNewestSeqTotalCounter() { + if GetNewestSeqTotalCounter != nil { + return + } + GetNewestSeqTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "get_newest_seq_total", + Help: "the number of get newest seq", + }) +} +func NewPullMsgBySeqListTotalCounter() { + if PullMsgBySeqListTotalCounter != nil { + return + } + PullMsgBySeqListTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pull_msg_by_seq_list_total", + Help: "The number of pull msg by seq list", + }) +} + +func NewSingleChatMsgRecvSuccessCounter() { + if SingleChatMsgRecvSuccessCounter != nil { + return + } + SingleChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "single_chat_msg_recv_success", + Help: "The number of single chat msg successful received ", + }) +} + +func NewGroupChatMsgRecvSuccessCounter() { + if GroupChatMsgRecvSuccessCounter != nil { + return + } + GroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "group_chat_msg_recv_success", + Help: "The number of group chat msg successful received", + }) +} + +func NewWorkSuperGroupChatMsgRecvSuccessCounter() { + if WorkSuperGroupChatMsgRecvSuccessCounter != nil { + return + } + WorkSuperGroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "work_super_group_chat_msg_recv_success", + Help: "The number of work/super group chat msg successful received", + }) +} + +func NewSingleChatMsgProcessSuccessCounter() { + if SingleChatMsgProcessSuccessCounter != nil { + return + } + SingleChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "single_chat_msg_process_success", + Help: "The number of single chat msg successful processed", + }) +} + +func NewSingleChatMsgProcessFailedCounter() { + if SingleChatMsgProcessFailedCounter != nil { + return + } + SingleChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "single_chat_msg_process_failed", + Help: "The number of single chat msg failed processed", + }) +} + +func NewGroupChatMsgProcessSuccessCounter() { + if GroupChatMsgProcessSuccessCounter != nil { + return + } + GroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "group_chat_msg_process_success", + Help: "The number of group chat msg successful processed", + }) +} + +func NewGroupChatMsgProcessFailedCounter() { + if GroupChatMsgProcessFailedCounter != nil { + return + } + GroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "group_chat_msg_process_failed", + Help: "The number of group chat msg failed processed", + }) +} + +func NewWorkSuperGroupChatMsgProcessSuccessCounter() { + if WorkSuperGroupChatMsgProcessSuccessCounter != nil { + return + } + WorkSuperGroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "work_super_group_chat_msg_process_success", + Help: "The number of work/super group chat msg successful processed", + }) +} +func NewWorkSuperGroupChatMsgProcessFailedCounter() { + if WorkSuperGroupChatMsgProcessFailedCounter != nil { + return + } + WorkSuperGroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "work_super_group_chat_msg_process_failed", + Help: "The number of work/super group chat msg failed processed", + }) +} + +func NewMsgOnlinePushSuccessCounter() { + if MsgOnlinePushSuccessCounter != nil { + return + } + MsgOnlinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_online_push_success", + Help: "The number of msg successful online pushed", + }) +} + +func NewMsgOfflinePushSuccessCounter() { + if MsgOfflinePushSuccessCounter != nil { + return + } + MsgOfflinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_offline_push_success", + Help: "The number of msg successful offline pushed", + }) +} +func NewMsgOfflinePushFailedCounter() { + if MsgOfflinePushFailedCounter != nil { + return + } + MsgOfflinePushFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_offline_push_failed", + Help: "The number of msg failed offline pushed", + }) +} diff --git a/pkg/common/prometheus/prometheus.go b/pkg/common/prometheus/prometheus.go index dd333414e..e8bc87b47 100644 --- a/pkg/common/prometheus/prometheus.go +++ b/pkg/common/prometheus/prometheus.go @@ -54,7 +54,6 @@ func PromeInc(counter prometheus.Counter) { if counter != nil { counter.Inc() } - } }