Files
open-im-server/internal/msggateway/new/n_ws_server.go
T

191 lines
4.9 KiB
Go
Raw Normal View History

2023-02-14 21:08:36 +08:00
package new
import (
2023-02-23 19:15:30 +08:00
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/tokenverify"
"OpenIM/pkg/utils"
2023-02-14 21:08:36 +08:00
"errors"
2023-02-16 16:32:31 +08:00
"fmt"
"github.com/go-playground/validator/v10"
2023-02-14 21:08:36 +08:00
"github.com/gorilla/websocket"
"net/http"
"sync"
2023-02-15 19:57:16 +08:00
"sync/atomic"
2023-02-14 21:08:36 +08:00
"time"
)
2023-02-15 19:57:16 +08:00
var bufferPool = sync.Pool{
New: func() interface{} {
2023-02-22 21:06:55 +08:00
return make([]byte, 1024)
2023-02-15 19:57:16 +08:00
},
}
2023-02-16 16:32:31 +08:00
2023-02-14 21:08:36 +08:00
type LongConnServer interface {
Run() error
}
type Server struct {
rpcPort int
wsMaxConnNum int
longConnServer *LongConnServer
2023-02-22 21:06:55 +08:00
//rpcServer *RpcServer
2023-02-14 21:08:36 +08:00
}
type WsServer struct {
2023-02-16 16:32:31 +08:00
port int
wsMaxConnNum int64
wsUpGrader *websocket.Upgrader
registerChan chan *Client
unregisterChan chan *Client
clients *UserMap
clientPool sync.Pool
onlineUserNum int64
onlineUserConnNum int64
gzipCompressor Compressor
2023-02-22 21:06:55 +08:00
encoder Encoder
2023-02-16 16:32:31 +08:00
handler MessageHandler
handshakeTimeout time.Duration
readBufferSize, WriteBufferSize int
2023-02-22 21:06:55 +08:00
validate *validator.Validate
2023-02-14 21:08:36 +08:00
}
func newWsServer(opts ...Option) (*WsServer, error) {
var config configs
for _, o := range opts {
o(&config)
}
if config.port < 1024 {
return nil, errors.New("port not allow to listen")
}
return &WsServer{
2023-02-16 16:32:31 +08:00
port: config.port,
wsMaxConnNum: config.maxConnNum,
handshakeTimeout: config.handshakeTimeout,
readBufferSize: config.messageMaxMsgLength,
2023-02-14 21:08:36 +08:00
clientPool: sync.Pool{
New: func() interface{} {
return new(Client)
},
},
2023-02-16 16:32:31 +08:00
validate: validator.New(),
2023-02-22 21:06:55 +08:00
clients: newUserMap(),
//handler: NewGrpcHandler(validate),
2023-02-14 21:08:36 +08:00
}, nil
}
func (ws *WsServer) Run() error {
2023-02-15 19:57:16 +08:00
var client *Client
go func() {
for {
select {
case client = <-ws.registerChan:
ws.registerClient(client)
2023-02-16 16:32:31 +08:00
case client = <-ws.unregisterChan:
ws.unregisterClient(client)
2023-02-15 19:57:16 +08:00
}
}
}()
2023-02-22 21:06:55 +08:00
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
2023-02-16 16:32:31 +08:00
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening
2023-02-15 19:57:16 +08:00
}
func (ws *WsServer) registerClient(client *Client) {
var (
2023-02-22 21:06:55 +08:00
userOK bool
2023-02-16 16:32:31 +08:00
clientOK bool
2023-02-22 21:06:55 +08:00
cli *Client
2023-02-15 19:57:16 +08:00
)
2023-02-22 21:06:55 +08:00
cli, userOK, clientOK = ws.clients.Get(client.userID, client.platformID)
if !userOK {
ws.clients.Set(client.userID, client)
2023-02-16 16:32:31 +08:00
atomic.AddInt64(&ws.onlineUserNum, 1)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
2023-02-22 21:06:55 +08:00
} else {
if clientOK { //已经有同平台的连接存在
ws.clients.Set(client.userID, client)
2023-02-16 16:32:31 +08:00
ws.multiTerminalLoginChecker(cli)
2023-02-22 21:06:55 +08:00
} else {
ws.clients.Set(client.userID, client)
2023-02-16 16:32:31 +08:00
atomic.AddInt64(&ws.onlineUserConnNum, 1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
}
2023-02-15 19:57:16 +08:00
}
2023-02-16 16:32:31 +08:00
}
func (ws *WsServer) multiTerminalLoginChecker(client *Client) {
}
func (ws *WsServer) unregisterClient(client *Client) {
2023-02-22 21:06:55 +08:00
isDeleteUser := ws.clients.delete(client.userID, client.platformID)
if isDeleteUser {
2023-02-16 16:32:31 +08:00
atomic.AddInt64(&ws.onlineUserNum, -1)
2023-02-15 19:57:16 +08:00
}
2023-02-16 16:32:31 +08:00
atomic.AddInt64(&ws.onlineUserConnNum, -1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
2023-02-15 19:57:16 +08:00
}
2023-02-16 16:32:31 +08:00
2023-02-14 21:08:36 +08:00
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
context := newContext(w, r)
2023-02-16 16:32:31 +08:00
if ws.onlineUserConnNum >= ws.wsMaxConnNum {
httpError(context, constant.ErrConnOverMaxNumLimit)
return
}
var (
2023-02-22 21:06:55 +08:00
token string
userID string
platformID string
exists bool
compression bool
compressor Compressor
2023-02-16 16:32:31 +08:00
)
token, exists = context.Query(TOKEN)
if !exists {
httpError(context, constant.ErrConnArgsErr)
return
}
2023-02-22 21:06:55 +08:00
userID, exists = context.Query(WS_USERID)
2023-02-16 16:32:31 +08:00
if !exists {
httpError(context, constant.ErrConnArgsErr)
return
}
platformID, exists = context.Query(PLATFORM_ID)
if !exists {
httpError(context, constant.ErrConnArgsErr)
return
}
err := tokenverify.WsVerifyToken(token, userID, platformID)
if err != nil {
httpError(context, err)
return
}
2023-02-22 21:06:55 +08:00
wsLongConn := newGWebSocket(constant.WebSocket, ws.handshakeTimeout, ws.readBufferSize)
2023-02-16 16:32:31 +08:00
err = wsLongConn.GenerateLongConn(w, r)
if err != nil {
httpError(context, err)
return
}
compressProtoc, exists := context.Query(COMPRESSION)
if exists {
2023-02-22 21:06:55 +08:00
if compressProtoc == GZIP_COMPRESSION_PROTOCAL {
2023-02-16 16:32:31 +08:00
compression = true
compressor = ws.gzipCompressor
}
}
compressProtoc, exists = context.GetHeader(COMPRESSION)
if exists {
2023-02-22 21:06:55 +08:00
if compressProtoc == GZIP_COMPRESSION_PROTOCAL {
2023-02-16 16:32:31 +08:00
compression = true
compressor = ws.gzipCompressor
2023-02-14 21:08:36 +08:00
}
}
2023-02-22 21:06:55 +08:00
client := ws.clientPool.Get().(*Client)
client.ResetClient(context, wsLongConn, compression, compressor, ws.encoder, ws.handler, ws.unregisterChan, ws.validate)
2023-02-16 16:32:31 +08:00
ws.registerChan <- client
go client.readMessage()
2023-02-14 21:08:36 +08:00
}