Files
open-im-server/internal/msg_gateway/gate/rpc_server.go
T

156 lines
5.1 KiB
Go
Raw Normal View History

2021-05-26 19:24:25 +08:00
package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
2021-12-28 20:44:19 +08:00
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
2021-10-11 22:00:38 +08:00
pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/utils"
"bytes"
2021-05-26 19:24:25 +08:00
"context"
"encoding/gob"
2021-05-26 19:24:25 +08:00
"fmt"
2021-12-17 15:55:43 +08:00
"github.com/golang/protobuf/proto"
2021-05-26 19:24:25 +08:00
"net"
"strings"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
2021-05-26 19:24:25 +08:00
)
type RPCServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
}
func (r *RPCServer) onInit(rpcPort int) {
r.rpcPort = rpcPort
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImOnlineMessageRelayName
r.etcdSchema = config.Config.Etcd.EtcdSchema
r.etcdAddr = config.Config.Etcd.EtcdAddr
}
func (r *RPCServer) run() {
2021-12-17 15:55:43 +08:00
ip := utils.ServerIP
registerAddress := ip + ":" + utils.IntToString(r.rpcPort)
2021-05-26 19:24:25 +08:00
listener, err := net.Listen("tcp", registerAddress)
if err != nil {
log.ErrorByArgs(fmt.Sprintf("fail to listening consumer, err:%v\n", err))
return
}
defer listener.Close()
srv := grpc.NewServer()
defer srv.GracefulStop()
pbRelay.RegisterOnlineMessageRelayServiceServer(srv, r)
2021-12-17 15:55:43 +08:00
err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), ip, r.rpcPort, r.rpcRegisterName, 10)
2021-05-26 19:24:25 +08:00
if err != nil {
log.ErrorByKv("register push message rpc to etcd err", "", "err", err.Error())
}
err = srv.Serve(listener)
if err != nil {
log.ErrorByKv("push message rpc listening err", "", "err", err.Error())
return
}
}
2021-12-23 17:34:32 +08:00
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) {
2021-05-26 19:24:25 +08:00
log.InfoByKv("PushMsgToUser is arriving", in.OperationID, "args", in.String())
var resp []*pbRelay.SingleMsgToUser
2021-12-23 17:34:32 +08:00
msgBytes, _ := proto.Marshal(in.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: in.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(in.OperationID, "data encode err", err.Error())
}
var tag bool
2021-12-23 17:34:32 +08:00
recvID := in.MsgData.RecvID
platformList := genPlatformArray()
for _, v := range platformList {
if conn := ws.getUserConn(recvID, v); conn != nil {
tag = true
2021-12-23 17:34:32 +08:00
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID)
2021-06-28 15:34:08 +08:00
temp := &pbRelay.SingleMsgToUser{
ResultCode: resultCode,
2021-12-23 17:34:32 +08:00
RecvID: recvID,
RecvPlatFormID: constant.PlatformNameToID(v),
2021-05-26 19:24:25 +08:00
}
2021-06-28 15:34:08 +08:00
resp = append(resp, temp)
} else {
temp := &pbRelay.SingleMsgToUser{
ResultCode: -1,
2021-12-23 17:34:32 +08:00
RecvID: recvID,
RecvPlatFormID: constant.PlatformNameToID(v),
}
resp = append(resp, temp)
2021-05-26 19:24:25 +08:00
}
}
2021-12-10 17:00:24 +08:00
//Single chat sender synchronization message
2021-12-23 17:34:32 +08:00
if in.MsgData.GetSessionType() == constant.SingleChatType {
for k, v := range ws.getSingleUserAllConn(recvID) {
_ = sendMsgToUser(v, replyBytes.Bytes(), in, k, recvID)
2021-12-10 17:00:24 +08:00
}
}
if !tag {
log.NewError(in.OperationID, "push err ,no matched ws conn not in map", in.String())
}
2021-12-23 17:34:32 +08:00
return &pbRelay.OnlinePushMsgResp{
2021-05-26 19:24:25 +08:00
Resp: resp,
}, nil
}
2021-11-29 16:26:57 +08:00
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
2021-12-28 20:44:19 +08:00
log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
if !token_verify.IsMangerUserID(req.OpUserID) {
log.NewError(req.OperationID, "no permission GetUsersOnlineStatus ", req.OpUserID)
return &pbRelay.GetUsersOnlineStatusResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
2021-11-29 16:26:57 +08:00
var resp pbRelay.GetUsersOnlineStatusResp
2021-12-23 17:34:32 +08:00
for _, userID := range req.UserIDList {
platformList := genPlatformArray()
2021-11-29 16:26:57 +08:00
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
2021-12-23 17:34:32 +08:00
temp.UserID = userID
for _, platform := range platformList {
if conn := ws.getUserConn(userID, platform); conn != nil {
2021-11-29 16:26:57 +08:00
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
2021-12-23 17:34:32 +08:00
ps.Platform = platform
2021-11-29 16:26:57 +08:00
ps.Status = constant.OnlineStatus
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
2021-05-26 19:24:25 +08:00
2021-11-29 16:26:57 +08:00
}
}
if temp.Status == constant.OnlineStatus {
resp.SuccessResult = append(resp.SuccessResult, temp)
}
}
2021-12-28 20:44:19 +08:00
log.NewInfo(req.OperationID, "GetUsersOnlineStatus rpc return ", resp.String())
2021-11-29 16:26:57 +08:00
return &resp, nil
}
2021-12-23 17:34:32 +08:00
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
2021-05-26 19:24:25 +08:00
if err != nil {
log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(),
2021-12-23 17:34:32 +08:00
"error", err, "senderPlatform", constant.PlatformIDToName(in.MsgData.SenderPlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
2021-05-26 19:24:25 +08:00
ResultCode = -2
return ResultCode
} else {
log.InfoByKv("PushMsgToUser is success By Ws", in.OperationID, "args", in.String(), "recvPlatForm", RecvPlatForm, "recvID", RecvID)
2021-05-26 19:24:25 +08:00
ResultCode = 0
return ResultCode
}
}
2021-12-23 17:34:32 +08:00
func genPlatformArray() (array []string) {
2021-11-25 14:12:52 +08:00
for i := 1; i <= constant.LinuxPlatformID; i++ {
2021-12-23 17:34:32 +08:00
array = append(array, constant.PlatformIDToName(int32(i)))
}
return array
}