Files
open-im-server/internal/msg_gateway/gate/relay_rpc_server.go
T

373 lines
14 KiB
Go
Raw Normal View History

2021-05-26 19:24:25 +08:00
package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
2022-09-15 01:22:20 +08:00
promePkg "Open_IM/pkg/common/prometheus"
2021-12-28 20:44:19 +08:00
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
2021-10-11 22:00:38 +08:00
pbRelay "Open_IM/pkg/proto/relay"
2022-06-09 18:08:42 +08:00
sdk_ws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"bytes"
2021-05-26 19:24:25 +08:00
"context"
"encoding/gob"
2021-05-26 19:24:25 +08:00
"net"
2022-05-07 17:05:05 +08:00
"strconv"
2021-05-26 19:24:25 +08:00
"strings"
2022-09-15 01:22:20 +08:00
"github.com/golang/protobuf/proto"
2022-09-15 16:27:36 +08:00
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
2022-09-15 01:22:20 +08:00
"github.com/gorilla/websocket"
"google.golang.org/grpc"
2021-05-26 19:24:25 +08:00
)
type RPCServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
2022-06-02 16:44:55 +08:00
platformList []int
pushTerminal []int
2022-07-29 15:47:19 +08:00
target string
2021-05-26 19:24:25 +08:00
}
2022-09-15 12:07:28 +08:00
func initPrometheus() {
promePkg.NewMsgRecvTotalCounter()
promePkg.NewGetNewestSeqTotalCounter()
promePkg.NewPullMsgBySeqListTotalCounter()
promePkg.NewMsgOnlinePushSuccessCounter()
2022-09-15 17:58:51 +08:00
promePkg.NewOnlineUserGauges()
2022-09-15 12:07:28 +08:00
//promePkg.NewSingleChatMsgRecvSuccessCounter()
//promePkg.NewGroupChatMsgRecvSuccessCounter()
//promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter()
}
2021-05-26 19:24:25 +08:00
func (r *RPCServer) onInit(rpcPort int) {
r.rpcPort = rpcPort
2022-07-20 20:59:52 +08:00
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImRelayName
2021-05-26 19:24:25 +08:00
r.etcdSchema = config.Config.Etcd.EtcdSchema
r.etcdAddr = config.Config.Etcd.EtcdAddr
2022-06-02 16:44:55 +08:00
r.platformList = genPlatformArray()
r.pushTerminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID}
2021-05-26 19:24:25 +08:00
}
func (r *RPCServer) run() {
2022-05-07 17:05:05 +08:00
listenIP := ""
if config.Config.ListenIP == "" {
listenIP = "0.0.0.0"
} else {
listenIP = config.Config.ListenIP
}
address := listenIP + ":" + strconv.Itoa(r.rpcPort)
listener, err := net.Listen("tcp", address)
2021-05-26 19:24:25 +08:00
if err != nil {
2022-05-10 09:09:37 +08:00
panic("listening err:" + err.Error() + r.rpcRegisterName)
2021-05-26 19:24:25 +08:00
}
defer listener.Close()
2022-09-15 01:22:20 +08:00
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
2022-09-15 08:45:10 +08:00
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
2022-09-15 16:27:36 +08:00
grpcOpts = append(grpcOpts, []grpc.ServerOption{
2022-09-15 16:39:49 +08:00
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
2022-09-15 16:27:36 +08:00
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
2022-09-15 01:22:20 +08:00
}
srv := grpc.NewServer(grpcOpts...)
2021-05-26 19:24:25 +08:00
defer srv.GracefulStop()
2022-07-20 20:59:52 +08:00
pbRelay.RegisterRelayServer(srv, r)
2022-05-07 17:05:05 +08:00
2022-06-23 09:24:05 +08:00
rpcRegisterIP := config.Config.RpcRegisterIP
2022-05-07 17:05:05 +08:00
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error("", "GetLocalIP failed ", err.Error())
}
}
err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName, 10)
2021-05-26 19:24:25 +08:00
if err != nil {
2022-05-07 17:05:05 +08:00
log.Error("", "register push message rpc to etcd err", "", "err", err.Error(), r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName)
2022-08-26 17:41:58 +08:00
panic(utils.Wrap(err, "register msg_gataway module rpc to etcd err"))
2021-05-26 19:24:25 +08:00
}
2022-07-29 15:47:19 +08:00
r.target = getcdv3.GetTarget(r.etcdSchema, rpcRegisterIP, r.rpcPort, r.rpcRegisterName)
2021-05-26 19:24:25 +08:00
err = srv.Serve(listener)
if err != nil {
2022-05-07 17:05:05 +08:00
log.Error("", "push message rpc listening err", "", "err", err.Error())
2021-05-26 19:24:25 +08:00
return
}
}
2021-12-23 17:34:32 +08:00
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) {
2022-05-05 16:13:45 +08:00
log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String())
2022-06-02 16:44:55 +08:00
var resp []*pbRelay.SingleMsgToUserPlatform
2021-12-23 17:34:32 +08:00
msgBytes, _ := proto.Marshal(in.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: in.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(in.OperationID, "data encode err", err.Error())
}
var tag bool
2022-02-08 17:12:02 +08:00
recvID := in.PushToUserID
2022-06-02 16:44:55 +08:00
for _, v := range r.platformList {
2021-12-23 17:34:32 +08:00
if conn := ws.getUserConn(recvID, v); conn != nil {
tag = true
2021-12-23 17:34:32 +08:00
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID)
2022-06-02 16:44:55 +08:00
temp := &pbRelay.SingleMsgToUserPlatform{
2021-06-28 15:34:08 +08:00
ResultCode: resultCode,
2021-12-23 17:34:32 +08:00
RecvID: recvID,
2022-06-02 16:44:55 +08:00
RecvPlatFormID: int32(v),
2021-05-26 19:24:25 +08:00
}
2021-06-28 15:34:08 +08:00
resp = append(resp, temp)
} else {
2022-06-02 16:44:55 +08:00
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: -1,
2021-12-23 17:34:32 +08:00
RecvID: recvID,
2022-06-02 16:44:55 +08:00
RecvPlatFormID: int32(v),
}
resp = append(resp, temp)
2021-05-26 19:24:25 +08:00
}
}
if !tag {
2022-05-05 16:13:45 +08:00
log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String())
}
2021-12-23 17:34:32 +08:00
return &pbRelay.OnlinePushMsgResp{
2021-05-26 19:24:25 +08:00
Resp: resp,
}, nil
}
2021-11-29 16:26:57 +08:00
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
2021-12-28 20:44:19 +08:00
log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
2022-04-16 20:10:10 +08:00
if !token_verify.IsManagerUserID(req.OpUserID) {
2021-12-28 20:44:19 +08:00
log.NewError(req.OperationID, "no permission GetUsersOnlineStatus ", req.OpUserID)
return &pbRelay.GetUsersOnlineStatusResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
2021-11-29 16:26:57 +08:00
var resp pbRelay.GetUsersOnlineStatusResp
2021-12-23 17:34:32 +08:00
for _, userID := range req.UserIDList {
2021-11-29 16:26:57 +08:00
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
2021-12-23 17:34:32 +08:00
temp.UserID = userID
2022-06-02 16:44:55 +08:00
userConnMap := ws.getUserAllCons(userID)
for platform, userConn := range userConnMap {
if userConn != nil {
2021-11-29 16:26:57 +08:00
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
2022-06-02 16:44:55 +08:00
ps.Platform = constant.PlatformIDToName(platform)
2021-11-29 16:26:57 +08:00
ps.Status = constant.OnlineStatus
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
}
}
2022-06-02 16:44:55 +08:00
2021-11-29 16:26:57 +08:00
if temp.Status == constant.OnlineStatus {
resp.SuccessResult = append(resp.SuccessResult, temp)
}
}
2021-12-28 20:44:19 +08:00
log.NewInfo(req.OperationID, "GetUsersOnlineStatus rpc return ", resp.String())
2021-11-29 16:26:57 +08:00
return &resp, nil
}
2022-06-08 20:26:08 +08:00
2022-06-09 18:24:32 +08:00
func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
//r.GetBatchMsgForPush(req.OperationID,req.MsgData,req.PushToUserIDList,)
msgBytes, _ := proto.Marshal(req.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: req.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(req.OperationID, "data encode err", err.Error())
}
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
}
userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap {
if userConn != nil {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
2022-09-15 12:07:28 +08:00
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
2022-06-09 18:24:32 +08:00
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: v,
RecvPlatFormID: int32(platform),
}
resp = append(resp, temp)
}
}
}
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
2022-06-02 16:44:55 +08:00
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
2022-06-09 18:08:42 +08:00
2022-06-02 16:44:55 +08:00
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
2022-06-02 17:37:00 +08:00
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
}
2022-06-02 16:44:55 +08:00
userConnMap := ws.getUserAllCons(v)
2022-06-09 18:08:42 +08:00
var platformList []int
for k, _ := range userConnMap {
platformList = append(platformList, k)
}
2022-06-11 12:50:20 +08:00
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String())
2022-06-09 18:08:42 +08:00
needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList)
2022-06-11 10:42:26 +08:00
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList))
2022-06-09 18:08:42 +08:00
for platform, list := range needPushMapList {
if list != nil {
2022-06-13 15:19:04 +08:00
log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:")
//for _, v := range list {
// log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String())
// req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
// log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList))
//}
msgBytes, err := proto.Marshal(list)
if err != nil {
log.Error(req.OperationID, "proto marshal err", err.Error())
continue
2022-06-11 13:25:32 +08:00
}
2022-06-13 15:19:04 +08:00
req.MsgData.MsgDataList = msgBytes
//req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
2022-06-11 12:09:50 +08:00
log.Debug(req.OperationID, "r.encodeWsData no string")
2022-06-13 15:19:04 +08:00
//log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String())
2022-06-11 13:25:32 +08:00
2022-06-11 12:09:50 +08:00
log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String())
2022-06-09 18:08:42 +08:00
replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID)
if err != nil {
2022-06-10 13:32:40 +08:00
log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String())
2022-06-09 18:08:42 +08:00
continue
}
2022-06-11 11:53:04 +08:00
log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len())
2022-06-09 18:08:42 +08:00
resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v)
2022-06-02 17:37:00 +08:00
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
2022-06-10 10:48:03 +08:00
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v)
2022-06-02 16:44:55 +08:00
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: v,
RecvPlatFormID: int32(platform),
}
resp = append(resp, temp)
}
2022-06-09 18:08:42 +08:00
} else {
if utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: 0,
RecvID: v,
RecvPlatFormID: int32(platform),
}
resp = append(resp, temp)
}
2022-06-02 16:44:55 +08:00
}
}
2022-06-02 17:37:00 +08:00
tempT.Resp = resp
2022-06-02 16:44:55 +08:00
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
2022-06-09 18:08:42 +08:00
func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) {
2022-06-11 12:03:12 +08:00
log.Debug(operationID, "encodeWsData begin", wsData.String())
msgBytes, err := proto.Marshal(wsData)
if err != nil {
log.NewError(operationID, "Marshal", err.Error())
return bytes.Buffer{}, utils.Wrap(err, "")
}
log.Debug(operationID, "encodeWsData begin", wsData.String())
2022-06-09 18:08:42 +08:00
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: operationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
2022-06-11 12:03:12 +08:00
err = enc.Encode(mReply)
2022-06-09 18:08:42 +08:00
if err != nil {
log.NewError(operationID, "data encode err", err.Error())
2022-06-11 12:03:12 +08:00
return bytes.Buffer{}, utils.Wrap(err, "")
2022-06-09 18:08:42 +08:00
}
return replyBytes, nil
}
2022-06-07 10:38:01 +08:00
func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOfflineReq) (*pbRelay.KickUserOfflineResp, error) {
2022-06-07 16:17:08 +08:00
log.NewInfo(req.OperationID, "KickUserOffline is arriving", req.String())
2022-06-07 16:05:59 +08:00
for _, v := range req.KickUserIDList {
log.NewWarn(req.OperationID, "SetTokenKicked ", v, req.PlatformID, req.OperationID)
SetTokenKicked(v, int(req.PlatformID), req.OperationID)
2022-06-07 16:05:59 +08:00
oldConnMap := ws.getUserAllCons(v)
if conn, ok := oldConnMap[int(req.PlatformID)]; ok { // user->map[platform->conn]
2022-06-23 09:24:05 +08:00
log.NewWarn(req.OperationID, "send kick msg, close connection ", req.PlatformID, v)
ws.sendKickMsg(conn)
2022-06-20 18:12:02 +08:00
conn.Close()
2022-06-07 16:05:59 +08:00
}
}
2022-06-07 16:28:51 +08:00
return &pbRelay.KickUserOfflineResp{}, nil
2022-06-07 10:38:01 +08:00
}
2022-06-20 18:12:02 +08:00
2022-07-29 15:47:19 +08:00
func (r *RPCServer) MultiTerminalLoginCheck(ctx context.Context, req *pbRelay.MultiTerminalLoginCheckReq) (*pbRelay.MultiTerminalLoginCheckResp, error) {
2022-08-11 11:14:30 +08:00
2022-07-29 15:47:19 +08:00
ws.MultiTerminalLoginCheckerWithLock(req.UserID, int(req.PlatformID), req.Token, req.OperationID)
return &pbRelay.MultiTerminalLoginCheckResp{}, nil
}
2022-06-02 16:44:55 +08:00
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(),
"error", err, "senderPlatform", constant.PlatformIDToName(int(in.MsgData.SenderPlatformID)), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
ResultCode = -2
return ResultCode
} else {
log.NewDebug(in.OperationID, "PushMsgToUser is success By Ws", "args", in.String(), "recvPlatForm", RecvPlatForm, "recvID", RecvID)
ResultCode = 0
return ResultCode
}
}
func sendMsgBatchToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlineBatchPushOneMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
2021-05-26 19:24:25 +08:00
if err != nil {
2022-05-05 16:13:45 +08:00
log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(),
2022-06-10 13:32:40 +08:00
"error", err, "senderPlatform", constant.PlatformIDToName(int(in.MsgData.SenderPlatformID)), "recv Platform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
2021-05-26 19:24:25 +08:00
ResultCode = -2
return ResultCode
} else {
2022-06-10 13:32:40 +08:00
log.NewDebug(in.OperationID, "PushMsgToUser is success By Ws", "args", in.String(), "recv PlatForm", RecvPlatForm, "recvID", RecvID)
2021-05-26 19:24:25 +08:00
ResultCode = 0
return ResultCode
}
}
2022-06-02 16:44:55 +08:00
func genPlatformArray() (array []int) {
2021-11-25 14:12:52 +08:00
for i := 1; i <= constant.LinuxPlatformID; i++ {
2022-06-02 16:44:55 +08:00
array = append(array, i)
}
return array
}