mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-18 07:49:01 +08:00
Merge remote-tracking branch 'upstream/main'
This commit is contained in:
@@ -19,10 +19,10 @@ import (
|
||||
"time"
|
||||
|
||||
cbapi "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||
"github.com/OpenIMSDK/tools/config"
|
||||
"github.com/OpenIMSDK/tools/constant"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
)
|
||||
|
||||
func url() string {
|
||||
|
||||
@@ -23,18 +23,20 @@ import (
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/apiresp"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/apiresp"
|
||||
"github.com/OpenIMSDK/tools/constant"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
|
||||
var ErrConnClosed = errors.New("conn has closed")
|
||||
var ErrNotSupportMessageProtocol = errors.New("not support message protocol")
|
||||
var ErrClientClosed = errors.New("client actively close the connection")
|
||||
var ErrPanic = errors.New("panic error")
|
||||
var (
|
||||
ErrConnClosed = errors.New("conn has closed")
|
||||
ErrNotSupportMessageProtocol = errors.New("not support message protocol")
|
||||
ErrClientClosed = errors.New("client actively close the connection")
|
||||
ErrPanic = errors.New("panic error")
|
||||
)
|
||||
|
||||
const (
|
||||
// MessageText is for UTF-8 encoded text messages like JSON.
|
||||
@@ -102,10 +104,12 @@ func (c *Client) ResetClient(
|
||||
c.closedErr = nil
|
||||
c.token = token
|
||||
}
|
||||
|
||||
func (c *Client) pongHandler(_ string) error {
|
||||
c.conn.SetReadDeadline(pongWait)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) readMessage() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@@ -124,7 +128,7 @@ func (c *Client) readMessage() {
|
||||
return
|
||||
}
|
||||
log.ZDebug(c.ctx, "readMessage", "messageType", messageType)
|
||||
if c.closed == true { //连接刚置位已经关闭,但是协程还没退出的场景
|
||||
if c.closed == true { // 连接刚置位已经关闭,但是协程还没退出的场景
|
||||
c.closedErr = ErrConnClosed
|
||||
return
|
||||
}
|
||||
@@ -148,8 +152,8 @@ func (c *Client) readMessage() {
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c *Client) handleMessage(message []byte) error {
|
||||
if c.IsCompress {
|
||||
var decompressErr error
|
||||
@@ -198,26 +202,26 @@ func (c *Client) handleMessage(message []byte) error {
|
||||
}
|
||||
c.replyMessage(ctx, &binaryReq, messageErr, resp)
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Client) setAppBackgroundStatus(ctx context.Context, req Req) ([]byte, error) {
|
||||
resp, isBackground, messageErr := c.longConnServer.SetUserDeviceBackground(ctx, req)
|
||||
if messageErr != nil {
|
||||
return nil, messageErr
|
||||
}
|
||||
c.IsBackground = isBackground
|
||||
//todo callback
|
||||
// todo callback
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Client) close() {
|
||||
c.w.Lock()
|
||||
defer c.w.Unlock()
|
||||
c.closed = true
|
||||
c.conn.Close()
|
||||
c.longConnServer.UnRegister(c)
|
||||
|
||||
}
|
||||
|
||||
func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, resp []byte) {
|
||||
errResp := apiresp.ParseError(err)
|
||||
mReply := Resp{
|
||||
@@ -234,6 +238,7 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re
|
||||
log.ZWarn(ctx, "wireBinaryMsg replyMessage", err, "resp", mReply.String())
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error {
|
||||
var msg sdkws.PushMessages
|
||||
conversationID := utils.GetConversationIDByMsg(msgData)
|
||||
@@ -296,5 +301,4 @@ func (c *Client) writePongMsg() error {
|
||||
}
|
||||
_ = c.conn.SetWriteDeadline(writeWait)
|
||||
return c.conn.WriteMessage(PongMessage, nil)
|
||||
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"compress/gzip"
|
||||
"io"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
|
||||
type Compressor interface {
|
||||
@@ -33,6 +33,7 @@ type GzipCompressor struct {
|
||||
func NewGzipCompressor() *GzipCompressor {
|
||||
return &GzipCompressor{compressProtocol: "gzip"}
|
||||
}
|
||||
|
||||
func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
|
||||
gzipBuffer := bytes.Buffer{}
|
||||
gz := gzip.NewWriter(&gzipBuffer)
|
||||
|
||||
@@ -27,11 +27,13 @@ const (
|
||||
GzipCompressionProtocol = "gzip"
|
||||
BackgroundStatus = "isBackground"
|
||||
)
|
||||
|
||||
const (
|
||||
WebSocket = iota + 1
|
||||
)
|
||||
|
||||
const (
|
||||
//Websocket Protocol
|
||||
// Websocket Protocol.
|
||||
WSGetNewestSeq = 1001
|
||||
WSPullMsgBySeqList = 1002
|
||||
WSSendMsg = 1003
|
||||
|
||||
@@ -19,8 +19,8 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/OpenIMSDK/tools/constant"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
|
||||
type UserConnContext struct {
|
||||
@@ -71,9 +71,11 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont
|
||||
ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetRemoteAddr() string {
|
||||
return c.RemoteAddr
|
||||
}
|
||||
|
||||
func (c *UserConnContext) Query(key string) (string, bool) {
|
||||
var value string
|
||||
if value = c.Req.URL.Query().Get(key); value == "" {
|
||||
@@ -81,6 +83,7 @@ func (c *UserConnContext) Query(key string) (string, bool) {
|
||||
}
|
||||
return value, true
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetHeader(key string) (string, bool) {
|
||||
var value string
|
||||
if value = c.Req.Header.Get(key); value == "" {
|
||||
@@ -88,27 +91,35 @@ func (c *UserConnContext) GetHeader(key string) (string, bool) {
|
||||
}
|
||||
return value, true
|
||||
}
|
||||
|
||||
func (c *UserConnContext) SetHeader(key, value string) {
|
||||
c.RespWriter.Header().Set(key, value)
|
||||
}
|
||||
|
||||
func (c *UserConnContext) ErrReturn(error string, code int) {
|
||||
http.Error(c.RespWriter, error, code)
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetConnID() string {
|
||||
return c.ConnID
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetUserID() string {
|
||||
return c.Req.URL.Query().Get(WsUserID)
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetPlatformID() string {
|
||||
return c.Req.URL.Query().Get(PlatformID)
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetOperationID() string {
|
||||
return c.Req.URL.Query().Get(OperationID)
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetToken() string {
|
||||
return c.Req.URL.Query().Get(Token)
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetBackground() bool {
|
||||
b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus))
|
||||
if err != nil {
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
|
||||
type Encoder interface {
|
||||
@@ -26,12 +26,12 @@ type Encoder interface {
|
||||
Decode(encodeData []byte, decodeData interface{}) error
|
||||
}
|
||||
|
||||
type GobEncoder struct {
|
||||
}
|
||||
type GobEncoder struct{}
|
||||
|
||||
func NewGobEncoder() *GobEncoder {
|
||||
return &GobEncoder{}
|
||||
}
|
||||
|
||||
func (g *GobEncoder) Encode(data interface{}) ([]byte, error) {
|
||||
buff := bytes.Buffer{}
|
||||
enc := gob.NewEncoder(&buff)
|
||||
@@ -41,6 +41,7 @@ func (g *GobEncoder) Encode(data interface{}) ([]byte, error) {
|
||||
}
|
||||
return buff.Bytes(), nil
|
||||
}
|
||||
|
||||
func (g *GobEncoder) Decode(encodeData []byte, decodeData interface{}) error {
|
||||
buff := bytes.NewBuffer(encodeData)
|
||||
dec := gob.NewDecoder(buff)
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
package msggateway
|
||||
|
||||
import "github.com/OpenIMSDK/Open-IM-Server/pkg/apiresp"
|
||||
import "github.com/OpenIMSDK/tools/apiresp"
|
||||
|
||||
func httpError(ctx *UserConnContext, err error) {
|
||||
apiresp.HttpError(ctx.RespWriter, err)
|
||||
|
||||
@@ -18,19 +18,19 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/tokenverify"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/startrpc"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/OpenIMSDK/protocol/msggateway"
|
||||
"github.com/OpenIMSDK/tools/config"
|
||||
"github.com/OpenIMSDK/tools/constant"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/startrpc"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
|
||||
func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
@@ -181,13 +181,12 @@ func (s *Server) KickUserOffline(
|
||||
if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok {
|
||||
for _, client := range clients {
|
||||
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
|
||||
err := client.KickOnlineMessage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err := client.longConnServer.KickUserConn(client); err != nil {
|
||||
log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.ZWarn(ctx, "conn not exist", nil, "userID", v, "platformID", req.PlatformID)
|
||||
log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID)
|
||||
}
|
||||
}
|
||||
return &msggateway.KickUserOfflineResp{}, nil
|
||||
@@ -197,6 +196,6 @@ func (s *Server) MultiTerminalLoginCheck(
|
||||
ctx context.Context,
|
||||
req *msggateway.MultiTerminalLoginCheckReq,
|
||||
) (*msggateway.MultiTerminalLoginCheckResp, error) {
|
||||
//TODO implement me
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/tools/config"
|
||||
)
|
||||
|
||||
func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
|
||||
|
||||
@@ -22,14 +22,14 @@ import (
|
||||
)
|
||||
|
||||
type LongConn interface {
|
||||
//Close this connection
|
||||
// Close this connection
|
||||
Close() error
|
||||
// WriteMessage Write message to connection,messageType means data type,can be set binary(2) and text(1).
|
||||
WriteMessage(messageType int, message []byte) error
|
||||
// ReadMessage Read message from connection.
|
||||
ReadMessage() (int, []byte, error)
|
||||
// SetReadDeadline sets the read deadline on the underlying network connection,
|
||||
//after a read has timed out, will return an error.
|
||||
// after a read has timed out, will return an error.
|
||||
SetReadDeadline(timeout time.Duration) error
|
||||
// SetWriteDeadline sets to write deadline when send message,when read has timed out,will return error.
|
||||
SetWriteDeadline(timeout time.Duration) error
|
||||
@@ -58,6 +58,7 @@ func newGWebSocket(protocolType int, handshakeTimeout time.Duration) *GWebSocket
|
||||
func (d *GWebSocket) Close() error {
|
||||
return d.conn.Close()
|
||||
}
|
||||
|
||||
func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) error {
|
||||
upgrader := &websocket.Upgrader{
|
||||
HandshakeTimeout: d.handshakeTimeout,
|
||||
@@ -69,10 +70,10 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er
|
||||
}
|
||||
d.conn = conn
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (d *GWebSocket) WriteMessage(messageType int, message []byte) error {
|
||||
//d.setSendConn(d.conn)
|
||||
// d.setSendConn(d.conn)
|
||||
return d.conn.WriteMessage(messageType, message)
|
||||
}
|
||||
|
||||
@@ -83,6 +84,7 @@ func (d *GWebSocket) WriteMessage(messageType int, message []byte) error {
|
||||
func (d *GWebSocket) ReadMessage() (int, []byte, error) {
|
||||
return d.conn.ReadMessage()
|
||||
}
|
||||
|
||||
func (d *GWebSocket) SetReadDeadline(timeout time.Duration) error {
|
||||
return d.conn.SetReadDeadline(time.Now().Add(timeout))
|
||||
}
|
||||
@@ -97,7 +99,6 @@ func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Respo
|
||||
d.conn = conn
|
||||
}
|
||||
return httpResp, err
|
||||
|
||||
}
|
||||
|
||||
func (d *GWebSocket) IsNil() bool {
|
||||
@@ -110,9 +111,11 @@ func (d *GWebSocket) IsNil() bool {
|
||||
func (d *GWebSocket) SetConnNil() {
|
||||
d.conn = nil
|
||||
}
|
||||
|
||||
func (d *GWebSocket) SetReadLimit(limit int64) {
|
||||
d.conn.SetReadLimit(limit)
|
||||
}
|
||||
|
||||
func (d *GWebSocket) SetPongHandler(handler PongHandler) {
|
||||
d.conn.SetPongHandler(handler)
|
||||
}
|
||||
|
||||
@@ -17,16 +17,16 @@ package msggateway
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
|
||||
"github.com/OpenIMSDK/protocol/push"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
|
||||
"github.com/go-playground/validator/v10"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/OpenIMSDK/protocol/msg"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
|
||||
type Req struct {
|
||||
@@ -75,8 +75,10 @@ type GrpcHandler struct {
|
||||
func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry) *GrpcHandler {
|
||||
msgRpcClient := rpcclient.NewMessageRpcClient(client)
|
||||
pushRpcClient := rpcclient.NewPushRpcClient(client)
|
||||
return &GrpcHandler{msgRpcClient: &msgRpcClient,
|
||||
pushClient: &pushRpcClient, validate: validate}
|
||||
return &GrpcHandler{
|
||||
msgRpcClient: &msgRpcClient,
|
||||
pushClient: &pushRpcClient, validate: validate,
|
||||
}
|
||||
}
|
||||
|
||||
func (g GrpcHandler) GetSeq(context context.Context, data Req) ([]byte, error) {
|
||||
@@ -164,6 +166,7 @@ func (g GrpcHandler) UserLogout(context context.Context, data Req) ([]byte, erro
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (g GrpcHandler) SetUserDeviceBackground(_ context.Context, data Req) ([]byte, bool, error) {
|
||||
req := sdkws.SetAppBackgroundStatusReq{}
|
||||
if err := proto.Unmarshal(data.Data, &req); err != nil {
|
||||
|
||||
@@ -23,20 +23,20 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/tools/config"
|
||||
"github.com/OpenIMSDK/tools/constant"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
|
||||
"github.com/go-playground/validator/v10"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/tokenverify"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
|
||||
type LongConnServer interface {
|
||||
@@ -47,6 +47,7 @@ type LongConnServer interface {
|
||||
Validate(s interface{}) error
|
||||
SetCacheHandler(cache cache.MsgModel)
|
||||
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
|
||||
KickUserConn(client *Client) error
|
||||
UnRegister(c *Client)
|
||||
Compressor
|
||||
Encoder
|
||||
@@ -86,6 +87,7 @@ type kickHandler struct {
|
||||
func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) {
|
||||
ws.MessageHandler = NewGrpcHandler(ws.validate, client)
|
||||
}
|
||||
|
||||
func (ws *WsServer) SetCacheHandler(cache cache.MsgModel) {
|
||||
ws.cache = cache
|
||||
}
|
||||
@@ -113,7 +115,6 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
|
||||
}
|
||||
if config.port < 1024 {
|
||||
return nil, errors.New("port not allow to listen")
|
||||
|
||||
}
|
||||
v := validator.New()
|
||||
return &WsServer{
|
||||
@@ -134,6 +135,7 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
|
||||
Encoder: NewGobEncoder(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ws *WsServer) Run() error {
|
||||
var client *Client
|
||||
go func() {
|
||||
@@ -144,13 +146,13 @@ func (ws *WsServer) Run() error {
|
||||
case client = <-ws.unregisterChan:
|
||||
ws.unregisterClient(client)
|
||||
case onlineInfo := <-ws.kickHandlerChan:
|
||||
ws.multiTerminalLoginChecker(onlineInfo)
|
||||
ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient)
|
||||
}
|
||||
}
|
||||
}()
|
||||
http.HandleFunc("/", ws.wsHandler)
|
||||
// http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {})
|
||||
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening
|
||||
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening
|
||||
}
|
||||
|
||||
func (ws *WsServer) registerClient(client *Client) {
|
||||
@@ -165,7 +167,6 @@ func (ws *WsServer) registerClient(client *Client) {
|
||||
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
|
||||
atomic.AddInt64(&ws.onlineUserNum, 1)
|
||||
atomic.AddInt64(&ws.onlineUserConnNum, 1)
|
||||
|
||||
} else {
|
||||
i := &kickHandler{
|
||||
clientOK: clientOK,
|
||||
@@ -176,7 +177,7 @@ func (ws *WsServer) registerClient(client *Client) {
|
||||
log.ZDebug(client.ctx, "user exist", "userID", client.UserID, "platformID", client.PlatformID)
|
||||
if clientOK {
|
||||
ws.clients.Set(client.UserID, client)
|
||||
//已经有同平台的连接存在
|
||||
// 已经有同平台的连接存在
|
||||
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
|
||||
atomic.AddInt64(&ws.onlineUserConnNum, 1)
|
||||
} else {
|
||||
@@ -194,6 +195,7 @@ func (ws *WsServer) registerClient(client *Client) {
|
||||
ws.onlineUserConnNum,
|
||||
)
|
||||
}
|
||||
|
||||
func getRemoteAdders(client []*Client) string {
|
||||
var ret string
|
||||
for i, c := range client {
|
||||
@@ -206,86 +208,83 @@ func getRemoteAdders(client []*Client) string {
|
||||
return ret
|
||||
}
|
||||
|
||||
func (ws *WsServer) multiTerminalLoginChecker(info *kickHandler) {
|
||||
func (ws *WsServer) KickUserConn(client *Client) error {
|
||||
ws.clients.deleteClients(client.UserID, []*Client{client})
|
||||
return client.KickOnlineMessage()
|
||||
}
|
||||
|
||||
func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) {
|
||||
switch config.Config.MultiLoginPolicy {
|
||||
case constant.DefalutNotKick:
|
||||
case constant.PCAndOther:
|
||||
if constant.PlatformIDToClass(info.newClient.PlatformID) == constant.TerminalPC {
|
||||
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
|
||||
return
|
||||
}
|
||||
fallthrough
|
||||
case constant.AllLoginButSameTermKick:
|
||||
if info.clientOK {
|
||||
ws.clients.deleteClients(info.newClient.UserID, info.oldClients)
|
||||
for _, c := range info.oldClients {
|
||||
if clientOK {
|
||||
ws.clients.deleteClients(newClient.UserID, oldClients)
|
||||
for _, c := range oldClients {
|
||||
err := c.KickOnlineMessage()
|
||||
if err != nil {
|
||||
log.ZWarn(c.ctx, "KickOnlineMessage", err)
|
||||
}
|
||||
}
|
||||
m, err := ws.cache.GetTokensWithoutError(
|
||||
info.newClient.ctx,
|
||||
info.newClient.UserID,
|
||||
info.newClient.PlatformID,
|
||||
newClient.ctx,
|
||||
newClient.UserID,
|
||||
newClient.PlatformID,
|
||||
)
|
||||
if err != nil && err != redis.Nil {
|
||||
log.ZWarn(
|
||||
info.newClient.ctx,
|
||||
newClient.ctx,
|
||||
"get token from redis err",
|
||||
err,
|
||||
"userID",
|
||||
info.newClient.UserID,
|
||||
newClient.UserID,
|
||||
"platformID",
|
||||
info.newClient.PlatformID,
|
||||
newClient.PlatformID,
|
||||
)
|
||||
return
|
||||
}
|
||||
if m == nil {
|
||||
log.ZWarn(
|
||||
info.newClient.ctx,
|
||||
newClient.ctx,
|
||||
"m is nil",
|
||||
errors.New("m is nil"),
|
||||
"userID",
|
||||
info.newClient.UserID,
|
||||
newClient.UserID,
|
||||
"platformID",
|
||||
info.newClient.PlatformID,
|
||||
newClient.PlatformID,
|
||||
)
|
||||
return
|
||||
}
|
||||
log.ZDebug(
|
||||
info.newClient.ctx,
|
||||
newClient.ctx,
|
||||
"get token from redis",
|
||||
"userID",
|
||||
info.newClient.UserID,
|
||||
newClient.UserID,
|
||||
"platformID",
|
||||
info.newClient.PlatformID,
|
||||
newClient.PlatformID,
|
||||
"tokenMap",
|
||||
m,
|
||||
)
|
||||
|
||||
for k := range m {
|
||||
if k != info.newClient.ctx.GetToken() {
|
||||
if k != newClient.ctx.GetToken() {
|
||||
m[k] = constant.KickedToken
|
||||
}
|
||||
}
|
||||
log.ZDebug(info.newClient.ctx, "set token map is ", "token map", m, "userID", info.newClient.UserID)
|
||||
err = ws.cache.SetTokenMapByUidPid(info.newClient.ctx, info.newClient.UserID, info.newClient.PlatformID, m)
|
||||
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID)
|
||||
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
|
||||
if err != nil {
|
||||
log.ZWarn(
|
||||
info.newClient.ctx,
|
||||
"SetTokenMapByUidPid err",
|
||||
err,
|
||||
"userID",
|
||||
info.newClient.UserID,
|
||||
"platformID",
|
||||
info.newClient.PlatformID,
|
||||
)
|
||||
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (ws *WsServer) unregisterClient(client *Client) {
|
||||
defer ws.clientPool.Put(client)
|
||||
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
|
||||
|
||||
@@ -16,33 +16,38 @@ package msggateway
|
||||
|
||||
import "time"
|
||||
|
||||
type Option func(opt *configs)
|
||||
type configs struct {
|
||||
//长连接监听端口
|
||||
port int
|
||||
//长连接允许最大链接数
|
||||
maxConnNum int64
|
||||
//连接握手超时时间
|
||||
handshakeTimeout time.Duration
|
||||
//允许消息最大长度
|
||||
messageMaxMsgLength int
|
||||
}
|
||||
type (
|
||||
Option func(opt *configs)
|
||||
configs struct {
|
||||
// 长连接监听端口
|
||||
port int
|
||||
// 长连接允许最大链接数
|
||||
maxConnNum int64
|
||||
// 连接握手超时时间
|
||||
handshakeTimeout time.Duration
|
||||
// 允许消息最大长度
|
||||
messageMaxMsgLength int
|
||||
}
|
||||
)
|
||||
|
||||
func WithPort(port int) Option {
|
||||
return func(opt *configs) {
|
||||
opt.port = port
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxConnNum(num int64) Option {
|
||||
return func(opt *configs) {
|
||||
opt.maxConnNum = num
|
||||
}
|
||||
}
|
||||
|
||||
func WithHandshakeTimeout(t time.Duration) Option {
|
||||
return func(opt *configs) {
|
||||
opt.handshakeTimeout = t
|
||||
}
|
||||
}
|
||||
|
||||
func WithMessageMaxMsgLength(length int) Option {
|
||||
return func(opt *configs) {
|
||||
opt.messageMaxMsgLength = length
|
||||
|
||||
@@ -18,8 +18,8 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
|
||||
type UserMap struct {
|
||||
@@ -29,6 +29,7 @@ type UserMap struct {
|
||||
func newUserMap() *UserMap {
|
||||
return &UserMap{}
|
||||
}
|
||||
|
||||
func (u *UserMap) GetAll(key string) ([]*Client, bool) {
|
||||
allClients, ok := u.m.Load(key)
|
||||
if ok {
|
||||
@@ -36,6 +37,7 @@ func (u *UserMap) GetAll(key string) ([]*Client, bool) {
|
||||
}
|
||||
return nil, ok
|
||||
}
|
||||
|
||||
func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) {
|
||||
allClients, userExisted := u.m.Load(key)
|
||||
if userExisted {
|
||||
@@ -47,12 +49,12 @@ func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) {
|
||||
}
|
||||
if len(clients) > 0 {
|
||||
return clients, userExisted, true
|
||||
|
||||
}
|
||||
return clients, userExisted, false
|
||||
}
|
||||
return nil, userExisted, false
|
||||
}
|
||||
|
||||
func (u *UserMap) Set(key string, v *Client) {
|
||||
allClients, existed := u.m.Load(key)
|
||||
if existed {
|
||||
@@ -67,6 +69,7 @@ func (u *UserMap) Set(key string, v *Client) {
|
||||
u.m.Store(key, clients)
|
||||
}
|
||||
}
|
||||
|
||||
func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool) {
|
||||
allClients, existed := u.m.Load(key)
|
||||
if existed {
|
||||
@@ -87,6 +90,7 @@ func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool)
|
||||
}
|
||||
return existed
|
||||
}
|
||||
|
||||
func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser bool) {
|
||||
m := utils.SliceToMapAny(clients, func(c *Client) (string, struct{}) {
|
||||
return c.ctx.GetRemoteAddr(), struct{}{}
|
||||
@@ -110,6 +114,7 @@ func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser boo
|
||||
}
|
||||
return existed
|
||||
}
|
||||
|
||||
func (u *UserMap) DeleteAll(key string) {
|
||||
u.m.Delete(key)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user