2021-05-26 19:24:25 +08:00
package gate
import (
"Open_IM/src/common/config"
"Open_IM/src/common/constant"
"Open_IM/src/common/log"
2021-07-15 12:33:59 +08:00
"Open_IM/src/grpc-etcdv3/getcdv3"
2021-05-26 19:24:25 +08:00
pbChat "Open_IM/src/proto/chat"
2021-10-25 16:03:14 +08:00
pbWs "Open_IM/src/proto/sdk_ws"
2021-05-26 19:24:25 +08:00
"Open_IM/src/utils"
2021-10-22 20:02:29 +08:00
"bytes"
2021-05-26 19:24:25 +08:00
"context"
2021-10-22 20:02:29 +08:00
"encoding/gob"
2021-10-21 19:10:55 +08:00
"fmt"
2021-10-25 16:03:14 +08:00
"github.com/golang/protobuf/proto"
2021-05-26 19:24:25 +08:00
"github.com/gorilla/websocket"
2021-10-21 19:10:55 +08:00
"runtime"
2021-05-26 19:24:25 +08:00
"strings"
)
2021-10-22 20:02:29 +08:00
func ( ws * WServer ) msgParse ( conn * UserConn , binaryMsg [ ] byte ) {
2021-05-26 19:24:25 +08:00
//ws online debug data
//{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0}
//{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6}
//{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b",
//"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID":
//"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"}
2021-10-22 20:02:29 +08:00
b := bytes . NewBuffer ( binaryMsg )
2021-05-26 19:24:25 +08:00
m := Req { }
2021-10-22 20:02:29 +08:00
dec := gob . NewDecoder ( b )
err := dec . Decode ( & m )
if err != nil {
2021-05-26 19:24:25 +08:00
log . ErrorByKv ( "ws json Unmarshal err" , "" , "err" , err . Error ( ) )
2021-10-21 19:10:55 +08:00
ws . sendErrMsg ( conn , 200 , err . Error ( ) , constant . WSDataError , "" )
err = conn . Close ( )
if err != nil {
log . NewError ( "" , "ws close err" , err . Error ( ) )
}
2021-05-26 19:24:25 +08:00
return
}
if err := validate . Struct ( m ) ; err != nil {
log . ErrorByKv ( "ws args validate err" , "" , "err" , err . Error ( ) )
2021-10-21 19:10:55 +08:00
ws . sendErrMsg ( conn , 201 , err . Error ( ) , m . ReqIdentifier , m . MsgIncr )
2021-05-26 19:24:25 +08:00
return
}
if ! utils . VerifyToken ( m . Token , m . SendID ) {
2021-10-21 19:10:55 +08:00
ws . sendErrMsg ( conn , 202 , "token validate err" , m . ReqIdentifier , m . MsgIncr )
2021-05-26 19:24:25 +08:00
return
}
2021-10-21 19:10:55 +08:00
fmt . Println ( "test fmt Basic Info Authentication Success" , m . OperationID , "reqIdentifier" , m . ReqIdentifier , "sendID" , m . SendID )
2021-10-25 20:12:50 +08:00
log . InfoByKv ( "Basic Info Authentication Success" , m . OperationID , "reqIdentifier" , m . ReqIdentifier , "sendID" , m . SendID , "msgIncr" , m . MsgIncr )
2021-05-26 19:24:25 +08:00
switch m . ReqIdentifier {
case constant . WSGetNewestSeq :
2021-10-21 19:10:55 +08:00
go ws . newestSeqReq ( conn , & m )
2021-05-26 19:24:25 +08:00
case constant . WSPullMsg :
2021-10-21 19:10:55 +08:00
go ws . pullMsgReq ( conn , & m )
2021-05-26 19:24:25 +08:00
case constant . WSSendMsg :
2021-10-21 19:10:55 +08:00
sendTime := utils . GetCurrentTimestampByNano ( )
go ws . sendMsgReq ( conn , & m , sendTime )
case constant . WSPullMsgBySeqList :
go ws . pullMsgBySeqListReq ( conn , & m )
2021-05-26 19:24:25 +08:00
default :
}
2021-10-21 19:10:55 +08:00
log . NewInfo ( "" , "goroutine num is " , runtime . NumGoroutine ( ) )
2021-05-26 19:24:25 +08:00
}
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) newestSeqResp ( conn * UserConn , m * Req , pb * pbChat . GetNewSeqResp ) {
2021-10-25 16:03:14 +08:00
var mReplyData pbWs . GetNewSeqResp
mReplyData . Seq = pb . GetSeq ( )
b , _ := proto . Marshal ( & mReplyData )
mReply := Resp {
ReqIdentifier : m . ReqIdentifier ,
MsgIncr : m . MsgIncr ,
ErrCode : pb . GetErrCode ( ) ,
ErrMsg : pb . GetErrMsg ( ) ,
OperationID : m . OperationID ,
Data : b ,
}
2021-05-26 19:24:25 +08:00
ws . sendMsg ( conn , mReply )
}
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) newestSeqReq ( conn * UserConn , m * Req ) {
2021-05-26 19:24:25 +08:00
log . InfoByKv ( "Ws call success to getNewSeq" , m . OperationID , "Parameters" , m )
pbData := pbChat . GetNewSeqReq { }
pbData . UserID = m . SendID
pbData . OperationID = m . OperationID
grpcConn := getcdv3 . GetConn ( config . Config . Etcd . EtcdSchema , strings . Join ( config . Config . Etcd . EtcdAddr , "," ) , config . Config . RpcRegisterName . OpenImOfflineMessageName )
if grpcConn == nil {
log . ErrorByKv ( "get grpcConn err" , pbData . OperationID , "args" , m )
}
msgClient := pbChat . NewChatClient ( grpcConn )
reply , err := msgClient . GetNewSeq ( context . Background ( ) , & pbData )
if err != nil {
log . ErrorByKv ( "rpc call failed to getNewSeq" , pbData . OperationID , "err" , err , "pbData" , pbData . String ( ) )
return
}
log . InfoByKv ( "rpc call success to getNewSeq" , pbData . OperationID , "replyData" , reply . String ( ) )
ws . newestSeqResp ( conn , m , reply )
}
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) pullMsgResp ( conn * UserConn , m * Req , pb * pbChat . PullMessageResp ) {
2021-10-25 16:03:14 +08:00
var mReplyData pbWs . PullMessageBySeqListResp
2021-10-25 20:24:26 +08:00
b , err := proto . Marshal ( pb )
2021-10-25 20:12:50 +08:00
if err != nil {
log . NewError ( m . OperationID , "GetSingleUserMsg,json marshal,err" , err . Error ( ) )
}
log . NewInfo ( m . OperationID , "pullMsgResp json is " , string ( b ) )
2021-10-25 20:24:26 +08:00
err = proto . Unmarshal ( b , & mReplyData )
2021-10-25 16:03:14 +08:00
if err != nil {
log . NewError ( m . OperationID , "SingleUserMsg,json Unmarshal,err" , err . Error ( ) )
2021-05-26 19:24:25 +08:00
}
2021-10-25 20:36:11 +08:00
c , err := proto . Marshal ( & mReplyData )
if err != nil {
log . NewError ( m . OperationID , "mReplyData,json marshal,err" , err . Error ( ) )
}
2021-10-25 16:03:14 +08:00
mReply := Resp {
ReqIdentifier : m . ReqIdentifier ,
MsgIncr : m . MsgIncr ,
ErrCode : pb . GetErrCode ( ) ,
ErrMsg : pb . GetErrMsg ( ) ,
OperationID : m . OperationID ,
Data : c ,
2021-05-26 19:24:25 +08:00
}
ws . sendMsg ( conn , mReply )
}
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) pullMsgReq ( conn * UserConn , m * Req ) {
2021-05-26 19:24:25 +08:00
log . InfoByKv ( "Ws call success to pullMsgReq" , m . OperationID , "Parameters" , m )
reply := new ( pbChat . PullMessageResp )
isPass , errCode , errMsg , data := ws . argsValidate ( m , constant . WSPullMsg )
if isPass {
pbData := pbChat . PullMessageReq { }
pbData . UserID = m . SendID
pbData . OperationID = m . OperationID
pbData . SeqBegin = data . ( SeqData ) . SeqBegin
pbData . SeqEnd = data . ( SeqData ) . SeqEnd
grpcConn := getcdv3 . GetConn ( config . Config . Etcd . EtcdSchema , strings . Join ( config . Config . Etcd . EtcdAddr , "," ) , config . Config . RpcRegisterName . OpenImOfflineMessageName )
msgClient := pbChat . NewChatClient ( grpcConn )
reply , err := msgClient . PullMessage ( context . Background ( ) , & pbData )
if err != nil {
log . ErrorByKv ( "PullMessage error" , pbData . OperationID , "err" , err . Error ( ) )
return
}
log . InfoByKv ( "rpc call success to pullMsgRep" , pbData . OperationID , "ReplyArgs" , reply . String ( ) , "maxSeq" , reply . GetMaxSeq ( ) ,
"MinSeq" , reply . GetMinSeq ( ) , "singLen" , len ( reply . GetSingleUserMsg ( ) ) , "groupLen" , len ( reply . GetGroupUserMsg ( ) ) )
ws . pullMsgResp ( conn , m , reply )
} else {
reply . ErrCode = errCode
reply . ErrMsg = errMsg
ws . pullMsgResp ( conn , m , reply )
}
}
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) pullMsgBySeqListReq ( conn * UserConn , m * Req ) {
log . NewError ( m . OperationID , "Ws call success to pullMsgBySeqListReq" , m )
reply := new ( pbChat . PullMessageResp )
isPass , errCode , errMsg , data := ws . argsValidate ( m , constant . WSPullMsgBySeqList )
if isPass {
pbData := pbChat . PullMessageBySeqListReq { }
2021-10-25 16:03:14 +08:00
pbData . SeqList = data . ( pbWs . PullMessageBySeqListReq ) . SeqList
2021-10-21 19:10:55 +08:00
pbData . UserID = m . SendID
pbData . OperationID = m . OperationID
grpcConn := getcdv3 . GetConn ( config . Config . Etcd . EtcdSchema , strings . Join ( config . Config . Etcd . EtcdAddr , "," ) , config . Config . RpcRegisterName . OpenImOfflineMessageName )
msgClient := pbChat . NewChatClient ( grpcConn )
reply , err := msgClient . PullMessageBySeqList ( context . Background ( ) , & pbData )
if err != nil {
log . NewError ( pbData . OperationID , "pullMsgBySeqListReq err" , err . Error ( ) )
return
}
log . NewInfo ( pbData . OperationID , "rpc call success to pullMsgBySeqListReq" , reply . String ( ) , reply . GetMaxSeq ( ) , reply . GetMinSeq ( ) , len ( reply . GetSingleUserMsg ( ) ) , len ( reply . GetGroupUserMsg ( ) ) )
ws . pullMsgResp ( conn , m , reply )
} else {
reply . ErrCode = errCode
reply . ErrMsg = errMsg
ws . pullMsgResp ( conn , m , reply )
}
}
2021-05-26 19:24:25 +08:00
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) sendMsgResp ( conn * UserConn , m * Req , pb * pbChat . UserSendMsgResp , sendTime int64 ) {
// := make(map[string]interface{})
2021-10-25 16:03:14 +08:00
var mReplyData pbWs . UserSendMsgResp
mReplyData . ClientMsgID = pb . GetClientMsgID ( )
mReplyData . ServerMsgID = pb . GetServerMsgID ( )
mReplyData . SendTime = sendTime
b , _ := proto . Marshal ( & mReplyData )
2021-10-21 19:10:55 +08:00
mReply := Resp {
ReqIdentifier : m . ReqIdentifier ,
MsgIncr : m . MsgIncr ,
ErrCode : pb . GetErrCode ( ) ,
ErrMsg : pb . GetErrMsg ( ) ,
OperationID : m . OperationID ,
2021-10-25 16:03:14 +08:00
Data : b ,
2021-10-21 19:10:55 +08:00
}
fmt . Println ( "test fmt send msg resp" , m . OperationID , "reqIdentifier" , m . ReqIdentifier , "sendID" , m . SendID )
2021-05-26 19:24:25 +08:00
ws . sendMsg ( conn , mReply )
}
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) sendMsgReq ( conn * UserConn , m * Req , sendTime int64 ) {
2021-05-26 19:24:25 +08:00
log . InfoByKv ( "Ws call success to sendMsgReq" , m . OperationID , "Parameters" , m )
reply := new ( pbChat . UserSendMsgResp )
isPass , errCode , errMsg , pData := ws . argsValidate ( m , constant . WSSendMsg )
if isPass {
2021-10-25 16:03:14 +08:00
data := pData . ( pbWs . UserSendMsgReq )
2021-05-26 19:24:25 +08:00
pbData := pbChat . UserSendMsgReq {
ReqIdentifier : m . ReqIdentifier ,
Token : m . Token ,
SendID : m . SendID ,
OperationID : m . OperationID ,
PlatformID : data . PlatformID ,
SessionType : data . SessionType ,
MsgFrom : data . MsgFrom ,
ContentType : data . ContentType ,
RecvID : data . RecvID ,
ForceList : data . ForceList ,
Content : data . Content ,
2021-10-25 16:03:14 +08:00
Options : utils . MapIntToJsonString ( data . Options ) ,
2021-05-26 19:24:25 +08:00
ClientMsgID : data . ClientMsgID ,
2021-10-21 19:10:55 +08:00
SendTime : sendTime ,
2021-05-26 19:24:25 +08:00
}
2021-10-21 19:10:55 +08:00
time := utils . GetCurrentTimestampBySecond ( )
2021-05-26 19:24:25 +08:00
etcdConn := getcdv3 . GetConn ( config . Config . Etcd . EtcdSchema , strings . Join ( config . Config . Etcd . EtcdAddr , "," ) , config . Config . RpcRegisterName . OpenImOfflineMessageName )
client := pbChat . NewChatClient ( etcdConn )
2021-10-21 19:10:55 +08:00
log . Info ( "" , "" , "ws UserSendMsg call, api call rpc..." )
reply , err := client . UserSendMsg ( context . Background ( ) , & pbData )
if err != nil {
log . NewError ( pbData . OperationID , "UserSendMsg err" , err . Error ( ) )
reply . ErrCode = 100
reply . ErrMsg = "rpc err"
}
log . NewInfo ( pbData . OperationID , "sendMsgReq call rpc cost time " , utils . GetCurrentTimestampBySecond ( ) - time )
2021-05-26 19:24:25 +08:00
log . Info ( "" , "" , "api UserSendMsg call end..., [data: %s] [reply: %s]" , pbData . String ( ) , reply . String ( ) )
2021-10-21 19:10:55 +08:00
ws . sendMsgResp ( conn , m , reply , sendTime )
log . NewInfo ( pbData . OperationID , "sendMsgResp end cost time " , utils . GetCurrentTimestampBySecond ( ) - time )
2021-05-26 19:24:25 +08:00
} else {
reply . ErrCode = errCode
reply . ErrMsg = errMsg
2021-10-21 19:10:55 +08:00
ws . sendMsgResp ( conn , m , reply , sendTime )
2021-05-26 19:24:25 +08:00
}
}
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) sendMsg ( conn * UserConn , mReply interface { } ) {
2021-10-22 20:02:29 +08:00
var b bytes . Buffer
enc := gob . NewEncoder ( & b )
err := enc . Encode ( mReply )
if err != nil {
fmt . Println ( err )
}
err = ws . writeMsg ( conn , websocket . BinaryMessage , b . Bytes ( ) )
2021-05-26 19:24:25 +08:00
if err != nil {
log . ErrorByKv ( "WS WriteMsg error" , "" , "userIP" , conn . RemoteAddr ( ) . String ( ) , "userUid" , ws . getUserUid ( conn ) , "error" , err , "mReply" , mReply )
}
}
2021-10-21 19:10:55 +08:00
func ( ws * WServer ) sendErrMsg ( conn * UserConn , errCode int32 , errMsg string , reqIdentifier int32 , msgIncr string ) {
2021-05-26 19:24:25 +08:00
mReply := make ( map [ string ] interface { } )
mReply [ "errCode" ] = errCode
mReply [ "errMsg" ] = errMsg
2021-10-21 19:10:55 +08:00
mReply [ "reqIdentifier" ] = reqIdentifier
mReply [ "msgIncr" ] = msgIncr
2021-05-26 19:24:25 +08:00
ws . sendMsg ( conn , mReply )
}