Merge branch 'main' into localcache

# Conflicts:
#	go.mod
#	go.sum
#	internal/msggateway/hub_server.go
#	internal/push/consumer_init.go
#	internal/push/offlinepush/fcm/push.go
#	internal/push/offlinepush/getui/push.go
#	internal/push/offlinepush/jpush/push.go
#	internal/push/push_handler.go
#	internal/push/push_rpc_server.go
#	internal/push/push_to_client.go
#	internal/rpc/friend/friend.go
#	internal/rpc/msg/server.go
#	internal/rpc/msg/verify.go
#	pkg/common/config/config.go
#	pkg/common/db/cache/conversation.go
#	pkg/common/db/cache/meta_cache.go
#	pkg/common/db/cache/msg.go
#	pkg/common/db/localcache/conversation.go
#	pkg/common/db/localcache/group.go
#	pkg/rpcclient/conversation.go
#	pkg/rpcclient/group.go
This commit is contained in:
withchao
2024-03-07 11:59:53 +08:00
326 changed files with 7738 additions and 3878 deletions
+9 -14
View File
@@ -20,18 +20,13 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/mcontext"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
)
func callBackURL() string {
return config.Config.Callback.CallbackUrl
}
func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAppBackground bool, connID string) error {
if !config.Config.Callback.CallbackUserOnline.Enable {
func CallbackUserOnline(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int, isAppBackground bool, connID string) error {
if !globalConfig.Callback.CallbackUserOnline.Enable {
return nil
}
req := cbapi.CallbackUserOnlineReq{
@@ -49,14 +44,14 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp
ConnID: connID,
}
resp := cbapi.CommonCallbackResp{}
if err := http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline); err != nil {
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, &req, &resp, globalConfig.Callback.CallbackUserOnline); err != nil {
return err
}
return nil
}
func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error {
if !config.Config.Callback.CallbackUserOffline.Enable {
func CallbackUserOffline(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int, connID string) error {
if !globalConfig.Callback.CallbackUserOffline.Enable {
return nil
}
req := &cbapi.CallbackUserOfflineReq{
@@ -73,14 +68,14 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con
ConnID: connID,
}
resp := &cbapi.CallbackUserOfflineResp{}
if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil {
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackUserOffline); err != nil {
return err
}
return nil
}
func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error {
if !config.Config.Callback.CallbackUserKickOff.Enable {
func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int) error {
if !globalConfig.Callback.CallbackUserKickOff.Enable {
return nil
}
req := &cbapi.CallbackUserKickOffReq{
@@ -96,7 +91,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
Seq: time.Now().UnixMilli(),
}
resp := &cbapi.CommonCallbackResp{}
if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil {
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackUserOffline); err != nil {
return err
}
return nil
+35 -34
View File
@@ -22,16 +22,15 @@ import (
"sync"
"sync/atomic"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/apiresp"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"google.golang.org/protobuf/proto"
)
var (
@@ -76,25 +75,20 @@ type Client struct {
token string
}
func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
return &Client{
w: new(sync.Mutex),
conn: conn,
PlatformID: utils.StringToInt(ctx.GetPlatformID()),
IsCompress: isCompress,
UserID: ctx.GetUserID(),
ctx: ctx,
}
}
// function not used
// func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
// return &Client{
// w: new(sync.Mutex),
// conn: conn,
// PlatformID: utils.StringToInt(ctx.GetPlatformID()),
// IsCompress: isCompress,
// UserID: ctx.GetUserID(),
// ctx: ctx,
// }
// }
// ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient(
ctx *UserConnContext,
conn LongConn,
isBackground, isCompress bool,
longConnServer LongConnServer,
token string,
) {
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isBackground, isCompress bool, longConnServer LongConnServer, token string) {
c.w = new(sync.Mutex)
c.conn = conn
c.PlatformID = utils.StringToInt(ctx.GetPlatformID())
@@ -109,9 +103,11 @@ func (c *Client) ResetClient(
c.token = token
}
// pingHandler handles ping messages and sends pong responses.
func (c *Client) pingHandler(_ string) error {
_ = c.conn.SetReadDeadline(pongWait)
if err := c.conn.SetReadDeadline(pongWait); err != nil {
return err
}
return c.writePongMsg()
}
@@ -138,7 +134,8 @@ func (c *Client) readMessage() {
}
log.ZDebug(c.ctx, "readMessage", "messageType", messageType)
if c.closed.Load() { // 连接刚置位已经关闭,但是协程还没退出的场景
if c.closed.Load() {
// The scenario where the connection has just been closed, but the coroutine has not exited
c.closedErr = ErrConnClosed
return
}
@@ -173,7 +170,7 @@ func (c *Client) handleMessage(message []byte) error {
var err error
message, err = c.longConnServer.DecompressWithPool(message)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
}
@@ -182,15 +179,15 @@ func (c *Client) handleMessage(message []byte) error {
err := c.longConnServer.Decode(message, binaryReq)
if err != nil {
return utils.Wrap(err, "")
return err
}
if err := c.longConnServer.Validate(binaryReq); err != nil {
return utils.Wrap(err, "")
return err
}
if binaryReq.SendID != c.UserID {
return utils.Wrap(errors.New("exception conn userID not same to req userID"), binaryReq.String())
return errs.Wrap(errors.New("exception conn userID not same to req userID"), binaryReq.String())
}
ctx := mcontext.WithMustInfoCtx(
@@ -236,7 +233,7 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req *Req) ([]byte,
}
c.IsBackground = isBackground
// todo callback
// TODO: callback
return resp, nil
}
@@ -270,7 +267,7 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re
}
if binaryReq.ReqIdentifier == WsLogoutMsg {
return errors.New("user logout")
return errs.Wrap(errors.New("user logout"))
}
return nil
}
@@ -313,17 +310,21 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
encodedBuf, err := c.longConnServer.Encode(resp)
if err != nil {
return utils.Wrap(err, "")
return err
}
c.w.Lock()
defer c.w.Unlock()
_ = c.conn.SetWriteDeadline(writeWait)
err = c.conn.SetWriteDeadline(writeWait)
if err != nil {
return err
}
if c.IsCompress {
resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf)
if compressErr != nil {
return utils.Wrap(compressErr, "")
return compressErr
}
return c.conn.WriteMessage(MessageBinary, resultBuf)
}
@@ -341,7 +342,7 @@ func (c *Client) writePongMsg() error {
err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
return utils.Wrap(err, "")
return err
}
return c.conn.WriteMessage(PongMessage, nil)
+25 -19
View File
@@ -17,11 +17,10 @@ package msggateway
import (
"bytes"
"compress/gzip"
"errors"
"io"
"sync"
"github.com/OpenIMSDK/tools/utils"
"github.com/OpenIMSDK/tools/errs"
)
var (
@@ -46,12 +45,15 @@ func NewGzipCompressor() *GzipCompressor {
func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
gzipBuffer := bytes.Buffer{}
gz := gzip.NewWriter(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err, "GzipCompressor.Compress: writing to gzip writer failed")
}
if err := gz.Close(); err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err, "GzipCompressor.Compress: closing gzip writer failed")
}
return gzipBuffer.Bytes(), nil
}
@@ -63,10 +65,10 @@ func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
gz.Reset(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err, "GzipCompressor.CompressWithPool: error writing data")
}
if err := gz.Close(); err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err, "GzipCompressor.CompressWithPool: error closing gzip writer")
}
return gzipBuffer.Bytes(), nil
}
@@ -75,32 +77,36 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
buff := bytes.NewBuffer(compressedData)
reader, err := gzip.NewReader(buff)
if err != nil {
return nil, utils.Wrap(err, "NewReader failed")
return nil, errs.Wrap(err, "GzipCompressor.DeCompress: NewReader creation failed")
}
compressedData, err = io.ReadAll(reader)
decompressedData, err := io.ReadAll(reader)
if err != nil {
return nil, utils.Wrap(err, "ReadAll failed")
return nil, errs.Wrap(err, "GzipCompressor.DeCompress: reading from gzip reader failed")
}
_ = reader.Close()
return compressedData, nil
if err = reader.Close(); err != nil {
// Even if closing the reader fails, we've successfully read the data,
// so we return the decompressed data and an error indicating the close failure.
return decompressedData, errs.Wrap(err, "GzipCompressor.DeCompress: closing gzip reader failed")
}
return decompressedData, nil
}
func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error) {
reader := gzipReaderPool.Get().(*gzip.Reader)
if reader == nil {
return nil, errors.New("NewReader failed")
}
defer gzipReaderPool.Put(reader)
err := reader.Reset(bytes.NewReader(compressedData))
if err != nil {
return nil, utils.Wrap(err, "NewReader failed")
return nil, errs.Wrap(err, "GzipCompressor.DecompressWithPool: resetting gzip reader failed")
}
compressedData, err = io.ReadAll(reader)
decompressedData, err := io.ReadAll(reader)
if err != nil {
return nil, utils.Wrap(err, "ReadAll failed")
return nil, errs.Wrap(err, "GzipCompressor.DecompressWithPool: reading from pooled gzip reader failed")
}
_ = reader.Close()
return compressedData, nil
if err = reader.Close(); err != nil {
// Similar to DeCompress, return the data and error for close failure.
return decompressedData, errs.Wrap(err, "GzipCompressor.DecompressWithPool: closing pooled gzip reader failed")
}
return decompressedData, nil
}
+13
View File
@@ -37,10 +37,16 @@ func TestCompressDecompress(t *testing.T) {
// compress
dest, err := compressor.CompressWithPool(src)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// decompress
res, err := compressor.DecompressWithPool(dest)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// check
@@ -60,10 +66,16 @@ func TestCompressDecompressWithConcurrency(t *testing.T) {
// compress
dest, err := compressor.CompressWithPool(src)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// decompress
res, err := compressor.DecompressWithPool(dest)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// check
@@ -99,6 +111,7 @@ func BenchmarkDecompress(b *testing.B) {
compressor := NewGzipCompressor()
comdata, err := compressor.Compress(src)
assert.Equal(b, nil, err)
for i := 0; i < b.N; i++ {
+3 -3
View File
@@ -18,7 +18,7 @@ import (
"bytes"
"encoding/gob"
"github.com/OpenIMSDK/tools/utils"
"github.com/OpenIMSDK/tools/errs"
)
type Encoder interface {
@@ -37,7 +37,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) {
enc := gob.NewEncoder(&buff)
err := enc.Encode(data)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "GobEncoder.Encode failed")
}
return buff.Bytes(), nil
}
@@ -47,7 +47,7 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
dec := gob.NewDecoder(buff)
err := dec.Decode(decodeData)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err, "GobEncoder.Decode failed")
}
return nil
}
+14 -9
View File
@@ -17,38 +17,39 @@ package msggateway
import (
"context"
"google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"google.golang.org/grpc"
)
func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(config)
if err != nil {
return err
}
msgModel := cache.NewMsgCacheModel(rdb)
s.LongConnServer.SetDiscoveryRegistry(disCov)
msgModel := cache.NewMsgCacheModel(rdb, config)
s.LongConnServer.SetDiscoveryRegistry(disCov, config)
s.LongConnServer.SetCacheHandler(msgModel)
msggateway.RegisterMsgGatewayServer(server, s)
return nil
}
func (s *Server) Start() error {
func (s *Server) Start(conf *config.GlobalConfig) error {
return startrpc.Start(
s.rpcPort,
config.Config.RpcRegisterName.OpenImMessageGatewayName,
conf.RpcRegisterName.OpenImMessageGatewayName,
s.prometheusPort,
conf,
s.InitServer,
)
}
@@ -57,6 +58,7 @@ type Server struct {
rpcPort int
prometheusPort int
LongConnServer LongConnServer
config *config.GlobalConfig
pushTerminal map[int]struct{}
}
@@ -66,10 +68,13 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
func NewServer(rpcPort int, proPort int, longConnServer LongConnServer) *Server {
s := &Server{
func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, config *config.GlobalConfig) *Server {
return &Server{
rpcPort: rpcPort,
prometheusPort: proPort,
LongConnServer: longConnServer,
pushTerminal: make(map[int]struct{}),
config: config,
}
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
@@ -87,7 +92,7 @@ func (s *Server) GetUsersOnlineStatus(
ctx context.Context,
req *msggateway.GetUsersOnlineStatusReq,
) (*msggateway.GetUsersOnlineStatusResp, error) {
if !authverify.IsAppManagerUid(ctx) {
if !authverify.IsAppManagerUid(ctx, s.config) {
return nil, errs.ErrNoPermission.Wrap("only app manager")
}
var resp msggateway.GetUsersOnlineStatusResp
+14 -33
View File
@@ -18,48 +18,29 @@ import (
"fmt"
"time"
"github.com/OpenIMSDK/tools/utils"
"golang.org/x/sync/errgroup"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// RunWsAndServer run ws server.
func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
fmt.Println(
"start rpc/msg_gateway server, port: ",
rpcPort,
wsPort,
prometheusPort,
", OpenIM version: ",
config.Version,
)
func RunWsAndServer(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error {
fmt.Println("start rpc/msg_gateway server, port: ", rpcPort, wsPort, prometheusPort, ", OpenIM version: ", config.Version)
longServer, err := NewWsServer(
conf,
WithPort(wsPort),
WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)),
WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second),
WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen),
WithWriteBufferSize(config.Config.LongConnSvr.WebsocketWriteBufferSize),
WithMaxConnNum(int64(conf.LongConnSvr.WebsocketMaxConnNum)),
WithHandshakeTimeout(time.Duration(conf.LongConnSvr.WebsocketTimeout)*time.Second),
WithMessageMaxMsgLength(conf.LongConnSvr.WebsocketMaxMsgLen),
WithWriteBufferSize(conf.LongConnSvr.WebsocketWriteBufferSize),
)
if err != nil {
return err
}
hubServer := NewServer(rpcPort, prometheusPort, longServer)
wg := errgroup.Group{}
wg.Go(func() error {
err = hubServer.Start()
if err != nil {
return utils.Wrap1(err)
}
return err
})
wg.Go(func() error {
return hubServer.LongConnServer.Run()
})
err = wg.Wait()
return err
hubServer := NewServer(rpcPort, prometheusPort, longServer, conf)
netDone := make(chan error)
go func() {
err = hubServer.Start(conf)
netDone <- err
}()
return hubServer.LongConnServer.Run(netDone)
}
+20 -6
View File
@@ -15,9 +15,11 @@
package msggateway
import (
"errors"
"net/http"
"time"
"github.com/OpenIMSDK/tools/errs"
"github.com/gorilla/websocket"
)
@@ -72,7 +74,8 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return err
// The upgrader.Upgrade method usually returns enough error messages to diagnose problems that may occur during the upgrade
return errs.Wrap(err, "GenerateLongConn: WebSocket upgrade failed")
}
d.conn = conn
return nil
@@ -96,7 +99,16 @@ func (d *GWebSocket) SetReadDeadline(timeout time.Duration) error {
}
func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error {
return d.conn.SetWriteDeadline(time.Now().Add(timeout))
// TODO add error
if timeout <= 0 {
return errs.Wrap(errors.New("timeout must be greater than 0"))
}
// TODO SetWriteDeadline Future add error handling
if err := d.conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return errs.Wrap(err, "GWebSocket.SetWriteDeadline failed")
}
return nil
}
func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Response, error) {
@@ -108,10 +120,12 @@ func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Respo
}
func (d *GWebSocket) IsNil() bool {
if d.conn != nil {
return false
}
return true
return d.conn == nil
//
// if d.conn != nil {
// return false
// }
// return true
}
func (d *GWebSocket) SetConnNil() {
+35 -27
View File
@@ -18,17 +18,16 @@ import (
"context"
"sync"
"github.com/OpenIMSDK/protocol/push"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/go-playground/validator/v10"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/push"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/protobuf/proto"
)
type Req struct {
@@ -107,9 +106,9 @@ type GrpcHandler struct {
validate *validator.Validate
}
func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry) *GrpcHandler {
msgRpcClient := rpcclient.NewMessageRpcClient(client)
pushRpcClient := rpcclient.NewPushRpcClient(client)
func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *GrpcHandler {
msgRpcClient := rpcclient.NewMessageRpcClient(client, config)
pushRpcClient := rpcclient.NewPushRpcClient(client, config)
return &GrpcHandler{
msgRpcClient: &msgRpcClient,
pushClient: &pushRpcClient, validate: validate,
@@ -119,10 +118,10 @@ func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDi
func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) {
req := sdkws.GetMaxSeqReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: error unmarshaling request")
}
if err := g.validate.Struct(&req); err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: validation failed")
}
resp, err := g.msgRpcClient.GetMaxSeq(context, &req)
if err != nil {
@@ -130,28 +129,37 @@ func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error)
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: error marshaling response")
}
return c, nil
}
func (g GrpcHandler) SendMessage(context context.Context, data *Req) ([]byte, error) {
msgData := sdkws.MsgData{}
// SendMessage handles the sending of messages through gRPC. It unmarshals the request data,
// validates the message, and then sends it using the message RPC client.
func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error) {
// Unmarshal the message data from the request.
var msgData sdkws.MsgData
if err := proto.Unmarshal(data.Data, &msgData); err != nil {
return nil, err
return nil, errs.Wrap(err, "error unmarshalling message data")
}
// Validate the message data structure.
if err := g.validate.Struct(&msgData); err != nil {
return nil, err
return nil, errs.Wrap(err, "message data validation failed")
}
req := msg.SendMsgReq{MsgData: &msgData}
resp, err := g.msgRpcClient.SendMsg(context, &req)
resp, err := g.msgRpcClient.SendMsg(ctx, &req)
if err != nil {
return nil, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "error marshaling response")
}
return c, nil
}
@@ -162,7 +170,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "error marshaling response")
}
return c, nil
}
@@ -170,10 +178,10 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) {
req := sdkws.PullMessageBySeqsReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
return nil, errs.Wrap(err, "error unmarshaling request")
}
if err := g.validate.Struct(data); err != nil {
return nil, err
return nil, errs.Wrap(err, "validation failed")
}
resp, err := g.msgRpcClient.PullMessageBySeqList(context, &req)
if err != nil {
@@ -181,7 +189,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "error marshaling response")
}
return c, nil
}
@@ -189,7 +197,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) {
req := push.DelUserPushTokenReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
return nil, errs.Wrap(err, "error unmarshaling request")
}
resp, err := g.pushClient.DelUserPushToken(context, &req)
if err != nil {
@@ -197,7 +205,7 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "error marshaling response")
}
return c, nil
}
@@ -205,10 +213,10 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err
func (g GrpcHandler) SetUserDeviceBackground(_ context.Context, data *Req) ([]byte, bool, error) {
req := sdkws.SetAppBackgroundStatusReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, false, err
return nil, false, errs.Wrap(err, "error unmarshaling request")
}
if err := g.validate.Struct(data); err != nil {
return nil, false, err
return nil, false, errs.Wrap(err, "validation failed")
}
return nil, req.IsBackground, nil
}
+59 -63
View File
@@ -20,42 +20,36 @@ import (
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/OpenIMSDK/tools/apiresp"
"github.com/go-playground/validator/v10"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/tools/apiresp"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
type LongConnServer interface {
Run() error
Run(done chan error) error
wsHandler(w http.ResponseWriter, r *http.Request)
GetUserAllCons(userID string) ([]*Client, bool)
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
Validate(s any) error
SetCacheHandler(cache cache.MsgModel)
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig)
KickUserConn(client *Client) error
UnRegister(c *Client)
SetKickHandlerInfo(i *kickHandler)
@@ -64,13 +58,15 @@ type LongConnServer interface {
MessageHandler
}
var bufferPool = sync.Pool{
New: func() any {
return make([]byte, 1024)
},
}
// bufferPool is unused
// var bufferPool = sync.Pool{
// New: func() any {
// return make([]byte, 1024)
// },
// }
type WsServer struct {
globalConfig *config.GlobalConfig
port int
wsMaxConnNum int64
registerChan chan *Client
@@ -90,15 +86,16 @@ type WsServer struct {
Encoder
MessageHandler
}
type kickHandler struct {
clientOK bool
oldClients []*Client
newClient *Client
}
func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry) {
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov)
u := rpcclient.NewUserRpcClient(disCov)
func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) {
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, config)
u := rpcclient.NewUserRpcClient(disCov, config)
ws.userClient = &u
ws.disCov = disCov
}
@@ -110,12 +107,12 @@ func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, sta
}
switch status {
case constant.Online:
err := CallbackUserOnline(ctx, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
err := CallbackUserOnline(ctx, ws.globalConfig, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
if err != nil {
log.ZWarn(ctx, "CallbackUserOnline err", err)
}
case constant.Offline:
err := CallbackUserOffline(ctx, client.UserID, client.PlatformID, client.ctx.GetConnID())
err := CallbackUserOffline(ctx, ws.globalConfig, client.UserID, client.PlatformID, client.ctx.GetConnID())
if err != nil {
log.ZWarn(ctx, "CallbackUserOffline err", err)
}
@@ -131,7 +128,9 @@ func (ws *WsServer) UnRegister(c *Client) {
}
func (ws *WsServer) Validate(s any) error {
//?question?
if s == nil {
return errs.Wrap(errors.New("input cannot be nil"))
}
return nil
}
@@ -143,13 +142,14 @@ func (ws *WsServer) GetUserPlatformCons(userID string, platform int) ([]*Client,
return ws.clients.Get(userID, platform)
}
func NewWsServer(opts ...Option) (*WsServer, error) {
func NewWsServer(globalConfig *config.GlobalConfig, opts ...Option) (*WsServer, error) {
var config configs
for _, o := range opts {
o(&config)
}
v := validator.New()
return &WsServer{
globalConfig: globalConfig,
port: config.port,
wsMaxConnNum: config.maxConnNum,
writeBufferSize: config.writeBufferSize,
@@ -169,23 +169,20 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
}, nil
}
func (ws *WsServer) Run() error {
func (ws *WsServer) Run(done chan error) error {
var (
client *Client
wg errgroup.Group
sigs = make(chan os.Signal, 1)
done = make(chan struct{}, 1)
client *Client
netErr error
shutdownDone = make(chan struct{}, 1)
)
server := http.Server{Addr: ":" + utils.IntToString(ws.port), Handler: nil}
wg.Go(func() error {
go func() {
for {
select {
case <-done:
return nil
case <-shutdownDone:
return
case client = <-ws.registerChan:
ws.registerClient(client)
case client = <-ws.unregisterChan:
@@ -194,40 +191,39 @@ func (ws *WsServer) Run() error {
ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient)
}
}
})
wg.Go(func() error {
http.HandleFunc("/", ws.wsHandler)
return server.ListenAndServe()
})
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-sigs
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// graceful exit operation for server
_ = server.Shutdown(ctx)
_ = wg.Wait()
close(done)
}()
netDone := make(chan struct{}, 1)
go func() {
http.HandleFunc("/", ws.wsHandler)
err := server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, "ws start err", server.Addr)
close(netDone)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
var err error
select {
case <-done:
return nil
case <-time.After(15 * time.Second):
return utils.Wrap1(errors.New("timeout exit"))
case err = <-done:
sErr := server.Shutdown(ctx)
if sErr != nil {
return errs.Wrap(sErr, "shutdown err")
}
close(shutdownDone)
if err != nil {
return err
}
case <-netDone:
}
return netErr
}
var concurrentRequest = 3
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
conns, err := ws.disCov.GetConns(ctx, ws.globalConfig.RpcRegisterName.OpenImMessageGatewayName)
if err != nil {
return err
}
@@ -282,7 +278,7 @@ func (ws *WsServer) registerClient(client *Client) {
log.ZDebug(client.ctx, "user exist", "userID", client.UserID, "platformID", client.PlatformID)
if clientOK {
ws.clients.Set(client.UserID, client)
// 已经有同平台的连接存在
// There is already a connection to the platform
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
ws.onlineUserConnNum.Add(1)
} else {
@@ -292,7 +288,7 @@ func (ws *WsServer) registerClient(client *Client) {
}
wg := sync.WaitGroup{}
if config.Config.Envs.Discovery == "zookeeper" {
if ws.globalConfig.Envs.Discovery == "zookeeper" {
wg.Add(1)
go func() {
defer wg.Done()
@@ -335,7 +331,7 @@ func (ws *WsServer) KickUserConn(client *Client) error {
}
func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) {
switch config.Config.MultiLoginPolicy {
switch ws.globalConfig.MultiLoginPolicy {
case constant.DefalutNotKick:
case constant.PCAndOther:
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
@@ -447,7 +443,7 @@ func (ws *WsServer) ParseWSArgs(r *http.Request) (args *WSArgs, err error) {
return nil, errs.ErrConnArgsErr.Wrap("platformID is not int")
}
v.PlatformID = platformID
if err = authverify.WsVerifyToken(v.Token, v.UserID, platformID); err != nil {
if err = authverify.WsVerifyToken(v.Token, v.UserID, ws.globalConfig.Secret, platformID); err != nil {
return nil, err
}
if query.Get(Compression) == GzipCompressionProtocol {
+5 -5
View File
@@ -19,15 +19,15 @@ import "time"
type (
Option func(opt *configs)
configs struct {
// 长连接监听端口
// Long connection listening port
port int
// 长连接允许最大链接数
// Maximum number of connections allowed for long connection
maxConnNum int64
// 连接握手超时时间
// Connection handshake timeout
handshakeTimeout time.Duration
// 允许消息最大长度
// Maximum length allowed for messages
messageMaxMsgLength int
// websocket write buffer, default: 4096, 4kb.
// Websocket write buffer, default: 4096, 4kb.
writeBufferSize int
}
)
+2 -2
View File
@@ -58,12 +58,12 @@ func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) {
func (u *UserMap) Set(key string, v *Client) {
allClients, existed := u.m.Load(key)
if existed {
log.ZDebug(context.Background(), "Set existed", "user_id", key, "client", *v)
log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID)
oldClients := allClients.([]*Client)
oldClients = append(oldClients, v)
u.m.Store(key, oldClients)
} else {
log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client", *v)
log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID)
var clients []*Client
clients = append(clients, v)
u.m.Store(key, clients)