This commit is contained in:
wangchuxiao
2023-02-28 15:53:38 +08:00
parent c2cfb32c45
commit 0c417d3471
8 changed files with 173 additions and 171 deletions
+7 -7
View File
@@ -12,7 +12,7 @@ import (
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tokenverify"
pbRelay "OpenIM/pkg/proto/relay"
msggateway "OpenIM/pkg/proto/relay"
rpc "OpenIM/pkg/proto/user"
"OpenIM/pkg/utils"
"context"
@@ -138,7 +138,7 @@ func GetUsersOnlineStatus(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
req := &pbRelay.GetUsersOnlineStatusReq{}
req := &msggateway.GetUsersOnlineStatusReq{}
utils.CopyStructFields(req, &params)
var ok bool
@@ -152,12 +152,12 @@ func GetUsersOnlineStatus(c *gin.Context) {
}
log.NewInfo(params.OperationID, "GetUsersOnlineStatus args ", req.String())
var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult
var respResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult
var wsResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult
var respResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult
flag := false
grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), params.OperationID)
for _, v := range grpcCons {
client := pbRelay.NewRelayClient(v)
client := msggateway.NewRelayClient(v)
reply, err := client.GetUsersOnlineStatus(context.Background(), req)
if err != nil {
log.NewError(params.OperationID, "GetUsersOnlineStatus rpc err", req.String(), err.Error())
@@ -172,7 +172,7 @@ func GetUsersOnlineStatus(c *gin.Context) {
//Online data merge of each node
for _, v1 := range params.UserIDList {
flag = false
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
temp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
for _, v2 := range wsResult {
if v2.UserID == v1 {
flag = true
@@ -190,7 +190,7 @@ func GetUsersOnlineStatus(c *gin.Context) {
}
resp := api.GetUsersOnlineStatusResp{CommResp: api.CommResp{ErrCode: 0, ErrMsg: ""}, SuccessResult: respResult}
if len(respResult) == 0 {
resp.SuccessResult = []*pbRelay.GetUsersOnlineStatusResp_SuccessResult{}
resp.SuccessResult = []*msggateway.GetUsersOnlineStatusResp_SuccessResult{}
}
log.NewInfo(req.OperationID, "GetUsersOnlineStatus api return", resp)
c.JSON(http.StatusOK, resp)
+1 -1
View File
@@ -8,7 +8,7 @@ import (
pbChat "OpenIM/pkg/proto/msg"
push "OpenIM/pkg/proto/push"
pbRtc "OpenIM/pkg/proto/rtc"
sdkws "OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"bytes"
"context"
+31 -31
View File
@@ -6,7 +6,7 @@ import (
"OpenIM/pkg/common/log"
prome "OpenIM/pkg/common/prome"
"OpenIM/pkg/common/tokenverify"
pbRelay "OpenIM/pkg/proto/relay"
"OpenIM/pkg/proto/msggateway"
sdkws "OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"bytes"
@@ -76,7 +76,7 @@ func (r *RPCServer) run() {
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
pbRelay.RegisterRelayServer(srv, r)
msggateway.RegisterRelayServer(srv, r)
rpcRegisterIP := config.Config.RpcRegisterIP
if config.Config.RpcRegisterIP == "" {
@@ -97,9 +97,9 @@ func (r *RPCServer) run() {
return
}
}
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) {
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) {
log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String())
var resp []*pbRelay.SingleMsgToUserPlatform
var resp []*msggateway.SingleMsgToUserPlatform
msgBytes, _ := proto.Marshal(in.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
@@ -118,14 +118,14 @@ func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgRe
if conn := ws.getUserConn(recvID, v); conn != nil {
tag = true
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID)
temp := &pbRelay.SingleMsgToUserPlatform{
temp := &msggateway.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: recvID,
RecvPlatFormID: int32(v),
}
resp = append(resp, temp)
} else {
temp := &pbRelay.SingleMsgToUserPlatform{
temp := &msggateway.SingleMsgToUserPlatform{
ResultCode: -1,
RecvID: recvID,
RecvPlatFormID: int32(v),
@@ -136,24 +136,24 @@ func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgRe
if !tag {
log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String())
}
return &pbRelay.OnlinePushMsgResp{
return &msggateway.OnlinePushMsgResp{
Resp: resp,
}, nil
}
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) {
log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
if !tokenverify.IsManagerUserID(req.OpUserID) {
log.NewError(req.OperationID, "no permission GetUsersOnlineStatus ", req.OpUserID)
return &pbRelay.GetUsersOnlineStatusResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
return &msggateway.GetUsersOnlineStatusResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
var resp pbRelay.GetUsersOnlineStatusResp
var resp msggateway.GetUsersOnlineStatusResp
for _, userID := range req.UserIDList {
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
temp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = userID
userConnMap := ws.getUserAllCons(userID)
for platform, userConn := range userConnMap {
if userConn != nil {
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(platform)
ps.Status = constant.OnlineStatus
ps.ConnID = userConn.connID
@@ -171,9 +171,9 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser
return &resp, nil
}
func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
var singleUserResult []*msggateway.SingelMsgToUserResultList
//r.GetBatchMsgForPush(req.OperationID,req.MsgData,req.PushToUserIDList,)
msgBytes, _ := proto.Marshal(req.MsgData)
mReply := Resp{
@@ -188,14 +188,14 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe
log.NewError(req.OperationID, "data encode err", err.Error())
}
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{
var resp []*msggateway.SingleMsgToUserPlatform
tempT := &msggateway.SingelMsgToUserResultList{
UserID: v,
}
userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap {
if userConn != nil {
temp := &pbRelay.SingleMsgToUserPlatform{
temp := &msggateway.SingleMsgToUserPlatform{
RecvID: v,
RecvPlatFormID: int32(platform),
}
@@ -218,17 +218,17 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
return &msggateway.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
var singleUserResult []*msggateway.SingelMsgToUserResultList
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{
var resp []*msggateway.SingleMsgToUserPlatform
tempT := &msggateway.SingelMsgToUserResultList{
UserID: v,
}
userConnMap := ws.getUserAllCons(v)
@@ -268,7 +268,7 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp := &pbRelay.SingleMsgToUserPlatform{
temp := &msggateway.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: v,
RecvPlatFormID: int32(platform),
@@ -278,7 +278,7 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online
} else {
if utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
temp := &pbRelay.SingleMsgToUserPlatform{
temp := &msggateway.SingleMsgToUserPlatform{
ResultCode: 0,
RecvID: v,
RecvPlatFormID: int32(platform),
@@ -290,7 +290,7 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
return &msggateway.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
@@ -317,7 +317,7 @@ func (r *RPCServer) encodeWsData(wsData *sdkws.MsgData, operationID string) (byt
return replyBytes, nil
}
func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOfflineReq) (*pbRelay.KickUserOfflineResp, error) {
func (r *RPCServer) KickUserOffline(_ context.Context, req *msggateway.KickUserOfflineReq) (*msggateway.KickUserOfflineResp, error) {
log.NewInfo(req.OperationID, "KickUserOffline is arriving", req.String())
for _, v := range req.KickUserIDList {
log.NewWarn(req.OperationID, "SetTokenKicked ", v, req.PlatformID, req.OperationID)
@@ -329,16 +329,16 @@ func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOffl
conn.Close()
}
}
return &pbRelay.KickUserOfflineResp{}, nil
return &msggateway.KickUserOfflineResp{}, nil
}
func (r *RPCServer) MultiTerminalLoginCheck(ctx context.Context, req *pbRelay.MultiTerminalLoginCheckReq) (*pbRelay.MultiTerminalLoginCheckResp, error) {
func (r *RPCServer) MultiTerminalLoginCheck(ctx context.Context, req *msggateway.MultiTerminalLoginCheckReq) (*msggateway.MultiTerminalLoginCheckResp, error) {
ws.MultiTerminalLoginCheckerWithLock(req.UserID, int(req.PlatformID), req.Token, req.OperationID)
return &pbRelay.MultiTerminalLoginCheckResp{}, nil
return &msggateway.MultiTerminalLoginCheckResp{}, nil
}
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
func sendMsgToUser(conn *UserConn, bMsg []byte, in *msggateway.OnlinePushMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(),
@@ -352,7 +352,7 @@ func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, Re
}
}
func sendMsgBatchToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlineBatchPushOneMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
func sendMsgBatchToUser(conn *UserConn, bMsg []byte, in *msggateway.OnlineBatchPushOneMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(),
+3 -3
View File
@@ -6,7 +6,7 @@ import (
"OpenIM/pkg/common/log"
prome "OpenIM/pkg/common/prome"
"OpenIM/pkg/common/tokenverify"
pbRelay "OpenIM/pkg/proto/relay"
msggateway "OpenIM/pkg/proto/relay"
"OpenIM/pkg/utils"
"bytes"
"compress/gzip"
@@ -166,8 +166,8 @@ func (ws *WServer) MultiTerminalLoginRemoteChecker(userID string, platformID int
continue
}
log.Debug(operationID, "call this node ", v.Target(), rpcSvr.target)
client := pbRelay.NewRelayClient(v)
req := &pbRelay.MultiTerminalLoginCheckReq{OperationID: operationID, PlatformID: platformID, UserID: userID, Token: token}
client := msggateway.NewRelayClient(v)
req := &msggateway.MultiTerminalLoginCheckReq{OperationID: operationID, PlatformID: platformID, UserID: userID, Token: token}
log.NewInfo(operationID, "MultiTerminalLoginCheckReq ", client, req.String())
resp, err := client.MultiTerminalLoginCheck(context.Background(), req)
if err != nil {
+3 -3
View File
@@ -13,7 +13,7 @@ import (
"OpenIM/pkg/common/tracelog"
discoveryRegistry "OpenIM/pkg/discoveryregistry"
pbAuth "OpenIM/pkg/proto/auth"
pbRelay "OpenIM/pkg/proto/relay"
msggateway "OpenIM/pkg/proto/relay"
"OpenIM/pkg/utils"
"context"
"github.com/OpenIMSDK/openKeeper"
@@ -108,8 +108,8 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
return err
}
for _, v := range grpcCons {
client := pbRelay.NewRelayClient(v)
kickReq := &pbRelay.KickUserOfflineReq{OperationID: operationID, KickUserIDList: []string{userID}, PlatformID: platformID}
client := msggateway.NewRelayClient(v)
kickReq := &msggateway.KickUserOfflineReq{OperationID: operationID, KickUserIDList: []string{userID}, PlatformID: platformID}
log.NewInfo(operationID, "KickUserOffline ", client, kickReq.String())
_, err := client.KickUserOffline(ctx, kickReq)
return utils.Wrap(err, "")