Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao
2023-06-14 15:31:50 +08:00
13 changed files with 677 additions and 385 deletions
+4
View File
@@ -139,6 +139,10 @@ func (m *Message) GetConversationsHasReadAndMaxSeq(c *gin.Context) {
a2r.Call(msg.MsgClient.GetConversationsHasReadAndMaxSeq, m.client, c)
}
func (m *Message) SetConversationHasReadSeq(c *gin.Context) {
a2r.Call(msg.MsgClient.SetConversationHasReadSeq, m.client, c)
}
func (m *Message) ClearConversationsMsg(c *gin.Context) {
a2r.Call(msg.MsgClient.ClearConversationsMsg, m.client, c)
}
+1
View File
@@ -143,6 +143,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
msgGroup.POST("/mark_msgs_as_read", m.MarkMsgsAsRead)
msgGroup.POST("/mark_conversation_as_read", m.MarkConversationAsRead)
msgGroup.POST("/get_conversations_has_read_and_max_seq", m.GetConversationsHasReadAndMaxSeq)
msgGroup.POST("/set_conversation_has_read_seq", m.SetConversationHasReadSeq)
msgGroup.POST("/clear_conversation_msg", m.ClearConversationsMsg)
msgGroup.POST("/user_clear_all_msg", m.UserClearAllMsg)
+3
View File
@@ -91,6 +91,9 @@ func (c *UserConnContext) GetPlatformID() string {
func (c *UserConnContext) GetOperationID() string {
return c.Req.URL.Query().Get(OperationID)
}
func (c *UserConnContext) GetToken() string {
return c.Req.URL.Query().Get(Token)
}
func (c *UserConnContext) GetBackground() bool {
b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus))
if err != nil {
+31 -5
View File
@@ -13,6 +13,7 @@ import (
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
redis "github.com/go-redis/redis/v8"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
@@ -141,9 +142,9 @@ func (ws *WsServer) registerClient(client *Client) {
clientOK bool
oldClients []*Client
)
ws.clients.Set(client.UserID, client)
oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID)
if !userOK {
ws.clients.Set(client.UserID, client)
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
atomic.AddInt64(&ws.onlineUserNum, 1)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
@@ -156,10 +157,14 @@ func (ws *WsServer) registerClient(client *Client) {
}
ws.kickHandlerChan <- i
log.ZDebug(client.ctx, "user exist", "userID", client.UserID, "platformID", client.PlatformID)
if clientOK { //已经有同平台的连接存在
if clientOK {
ws.clients.Set(client.UserID, client)
//已经有同平台的连接存在
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
atomic.AddInt64(&ws.onlineUserConnNum, 1)
} else {
ws.clients.Set(client.UserID, client)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
}
}
@@ -187,13 +192,35 @@ func (ws *WsServer) multiTerminalLoginChecker(info *kickHandler) {
fallthrough
case constant.AllLoginButSameTermKick:
if info.clientOK {
ws.clients.deleteClients(info.newClient.UserID, info.oldClients)
for _, c := range info.oldClients {
err := c.KickOnlineMessage()
if err != nil {
log.ZError(c.ctx, "KickOnlineMessage", err)
log.ZWarn(c.ctx, "KickOnlineMessage", err)
}
}
ws.cache.GetTokensWithoutError(info.newClient.ctx, info.newClient.UserID, info.newClient.PlatformID)
m, err := ws.cache.GetTokensWithoutError(info.newClient.ctx, info.newClient.UserID, info.newClient.PlatformID)
if err != nil && err != redis.Nil {
log.ZWarn(info.newClient.ctx, "get token from redis err", err, "userID", info.newClient.UserID, "platformID", info.newClient.PlatformID)
return
}
if m == nil {
log.ZWarn(info.newClient.ctx, "m is nil", errors.New("m is nil"), "userID", info.newClient.UserID, "platformID", info.newClient.PlatformID)
return
}
log.ZDebug(info.newClient.ctx, "get token from redis", "userID", info.newClient.UserID, "platformID", info.newClient.PlatformID, "tokenMap", m)
for k, _ := range m {
if k != info.newClient.ctx.GetToken() {
m[k] = constant.KickedToken
}
}
log.ZDebug(info.newClient.ctx, "set token map is ", "token map", m, "userID", info.newClient.UserID)
err = ws.cache.SetTokenMapByUidPid(info.newClient.ctx, info.newClient.UserID, info.newClient.PlatformID, m)
if err != nil {
log.ZWarn(info.newClient.ctx, "SetTokenMapByUidPid err", err, "userID", info.newClient.UserID, "platformID", info.newClient.PlatformID)
return
}
}
}
@@ -209,7 +236,6 @@ func (ws *WsServer) unregisterClient(client *Client) {
}
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
defer log.ZInfo(context.Background(), "wsHandler", "remote addr", "url", r.URL.String())
connContext := newContext(w, r)
if ws.onlineUserConnNum >= ws.wsMaxConnNum {
httpError(connContext, errs.ErrConnOverMaxNumLimit)
+24
View File
@@ -3,6 +3,7 @@ package msggateway
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"sync"
)
@@ -71,6 +72,29 @@ func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool)
}
return existed
}
func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser bool) {
m := utils.SliceToMapAny(clients, func(c *Client) (string, struct{}) {
return c.ctx.GetRemoteAddr(), struct{}{}
})
allClients, existed := u.m.Load(key)
if existed {
oldClients := allClients.([]*Client)
var a []*Client
for _, client := range oldClients {
if _, ok := m[client.ctx.GetRemoteAddr()]; !ok {
a = append(a, client)
}
}
if len(a) == 0 {
u.m.Delete(key)
return true
} else {
u.m.Store(key, a)
return false
}
}
return existed
}
func (u *UserMap) DeleteAll(key string) {
u.m.Delete(key)
}
+27 -9
View File
@@ -35,6 +35,23 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m
return resp, nil
}
func (m *msgServer) SetConversationHasReadMaxSeq(ctx context.Context, req *msg.SetConversationHasReadSeqReq) (resp *msg.SetConversationHasReadSeqResp, err error) {
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
if err != nil {
return
}
if req.HasReadSeq > maxSeq {
return nil, errs.ErrArgs.Wrap("hasReadSeq must not be bigger than maxSeq")
}
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq); err != nil {
return
}
return &msg.SetConversationHasReadSeqResp{}, nil
}
func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (resp *msg.MarkMsgsAsReadResp, err error) {
if len(req.Seqs) < 1 {
return nil, errs.ErrArgs.Wrap("seqs must not be empty")
@@ -51,8 +68,7 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR
if err != nil {
return
}
err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, req.Seqs)
if err != nil {
if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil {
return
}
currentHasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID)
@@ -72,6 +88,10 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR
}
func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkConversationAsReadReq) (resp *msg.MarkConversationAsReadResp, err error) {
conversations, err := m.Conversation.GetConversationsByConversationID(ctx, []string{req.ConversationID})
if err != nil {
return
}
hasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID)
if err != nil && errors.Unwrap(err) != redis.Nil {
return
@@ -81,13 +101,11 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ {
seqs = append(seqs, i)
}
conversations, err := m.Conversation.GetConversationsByConversationID(ctx, []string{req.ConversationID})
if err != nil {
return
}
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID)
if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil {
return
if len(seqs) > 0 {
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID)
if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil {
return
}
}
if req.HasReadSeq > hasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
+4
View File
@@ -101,6 +101,7 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
}
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
defer log.ZDebug(ctx, "clearConversation return line")
conversations, err := m.Conversation.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
return err
@@ -135,5 +136,8 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str
m.notificationSender.NotificationWithSesstionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips)
}
}
if err := m.MsgDatabase.UserSetHasReadSeqs(ctx, userID, maxSeqs); err != nil {
return err
}
return nil
}
+5
View File
@@ -34,6 +34,11 @@ type msgServer struct {
notificationSender *rpcclient.NotificationSender
}
func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetConversationHasReadSeqReq) (*msg.SetConversationHasReadSeqResp, error) {
//TODO implement me
panic("implement me")
}
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
m.Handlers = append(m.Handlers, interceptorFunc...)
}