Files
open-im-server/src/msg_gateway/gate/logic.go
T

255 lines
9.7 KiB
Go
Raw Normal View History

2021-05-26 19:24:25 +08:00
package gate
import (
"Open_IM/src/common/config"
"Open_IM/src/common/constant"
"Open_IM/src/common/log"
2021-07-15 12:33:59 +08:00
"Open_IM/src/grpc-etcdv3/getcdv3"
2021-05-26 19:24:25 +08:00
pbChat "Open_IM/src/proto/chat"
"Open_IM/src/utils"
"context"
"encoding/json"
2021-10-21 19:10:55 +08:00
"fmt"
2021-05-26 19:24:25 +08:00
"github.com/gorilla/websocket"
2021-10-21 19:10:55 +08:00
"runtime"
2021-05-26 19:24:25 +08:00
"strings"
)
2021-10-21 19:10:55 +08:00
func (ws *WServer) msgParse(conn *UserConn, jsonMsg []byte) {
2021-05-26 19:24:25 +08:00
//ws online debug data
//{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0}
//{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6}
//{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b",
//"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID":
//"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"}
m := Req{}
if err := json.Unmarshal(jsonMsg, &m); err != nil {
log.ErrorByKv("ws json Unmarshal err", "", "err", err.Error())
2021-10-21 19:10:55 +08:00
ws.sendErrMsg(conn, 200, err.Error(), constant.WSDataError, "")
err = conn.Close()
if err != nil {
log.NewError("", "ws close err", err.Error())
}
2021-05-26 19:24:25 +08:00
return
}
if err := validate.Struct(m); err != nil {
log.ErrorByKv("ws args validate err", "", "err", err.Error())
2021-10-21 19:10:55 +08:00
ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr)
2021-05-26 19:24:25 +08:00
return
}
if !utils.VerifyToken(m.Token, m.SendID) {
2021-10-21 19:10:55 +08:00
ws.sendErrMsg(conn, 202, "token validate err", m.ReqIdentifier, m.MsgIncr)
2021-05-26 19:24:25 +08:00
return
}
2021-10-21 19:10:55 +08:00
fmt.Println("test fmt Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
2021-05-26 19:24:25 +08:00
log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
switch m.ReqIdentifier {
case constant.WSGetNewestSeq:
2021-10-21 19:10:55 +08:00
go ws.newestSeqReq(conn, &m)
2021-05-26 19:24:25 +08:00
case constant.WSPullMsg:
2021-10-21 19:10:55 +08:00
go ws.pullMsgReq(conn, &m)
2021-05-26 19:24:25 +08:00
case constant.WSSendMsg:
2021-10-21 19:10:55 +08:00
sendTime := utils.GetCurrentTimestampByNano()
go ws.sendMsgReq(conn, &m, sendTime)
case constant.WSPullMsgBySeqList:
go ws.pullMsgBySeqListReq(conn, &m)
2021-05-26 19:24:25 +08:00
default:
}
2021-10-21 19:10:55 +08:00
log.NewInfo("", "goroutine num is ", runtime.NumGoroutine())
2021-05-26 19:24:25 +08:00
}
2021-10-21 19:10:55 +08:00
func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) {
2021-05-26 19:24:25 +08:00
mReply := make(map[string]interface{})
mData := make(map[string]interface{})
mReply["reqIdentifier"] = m.ReqIdentifier
mReply["msgIncr"] = m.MsgIncr
mReply["errCode"] = pb.GetErrCode()
mReply["errMsg"] = pb.GetErrMsg()
mData["seq"] = pb.GetSeq()
mReply["data"] = mData
ws.sendMsg(conn, mReply)
}
2021-10-21 19:10:55 +08:00
func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) {
2021-05-26 19:24:25 +08:00
log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m)
pbData := pbChat.GetNewSeqReq{}
pbData.UserID = m.SendID
pbData.OperationID = m.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
if grpcConn == nil {
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m)
}
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.GetNewSeq(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String())
return
}
log.InfoByKv("rpc call success to getNewSeq", pbData.OperationID, "replyData", reply.String())
ws.newestSeqResp(conn, m, reply)
}
2021-10-21 19:10:55 +08:00
func (ws *WServer) pullMsgResp(conn *UserConn, m *Req, pb *pbChat.PullMessageResp) {
2021-05-26 19:24:25 +08:00
mReply := make(map[string]interface{})
msg := make(map[string]interface{})
mReply["reqIdentifier"] = m.ReqIdentifier
mReply["msgIncr"] = m.MsgIncr
mReply["errCode"] = pb.GetErrCode()
mReply["errMsg"] = pb.GetErrMsg()
//空切片
if v := pb.GetSingleUserMsg(); v != nil {
msg["single"] = v
} else {
msg["single"] = []pbChat.GatherFormat{}
}
if v := pb.GetGroupUserMsg(); v != nil {
msg["group"] = v
} else {
msg["group"] = []pbChat.GatherFormat{}
}
msg["maxSeq"] = pb.GetMaxSeq()
msg["minSeq"] = pb.GetMinSeq()
mReply["data"] = msg
ws.sendMsg(conn, mReply)
}
2021-10-21 19:10:55 +08:00
func (ws *WServer) pullMsgReq(conn *UserConn, m *Req) {
2021-05-26 19:24:25 +08:00
log.InfoByKv("Ws call success to pullMsgReq", m.OperationID, "Parameters", m)
reply := new(pbChat.PullMessageResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsg)
if isPass {
pbData := pbChat.PullMessageReq{}
pbData.UserID = m.SendID
pbData.OperationID = m.OperationID
pbData.SeqBegin = data.(SeqData).SeqBegin
pbData.SeqEnd = data.(SeqData).SeqEnd
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.PullMessage(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("PullMessage error", pbData.OperationID, "err", err.Error())
return
}
log.InfoByKv("rpc call success to pullMsgRep", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(),
"MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg()))
ws.pullMsgResp(conn, m, reply)
} else {
reply.ErrCode = errCode
reply.ErrMsg = errMsg
ws.pullMsgResp(conn, m, reply)
}
}
2021-10-21 19:10:55 +08:00
func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
log.NewError(m.OperationID, "Ws call success to pullMsgBySeqListReq", m)
reply := new(pbChat.PullMessageResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList)
if isPass {
pbData := pbChat.PullMessageBySeqListReq{}
pbData.SeqList = data.(SeqListData).SeqList
pbData.UserID = m.SendID
pbData.OperationID = m.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &pbData)
if err != nil {
log.NewError(pbData.OperationID, "pullMsgBySeqListReq err", err.Error())
return
}
log.NewInfo(pbData.OperationID, "rpc call success to pullMsgBySeqListReq", reply.String(), reply.GetMaxSeq(), reply.GetMinSeq(), len(reply.GetSingleUserMsg()), len(reply.GetGroupUserMsg()))
ws.pullMsgResp(conn, m, reply)
} else {
reply.ErrCode = errCode
reply.ErrMsg = errMsg
ws.pullMsgResp(conn, m, reply)
}
}
2021-05-26 19:24:25 +08:00
2021-10-21 19:10:55 +08:00
func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.UserSendMsgResp, sendTime int64) {
// := make(map[string]interface{})
2021-05-26 19:24:25 +08:00
mReplyData := make(map[string]interface{})
2021-10-21 19:10:55 +08:00
//mReply["reqIdentifier"] = m.ReqIdentifier
//mReply["msgIncr"] = m.MsgIncr
//mReply["errCode"] = pb.GetErrCode()
//mReply["errMsg"] = pb.GetErrMsg()
2021-05-26 19:24:25 +08:00
mReplyData["clientMsgID"] = pb.GetClientMsgID()
mReplyData["serverMsgID"] = pb.GetServerMsgID()
2021-10-21 19:10:55 +08:00
mReplyData["sendTime"] = utils.Int64ToString(sendTime)
//mReply["data"] = mReplyData
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: mReplyData,
}
fmt.Println("test fmt send msg resp", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
2021-05-26 19:24:25 +08:00
ws.sendMsg(conn, mReply)
}
2021-10-21 19:10:55 +08:00
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req, sendTime int64) {
2021-05-26 19:24:25 +08:00
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
reply := new(pbChat.UserSendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
if isPass {
data := pData.(MsgData)
pbData := pbChat.UserSendMsgReq{
ReqIdentifier: m.ReqIdentifier,
Token: m.Token,
SendID: m.SendID,
OperationID: m.OperationID,
PlatformID: data.PlatformID,
SessionType: data.SessionType,
MsgFrom: data.MsgFrom,
ContentType: data.ContentType,
RecvID: data.RecvID,
ForceList: data.ForceList,
Content: data.Content,
Options: utils.MapToJsonString(data.Options),
ClientMsgID: data.ClientMsgID,
OffLineInfo: utils.MapToJsonString(data.OfflineInfo),
2021-10-21 19:10:55 +08:00
SendTime: sendTime,
2021-05-26 19:24:25 +08:00
}
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
2021-10-21 19:10:55 +08:00
time := utils.GetCurrentTimestampBySecond()
2021-05-26 19:24:25 +08:00
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
client := pbChat.NewChatClient(etcdConn)
2021-10-21 19:10:55 +08:00
log.Info("", "", "ws UserSendMsg call, api call rpc...")
reply, err := client.UserSendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(pbData.OperationID, "UserSendMsg err", err.Error())
reply.ErrCode = 100
reply.ErrMsg = "rpc err"
}
log.NewInfo(pbData.OperationID, "sendMsgReq call rpc cost time ", utils.GetCurrentTimestampBySecond()-time)
2021-05-26 19:24:25 +08:00
log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String())
2021-10-21 19:10:55 +08:00
ws.sendMsgResp(conn, m, reply, sendTime)
log.NewInfo(pbData.OperationID, "sendMsgResp end cost time ", utils.GetCurrentTimestampBySecond()-time)
2021-05-26 19:24:25 +08:00
} else {
reply.ErrCode = errCode
reply.ErrMsg = errMsg
2021-10-21 19:10:55 +08:00
ws.sendMsgResp(conn, m, reply, sendTime)
2021-05-26 19:24:25 +08:00
}
}
2021-10-21 19:10:55 +08:00
func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) {
2021-05-26 19:24:25 +08:00
bMsg, _ := json.Marshal(mReply)
err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
if err != nil {
log.ErrorByKv("WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err, "mReply", mReply)
}
}
2021-10-21 19:10:55 +08:00
func (ws *WServer) sendErrMsg(conn *UserConn, errCode int32, errMsg string, reqIdentifier int32, msgIncr string) {
2021-05-26 19:24:25 +08:00
mReply := make(map[string]interface{})
mReply["errCode"] = errCode
mReply["errMsg"] = errMsg
2021-10-21 19:10:55 +08:00
mReply["reqIdentifier"] = reqIdentifier
mReply["msgIncr"] = msgIncr
2021-05-26 19:24:25 +08:00
ws.sendMsg(conn, mReply)
}