mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-21 01:09:01 +08:00
Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release
This commit is contained in:
@@ -3,6 +3,7 @@ package apiAuth
|
||||
import (
|
||||
api "Open_IM/pkg/base_info"
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/common/token_verify"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
@@ -66,7 +67,13 @@ func UserRegister(c *gin.Context) {
|
||||
if reply.CommonResp.ErrCode != 0 {
|
||||
errMsg := req.OperationID + " " + " UserRegister failed " + reply.CommonResp.ErrMsg + req.String()
|
||||
log.NewError(req.OperationID, errMsg)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg})
|
||||
if reply.CommonResp.ErrCode == constant.RegisterLimit {
|
||||
c.JSON(http.StatusOK, gin.H{"errCode": constant.RegisterLimit, "errMsg": "用户注册被限制"})
|
||||
} else if reply.CommonResp.ErrCode == constant.InvitationError {
|
||||
c.JSON(http.StatusOK, gin.H{"errCode": constant.InvitationError, "errMsg": "邀请码错误"})
|
||||
} else {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
package cms_api
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
func prometheusHandler() gin.HandlerFunc {
|
||||
h := promhttp.Handler()
|
||||
|
||||
return func(c *gin.Context) {
|
||||
h.ServeHTTP(c.Writer, c.Request)
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"Open_IM/internal/cms_api/statistics"
|
||||
"Open_IM/internal/cms_api/user"
|
||||
"Open_IM/internal/demo/register"
|
||||
"Open_IM/pkg/common/config"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
@@ -16,6 +17,9 @@ import (
|
||||
func NewGinRouter() *gin.Engine {
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
baseRouter := gin.Default()
|
||||
if config.Config.Prometheus.Enable {
|
||||
baseRouter.GET("/metrics", prometheusHandler())
|
||||
}
|
||||
router := baseRouter.Group("/cms")
|
||||
router.Use(middleware.CorsHandler())
|
||||
adminRouterGroup := router.Group("/admin")
|
||||
@@ -64,9 +68,10 @@ func NewGinRouter() *gin.Engine {
|
||||
{
|
||||
messageCMSRouterGroup.POST("/get_chat_logs", messageCMS.GetChatLogs)
|
||||
}
|
||||
friendCMSRouterGroup := r2.Group("friend")
|
||||
friendCMSRouterGroup := r2.Group("/friend")
|
||||
{
|
||||
friendCMSRouterGroup.POST("/get_friends", friend.GetUserFriends)
|
||||
}
|
||||
|
||||
return baseRouter
|
||||
}
|
||||
|
||||
@@ -6,7 +6,11 @@ import (
|
||||
"Open_IM/pkg/common/kafka"
|
||||
"Open_IM/pkg/statistics"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
const OnlineTopicBusy = 1
|
||||
@@ -46,7 +50,7 @@ func Init() {
|
||||
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
|
||||
producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
|
||||
}
|
||||
func Run() {
|
||||
func Run(promethuesPort int) {
|
||||
//register mysqlConsumerHandler to
|
||||
if config.Config.ChatPersistenceMysql {
|
||||
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
|
||||
@@ -56,6 +60,10 @@ func Run() {
|
||||
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
|
||||
go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
|
||||
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
|
||||
if config.Config.Prometheus.Enable {
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil)
|
||||
}
|
||||
}
|
||||
func SetOnlineTopicStatus(status int) {
|
||||
w.Lock()
|
||||
|
||||
@@ -14,8 +14,20 @@ import (
|
||||
"Open_IM/pkg/common/log"
|
||||
pbMsg "Open_IM/pkg/proto/msg"
|
||||
"Open_IM/pkg/utils"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
var (
|
||||
// msgInsertMysqlProcessed perometheus.Countr
|
||||
msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "insert_mysql_msg_total",
|
||||
Help: "The total number of msg insert mysql events",
|
||||
})
|
||||
)
|
||||
|
||||
type PersistentConsumerHandler struct {
|
||||
@@ -29,7 +41,12 @@ func (pc *PersistentConsumerHandler) Init() {
|
||||
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
||||
|
||||
// if config.Config.Prometheus.Enable {
|
||||
// msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{
|
||||
// Name: "insert_mysql_msg_total",
|
||||
// Help: "The total number of msg insert mysql events",
|
||||
// })
|
||||
// }
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
@@ -65,6 +82,10 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
|
||||
log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
|
||||
return
|
||||
}
|
||||
if config.Config.Prometheus.Enable {
|
||||
log.NewDebug(msgFromMQ.OperationID, utils.GetSelfFuncName(), "inc msgInsertMysqlProcessed")
|
||||
msgInsertMysqlProcessed.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"Open_IM/pkg/common/db"
|
||||
"Open_IM/pkg/common/log"
|
||||
"context"
|
||||
go_redis "github.com/go-redis/redis/v8"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
@@ -85,6 +86,20 @@ func (f *Fcm) Push(accounts []string, title, detailContent, operationID string,
|
||||
if err == nil {
|
||||
apns.Payload.Aps.Badge = &unreadCountSum
|
||||
} else {
|
||||
log.Error(operationID, "IncrUserBadgeUnreadCountSum redis err", err.Error(), uid)
|
||||
Fail++
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
unreadCountSum, err := db.DB.GetUserBadgeUnreadCountSum(uid)
|
||||
if err == nil && unreadCountSum != 0 {
|
||||
apns.Payload.Aps.Badge = &unreadCountSum
|
||||
} else if err == go_redis.Nil || unreadCountSum == 0 {
|
||||
zero := 1
|
||||
apns.Payload.Aps.Badge = &zero
|
||||
} else {
|
||||
log.Error(operationID, "GetUserBadgeUnreadCountSum redis err", err.Error(), uid)
|
||||
Fail++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ func callbackOfflinePush(operationID string, userIDList []string, msg *commonPb.
|
||||
UserIDList: userIDList,
|
||||
},
|
||||
OfflinePushInfo: msg.OfflinePushInfo,
|
||||
ClientMsgID: msg.ClientMsgID,
|
||||
SendID: msg.SendID,
|
||||
GroupID: msg.GroupID,
|
||||
ContentType: msg.ContentType,
|
||||
@@ -75,6 +76,7 @@ func callbackOnlinePush(operationID string, userIDList []string, msg *commonPb.M
|
||||
UserIDList: userIDList,
|
||||
},
|
||||
OfflinePushInfo: msg.OfflinePushInfo,
|
||||
ClientMsgID: msg.ClientMsgID,
|
||||
SendID: msg.SendID,
|
||||
GroupID: msg.GroupID,
|
||||
ContentType: msg.ContentType,
|
||||
@@ -111,6 +113,7 @@ func callbackBeforeSuperGroupOnlinePush(operationID string, groupID string, msg
|
||||
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
|
||||
},
|
||||
OfflinePushInfo: msg.OfflinePushInfo,
|
||||
ClientMsgID: msg.ClientMsgID,
|
||||
SendID: msg.SendID,
|
||||
GroupID: groupID,
|
||||
ContentType: msg.ContentType,
|
||||
|
||||
@@ -171,24 +171,6 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
|
||||
return
|
||||
}
|
||||
pushToUserIDList = userIDList
|
||||
//getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID}
|
||||
//etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, pushMsg.OperationID)
|
||||
//if etcdConn == nil {
|
||||
// errMsg := pushMsg.OperationID + "getcdv3.GetDefaultConn == nil"
|
||||
// log.NewError(pushMsg.OperationID, errMsg)
|
||||
// return
|
||||
//}
|
||||
//client := pbCache.NewCacheClient(etcdConn)
|
||||
//cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq)
|
||||
//if err != nil {
|
||||
// log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error())
|
||||
// return
|
||||
//}
|
||||
//if cacheResp.CommonResp.ErrCode != 0 {
|
||||
// log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
|
||||
// return
|
||||
//}
|
||||
//pushToUserIDList = cacheResp.UserIDList
|
||||
}
|
||||
|
||||
grpcCons := getcdv3.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID)
|
||||
|
||||
Reference in New Issue
Block a user