mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-19 00:09:02 +08:00
refactor: replace LongConn with ClientConn interface and simplify message handling
(cherry picked from commit a1dd79a459)
This commit is contained in:
@@ -2,18 +2,20 @@ package msggateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
"github.com/openimsdk/tools/apiresp"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||
pbAuth "github.com/openimsdk/protocol/auth"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
|
||||
"github.com/go-playground/validator/v10"
|
||||
@@ -23,10 +25,11 @@ import (
|
||||
"github.com/openimsdk/protocol/msggateway"
|
||||
"github.com/openimsdk/tools/discovery"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/utils/stringutil"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var wsSuccessResponse, _ = json.Marshal(&apiresp.ApiResponse{})
|
||||
|
||||
type LongConnServer interface {
|
||||
Run(ctx context.Context) error
|
||||
wsHandler(w http.ResponseWriter, r *http.Request)
|
||||
@@ -43,6 +46,7 @@ type LongConnServer interface {
|
||||
}
|
||||
|
||||
type WsServer struct {
|
||||
websocket *websocket.Upgrader
|
||||
msgGatewayConfig *Config
|
||||
port int
|
||||
wsMaxConnNum int64
|
||||
@@ -136,9 +140,13 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
|
||||
o(&config)
|
||||
}
|
||||
//userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUser)
|
||||
|
||||
upgrader := &websocket.Upgrader{
|
||||
HandshakeTimeout: config.handshakeTimeout,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
v := validator.New()
|
||||
return &WsServer{
|
||||
websocket: upgrader,
|
||||
msgGatewayConfig: msgGatewayConfig,
|
||||
port: config.port,
|
||||
wsMaxConnNum: config.maxConnNum,
|
||||
@@ -260,8 +268,7 @@ func (ws *WsServer) registerClient(client *Client) {
|
||||
)
|
||||
oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID)
|
||||
|
||||
log.ZInfo(client.ctx, "registerClient", "userID", client.UserID, "platformID", client.PlatformID,
|
||||
"sdkVersion", client.SDKVersion)
|
||||
log.ZInfo(client.ctx, "registerClient", "userID", client.UserID, "platformID", client.PlatformID)
|
||||
|
||||
if !userOK {
|
||||
ws.clients.Set(client.UserID, client)
|
||||
@@ -448,7 +455,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
|
||||
// validateRespWithRequest checks if the response matches the expected userID and platformID.
|
||||
func (ws *WsServer) validateRespWithRequest(ctx *UserConnContext, resp *pbAuth.ParseTokenResp) error {
|
||||
userID := ctx.GetUserID()
|
||||
platformID := stringutil.StringToInt32(ctx.GetPlatformID())
|
||||
platformID := int32(ctx.GetPlatformID())
|
||||
if resp.UserID != userID {
|
||||
return servererrs.ErrTokenInvalid.WrapMsg(fmt.Sprintf("token uid %s != userID %s", resp.UserID, userID))
|
||||
}
|
||||
@@ -458,19 +465,37 @@ func (ws *WsServer) validateRespWithRequest(ctx *UserConnContext, resp *pbAuth.P
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ws *WsServer) handlerError(ctx *UserConnContext, w http.ResponseWriter, r *http.Request, err error) {
|
||||
if !ctx.ShouldSendResp() {
|
||||
httpError(ctx, err)
|
||||
return
|
||||
}
|
||||
// the browser cannot get the response of upgrade failure
|
||||
data, err := json.Marshal(apiresp.ParseError(err))
|
||||
if err != nil {
|
||||
log.ZError(ctx, "json marshal failed", err)
|
||||
return
|
||||
}
|
||||
conn, upgradeErr := ws.websocket.Upgrade(w, r, nil)
|
||||
if upgradeErr != nil {
|
||||
log.ZWarn(ctx, "websocket upgrade failed", upgradeErr, "respErr", err, "resp", string(data))
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
log.ZWarn(ctx, "WriteMessage failed", err, "respErr", err, "resp", string(data))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// Create a new connection context
|
||||
connContext := newContext(w, r)
|
||||
|
||||
if !ws.ready.Load() {
|
||||
httpError(connContext, errs.New("ws server not ready"))
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the current number of online user connections exceeds the maximum limit
|
||||
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
|
||||
// If it exceeds the maximum connection number, return an error via HTTP and stop processing
|
||||
httpError(connContext, servererrs.ErrConnOverMaxNumLimit.WrapMsg("over max conn num limit"))
|
||||
ws.handlerError(connContext, w, r, servererrs.ErrConnOverMaxNumLimit.WrapMsg("over max conn num limit"))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -478,31 +503,14 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
err := connContext.ParseEssentialArgs()
|
||||
if err != nil {
|
||||
// If there's an error during parsing, return an error via HTTP and stop processing
|
||||
|
||||
httpError(connContext, err)
|
||||
return
|
||||
}
|
||||
|
||||
if ws.authClient == nil {
|
||||
httpError(connContext, errs.New("auth client is not initialized"))
|
||||
ws.handlerError(connContext, w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Call the authentication client to parse the Token obtained from the context
|
||||
resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken())
|
||||
if err != nil {
|
||||
// If there's an error parsing the Token, decide whether to send the error message via WebSocket based on the context flag
|
||||
shouldSendError := connContext.ShouldSendResp()
|
||||
if shouldSendError {
|
||||
// Create a WebSocket connection object and attempt to send the error message via WebSocket
|
||||
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
|
||||
if err := wsLongConn.RespondWithError(err, w, r); err == nil {
|
||||
// If the error message is successfully sent via WebSocket, stop processing
|
||||
return
|
||||
}
|
||||
}
|
||||
// If sending via WebSocket is not required or fails, return the error via HTTP and stop processing
|
||||
httpError(connContext, err)
|
||||
ws.handlerError(connContext, w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -510,32 +518,30 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
err = ws.validateRespWithRequest(connContext, resp)
|
||||
if err != nil {
|
||||
// If validation fails, return an error via HTTP and stop processing
|
||||
httpError(connContext, err)
|
||||
ws.handlerError(connContext, w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.ZDebug(connContext, "new conn", "token", connContext.GetToken())
|
||||
// Create a WebSocket long connection object
|
||||
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
|
||||
if err := wsLongConn.GenerateLongConn(w, r); err != nil {
|
||||
//If the creation of the long connection fails, the error is handled internally during the handshake process.
|
||||
log.ZWarn(connContext, "long connection fails", err)
|
||||
conn, err := ws.websocket.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.ZWarn(connContext, "websocket upgrade failed", err)
|
||||
return
|
||||
} else {
|
||||
// Check if a normal response should be sent via WebSocket
|
||||
shouldSendSuccessResp := connContext.ShouldSendResp()
|
||||
if shouldSendSuccessResp {
|
||||
// Attempt to send a success message through WebSocket
|
||||
if err := wsLongConn.RespondWithSuccess(); err != nil {
|
||||
// If the success message is successfully sent, end further processing
|
||||
return
|
||||
}
|
||||
}
|
||||
if connContext.ShouldSendResp() {
|
||||
if err := conn.WriteMessage(websocket.TextMessage, wsSuccessResponse); err != nil {
|
||||
log.ZWarn(connContext, "WriteMessage first response", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve a client object from the client pool, reset its state, and associate it with the current WebSocket long connection
|
||||
client := ws.clientPool.Get().(*Client)
|
||||
client.ResetClient(connContext, wsLongConn, ws)
|
||||
log.ZDebug(connContext, "new conn", "token", connContext.GetToken())
|
||||
|
||||
var pingInterval time.Duration
|
||||
if connContext.GetPlatformID() == constant.WebPlatformID {
|
||||
pingInterval = pingPeriod
|
||||
}
|
||||
|
||||
client := new(Client)
|
||||
client.ResetClient(connContext, NewWebSocketClientConn(conn, maxMessageSize, pongWait, pingInterval), ws)
|
||||
|
||||
// Register the client with the server and start message processing
|
||||
ws.registerChan <- client
|
||||
|
||||
Reference in New Issue
Block a user