Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun

This commit is contained in:
wangchuxiao
2022-05-10 10:45:21 +08:00
36 changed files with 158 additions and 82 deletions
+26
View File
@@ -4,6 +4,7 @@ import (
api "Open_IM/pkg/base_info"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
rpc "Open_IM/pkg/proto/auth"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
@@ -96,3 +97,28 @@ func UserToken(c *gin.Context) {
log.NewInfo(req.OperationID, "UserToken return ", resp)
c.JSON(http.StatusOK, resp)
}
func ParseToken(c *gin.Context) {
params := api.ParseTokenReq{}
if err := c.BindJSON(&params); err != nil {
errMsg := " BindJSON failed " + err.Error()
log.NewError("0", errMsg)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": errMsg})
return
}
var ok bool
var errInfo string
var expireTime int64
ok, _, errInfo, expireTime = token_verify.GetUserIDFromTokenExpireTime(c.Request.Header.Get("token"), params.OperationID)
if !ok {
errMsg := params.OperationID + " " + "GetUserIDFromTokenExpireTime failed " + errInfo + " token:" + c.Request.Header.Get("token")
log.NewError(params.OperationID, errMsg)
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg})
return
}
resp := api.ParseTokenResp{CommResp: api.CommResp{ErrCode: 0, ErrMsg: ""}, ExpireTime: expireTime}
log.NewInfo(params.OperationID, "ParseToken return ", resp)
c.JSON(http.StatusOK, resp)
}
-4
View File
@@ -35,10 +35,6 @@ func GetSeq(c *gin.Context) {
pbData.UserID = params.SendID
pbData.OperationID = params.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
if grpcConn == nil {
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", params)
}
msgClient := pbMsg.NewChatClient(grpcConn)
reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData)
if err != nil {
+2 -2
View File
@@ -55,10 +55,10 @@ func PullMsgBySeqList(c *gin.Context) {
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("PullMessageBySeqList error", pbData.OperationID, "err", err.Error())
log.Error(pbData.OperationID, "PullMessageBySeqList error", err.Error())
return
}
log.InfoByKv("rpc call success to PullMessageBySeqList", pbData.OperationID, "ReplyArgs", reply.String(), len(reply.List))
log.NewInfo(pbData.OperationID, "rpc call success to PullMessageBySeqList", reply.String(), len(reply.List))
c.JSON(http.StatusOK, gin.H{
"errCode": reply.ErrCode,
"errMsg": reply.ErrMsg,
+2 -3
View File
@@ -63,13 +63,12 @@ func SendMsg(c *gin.Context) {
if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
log.ErrorByKv("json unmarshal err", "", "err", err.Error(), "data", c.PostForm("data"))
log.Error("0", "BindJSON failed ", err.Error())
return
}
token := c.Request.Header.Get("token")
log.InfoByKv("api call success to sendMsgReq", params.OperationID, "Parameters", params)
log.NewInfo(params.OperationID, "api call success to sendMsgReq", params)
pbData := newUserSendMsgReq(token, &params)
log.Info("", "", "api SendMsg call start..., [data: %s]", pbData.String())
+1 -1
View File
@@ -166,7 +166,7 @@ func ManagementSendMsg(c *gin.Context) {
}
}
log.InfoByKv("Ws call success to ManagementSendMsgReq", params.OperationID, "Parameters", params)
log.NewInfo(params.OperationID, "Ws call success to ManagementSendMsgReq", params)
pbData := newUserSendMsgReq(&params)
log.Info("", "", "api ManagementSendMsg call start..., [data: %s]", pbData.String())
+2 -2
View File
@@ -35,14 +35,14 @@ func Verify(c *gin.Context) {
}
if params.VerificationCode == config.Config.Demo.SuperCode {
log.InfoByKv("Super Code Verified successfully", account)
log.NewInfo(params.OperationID, "Super Code Verified successfully", account)
data := make(map[string]interface{})
data["account"] = account
data["verificationCode"] = params.VerificationCode
c.JSON(http.StatusOK, gin.H{"errCode": constant.NoError, "errMsg": "Verified successfully!", "data": data})
return
}
log.NewInfo("0", " params.VerificationCode != config.Config.Demo.SuperCode", params.VerificationCode, config.Config.Demo)
log.NewInfo(params.OperationID, " params.VerificationCode != config.Config.Demo.SuperCode", params.VerificationCode, config.Config.Demo)
log.NewInfo(params.OperationID, "begin get form redis", account)
if params.UsedFor == 0 {
params.UsedFor = constant.VerificationCodeForRegister
+11 -8
View File
@@ -2,6 +2,7 @@ package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/statistics"
"fmt"
@@ -10,12 +11,14 @@ import (
)
var (
rwLock *sync.RWMutex
validate *validator.Validate
ws WServer
rpcSvr RPCServer
sendMsgCount uint64
userCount uint64
rwLock *sync.RWMutex
validate *validator.Validate
ws WServer
rpcSvr RPCServer
sendMsgAllCount uint64
sendMsgFailedCount uint64
sendMsgSuccessCount uint64
userCount uint64
)
func Init(rpcPort, wsPort int) {
@@ -23,8 +26,8 @@ func Init(rpcPort, wsPort int) {
rwLock = new(sync.RWMutex)
validate = validator.New()
statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 300)
statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", userCount), 300)
statistics.NewStatistics(&sendMsgAllCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
ws.onInit(wsPort)
rpcSvr.onInit(rpcPort)
}
+2 -2
View File
@@ -80,7 +80,7 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
nReply.ErrMsg = err.Error()
ws.getSeqResp(conn, m, nReply)
} else {
log.InfoByKv("rpc call success to getSeqReq", rpcReq.OperationID, "replyData", rpcReply.String())
log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String())
ws.getSeqResp(conn, m, rpcReply)
}
}
@@ -146,7 +146,7 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM
}
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
sendMsgCount++
sendMsgAllCount++
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data)
nReply := new(pbChat.SendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
+1 -2
View File
@@ -43,8 +43,7 @@ func (r *RPCServer) run() {
address := listenIP + ":" + strconv.Itoa(r.rpcPort)
listener, err := net.Listen("tcp", address)
if err != nil {
log.Error("", "fail to listening consumer failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + r.rpcRegisterName)
}
defer listener.Close()
srv := grpc.NewServer()
+4 -4
View File
@@ -62,6 +62,7 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
//Initialize a lock for each user
newConn := &UserConn{conn, new(sync.Mutex)}
userCount++
ws.addUserConn(query["sendID"][0], int32(utils.StringToInt64(query["platformID"][0])), newConn, query["token"][0])
go ws.readMsg(newConn)
}
@@ -77,6 +78,7 @@ func (ws *WServer) readMsg(conn *UserConn) {
if err != nil {
uid, platform := ws.getUserUid(conn)
log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", uid, "platform", platform, "error", err.Error())
userCount--
ws.delUserConn(conn)
return
} else {
@@ -189,7 +191,6 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok
count = count + len(v)
}
log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
userCount = uint64(len(ws.wsUserToConn))
}
@@ -213,11 +214,10 @@ func (ws *WServer) delUserConn(conn *UserConn) {
for _, v := range ws.wsUserToConn {
count = count + len(v)
}
log.NewWarn(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
} else {
log.NewWarn(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
}
userCount = uint64(len(ws.wsUserToConn))
delete(ws.wsConnToUser, conn)
}
@@ -11,6 +11,7 @@ import (
"Open_IM/pkg/statistics"
"Open_IM/pkg/utils"
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"strings"
@@ -20,15 +21,16 @@ import (
type fcb func(msg []byte, msgKey string)
type HistoryConsumerHandler struct {
msgHandle map[string]fcb
historyConsumerGroup *kfk.MConsumerGroup
singleMsgCount uint64
groupMsgCount uint64
msgHandle map[string]fcb
historyConsumerGroup *kfk.MConsumerGroup
singleMsgFailedCount uint64
singleMsgSuccessCount uint64
groupMsgCount uint64
}
func (mc *HistoryConsumerHandler) Init() {
statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 300)
statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 300)
statistics.NewStatistics(&mc.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
mc.msgHandle = make(map[string]fcb)
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
@@ -59,10 +61,11 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
if isHistory {
err := saveUserChat(msgKey, &msgFromMQ)
if err != nil {
mc.singleMsgFailedCount++
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
mc.singleMsgCount++
mc.singleMsgSuccessCount++
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
@@ -89,7 +92,6 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
mc.singleMsgCount++
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
@@ -109,7 +111,7 @@ func (HistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { ret
func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.InfoByKv("kafka get info to mongo", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
sess.MarkMessage(msg, "")
}
@@ -51,7 +51,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey strin
tag = true
}
case constant.GroupChatType:
if msgKey == msgFromMQ.MsgData.SendID || utils.IsContain(msgFromMQ.MsgData.SendID, config.Config.Manager.AppManagerUid) {
if msgKey == msgFromMQ.MsgData.SendID {
tag = true
}
}
@@ -70,7 +70,7 @@ func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
pc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
sess.MarkMessage(msg, "")
}
+2 -2
View File
@@ -19,7 +19,7 @@ var (
pushCh PushConsumerHandler
pushTerminal []int32
producer *kafka.Producer
count uint64
successCount uint64
)
func Init(rpcPort int) {
@@ -30,7 +30,7 @@ func Init(rpcPort int) {
}
func init() {
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 300), 300)
statistics.NewStatistics(&successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
}
func Run() {
+2 -2
View File
@@ -31,7 +31,7 @@ func (ms *PushConsumerHandler) Init() {
config.Config.Kafka.ConsumerGroupID.MsgToPush)
}
func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) {
log.InfoByKv("msg come from kafka And push!!!", "", "msg", string(msg))
log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg))
msgFromMQ := pbChat.PushMsgDataToMQ{}
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
log.ErrorByKv("push Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
@@ -45,7 +45,7 @@ func (PushConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return
func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
ms.msgHandle[msg.Topic](msg.Value)
}
return nil
+1 -2
View File
@@ -37,8 +37,7 @@ func (r *RPCServer) run() {
listener, err := net.Listen("tcp", address)
if err != nil {
log.Error("", "push module rpc listening port err", err.Error(), address)
return
panic("listening err:" + err.Error() + r.rpcRegisterName)
}
defer listener.Close()
srv := grpc.NewServer()
+1 -1
View File
@@ -53,7 +53,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
}
}
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
count++
successCount++
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
for _, v := range wsResult {
if v.ResultCode == 0 {
+1 -2
View File
@@ -46,8 +46,7 @@ func (s *adminCMSServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("0", "Listen failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
defer listener.Close()
+1 -2
View File
@@ -88,8 +88,7 @@ func (rpc *rpcAuth) Run() {
address := listenIP + ":" + strconv.Itoa(rpc.rpcPort)
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError(operationID, "listen network failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + rpc.rpcRegisterName)
}
log.NewInfo(operationID, "listen network success, ", address, listener)
//grpc server
+1 -2
View File
@@ -47,8 +47,7 @@ func (s *cacheServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("0", "Listen failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
defer listener.Close()
+1 -2
View File
@@ -148,8 +148,7 @@ func (rpc *rpcConversation) Run() {
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("0", "listen network failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + rpc.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
//grpc server
+1 -2
View File
@@ -53,8 +53,7 @@ func (s *friendServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("0", "Listen failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen ok ", address)
defer listener.Close()
+1 -2
View File
@@ -54,8 +54,7 @@ func (s *groupServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("", "Listen failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("", "listen network success, ", address, listener)
defer listener.Close()
+1 -2
View File
@@ -54,8 +54,7 @@ func (s *messageCMSServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("0", "Listen failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
defer listener.Close()
+1 -1
View File
@@ -11,7 +11,7 @@ import (
)
func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) {
log.InfoByKv("rpc getMaxAndMinSeq is arriving", in.OperationID, in.String())
log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String())
//seq, err := model.GetBiggestSeqFromReceive(in.UserID)
maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID)
minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID)
+1 -2
View File
@@ -45,8 +45,7 @@ func (rpc *rpcChat) Run() {
address := listenIP + ":" + strconv.Itoa(rpc.rpcPort)
listener, err := net.Listen("tcp", address)
if err != nil {
log.Error("", "listen network failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + rpc.rpcRegisterName)
}
log.Info("", "listen network success, address ", address)
+1 -2
View File
@@ -49,8 +49,7 @@ func (s *officeServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("0", "Listen failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
defer listener.Close()
+1 -2
View File
@@ -50,8 +50,7 @@ func (s *organizationServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("", "Listen failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("", "listen network success, ", address, listener)
defer listener.Close()
+1 -2
View File
@@ -58,8 +58,7 @@ func (s *statisticsServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("0", "Listen failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
defer listener.Close()
+1 -2
View File
@@ -54,8 +54,7 @@ func (s *userServer) Run() {
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
log.NewError("0", "listen network failed ", err.Error(), address)
return
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen network success, address ", address, listener)
defer listener.Close()