Compare commits

...

5 Commits

7 changed files with 70 additions and 21 deletions
+6 -1
View File
@@ -63,7 +63,12 @@ services:
restart: always restart: always
sysctls: sysctls:
net.core.somaxconn: 1024 net.core.somaxconn: 1024
command: redis-server /usr/local/redis/config/redis.conf --requirepass openIM123 --appendonly yes command: >
redis-server
--requirepass openIM123
--appendonly yes
--aof-use-rdb-preamble yes
--save ""
networks: networks:
- openim - openim
+1
View File
@@ -244,6 +244,7 @@ func (s *Server) MultiTerminalLoginCheck(ctx context.Context, req *msggateway.Mu
tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx)) tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx))
client := &Client{} client := &Client{}
client.ctx = tempUserCtx client.ctx = tempUserCtx
client.token = req.Token
client.UserID = req.UserID client.UserID = req.UserID
client.PlatformID = int(req.PlatformID) client.PlatformID = int(req.PlatformID)
i := &kickHandler{ i := &kickHandler{
+38 -3
View File
@@ -337,17 +337,51 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
} }
} }
// If reconnect: When multiple msgGateway instances are deployed, a client may disconnect from instance A and reconnect to instance B.
// During this process, instance A might still be executing, resulting in two clients with the same token existing simultaneously.
// This situation needs to be filtered to prevent duplicate clients.
checkSameTokenFunc := func(oldClients []*Client) []*Client {
var clientsNeedToKick []*Client
for _, c := range oldClients {
if c.token == newClient.token {
log.ZDebug(newClient.ctx, "token is same, not kick",
"userID", newClient.UserID,
"platformID", newClient.PlatformID,
"token", newClient.token)
continue
}
clientsNeedToKick = append(clientsNeedToKick, c)
}
return clientsNeedToKick
}
switch ws.msgGatewayConfig.Share.MultiLogin.Policy { switch ws.msgGatewayConfig.Share.MultiLogin.Policy {
case constant.DefalutNotKick: case constant.DefalutNotKick:
case constant.PCAndOther: case constant.PCAndOther:
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC { if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
return return
} }
clients, ok := ws.clients.GetAll(newClient.UserID)
clientOK = ok
oldClients = make([]*Client, 0, len(clients))
for _, c := range clients {
if constant.PlatformIDToClass(c.PlatformID) == constant.TerminalPC {
continue
}
oldClients = append(oldClients, c)
}
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if !clientOK { if !clientOK {
return return
} }
oldClients = checkSameTokenFunc(oldClients)
ws.clients.DeleteClients(newClient.UserID, oldClients) ws.clients.DeleteClients(newClient.UserID, oldClients)
for _, c := range oldClients { for _, c := range oldClients {
err := c.KickOnlineMessage() err := c.KickOnlineMessage()
@@ -373,14 +407,15 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
if !ok { if !ok {
return return
} }
var (
kickClients []*Client var kickClients []*Client
)
for _, client := range clients { for _, client := range clients {
if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) { if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) {
kickClients = append(kickClients, client) kickClients = append(kickClients, client)
} }
} }
kickClients = checkSameTokenFunc(kickClients)
kickTokenFunc(kickClients) kickTokenFunc(kickClients)
} }
} }
+9 -2
View File
@@ -675,12 +675,19 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) { func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
totalMsgs := make(map[string]*sdkws.MsgData) totalMsgs := make(map[string]*sdkws.MsgData)
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
seq := seqs[conversationID] seq, ok := seqs[conversationID]
if !ok {
log.ZWarn(ctx, "seq not found for conversationID", errs.New("seq not found for conversation"), "conversationID", conversationID)
continue
}
docID := db.msgTable.GetDocID(conversationID, seq) docID := db.msgTable.GetDocID(conversationID, seq)
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil { if err != nil {
return nil, err log.ZWarn(ctx, "FindOneByDocID failed", err, "conversationID", conversationID, "docID", docID, "seq", seq)
continue
} }
index := db.msgTable.GetMsgIndex(seq) index := db.msgTable.GetMsgIndex(seq)
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg) totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
} }
+7 -6
View File
@@ -5,6 +5,11 @@ import (
"fmt" "fmt"
"time" "time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
@@ -14,10 +19,6 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/jsonutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
) )
func NewMsgMongo(db *mongo.Database) (database.Msg, error) { func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
@@ -1149,7 +1150,7 @@ func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
for i := len(res) - 1; i > 0; i-- { for i := len(res) - 1; i >= 0; i-- {
v := res[i] v := res[i]
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 { if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
@@ -1164,7 +1165,7 @@ func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string,
limit := int64(-1) limit := int64(-1)
if first { if first {
first = false first = false
limit = m.model.GetMsgIndex(seq) limit = m.model.GetLimitForSingleDoc(seq)
} }
docID := m.model.BuildDocIDByIndex(conversationID, i) docID := m.model.BuildDocIDByIndex(conversationID, i)
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit) msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
+8 -8
View File
@@ -120,15 +120,11 @@ func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 { func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64) t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ { for _, seq := range seqs {
docID := m.GetDocID(conversationID, seqs[i]) docID := m.GetDocID(conversationID, seq)
if value, ok := t[docID]; !ok { t[docID] = append(t[docID], seq)
var temp []int64
t[docID] = append(temp, seqs[i])
} else {
t[docID] = append(value, seqs[i])
}
} }
return t return t
} }
@@ -136,6 +132,10 @@ func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
return (seq - 1) % singleGocMsgNum return (seq - 1) % singleGocMsgNum
} }
func (*MsgDocModel) GetLimitForSingleDoc(seq int64) int64 {
return seq % singleGocMsgNum
}
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10) return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
} }
+1 -1
View File
@@ -1 +1 @@
v3.8.3 v3.8.3-patch.9