send msg file modify

This commit is contained in:
Gordon
2021-12-23 17:34:32 +08:00
parent 2d56dacd0e
commit 5f09d6a26f
20 changed files with 682 additions and 989 deletions
+25 -55
View File
@@ -6,7 +6,6 @@ import (
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"bytes"
"context"
@@ -55,26 +54,10 @@ func (r *RPCServer) run() {
return
}
}
func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbRelay.MsgToUserResp, error) {
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) {
log.InfoByKv("PushMsgToUser is arriving", in.OperationID, "args", in.String())
var resp []*pbRelay.SingleMsgToUser
var RecvID string
msg := open_im_sdk.MsgData{
SendID: in.SendID,
RecvID: in.RecvID,
MsgFrom: in.MsgFrom,
ContentType: in.ContentType,
SessionType: in.SessionType,
SenderNickName: in.SenderNickName,
SenderFaceURL: in.SenderFaceURL,
ClientMsgID: in.ClientMsgID,
ServerMsgID: in.ServerMsgID,
Content: in.Content,
Seq: in.RecvSeq,
SendTime: in.SendTime,
SenderPlatformID: in.PlatformID,
}
msgBytes, _ := proto.Marshal(&msg)
msgBytes, _ := proto.Marshal(in.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: in.OperationID,
@@ -86,65 +69,52 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
if err != nil {
log.NewError(in.OperationID, "data encode err", err.Error())
}
switch in.GetSessionType() {
case constant.SingleChatType:
RecvID = in.GetRecvID()
case constant.GroupChatType:
RecvID = strings.Split(in.GetRecvID(), " ")[0]
}
var tag bool
var UIDAndPID []string
userIDList := genUidPlatformArray(RecvID)
for _, v := range userIDList {
UIDAndPID = strings.Split(v, " ")
if conn := ws.getUserConn(v); conn != nil {
recvID := in.MsgData.RecvID
platformList := genPlatformArray()
for _, v := range platformList {
if conn := ws.getUserConn(recvID, v); conn != nil {
tag = true
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, UIDAndPID[1], UIDAndPID[0])
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID)
temp := &pbRelay.SingleMsgToUser{
ResultCode: resultCode,
RecvID: UIDAndPID[0],
RecvPlatFormID: constant.PlatformNameToID(UIDAndPID[1]),
RecvID: recvID,
RecvPlatFormID: constant.PlatformNameToID(v),
}
resp = append(resp, temp)
} else {
temp := &pbRelay.SingleMsgToUser{
ResultCode: -1,
RecvID: UIDAndPID[0],
RecvPlatFormID: constant.PlatformNameToID(UIDAndPID[1]),
RecvID: recvID,
RecvPlatFormID: constant.PlatformNameToID(v),
}
resp = append(resp, temp)
}
}
//Single chat sender synchronization message
if in.GetSessionType() == constant.SingleChatType && in.ContentType <= constant.Quote && in.ContentType != constant.Typing && in.ContentType != constant.HasReadReceipt {
userIDList = genUidPlatformArray(in.SendID)
for _, v := range userIDList {
UIDAndPID = strings.Split(v, " ")
if conn := ws.getUserConn(v); conn != nil {
_ = sendMsgToUser(conn, replyBytes.Bytes(), in, UIDAndPID[1], UIDAndPID[0])
}
if in.MsgData.GetSessionType() == constant.SingleChatType {
for k, v := range ws.getSingleUserAllConn(recvID) {
_ = sendMsgToUser(v, replyBytes.Bytes(), in, k, recvID)
}
}
if !tag {
log.NewError(in.OperationID, "push err ,no matched ws conn not in map", in.String())
}
return &pbRelay.MsgToUserResp{
return &pbRelay.OnlinePushMsgResp{
Resp: resp,
}, nil
}
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
log.NewDebug(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
var UIDAndPID []string
var resp pbRelay.GetUsersOnlineStatusResp
for _, v1 := range req.UserIDList {
userIDList := genUidPlatformArray(v1)
for _, userID := range req.UserIDList {
platformList := genPlatformArray()
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = v1
for _, v2 := range userIDList {
UIDAndPID = strings.Split(v2, " ")
if conn := ws.getUserConn(v2); conn != nil {
temp.UserID = userID
for _, platform := range platformList {
if conn := ws.getUserConn(userID, platform); conn != nil {
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = UIDAndPID[1]
ps.Platform = platform
ps.Status = constant.OnlineStatus
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
@@ -157,11 +127,11 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser
}
return &resp, nil
}
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(),
"error", err, "senderPlatform", constant.PlatformIDToName(in.PlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
"error", err, "senderPlatform", constant.PlatformIDToName(in.MsgData.SenderPlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
ResultCode = -2
return ResultCode
} else {
@@ -171,9 +141,9 @@ func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPl
}
}
func genUidPlatformArray(uid string) (array []string) {
func genPlatformArray() (array []string) {
for i := 1; i <= constant.LinuxPlatformID; i++ {
array = append(array, uid+" "+constant.PlatformIDToName(int32(i)))
array = append(array, constant.PlatformIDToName(int32(i)))
}
return array
}