Merge remote-tracking branch 'origin/main' into js-server

This commit is contained in:
withchao
2024-10-24 17:44:20 +08:00
135 changed files with 5665 additions and 3657 deletions
+6 -1
View File
@@ -22,6 +22,8 @@ import (
"sync/atomic"
"time"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
@@ -30,7 +32,6 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
)
var (
@@ -220,6 +221,10 @@ func (c *Client) handleMessage(message []byte) error {
resp, messageErr = c.longConnServer.SendSignalMessage(ctx, binaryReq)
case WSPullMsgBySeqList:
resp, messageErr = c.longConnServer.PullMessageBySeqList(ctx, binaryReq)
case WSPullMsg:
resp, messageErr = c.longConnServer.GetSeqMessage(ctx, binaryReq)
case WSGetConvMaxReadSeq:
resp, messageErr = c.longConnServer.GetConversationsHasReadAndMaxSeq(ctx, binaryReq)
case WsLogoutMsg:
resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq)
case WsSetBackgroundStatus:
+2
View File
@@ -39,6 +39,8 @@ const (
WSPullMsgBySeqList = 1002
WSSendMsg = 1003
WSSendSignalMsg = 1004
WSPullMsg = 1005
WSGetConvMaxReadSeq = 1006
WSPushMsg = 2001
WSKickOnlineMsg = 2002
WsLogoutMsg = 2003
+5 -1
View File
@@ -66,12 +66,16 @@ func (c *UserConnContext) Value(key any) any {
}
func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnContext {
remoteAddr := req.RemoteAddr
if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != "" {
remoteAddr += "_" + forwarded
}
return &UserConnContext{
RespWriter: respWriter,
Req: req,
Path: req.URL.Path,
Method: req.Method,
RemoteAddr: req.RemoteAddr,
RemoteAddr: remoteAddr,
ConnID: encrypt.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(timeutil.GetCurrentTimestampByMill()))),
}
}
+5 -5
View File
@@ -16,6 +16,8 @@ package msggateway
import (
"context"
"sync/atomic"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
@@ -30,7 +32,6 @@ import (
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc"
"sync/atomic"
)
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -111,15 +112,14 @@ func (s *Server) GetUsersOnlineStatus(
}
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(client.PlatformID)
ps.Status = constant.OnlineStatus
ps.PlatformID = int32(client.PlatformID)
ps.ConnID = client.ctx.GetConnID()
ps.Token = client.token
ps.IsBackground = client.IsBackground
uresp.Status = constant.OnlineStatus
uresp.Status = constant.Online
uresp.DetailPlatformStatus = append(uresp.DetailPlatformStatus, ps)
}
if uresp.Status == constant.OnlineStatus {
if uresp.Status == constant.Online {
resp.SuccessResult = append(resp.SuccessResult, uresp)
}
}
+1 -1
View File
@@ -58,7 +58,7 @@ func Start(ctx context.Context, index int, conf *Config) error {
)
hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error {
longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, longServer.subscriberUserOnlineStatusChanges)
longServer.online, _ = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges)
return nil
})
+43 -2
View File
@@ -19,6 +19,8 @@ import (
"sync"
"github.com/go-playground/validator/v10"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/msg"
@@ -27,7 +29,6 @@ import (
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/jsonutil"
"google.golang.org/protobuf/proto"
)
type Req struct {
@@ -94,6 +95,8 @@ type MessageHandler interface {
SendMessage(context context.Context, data *Req) ([]byte, error)
SendSignalMessage(context context.Context, data *Req) ([]byte, error)
PullMessageBySeqList(context context.Context, data *Req) ([]byte, error)
GetConversationsHasReadAndMaxSeq(context context.Context, data *Req) ([]byte, error)
GetSeqMessage(context context.Context, data *Req) ([]byte, error)
UserLogout(context context.Context, data *Req) ([]byte, error)
SetUserDeviceBackground(context context.Context, data *Req) ([]byte, bool, error)
}
@@ -175,7 +178,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) {
req := sdkws.PullMessageBySeqsReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "PullMessageBySeqsReq")
return nil, errs.WrapMsg(err, "err proto unmarshal", "action", "unmarshal", "dataType", "PullMessageBySeqsReq")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq")
@@ -191,6 +194,44 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
return c, nil
}
func (g GrpcHandler) GetConversationsHasReadAndMaxSeq(context context.Context, data *Req) ([]byte, error) {
req := msg.GetConversationsHasReadAndMaxSeqReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "err proto unmarshal", "action", "unmarshal", "dataType", "GetConversationsHasReadAndMaxSeq")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "GetConversationsHasReadAndMaxSeq")
}
resp, err := g.msgRpcClient.GetConversationsHasReadAndMaxSeq(context, &req)
if err != nil {
return nil, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "GetConversationsHasReadAndMaxSeq")
}
return c, nil
}
func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, error) {
req := msg.GetSeqMessageReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "GetSeqMessage")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "GetSeqMessage")
}
resp, err := g.msgRpcClient.GetSeqMessage(context, &req)
if err != nil {
return nil, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "GetSeqMessage")
}
return c, nil
}
func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) {
req := push.DelUserPushTokenReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
+13
View File
@@ -90,6 +90,19 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
if _, err := ws.userClient.Client.SetUserOnlineStatus(ctx, req); err != nil {
log.ZError(ctx, "update user online status", err)
}
for _, ss := range req.Status {
for _, online := range ss.Online {
client, _, _ := ws.clients.Get(ss.UserID, int(online))
back := false
if len(client) > 0 {
back = client[0].IsBackground
}
ws.webhookAfterUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOnline, ss.UserID, int(online), back, ss.ConnID)
}
for _, offline := range ss.Offline {
ws.webhookAfterUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOffline, ss.UserID, int(offline), ss.ConnID)
}
}
}
for i := 0; i < concurrent; i++ {
+58 -19
View File
@@ -1,17 +1,3 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msggateway
import (
@@ -212,7 +198,6 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
if err != nil {
return err
}
wg := errgroup.Group{}
wg.SetLimit(concurrentRequest)
@@ -265,7 +250,7 @@ func (ws *WsServer) registerClient(client *Client) {
if clientOK {
ws.clients.Set(client.UserID, client)
// There is already a connection to the platform
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID",
log.ZDebug(client.ctx, "repeat login", "userID", client.UserID, "platformID",
client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
ws.onlineUserConnNum.Add(1)
} else {
@@ -293,7 +278,7 @@ func (ws *WsServer) registerClient(client *Client) {
wg.Wait()
log.ZInfo(
log.ZDebug(
client.ctx,
"user online",
"online user Num",
@@ -321,8 +306,32 @@ func (ws *WsServer) KickUserConn(client *Client) error {
}
func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) {
switch ws.msgGatewayConfig.MsgGateway.MultiLoginPolicy {
kickTokenFunc := func(kickClients []*Client) {
var kickTokens []string
ws.clients.DeleteClients(newClient.UserID, kickClients)
for _, c := range kickClients {
kickTokens = append(kickTokens, c.token)
err := c.KickOnlineMessage()
if err != nil {
log.ZWarn(c.ctx, "KickOnlineMessage", err)
}
}
ctx := mcontext.WithMustInfoCtx(
[]string{newClient.ctx.GetOperationID(), newClient.ctx.GetUserID(),
constant.PlatformIDToName(newClient.PlatformID), newClient.ctx.GetConnID()},
)
if _, err := ws.authClient.KickTokens(ctx, kickTokens); err != nil {
log.ZWarn(newClient.ctx, "kickTokens err", err)
}
}
switch ws.msgGatewayConfig.Share.MultiLogin.Policy {
case constant.DefalutNotKick:
case constant.WebAndOther:
if constant.PlatformIDToClass(newClient.PlatformID) == constant.WebPlatformStr {
return
}
fallthrough
case constant.PCAndOther:
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
return
@@ -347,6 +356,35 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
log.ZWarn(newClient.ctx, "InvalidateToken err", err, "userID", newClient.UserID,
"platformID", newClient.PlatformID)
}
case constant.PcMobileAndWeb:
clients, ok := ws.clients.GetAll(newClient.UserID)
if !ok {
return
}
var (
kickClients []*Client
)
for _, client := range clients {
if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) {
kickClients = append(kickClients, client)
}
}
kickTokenFunc(kickClients)
case constant.SingleTerminalLogin:
clients, ok := ws.clients.GetAll(newClient.UserID)
if !ok {
return
}
var (
kickClients []*Client
)
for _, client := range clients {
kickClients = append(kickClients, client)
}
kickTokenFunc(kickClients)
case constant.Customize:
// todo
}
}
@@ -360,7 +398,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
ws.onlineUserConnNum.Add(-1)
ws.subscription.DelClient(client)
//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
log.ZDebug(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
ws.onlineUserNum.Load(), "online user conn Num",
ws.onlineUserConnNum.Load(),
)
@@ -425,6 +463,7 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
return
}
log.ZDebug(connContext, "new conn", "token", connContext.GetToken())
// Create a WebSocket long connection object
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
if err := wsLongConn.GenerateLongConn(w, r); err != nil {