ws and push update

This commit is contained in:
Gordon
2022-06-02 16:44:55 +08:00
parent 48410c428a
commit a18dcfce9e
10 changed files with 163 additions and 60 deletions
+98 -16
View File
@@ -25,6 +25,8 @@ type RPCServer struct {
rpcRegisterName string
etcdSchema string
etcdAddr []string
platformList []int
pushTerminal []int
}
func (r *RPCServer) onInit(rpcPort int) {
@@ -32,6 +34,8 @@ func (r *RPCServer) onInit(rpcPort int) {
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImOnlineMessageRelayName
r.etcdSchema = config.Config.Etcd.EtcdSchema
r.etcdAddr = config.Config.Etcd.EtcdAddr
r.platformList = genPlatformArray()
r.pushTerminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID}
}
func (r *RPCServer) run() {
listenIP := ""
@@ -69,7 +73,7 @@ func (r *RPCServer) run() {
}
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) {
log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String())
var resp []*pbRelay.SingleMsgToUser
var resp []*pbRelay.SingleMsgToUserPlatform
msgBytes, _ := proto.Marshal(in.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
@@ -84,22 +88,21 @@ func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgRe
}
var tag bool
recvID := in.PushToUserID
platformList := genPlatformArray()
for _, v := range platformList {
for _, v := range r.platformList {
if conn := ws.getUserConn(recvID, v); conn != nil {
tag = true
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID)
temp := &pbRelay.SingleMsgToUser{
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: recvID,
RecvPlatFormID: constant.PlatformNameToID(v),
RecvPlatFormID: int32(v),
}
resp = append(resp, temp)
} else {
temp := &pbRelay.SingleMsgToUser{
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: -1,
RecvID: recvID,
RecvPlatFormID: constant.PlatformNameToID(v),
RecvPlatFormID: int32(v),
}
resp = append(resp, temp)
}
@@ -119,19 +122,19 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser
}
var resp pbRelay.GetUsersOnlineStatusResp
for _, userID := range req.UserIDList {
platformList := genPlatformArray()
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = userID
for _, platform := range platformList {
if conn := ws.getUserConn(userID, platform); conn != nil {
userConnMap := ws.getUserAllCons(userID)
for platform, userConn := range userConnMap {
if userConn != nil {
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = platform
ps.Platform = constant.PlatformIDToName(platform)
ps.Status = constant.OnlineStatus
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
}
}
if temp.Status == constant.OnlineStatus {
resp.SuccessResult = append(resp.SuccessResult, temp)
}
@@ -139,11 +142,76 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser
log.NewInfo(req.OperationID, "GetUsersOnlineStatus rpc return ", resp.String())
return &resp, nil
}
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm, RecvID string) (ResultCode int64) {
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
msgBytes, _ := proto.Marshal(req.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: req.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(req.OperationID, "data encode err", err.Error())
}
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap {
if userConn != nil {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { //仅仅记录推送成功的平台端
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: v,
RecvPlatFormID: int32(platform),
}
resp = append(resp, temp)
}
}
}
//for _, x := range r.platformList {
// if conn := ws.getUserConn(v, x); conn != nil {
// resultCode := sendMsgBatchToUser(conn, replyBytes.Bytes(), req, x, v)
// temp := &pbRelay.SingleMsgToUserPlatform{
// ResultCode: resultCode,
// RecvID: v,
// RecvPlatFormID: constant.PlatformNameToID(x),
// }
// resp = append(resp, temp)
// } else {
// if utils.IsContain(x,r.pushTerminal) {
// temp := &pbRelay.SingleMsgToUserPlatform{
// ResultCode: -1,
// RecvID: v,
// RecvPlatFormID: constant.PlatformNameToID(x),
// }
// resp = append(resp, temp)
// }
//
// }
//}
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
Resp: resp,
}
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(),
"error", err, "senderPlatform", constant.PlatformIDToName(in.MsgData.SenderPlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
"error", err, "senderPlatform", constant.PlatformIDToName(int(in.MsgData.SenderPlatformID)), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
ResultCode = -2
return ResultCode
} else {
@@ -153,9 +221,23 @@ func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, Re
}
}
func genPlatformArray() (array []string) {
func sendMsgBatchToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlineBatchPushOneMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(),
"error", err, "senderPlatform", constant.PlatformIDToName(int(in.MsgData.SenderPlatformID)), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
ResultCode = -2
return ResultCode
} else {
log.NewDebug(in.OperationID, "PushMsgToUser is success By Ws", "args", in.String(), "recvPlatForm", RecvPlatForm, "recvID", RecvID)
ResultCode = 0
return ResultCode
}
}
func genPlatformArray() (array []int) {
for i := 1; i <= constant.LinuxPlatformID; i++ {
array = append(array, constant.PlatformIDToName(int32(i)))
array = append(array, i)
}
return array
}