mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-11 12:36:00 +08:00
ws add logout remove push token
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
promePkg "Open_IM/pkg/common/prometheus"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
pbChat "Open_IM/pkg/proto/msg"
|
||||
push "Open_IM/pkg/proto/push"
|
||||
pbRtc "Open_IM/pkg/proto/rtc"
|
||||
sdk_ws "Open_IM/pkg/proto/sdk_ws"
|
||||
"Open_IM/pkg/utils"
|
||||
@@ -58,7 +59,7 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
|
||||
promePkg.PromeInc(promePkg.PullMsgBySeqListTotalCounter)
|
||||
case constant.WsLogoutMsg:
|
||||
log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier)
|
||||
// conn.Close()
|
||||
ws.userLogoutReq(conn, &m)
|
||||
default:
|
||||
log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier)
|
||||
}
|
||||
@@ -157,7 +158,6 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
|
||||
ws.pullMsgBySeqListResp(conn, m, nReply)
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullMessageBySeqListResp) {
|
||||
log.NewInfo(m.OperationID, "pullMsgBySeqListResp come here ", pb.String())
|
||||
c, _ := proto.Marshal(pb)
|
||||
@@ -173,7 +173,40 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM
|
||||
len(mReply.Data))
|
||||
ws.sendMsg(conn, mReply)
|
||||
}
|
||||
func (ws *WServer) userLogoutReq(conn *UserConn, m *Req) {
|
||||
log.NewInfo(m.OperationID, "Ws call success to userLogoutReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data))
|
||||
rpcReq := push.DelUserPushTokenReq{}
|
||||
rpcReq.UserID = m.SendID
|
||||
rpcReq.OperationID = m.OperationID
|
||||
grpcConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, m.OperationID)
|
||||
if grpcConn == nil {
|
||||
errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil"
|
||||
log.NewError(rpcReq.OperationID, errMsg)
|
||||
ws.userLogoutResp(conn, m)
|
||||
return
|
||||
}
|
||||
msgClient := push.NewPushMsgServiceClient(grpcConn)
|
||||
reply, err := msgClient.DelUserPushToken(context.Background(), &rpcReq)
|
||||
if err != nil {
|
||||
log.NewError(rpcReq.OperationID, "DelUserPushToken err", err.Error())
|
||||
|
||||
ws.userLogoutResp(conn, m)
|
||||
} else {
|
||||
log.NewInfo(rpcReq.OperationID, "rpc call success to DelUserPushToken", reply.String())
|
||||
ws.userLogoutResp(conn, m)
|
||||
}
|
||||
ws.userLogoutResp(conn, m)
|
||||
|
||||
}
|
||||
func (ws *WServer) userLogoutResp(conn *UserConn, m *Req) {
|
||||
mReply := Resp{
|
||||
ReqIdentifier: m.ReqIdentifier,
|
||||
MsgIncr: m.MsgIncr,
|
||||
OperationID: m.OperationID,
|
||||
}
|
||||
ws.sendMsg(conn, mReply)
|
||||
_ = conn.Close()
|
||||
}
|
||||
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
|
||||
sendMsgAllCountLock.Lock()
|
||||
sendMsgAllCount++
|
||||
@@ -233,6 +266,7 @@ func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
|
||||
Data: b,
|
||||
}
|
||||
ws.sendMsg(conn, mReply)
|
||||
|
||||
}
|
||||
|
||||
func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
type UserConn struct {
|
||||
*websocket.Conn
|
||||
w *sync.Mutex
|
||||
platformID int32
|
||||
PushedMaxSeq uint32
|
||||
}
|
||||
type WServer struct {
|
||||
@@ -74,7 +75,7 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
log.Error(operationID, "upgrade http conn err", err.Error(), query)
|
||||
return
|
||||
} else {
|
||||
newConn := &UserConn{conn, new(sync.Mutex), 0}
|
||||
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0}
|
||||
userCount++
|
||||
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID)
|
||||
go ws.readMsg(newConn)
|
||||
|
||||
@@ -3,6 +3,7 @@ package logic
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db"
|
||||
"Open_IM/pkg/common/log"
|
||||
promePkg "Open_IM/pkg/common/prometheus"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
@@ -10,7 +11,6 @@ import (
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"net"
|
||||
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@@ -91,3 +91,15 @@ func (r *RPCServer) PushMsg(_ context.Context, pbData *pbPush.PushMsgReq) (*pbPu
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *RPCServer) DelUserPushToken(c context.Context, req *pbPush.DelUserPushTokenReq) (*pbPush.DelUserPushTokenResp, error) {
|
||||
var resp pbPush.DelUserPushTokenResp
|
||||
err := db.DB.DelFcmToken(req.UserID, int(req.PlatformID))
|
||||
if err != nil {
|
||||
errMsg := req.OperationID + " " + "SetFcmToken failed " + err.Error()
|
||||
log.NewError(req.OperationID, errMsg)
|
||||
resp.ErrCode = 500
|
||||
resp.ErrMsg = errMsg
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user