This commit is contained in:
wangchuxiao
2023-02-09 16:11:18 +08:00
parent 1075754458
commit ccc7e6b3c8
73 changed files with 153 additions and 160 deletions
+117
View File
@@ -0,0 +1,117 @@
package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/getcdv3"
pbChat "Open_IM/pkg/proto/msg"
sdk_ws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"strings"
)
var MaxPullMsgNum = 100
func (r *RPCServer) GenPullSeqList(currentSeq uint32, operationID string, userID string) ([]uint32, error) {
maxSeq, err := db.DB.GetUserMaxSeq(userID)
if err != nil {
log.Error(operationID, "GetUserMaxSeq failed ", userID, err.Error())
return nil, utils.Wrap(err, "")
}
var seqList []uint32
num := 0
for i := currentSeq + 1; i < uint32(maxSeq); i++ {
seqList = append(seqList, i)
num++
if num == MaxPullMsgNum {
break
}
}
log.Info(operationID, "GenPullSeqList ", seqList, "current seq", currentSeq)
return seqList, nil
}
func (r *RPCServer) GetSingleUserMsgForPushPlatforms(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformIDList []int) map[int]*sdk_ws.MsgDataList {
user2PushMsg := make(map[int]*sdk_ws.MsgDataList, 0)
for _, v := range platformIDList {
user2PushMsg[v] = r.GetSingleUserMsgForPush(operationID, msgData, pushToUserID, v)
//log.Info(operationID, "GetSingleUserMsgForPush", msgData.Seq, pushToUserID, v, "len:", len(user2PushMsg[v]))
}
return user2PushMsg
}
func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID int) *sdk_ws.MsgDataList {
//msgData.MsgDataList = nil
return &sdk_ws.MsgDataList{MsgDataList: []*sdk_ws.MsgData{msgData}}
//userConn := ws.getUserConn(pushToUserID, platformID)
//if userConn == nil {
// log.Debug(operationID, "userConn == nil")
// return []*sdk_ws.MsgData{msgData}
//}
//
//if msgData.Seq <= userConn.PushedMaxSeq {
// log.Debug(operationID, "msgData.Seq <= userConn.PushedMaxSeq", msgData.Seq, userConn.PushedMaxSeq)
// return nil
//}
//
//msgList := r.GetSingleUserMsg(operationID, msgData.Seq, pushToUserID)
//if msgList == nil {
// log.Debug(operationID, "GetSingleUserMsg msgList == nil", msgData.Seq, userConn.PushedMaxSeq)
// userConn.PushedMaxSeq = msgData.Seq
// return []*sdk_ws.MsgData{msgData}
//}
//msgList = append(msgList, msgData)
//
//for _, v := range msgList {
// if v.Seq > userConn.PushedMaxSeq {
// userConn.PushedMaxSeq = v.Seq
// }
//}
//log.Debug(operationID, "GetSingleUserMsg msgList len ", len(msgList), userConn.PushedMaxSeq)
//return msgList
}
func (r *RPCServer) GetSingleUserMsg(operationID string, currentMsgSeq uint32, userID string) []*sdk_ws.MsgData {
seqList, err := r.GenPullSeqList(currentMsgSeq, operationID, userID)
if err != nil {
log.Error(operationID, "GenPullSeqList failed ", err.Error(), currentMsgSeq, userID)
return nil
}
if len(seqList) == 0 {
log.Error(operationID, "GenPullSeqList len == 0 ", currentMsgSeq, userID)
return nil
}
rpcReq := sdk_ws.PullMessageBySeqListReq{}
rpcReq.SeqList = seqList
rpcReq.UserID = userID
rpcReq.OperationID = operationID
grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, rpcReq.OperationID)
if grpcConn == nil {
errMsg := "getcdv3.GetDefaultConn == nil"
log.NewError(rpcReq.OperationID, errMsg)
return nil
}
msgClient := pbChat.NewMsgClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq)
if err != nil {
log.Error(operationID, "PullMessageBySeqList failed ", err.Error(), rpcReq.String())
return nil
}
if len(reply.List) == 0 {
return nil
}
return reply.List
}
//func (r *RPCServer) GetBatchUserMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserIDList []string, platformID int) map[string][]*sdk_ws.MsgData {
// user2PushMsg := make(map[string][]*sdk_ws.MsgData, 0)
// for _, v := range pushToUserIDList {
// user2PushMsg[v] = r.GetSingleUserMsgForPush(operationID, msgData, v, platformID)
// }
// return user2PushMsg
//}
+89
View File
@@ -0,0 +1,89 @@
package gate
import (
cbApi "Open_IM/pkg/callback_struct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/http"
http2 "net/http"
"time"
)
func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID string) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
if !config.Config.Callback.CallbackUserOnline.Enable {
return callbackResp
}
callbackUserOnlineReq := cbApi.CallbackUserOnlineReq{
Token: token,
UserStatusCallbackReq: cbApi.UserStatusCallbackReq{
UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
CallbackCommand: constant.CallbackUserOnlineCommand,
OperationID: operationID,
PlatformID: int32(platformID),
Platform: constant.PlatformIDToName(platformID),
},
UserID: userID,
},
Seq: int(time.Now().UnixNano() / 1e6),
IsAppBackground: isAppBackground,
ConnID: connID,
}
callbackUserOnlineResp := &cbApi.CallbackUserOnlineResp{CommonCallbackResp: &callbackResp}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOnlineCommand, callbackUserOnlineReq, callbackUserOnlineResp, config.Config.Callback.CallbackUserOnline.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return callbackResp
}
func callbackUserOffline(operationID, userID string, platformID int, connID string) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
if !config.Config.Callback.CallbackUserOffline.Enable {
return callbackResp
}
callbackOfflineReq := cbApi.CallbackUserOfflineReq{
UserStatusCallbackReq: cbApi.UserStatusCallbackReq{
UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
CallbackCommand: constant.CallbackUserOfflineCommand,
OperationID: operationID,
PlatformID: int32(platformID),
Platform: constant.PlatformIDToName(platformID),
},
UserID: userID,
},
Seq: int(time.Now().UnixNano() / 1e6),
ConnID: connID,
}
callbackUserOfflineResp := &cbApi.CallbackUserOfflineResp{CommonCallbackResp: &callbackResp}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOfflineCommand, callbackOfflineReq, callbackUserOfflineResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return callbackResp
}
func callbackUserKickOff(operationID string, userID string, platformID int) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
if !config.Config.Callback.CallbackUserKickOff.Enable {
return callbackResp
}
callbackUserKickOffReq := cbApi.CallbackUserKickOffReq{
UserStatusCallbackReq: cbApi.UserStatusCallbackReq{
UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
CallbackCommand: constant.CallbackUserKickOffCommand,
OperationID: operationID,
PlatformID: int32(platformID),
Platform: constant.PlatformIDToName(platformID),
},
UserID: userID,
},
Seq: int(time.Now().UnixNano() / 1e6),
}
callbackUserKickOffResp := &cbApi.CallbackUserKickOffResp{CommonCallbackResp: &callbackResp}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserKickOffCommand, callbackUserKickOffReq, callbackUserKickOffResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return callbackResp
}
+48
View File
@@ -0,0 +1,48 @@
package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/statistics"
"fmt"
"sync"
promePkg "Open_IM/pkg/common/prometheus"
"github.com/go-playground/validator/v10"
)
var (
rwLock *sync.RWMutex
validate *validator.Validate
ws WServer
rpcSvr RPCServer
sendMsgAllCount uint64
sendMsgFailedCount uint64
sendMsgSuccessCount uint64
userCount uint64
sendMsgAllCountLock sync.RWMutex
)
func Init(rpcPort, wsPort int) {
rwLock = new(sync.RWMutex)
validate = validator.New()
statistics.NewStatistics(&sendMsgAllCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
ws.onInit(wsPort)
rpcSvr.onInit(rpcPort)
initPrometheus()
}
func Run(promethuesPort int) {
go ws.run()
go rpcSvr.run()
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
}
+429
View File
@@ -0,0 +1,429 @@
package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/getcdv3"
pbChat "Open_IM/pkg/proto/msg"
push "Open_IM/pkg/proto/push"
pbRtc "Open_IM/pkg/proto/rtc"
sdk_ws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"bytes"
"context"
"encoding/gob"
"runtime"
"strings"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
b := bytes.NewBuffer(binaryMsg)
m := Req{}
dec := gob.NewDecoder(b)
err := dec.Decode(&m)
if err != nil {
log.NewError("", "ws Decode err", err.Error())
err = conn.Close()
if err != nil {
log.NewError("", "ws close err", err.Error())
}
return
}
if err := validate.Struct(m); err != nil {
log.NewError("", "ws args validate err", err.Error())
ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr, m.OperationID)
return
}
log.NewInfo(m.OperationID, "Basic Info Authentication Success", m.SendID, m.MsgIncr, m.ReqIdentifier)
if m.SendID != conn.userID {
if err = conn.Close(); err != nil {
log.NewError(m.OperationID, "close ws conn failed", conn.userID, "send id", m.SendID, err.Error())
return
}
}
switch m.ReqIdentifier {
case constant.WSGetNewestSeq:
log.NewInfo(m.OperationID, "getSeqReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.getSeqReq(conn, &m)
promePkg.PromeInc(promePkg.GetNewestSeqTotalCounter)
case constant.WSSendMsg:
log.NewInfo(m.OperationID, "sendMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.sendMsgReq(conn, &m)
promePkg.PromeInc(promePkg.MsgRecvTotalCounter)
case constant.WSSendSignalMsg:
log.NewInfo(m.OperationID, "sendSignalMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.sendSignalMsgReq(conn, &m)
case constant.WSPullMsgBySeqList:
log.NewInfo(m.OperationID, "pullMsgBySeqListReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.pullMsgBySeqListReq(conn, &m)
promePkg.PromeInc(promePkg.PullMsgBySeqListTotalCounter)
case constant.WsLogoutMsg:
log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.userLogoutReq(conn, &m)
case constant.WsSetBackgroundStatus:
log.NewInfo(m.OperationID, "WsSetBackgroundStatus", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.setUserDeviceBackground(conn, &m)
default:
log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier)
}
log.NewInfo(m.OperationID, "goroutine num is ", runtime.NumGoroutine())
}
func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier)
nReply := new(sdk_ws.GetMaxAndMinSeqResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq, m.OperationID)
log.Info(m.OperationID, "argsValidate ", isPass, errCode, errMsg)
if isPass {
rpcReq := sdk_ws.GetMaxAndMinSeqReq{}
rpcReq.GroupIDList = data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList
rpcReq.UserID = m.SendID
rpcReq.OperationID = m.OperationID
log.Debug(m.OperationID, "Ws call success to getMaxAndMinSeq", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList)
grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, rpcReq.OperationID)
if grpcConn == nil {
errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil"
nReply.ErrCode = 500
nReply.ErrMsg = errMsg
log.NewError(rpcReq.OperationID, errMsg)
ws.getSeqResp(conn, m, nReply)
return
}
msgClient := pbChat.NewMsgClient(grpcConn)
rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq)
if err != nil {
nReply.ErrCode = 500
nReply.ErrMsg = err.Error()
log.Error(rpcReq.OperationID, "rpc call failed to GetMaxAndMinSeq ", nReply.String())
ws.getSeqResp(conn, m, nReply)
} else {
log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String())
ws.getSeqResp(conn, m, rpcReply)
}
} else {
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
log.Error(m.OperationID, "argsValidate failed send resp: ", nReply.String())
ws.getSeqResp(conn, m, nReply)
}
}
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) {
b, _ := proto.Marshal(pb)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: b,
}
log.Debug(m.OperationID, "getSeqResp come here req: ", pb.String(), "send resp: ",
mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg)
ws.sendMsg(conn, mReply)
}
func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data))
nReply := new(sdk_ws.PullMessageBySeqListResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList, m.OperationID)
if isPass {
rpcReq := sdk_ws.PullMessageBySeqListReq{}
rpcReq.SeqList = data.(sdk_ws.PullMessageBySeqListReq).SeqList
rpcReq.UserID = m.SendID
rpcReq.OperationID = m.OperationID
rpcReq.GroupSeqList = data.(sdk_ws.PullMessageBySeqListReq).GroupSeqList
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq middle", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.PullMessageBySeqListReq).SeqList)
grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID)
if grpcConn == nil {
errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil"
nReply.ErrCode = 500
nReply.ErrMsg = errMsg
log.NewError(rpcReq.OperationID, errMsg)
ws.pullMsgBySeqListResp(conn, m, nReply)
return
}
msgClient := pbChat.NewMsgClient(grpcConn)
maxSizeOption := grpc.MaxCallRecvMsgSize(1024 * 1024 * 20)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq, maxSizeOption)
if err != nil {
log.NewError(rpcReq.OperationID, "pullMsgBySeqListReq err", err.Error())
nReply.ErrCode = 200
nReply.ErrMsg = err.Error()
ws.pullMsgBySeqListResp(conn, m, nReply)
} else {
log.NewInfo(rpcReq.OperationID, "rpc call success to pullMsgBySeqListReq", reply.String(), len(reply.List))
ws.pullMsgBySeqListResp(conn, m, reply)
}
} else {
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
ws.pullMsgBySeqListResp(conn, m, nReply)
}
}
func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullMessageBySeqListResp) {
log.NewInfo(m.OperationID, "pullMsgBySeqListResp come here ", pb.String())
c, _ := proto.Marshal(pb)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: c,
}
log.NewInfo(m.OperationID, "pullMsgBySeqListResp all data is ", mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg,
len(mReply.Data))
ws.sendMsg(conn, mReply)
}
func (ws *WServer) userLogoutReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to userLogoutReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data))
rpcReq := push.DelUserPushTokenReq{}
rpcReq.UserID = m.SendID
rpcReq.PlatformID = conn.PlatformID
rpcReq.OperationID = m.OperationID
grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, m.OperationID)
if grpcConn == nil {
errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(rpcReq.OperationID, errMsg)
ws.userLogoutResp(conn, m)
return
}
msgClient := push.NewPushMsgServiceClient(grpcConn)
reply, err := msgClient.DelUserPushToken(context.Background(), &rpcReq)
if err != nil {
log.NewError(rpcReq.OperationID, "DelUserPushToken err", err.Error())
ws.userLogoutResp(conn, m)
} else {
log.NewInfo(rpcReq.OperationID, "rpc call success to DelUserPushToken", reply.String())
ws.userLogoutResp(conn, m)
}
ws.userLogoutResp(conn, m)
}
func (ws *WServer) userLogoutResp(conn *UserConn, m *Req) {
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
OperationID: m.OperationID,
}
ws.sendMsg(conn, mReply)
_ = conn.Close()
}
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
sendMsgAllCountLock.Lock()
sendMsgAllCount++
sendMsgAllCountLock.Unlock()
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID)
nReply := new(pbChat.SendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg, m.OperationID)
if isPass {
data := pData.(sdk_ws.MsgData)
pbData := pbChat.SendMsgReq{
Token: m.Token,
OperationID: m.OperationID,
MsgData: &data,
}
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq middle", m.ReqIdentifier, m.SendID, m.MsgIncr, data.String())
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID)
if etcdConn == nil {
errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil"
nReply.ErrCode = 500
nReply.ErrMsg = errMsg
log.NewError(m.OperationID, errMsg)
ws.sendMsgResp(conn, m, nReply)
return
}
client := pbChat.NewMsgClient(etcdConn)
reply, err := client.SendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(pbData.OperationID, "UserSendMsg err", err.Error())
nReply.ErrCode = 200
nReply.ErrMsg = err.Error()
ws.sendMsgResp(conn, m, nReply)
} else {
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String())
ws.sendMsgResp(conn, m, reply)
}
} else {
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
ws.sendMsgResp(conn, m, nReply)
}
}
func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
var mReplyData sdk_ws.UserSendMsgResp
mReplyData.ClientMsgID = pb.GetClientMsgID()
mReplyData.ServerMsgID = pb.GetServerMsgID()
mReplyData.SendTime = pb.GetSendTime()
b, _ := proto.Marshal(&mReplyData)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: b,
}
ws.sendMsg(conn, mReply)
}
func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, string(m.Data))
nReply := new(pbChat.SendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg, m.OperationID)
if isPass {
signalResp := pbRtc.SignalResp{}
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRealTimeCommName, m.OperationID)
if etcdConn == nil {
errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(m.OperationID, errMsg)
ws.sendSignalMsgResp(conn, 204, errMsg, m, &signalResp)
return
}
rtcClient := pbRtc.NewRtcServiceClient(etcdConn)
req := &pbRtc.SignalMessageAssembleReq{
SignalReq: pData.(*pbRtc.SignalReq),
OperationID: m.OperationID,
}
respPb, err := rtcClient.SignalMessageAssemble(context.Background(), req)
if err != nil {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "SignalMessageAssemble", err.Error(), config.Config.RpcRegisterName.OpenImRealTimeCommName)
ws.sendSignalMsgResp(conn, 204, "grpc SignalMessageAssemble failed: "+err.Error(), m, &signalResp)
return
}
signalResp.Payload = respPb.SignalResp.Payload
msgData := sdk_ws.MsgData{}
utils.CopyStructFields(&msgData, respPb.MsgData)
log.NewInfo(m.OperationID, utils.GetSelfFuncName(), respPb.String())
if respPb.IsPass {
pbData := pbChat.SendMsgReq{
Token: m.Token,
OperationID: m.OperationID,
MsgData: &msgData,
}
log.NewInfo(m.OperationID, utils.GetSelfFuncName(), "pbData: ", pbData)
log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq middle", m.ReqIdentifier, m.SendID, m.MsgIncr, msgData)
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID)
if etcdConn == nil {
errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(m.OperationID, errMsg)
ws.sendSignalMsgResp(conn, 200, errMsg, m, &signalResp)
return
}
client := pbChat.NewMsgClient(etcdConn)
reply, err := client.SendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(pbData.OperationID, utils.GetSelfFuncName(), "rpc sendMsg err", err.Error())
nReply.ErrCode = 200
nReply.ErrMsg = err.Error()
ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp)
} else {
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String(), signalResp.String(), m)
ws.sendSignalMsgResp(conn, 0, "", m, &signalResp)
}
} else {
log.NewError(m.OperationID, utils.GetSelfFuncName(), respPb.IsPass, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg)
ws.sendSignalMsgResp(conn, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg, m, &signalResp)
}
} else {
ws.sendSignalMsgResp(conn, errCode, errMsg, m, nil)
}
}
func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *pbRtc.SignalResp) {
// := make(map[string]interface{})
log.Debug(m.OperationID, "sendSignalMsgResp is", pb.String())
b, _ := proto.Marshal(pb)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: errCode,
ErrMsg: errMsg,
OperationID: m.OperationID,
Data: b,
}
ws.sendMsg(conn, mReply)
}
func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) {
var b bytes.Buffer
enc := gob.NewEncoder(&b)
err := enc.Encode(mReply)
if err != nil {
// uid, platform := ws.getUserUid(conn)
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), err.Error())
return
}
err = ws.writeMsg(conn, websocket.BinaryMessage, b.Bytes())
if err != nil {
// uid, platform := ws.getUserUid(conn)
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), err.Error())
} else {
log.Debug(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws write response success")
}
}
func (ws *WServer) sendErrMsg(conn *UserConn, errCode int32, errMsg string, reqIdentifier int32, msgIncr string, operationID string) {
mReply := Resp{
ReqIdentifier: reqIdentifier,
MsgIncr: msgIncr,
ErrCode: errCode,
ErrMsg: errMsg,
OperationID: operationID,
}
ws.sendMsg(conn, mReply)
}
func SetTokenKicked(userID string, platformID int, operationID string) {
m, err := db.DB.GetTokenMapByUidPid(userID, constant.PlatformIDToName(platformID))
if err != nil {
log.Error(operationID, "GetTokenMapByUidPid failed ", err.Error(), userID, constant.PlatformIDToName(platformID))
return
}
for k, _ := range m {
m[k] = constant.KickedToken
}
err = db.DB.SetTokenMapByUidPid(userID, platformID, m)
if err != nil {
log.Error(operationID, "SetTokenMapByUidPid failed ", err.Error(), userID, constant.PlatformIDToName(platformID))
return
}
}
func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) {
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WsSetBackgroundStatus, m.OperationID)
if isPass {
req := pData.(*sdk_ws.SetAppBackgroundStatusReq)
conn.IsBackground = req.IsBackground
callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.PlatformID), conn.token, conn.IsBackground, conn.connID)
if callbackResp.ErrCode != 0 {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
}
log.NewInfo(m.OperationID, "SetUserDeviceBackground", "success", *conn, req.IsBackground)
}
ws.setUserDeviceBackgroundResp(conn, m, errCode, errMsg)
}
func (ws *WServer) setUserDeviceBackgroundResp(conn *UserConn, m *Req, errCode int32, errMsg string) {
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
OperationID: m.OperationID,
ErrCode: errCode,
ErrMsg: errMsg,
}
ws.sendMsg(conn, mReply)
}
@@ -0,0 +1,22 @@
package open_im_media
const (
// Address gRPC服务地址
Address = "127.0.0.1:11300"
)
type Media struct {
}
func NewMedia() *Media {
return &Media{}
}
func init() {
}
func (m *Media) CreateRoom(roomName string) (error, error) {
return nil, nil
}
@@ -0,0 +1,377 @@
package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
sdk_ws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"bytes"
"context"
"encoding/gob"
"net"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
type RPCServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
platformList []int
pushTerminal []int
target string
}
func initPrometheus() {
promePkg.NewMsgRecvTotalCounter()
promePkg.NewGetNewestSeqTotalCounter()
promePkg.NewPullMsgBySeqListTotalCounter()
promePkg.NewMsgOnlinePushSuccessCounter()
promePkg.NewOnlineUserGauges()
//promePkg.NewSingleChatMsgRecvSuccessCounter()
//promePkg.NewGroupChatMsgRecvSuccessCounter()
//promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter()
}
func (r *RPCServer) onInit(rpcPort int) {
r.rpcPort = rpcPort
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImRelayName
r.etcdSchema = config.Config.Etcd.EtcdSchema
r.etcdAddr = config.Config.Etcd.EtcdAddr
r.platformList = genPlatformArray()
r.pushTerminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID}
}
func (r *RPCServer) run() {
listenIP := ""
if config.Config.ListenIP == "" {
listenIP = "0.0.0.0"
} else {
listenIP = config.Config.ListenIP
}
address := listenIP + ":" + strconv.Itoa(r.rpcPort)
listener, err := net.Listen("tcp", address)
if err != nil {
panic("listening err:" + err.Error() + r.rpcRegisterName)
}
defer listener.Close()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, []grpc.ServerOption{
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
pbRelay.RegisterRelayServer(srv, r)
rpcRegisterIP := config.Config.RpcRegisterIP
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error("", "GetLocalIP failed ", err.Error())
}
}
err = rpc.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName, 10)
if err != nil {
log.Error("", "register push message rpc to etcd err", "", "err", err.Error(), r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName)
panic(utils.Wrap(err, "register msg_gataway module rpc to etcd err"))
}
r.target = rpc.GetTarget(r.etcdSchema, rpcRegisterIP, r.rpcPort, r.rpcRegisterName)
err = srv.Serve(listener)
if err != nil {
log.Error("", "push message rpc listening err", "", "err", err.Error())
return
}
}
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) {
log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String())
var resp []*pbRelay.SingleMsgToUserPlatform
msgBytes, _ := proto.Marshal(in.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: in.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(in.OperationID, "data encode err", err.Error())
}
var tag bool
recvID := in.PushToUserID
for _, v := range r.platformList {
if conn := ws.getUserConn(recvID, v); conn != nil {
tag = true
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID)
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: recvID,
RecvPlatFormID: int32(v),
}
resp = append(resp, temp)
} else {
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: -1,
RecvID: recvID,
RecvPlatFormID: int32(v),
}
resp = append(resp, temp)
}
}
if !tag {
log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String())
}
return &pbRelay.OnlinePushMsgResp{
Resp: resp,
}, nil
}
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
if !tokenverify.IsManagerUserID(req.OpUserID) {
log.NewError(req.OperationID, "no permission GetUsersOnlineStatus ", req.OpUserID)
return &pbRelay.GetUsersOnlineStatusResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
var resp pbRelay.GetUsersOnlineStatusResp
for _, userID := range req.UserIDList {
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = userID
userConnMap := ws.getUserAllCons(userID)
for platform, userConn := range userConnMap {
if userConn != nil {
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(platform)
ps.Status = constant.OnlineStatus
ps.ConnID = userConn.connID
ps.IsBackground = userConn.IsBackground
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
}
}
if temp.Status == constant.OnlineStatus {
resp.SuccessResult = append(resp.SuccessResult, temp)
}
}
log.NewInfo(req.OperationID, "GetUsersOnlineStatus rpc return ", resp.String())
return &resp, nil
}
func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
//r.GetBatchMsgForPush(req.OperationID,req.MsgData,req.PushToUserIDList,)
msgBytes, _ := proto.Marshal(req.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: req.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(req.OperationID, "data encode err", err.Error())
}
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
}
userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap {
if userConn != nil {
temp := &pbRelay.SingleMsgToUserPlatform{
RecvID: v,
RecvPlatFormID: int32(platform),
}
if !userConn.IsBackground {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp.ResultCode = resultCode
resp = append(resp, temp)
}
} else {
temp.ResultCode = -2
resp = append(resp, temp)
}
}
}
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
}
userConnMap := ws.getUserAllCons(v)
var platformList []int
for k, _ := range userConnMap {
platformList = append(platformList, k)
}
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String())
needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList)
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList))
for platform, list := range needPushMapList {
if list != nil {
log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:")
//for _, v := range list {
// log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String())
// req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
// log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList))
//}
msgBytes, err := proto.Marshal(list)
if err != nil {
log.Error(req.OperationID, "proto marshal err", err.Error())
continue
}
req.MsgData.MsgDataList = msgBytes
//req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
log.Debug(req.OperationID, "r.encodeWsData no string")
//log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String())
log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String())
replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID)
if err != nil {
log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String())
continue
}
log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len())
resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: v,
RecvPlatFormID: int32(platform),
}
resp = append(resp, temp)
}
} else {
if utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: 0,
RecvID: v,
RecvPlatFormID: int32(platform),
}
resp = append(resp, temp)
}
}
}
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) {
log.Debug(operationID, "encodeWsData begin", wsData.String())
msgBytes, err := proto.Marshal(wsData)
if err != nil {
log.NewError(operationID, "Marshal", err.Error())
return bytes.Buffer{}, utils.Wrap(err, "")
}
log.Debug(operationID, "encodeWsData begin", wsData.String())
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: operationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err = enc.Encode(mReply)
if err != nil {
log.NewError(operationID, "data encode err", err.Error())
return bytes.Buffer{}, utils.Wrap(err, "")
}
return replyBytes, nil
}
func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOfflineReq) (*pbRelay.KickUserOfflineResp, error) {
log.NewInfo(req.OperationID, "KickUserOffline is arriving", req.String())
for _, v := range req.KickUserIDList {
log.NewWarn(req.OperationID, "SetTokenKicked ", v, req.PlatformID, req.OperationID)
SetTokenKicked(v, int(req.PlatformID), req.OperationID)
oldConnMap := ws.getUserAllCons(v)
if conn, ok := oldConnMap[int(req.PlatformID)]; ok { // user->map[platform->conn]
log.NewWarn(req.OperationID, "send kick msg, close connection ", req.PlatformID, v)
ws.sendKickMsg(conn)
conn.Close()
}
}
return &pbRelay.KickUserOfflineResp{}, nil
}
func (r *RPCServer) MultiTerminalLoginCheck(ctx context.Context, req *pbRelay.MultiTerminalLoginCheckReq) (*pbRelay.MultiTerminalLoginCheckResp, error) {
ws.MultiTerminalLoginCheckerWithLock(req.UserID, int(req.PlatformID), req.Token, req.OperationID)
return &pbRelay.MultiTerminalLoginCheckResp{}, nil
}
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(),
"error", err, "senderPlatform", constant.PlatformIDToName(int(in.MsgData.SenderPlatformID)), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
ResultCode = -2
return ResultCode
} else {
log.NewDebug(in.OperationID, "PushMsgToUser is success By Ws", "args", in.String(), "recvPlatForm", RecvPlatForm, "recvID", RecvID)
ResultCode = 0
return ResultCode
}
}
func sendMsgBatchToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlineBatchPushOneMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(),
"error", err, "senderPlatform", constant.PlatformIDToName(int(in.MsgData.SenderPlatformID)), "recv Platform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
ResultCode = -2
return ResultCode
} else {
log.NewDebug(in.OperationID, "PushMsgToUser is success By Ws", "args", in.String(), "recv PlatForm", RecvPlatForm, "recvID", RecvID)
ResultCode = 0
return ResultCode
}
}
func genPlatformArray() (array []int) {
for i := 1; i <= constant.LinuxPlatformID; i++ {
array = append(array, i)
}
return array
}
+125
View File
@@ -0,0 +1,125 @@
/*
** description("").
** copyright('Open_IM,www.Open_IM.io').
** author("fg,Gordon@tuoyun.net").
** time(2021/5/21 15:29).
*/
package gate
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
pbRtc "Open_IM/pkg/proto/rtc"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"github.com/golang/protobuf/proto"
)
type Req struct {
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
Token string `json:"token" `
SendID string `json:"sendID" validate:"required"`
OperationID string `json:"operationID" validate:"required"`
MsgIncr string `json:"msgIncr" validate:"required"`
Data []byte `json:"data"`
}
type Resp struct {
ReqIdentifier int32 `json:"reqIdentifier"`
MsgIncr string `json:"msgIncr"`
OperationID string `json:"operationID"`
ErrCode int32 `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data []byte `json:"data"`
}
type SeqData struct {
SeqBegin int64 `mapstructure:"seqBegin" validate:"required"`
SeqEnd int64 `mapstructure:"seqEnd" validate:"required"`
}
type MsgData struct {
PlatformID int32 `mapstructure:"platformID" validate:"required"`
SessionType int32 `mapstructure:"sessionType" validate:"required"`
MsgFrom int32 `mapstructure:"msgFrom" validate:"required"`
ContentType int32 `mapstructure:"contentType" validate:"required"`
RecvID string `mapstructure:"recvID" validate:"required"`
ForceList []string `mapstructure:"forceList"`
Content string `mapstructure:"content" validate:"required"`
Options map[string]interface{} `mapstructure:"options" validate:"required"`
ClientMsgID string `mapstructure:"clientMsgID" validate:"required"`
OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"`
Ext map[string]interface{} `mapstructure:"ext"`
}
type MaxSeqResp struct {
MaxSeq int64 `json:"maxSeq"`
}
type PullMessageResp struct {
}
type SeqListData struct {
SeqList []int64 `mapstructure:"seqList" validate:"required"`
}
func (ws *WServer) argsValidate(m *Req, r int32, operationID string) (isPass bool, errCode int32, errMsg string, returnData interface{}) {
switch r {
case constant.WSGetNewestSeq:
data := open_im_sdk.GetMaxAndMinSeqReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
case constant.WSSendMsg:
data := open_im_sdk.MsgData{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
case constant.WSSendSignalMsg:
data := pbRtc.SignalReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", &data
case constant.WSPullMsgBySeqList:
data := open_im_sdk.PullMessageBySeqListReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
case constant.WsSetBackgroundStatus:
data := open_im_sdk.SetAppBackgroundStatusReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", &data
default:
}
return false, 204, "args err", nil
}
+515
View File
@@ -0,0 +1,515 @@
package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/utils"
"bytes"
"compress/gzip"
"context"
"encoding/gob"
"io/ioutil"
"strconv"
"strings"
go_redis "github.com/go-redis/redis/v8"
"github.com/pkg/errors"
//"gopkg.in/errgo.v2/errors"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
type UserConn struct {
*websocket.Conn
w *sync.Mutex
PlatformID int32
PushedMaxSeq uint32
IsCompress bool
userID string
IsBackground bool
token string
connID string
}
type WServer struct {
wsAddr string
wsMaxConnNum int
wsUpGrader *websocket.Upgrader
wsConnToUser map[*UserConn]map[int]string
wsUserToConn map[string]map[int]*UserConn
}
func (ws *WServer) onInit(wsPort int) {
ws.wsAddr = ":" + utils.IntToString(wsPort)
ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum
ws.wsConnToUser = make(map[*UserConn]map[int]string)
ws.wsUserToConn = make(map[string]map[int]*UserConn)
ws.wsUpGrader = &websocket.Upgrader{
HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second,
ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen,
CheckOrigin: func(r *http.Request) bool { return true },
}
}
func (ws *WServer) run() {
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
err := http.ListenAndServe(ws.wsAddr, nil) //Start listening
if err != nil {
panic("Ws listening err:" + err.Error())
}
}
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
operationID := ""
if len(query["operationID"]) != 0 {
operationID = query["operationID"][0]
} else {
operationID = utils.OperationIDGenerator()
}
log.Debug(operationID, utils.GetSelfFuncName(), " args: ", query)
if isPass, compression := ws.headerCheck(w, r, operationID); isPass {
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
if err != nil {
log.Error(operationID, "upgrade http conn err", err.Error(), query)
return
} else {
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, compression, query["sendID"][0], false, query["token"][0], conn.RemoteAddr().String() + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))}
userCount++
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], newConn.connID, operationID)
go ws.readMsg(newConn)
}
} else {
log.Error(operationID, "headerCheck failed ")
}
}
func (ws *WServer) readMsg(conn *UserConn) {
for {
messageType, msg, err := conn.ReadMessage()
if messageType == websocket.PingMessage {
log.NewInfo("", "this is a pingMessage")
}
if err != nil {
log.NewWarn("", "WS ReadMsg error ", " userIP", conn.RemoteAddr().String(), "userUid", "platform", "error", err.Error())
userCount--
ws.delUserConn(conn)
return
}
log.NewDebug("", "size", utils.ByteSize(uint64(len(msg))))
if conn.IsCompress {
buff := bytes.NewBuffer(msg)
reader, err := gzip.NewReader(buff)
if err != nil {
log.NewWarn("", "un gzip read failed")
continue
}
msg, err = ioutil.ReadAll(reader)
if err != nil {
log.NewWarn("", "ReadAll failed")
continue
}
err = reader.Close()
if err != nil {
log.NewWarn("", "reader close failed")
}
}
ws.msgParse(conn, msg)
}
}
func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) {
conn.w.Lock()
defer conn.w.Unlock()
conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
}
func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error {
conn.w.Lock()
defer conn.w.Unlock()
if conn.IsCompress {
var buffer bytes.Buffer
gz := gzip.NewWriter(&buffer)
if _, err := gz.Write(msg); err != nil {
return utils.Wrap(err, "")
}
if err := gz.Close(); err != nil {
return utils.Wrap(err, "")
}
msg = buffer.Bytes()
}
conn.SetWriteDeadline(time.Now().Add(time.Duration(60) * time.Second))
return conn.WriteMessage(a, msg)
}
func (ws *WServer) SetWriteTimeoutWriteMsg(conn *UserConn, a int, msg []byte, timeout int) error {
conn.w.Lock()
defer conn.w.Unlock()
conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
return conn.WriteMessage(a, msg)
}
func (ws *WServer) MultiTerminalLoginRemoteChecker(userID string, platformID int32, token string, operationID string) {
grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), operationID)
log.NewInfo(operationID, utils.GetSelfFuncName(), "args grpcCons: ", userID, platformID, grpcCons)
for _, v := range grpcCons {
if v.Target() == rpcSvr.target {
log.Debug(operationID, "Filter out this node ", rpcSvr.target)
continue
}
log.Debug(operationID, "call this node ", v.Target(), rpcSvr.target)
client := pbRelay.NewRelayClient(v)
req := &pbRelay.MultiTerminalLoginCheckReq{OperationID: operationID, PlatformID: platformID, UserID: userID, Token: token}
log.NewInfo(operationID, "MultiTerminalLoginCheckReq ", client, req.String())
resp, err := client.MultiTerminalLoginCheck(context.Background(), req)
if err != nil {
log.Error(operationID, "MultiTerminalLoginCheck failed ", err.Error())
continue
}
if resp.ErrCode != 0 {
log.Error(operationID, "MultiTerminalLoginCheck errCode, errMsg: ", resp.ErrCode, resp.ErrMsg)
continue
}
log.Debug(operationID, "MultiTerminalLoginCheck resp ", resp.String())
}
}
func (ws *WServer) MultiTerminalLoginCheckerWithLock(uid string, platformID int, token string, operationID string) {
rwLock.Lock()
defer rwLock.Unlock()
log.NewInfo(operationID, utils.GetSelfFuncName(), " rpc args: ", uid, platformID, token)
switch config.Config.MultiLoginPolicy {
case constant.PCAndOther:
if constant.PlatformNameToClass(constant.PlatformIDToName(platformID)) == constant.TerminalPC {
return
}
fallthrough
case constant.AllLoginButSameTermKick:
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
if oldConn, ok := oldConnMap[platformID]; ok {
log.NewDebug(operationID, uid, platformID, "kick old conn")
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
if err != nil && err != go_redis.Nil {
log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID))
return
}
if m == nil {
log.NewError(operationID, "get token from redis err", "m is nil", uid, constant.PlatformIDToName(platformID))
return
}
log.NewDebug(operationID, "get token map is ", m, uid, constant.PlatformIDToName(platformID))
for k, _ := range m {
if k != token {
m[k] = constant.KickedToken
}
}
log.NewDebug(operationID, "set token map is ", m, uid, constant.PlatformIDToName(platformID))
err = db.DB.SetTokenMapByUidPid(uid, platformID, m)
if err != nil {
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m)
return
}
err = oldConn.Close()
//delete(oldConnMap, platformID)
ws.wsUserToConn[uid] = oldConnMap
if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid)
}
delete(ws.wsConnToUser, oldConn)
if err != nil {
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
}
} else {
log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID])
}
} else {
log.NewDebug(operationID, "no other conn", ws.wsUserToConn, uid, platformID)
}
case constant.SingleTerminalLogin:
case constant.WebAndOther:
}
}
func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn *UserConn, token string, operationID string) {
switch config.Config.MultiLoginPolicy {
case constant.PCAndOther:
if constant.PlatformNameToClass(constant.PlatformIDToName(platformID)) == constant.TerminalPC {
return
}
fallthrough
case constant.AllLoginButSameTermKick:
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
if oldConn, ok := oldConnMap[platformID]; ok {
log.NewDebug(operationID, uid, platformID, "kick old conn")
ws.sendKickMsg(oldConn)
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
if err != nil && err != go_redis.Nil {
log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID))
return
}
if m == nil {
log.NewError(operationID, "get token from redis err", "m is nil", uid, constant.PlatformIDToName(platformID))
return
}
log.NewDebug(operationID, "get token map is ", m, uid, constant.PlatformIDToName(platformID))
for k, _ := range m {
if k != token {
m[k] = constant.KickedToken
}
}
log.NewDebug(operationID, "set token map is ", m, uid, constant.PlatformIDToName(platformID))
err = db.DB.SetTokenMapByUidPid(uid, platformID, m)
if err != nil {
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m)
return
}
err = oldConn.Close()
delete(oldConnMap, platformID)
ws.wsUserToConn[uid] = oldConnMap
if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid)
}
delete(ws.wsConnToUser, oldConn)
if err != nil {
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
}
callbackResp := callbackUserKickOff(operationID, uid, platformID)
if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
}
} else {
log.Debug(operationID, "normal uid-conn ", uid, platformID, oldConnMap[platformID])
}
} else {
log.NewDebug(operationID, "no other conn", ws.wsUserToConn, uid, platformID)
}
case constant.SingleTerminalLogin:
case constant.WebAndOther:
}
}
func (ws *WServer) sendKickMsg(oldConn *UserConn) {
mReply := Resp{
ReqIdentifier: constant.WSKickOnlineMsg,
ErrCode: constant.ErrTokenInvalid.ErrCode,
ErrMsg: constant.ErrTokenInvalid.ErrMsg,
}
var b bytes.Buffer
enc := gob.NewEncoder(&b)
err := enc.Encode(mReply)
if err != nil {
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "Encode Msg error", oldConn.RemoteAddr().String(), err.Error())
return
}
err = ws.writeMsg(oldConn, websocket.BinaryMessage, b.Bytes())
if err != nil {
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), err.Error())
}
}
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, connID, operationID string) {
rwLock.Lock()
defer rwLock.Unlock()
log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token, "ip: ", conn.RemoteAddr().String())
callbackResp := callbackUserOnline(operationID, uid, platformID, token, false, connID)
if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp)
}
go ws.MultiTerminalLoginRemoteChecker(uid, int32(platformID), token, operationID)
ws.MultiTerminalLoginChecker(uid, platformID, conn, token, operationID)
if oldConnMap, ok := ws.wsUserToConn[uid]; ok {
oldConnMap[platformID] = conn
ws.wsUserToConn[uid] = oldConnMap
log.Debug(operationID, "user not first come in, add conn ", uid, platformID, conn, oldConnMap)
} else {
i := make(map[int]*UserConn)
i[platformID] = conn
ws.wsUserToConn[uid] = i
log.Debug(operationID, "user first come in, new user, conn", uid, platformID, conn, ws.wsUserToConn[uid])
}
if oldStringMap, ok := ws.wsConnToUser[conn]; ok {
oldStringMap[platformID] = uid
ws.wsConnToUser[conn] = oldStringMap
} else {
i := make(map[int]string)
i[platformID] = uid
ws.wsConnToUser[conn] = i
}
count := 0
for _, v := range ws.wsUserToConn {
count = count + len(v)
}
promePkg.PromeGaugeInc(promePkg.OnlineUserGauge)
log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
}
func (ws *WServer) delUserConn(conn *UserConn) {
rwLock.Lock()
defer rwLock.Unlock()
operationID := utils.OperationIDGenerator()
var uid string
var platform int
if oldStringMap, okg := ws.wsConnToUser[conn]; okg {
for k, v := range oldStringMap {
platform = k
uid = v
}
if oldConnMap, ok := ws.wsUserToConn[uid]; ok {
delete(oldConnMap, platform)
ws.wsUserToConn[uid] = oldConnMap
if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid)
}
count := 0
for _, v := range ws.wsUserToConn {
count = count + len(v)
}
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
} else {
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
}
delete(ws.wsConnToUser, conn)
}
err := conn.Close()
if err != nil {
log.Error(operationID, " close err", "", "uid", uid, "platform", platform)
}
if conn.PlatformID == 0 || conn.connID == "" {
log.NewWarn(operationID, utils.GetSelfFuncName(), "PlatformID or connID is null", conn.PlatformID, conn.connID)
}
callbackResp := callbackUserOffline(operationID, conn.userID, int(conn.PlatformID), conn.connID)
if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
}
promePkg.PromeGaugeDec(promePkg.OnlineUserGauge)
}
func (ws *WServer) getUserConn(uid string, platform int) *UserConn {
rwLock.RLock()
defer rwLock.RUnlock()
if connMap, ok := ws.wsUserToConn[uid]; ok {
if conn, flag := connMap[platform]; flag {
return conn
}
}
return nil
}
func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn {
rwLock.RLock()
defer rwLock.RUnlock()
if connMap, ok := ws.wsUserToConn[uid]; ok {
newConnMap := make(map[int]*UserConn)
for k, v := range connMap {
newConnMap[k] = v
}
return newConnMap
}
return nil
}
// func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) {
// rwLock.RLock()
// defer rwLock.RUnlock()
//
// if stringMap, ok := ws.wsConnToUser[conn]; ok {
// for k, v := range stringMap {
// platform = k
// uid = v
// }
// return uid, platform
// }
// return "", 0
// }
func WsVerifyToken(token, uid string, platformID string, operationID string) (bool, error, string) {
}
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) (isPass, compression bool) {
status := http.StatusUnauthorized
query := r.URL.Query()
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
if ok, err, msg := tokenverify.WsVerifyToken(query["token"][0], query["sendID"][0], query["platformID"][0], operationID); !ok {
if errors.Is(err, constant.ErrTokenExpired) {
status = int(constant.ErrTokenExpired.ErrCode)
}
if errors.Is(err, constant.ErrTokenInvalid) {
status = int(constant.ErrTokenInvalid.ErrCode)
}
if errors.Is(err, constant.ErrTokenMalformed) {
status = int(constant.ErrTokenMalformed.ErrCode)
}
if errors.Is(err, constant.ErrTokenNotValidYet) {
status = int(constant.ErrTokenNotValidYet.ErrCode)
}
if errors.Is(err, constant.ErrTokenUnknown) {
status = int(constant.ErrTokenUnknown.ErrCode)
}
if errors.Is(err, constant.ErrTokenKicked) {
status = int(constant.ErrTokenKicked.ErrCode)
}
if errors.Is(err, constant.ErrTokenDifferentPlatformID) {
status = int(constant.ErrTokenDifferentPlatformID.ErrCode)
}
if errors.Is(err, constant.ErrTokenDifferentUserID) {
status = int(constant.ErrTokenDifferentUserID.ErrCode)
}
//switch errors.Cause(err) {
//case constant.ErrTokenExpired:
// status = int(constant.ErrTokenExpired.ErrCode)
//case constant.ErrTokenInvalid:
// status = int(constant.ErrTokenInvalid.ErrCode)
//case constant.ErrTokenMalformed:
// status = int(constant.ErrTokenMalformed.ErrCode)
//case constant.ErrTokenNotValidYet:
// status = int(constant.ErrTokenNotValidYet.ErrCode)
//case constant.ErrTokenUnknown:
// status = int(constant.ErrTokenUnknown.ErrCode)
//case constant.ErrTokenKicked:
// status = int(constant.ErrTokenKicked.ErrCode)
//case constant.ErrTokenDifferentPlatformID:
// status = int(constant.ErrTokenDifferentPlatformID.ErrCode)
//case constant.ErrTokenDifferentUserID:
// status = int(constant.ErrTokenDifferentUserID.ErrCode)
//}
log.Error(operationID, "Token verify failed ", "query ", query, msg, err.Error(), "status: ", status)
w.Header().Set("Sec-Websocket-Version", "13")
w.Header().Set("ws_err_msg", err.Error())
http.Error(w, err.Error(), status)
return false, false
} else {
if r.Header.Get("compression") == "gzip" {
compression = true
}
if len(query["compression"]) != 0 && query["compression"][0] == "gzip" {
compression = true
}
log.Info(operationID, "Connection Authentication Success", "", "token ", query["token"][0], "userID ", query["sendID"][0], "platformID ", query["platformID"][0], "compression", compression)
return true, compression
}
} else {
status = int(constant.ErrArgs.ErrCode)
log.Error(operationID, "Args err ", "query ", query)
w.Header().Set("Sec-Websocket-Version", "13")
errMsg := "args err, need token, sendID, platformID"
w.Header().Set("ws_err_msg", errMsg)
http.Error(w, errMsg, status)
return false, false
}
}