mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-07 02:26:00 +08:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 391eafb402 |
+1
-6
@@ -63,12 +63,7 @@ services:
|
|||||||
restart: always
|
restart: always
|
||||||
sysctls:
|
sysctls:
|
||||||
net.core.somaxconn: 1024
|
net.core.somaxconn: 1024
|
||||||
command: >
|
command: redis-server /usr/local/redis/config/redis.conf --requirepass openIM123 --appendonly yes
|
||||||
redis-server
|
|
||||||
--requirepass openIM123
|
|
||||||
--appendonly yes
|
|
||||||
--aof-use-rdb-preamble yes
|
|
||||||
--save ""
|
|
||||||
networks:
|
networks:
|
||||||
- openim
|
- openim
|
||||||
|
|
||||||
|
|||||||
@@ -244,7 +244,6 @@ func (s *Server) MultiTerminalLoginCheck(ctx context.Context, req *msggateway.Mu
|
|||||||
tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx))
|
tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx))
|
||||||
client := &Client{}
|
client := &Client{}
|
||||||
client.ctx = tempUserCtx
|
client.ctx = tempUserCtx
|
||||||
client.token = req.Token
|
|
||||||
client.UserID = req.UserID
|
client.UserID = req.UserID
|
||||||
client.PlatformID = int(req.PlatformID)
|
client.PlatformID = int(req.PlatformID)
|
||||||
i := &kickHandler{
|
i := &kickHandler{
|
||||||
|
|||||||
@@ -337,51 +337,17 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If reconnect: When multiple msgGateway instances are deployed, a client may disconnect from instance A and reconnect to instance B.
|
|
||||||
// During this process, instance A might still be executing, resulting in two clients with the same token existing simultaneously.
|
|
||||||
// This situation needs to be filtered to prevent duplicate clients.
|
|
||||||
checkSameTokenFunc := func(oldClients []*Client) []*Client {
|
|
||||||
var clientsNeedToKick []*Client
|
|
||||||
|
|
||||||
for _, c := range oldClients {
|
|
||||||
if c.token == newClient.token {
|
|
||||||
log.ZDebug(newClient.ctx, "token is same, not kick",
|
|
||||||
"userID", newClient.UserID,
|
|
||||||
"platformID", newClient.PlatformID,
|
|
||||||
"token", newClient.token)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
clientsNeedToKick = append(clientsNeedToKick, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
return clientsNeedToKick
|
|
||||||
}
|
|
||||||
|
|
||||||
switch ws.msgGatewayConfig.Share.MultiLogin.Policy {
|
switch ws.msgGatewayConfig.Share.MultiLogin.Policy {
|
||||||
case constant.DefalutNotKick:
|
case constant.DefalutNotKick:
|
||||||
case constant.PCAndOther:
|
case constant.PCAndOther:
|
||||||
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
|
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
clients, ok := ws.clients.GetAll(newClient.UserID)
|
|
||||||
clientOK = ok
|
|
||||||
oldClients = make([]*Client, 0, len(clients))
|
|
||||||
for _, c := range clients {
|
|
||||||
if constant.PlatformIDToClass(c.PlatformID) == constant.TerminalPC {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
oldClients = append(oldClients, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
fallthrough
|
fallthrough
|
||||||
case constant.AllLoginButSameTermKick:
|
case constant.AllLoginButSameTermKick:
|
||||||
if !clientOK {
|
if !clientOK {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
oldClients = checkSameTokenFunc(oldClients)
|
|
||||||
|
|
||||||
ws.clients.DeleteClients(newClient.UserID, oldClients)
|
ws.clients.DeleteClients(newClient.UserID, oldClients)
|
||||||
for _, c := range oldClients {
|
for _, c := range oldClients {
|
||||||
err := c.KickOnlineMessage()
|
err := c.KickOnlineMessage()
|
||||||
@@ -407,15 +373,14 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
|
|||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
var (
|
||||||
var kickClients []*Client
|
kickClients []*Client
|
||||||
|
)
|
||||||
for _, client := range clients {
|
for _, client := range clients {
|
||||||
if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) {
|
if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) {
|
||||||
kickClients = append(kickClients, client)
|
kickClients = append(kickClients, client)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
kickClients = checkSameTokenFunc(kickClients)
|
|
||||||
|
|
||||||
kickTokenFunc(kickClients)
|
kickTokenFunc(kickClients)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -675,19 +675,12 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
|
|||||||
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
|
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
|
||||||
totalMsgs := make(map[string]*sdkws.MsgData)
|
totalMsgs := make(map[string]*sdkws.MsgData)
|
||||||
for _, conversationID := range conversationIDs {
|
for _, conversationID := range conversationIDs {
|
||||||
seq, ok := seqs[conversationID]
|
seq := seqs[conversationID]
|
||||||
if !ok {
|
|
||||||
log.ZWarn(ctx, "seq not found for conversationID", errs.New("seq not found for conversation"), "conversationID", conversationID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
docID := db.msgTable.GetDocID(conversationID, seq)
|
docID := db.msgTable.GetDocID(conversationID, seq)
|
||||||
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
|
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "FindOneByDocID failed", err, "conversationID", conversationID, "docID", docID, "seq", seq)
|
return nil, err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
index := db.msgTable.GetMsgIndex(seq)
|
index := db.msgTable.GetMsgIndex(seq)
|
||||||
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
|
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,11 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
@@ -19,6 +14,10 @@ import (
|
|||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"github.com/openimsdk/tools/utils/jsonutil"
|
"github.com/openimsdk/tools/utils/jsonutil"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
|
func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
|
||||||
@@ -1150,7 +1149,7 @@ func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
for i := len(res) - 1; i >= 0; i-- {
|
for i := len(res) - 1; i > 0; i-- {
|
||||||
v := res[i]
|
v := res[i]
|
||||||
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
|
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
|
||||||
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
|
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
|
||||||
@@ -1165,7 +1164,7 @@ func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string,
|
|||||||
limit := int64(-1)
|
limit := int64(-1)
|
||||||
if first {
|
if first {
|
||||||
first = false
|
first = false
|
||||||
limit = m.model.GetLimitForSingleDoc(seq)
|
limit = m.model.GetMsgIndex(seq)
|
||||||
}
|
}
|
||||||
docID := m.model.BuildDocIDByIndex(conversationID, i)
|
docID := m.model.BuildDocIDByIndex(conversationID, i)
|
||||||
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
|
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
|
||||||
|
|||||||
@@ -120,11 +120,15 @@ func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
|||||||
|
|
||||||
func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
|
func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
|
||||||
t := make(map[string][]int64)
|
t := make(map[string][]int64)
|
||||||
for _, seq := range seqs {
|
for i := 0; i < len(seqs); i++ {
|
||||||
docID := m.GetDocID(conversationID, seq)
|
docID := m.GetDocID(conversationID, seqs[i])
|
||||||
t[docID] = append(t[docID], seq)
|
if value, ok := t[docID]; !ok {
|
||||||
|
var temp []int64
|
||||||
|
t[docID] = append(temp, seqs[i])
|
||||||
|
} else {
|
||||||
|
t[docID] = append(value, seqs[i])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,10 +136,6 @@ func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
|
|||||||
return (seq - 1) % singleGocMsgNum
|
return (seq - 1) % singleGocMsgNum
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*MsgDocModel) GetLimitForSingleDoc(seq int64) int64 {
|
|
||||||
return seq % singleGocMsgNum
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||||
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-1
@@ -1 +1 @@
|
|||||||
v3.8.3-patch.9
|
v3.8.3-patch.7
|
||||||
Reference in New Issue
Block a user