Files
open-im-server/internal/rpc/msg/as_read.go
T

230 lines
8.2 KiB
Go
Raw Normal View History

2023-07-04 11:15:20 +08:00
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2023-06-30 09:45:02 +08:00
package msg
import (
"context"
2024-11-22 15:49:16 +08:00
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
2024-03-04 21:32:07 +08:00
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
2024-04-19 22:23:08 +08:00
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
2024-03-05 10:51:55 +08:00
"github.com/redis/go-redis/v9"
2023-06-30 09:45:02 +08:00
)
2024-04-19 22:23:08 +08:00
func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (*msg.GetConversationsHasReadAndMaxSeqResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
var conversationIDs []string
if len(req.ConversationIDs) == 0 {
2024-04-19 22:23:08 +08:00
var err error
conversationIDs, err = m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
} else {
conversationIDs = req.ConversationIDs
2023-06-30 09:45:02 +08:00
}
2024-04-19 22:23:08 +08:00
2023-06-30 09:45:02 +08:00
hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, conversationIDs)
if err != nil {
return nil, err
}
2024-04-19 22:23:08 +08:00
conversations, err := m.ConversationLocalCache.GetConversations(ctx, req.UserID, conversationIDs)
2023-06-30 09:45:02 +08:00
if err != nil {
return nil, err
}
2024-04-19 22:23:08 +08:00
conversationMaxSeqMap := make(map[string]int64)
2023-06-30 09:45:02 +08:00
for _, conversation := range conversations {
if conversation.MaxSeq != 0 {
conversationMaxSeqMap[conversation.ConversationID] = conversation.MaxSeq
}
}
maxSeqs, err := m.MsgDatabase.GetMaxSeqsWithTime(ctx, conversationIDs)
2023-06-30 09:45:02 +08:00
if err != nil {
return nil, err
}
2024-04-19 22:23:08 +08:00
resp := &msg.GetConversationsHasReadAndMaxSeqResp{Seqs: make(map[string]*msg.Seqs)}
if req.ReturnPinned {
pinnedConversationIDs, err := m.ConversationLocalCache.GetPinnedConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
resp.PinnedConversationIDs = pinnedConversationIDs
}
2024-04-19 22:23:08 +08:00
for conversationID, maxSeq := range maxSeqs {
resp.Seqs[conversationID] = &msg.Seqs{
HasReadSeq: hasReadSeqs[conversationID],
MaxSeq: maxSeq.Seq,
MaxSeqTime: maxSeq.Time,
2023-06-30 09:45:02 +08:00
}
2024-04-19 22:23:08 +08:00
if v, ok := conversationMaxSeqMap[conversationID]; ok {
resp.Seqs[conversationID].MaxSeq = v
2023-06-30 09:45:02 +08:00
}
}
return resp, nil
}
2024-04-19 22:23:08 +08:00
func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetConversationHasReadSeqReq) (*msg.SetConversationHasReadSeqResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
2023-06-30 09:45:02 +08:00
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
if err != nil {
2024-04-19 22:23:08 +08:00
return nil, err
2023-06-30 09:45:02 +08:00
}
if req.HasReadSeq > maxSeq {
2024-04-19 22:23:08 +08:00
return nil, errs.ErrArgs.WrapMsg("hasReadSeq must not be bigger than maxSeq")
2023-06-30 09:45:02 +08:00
}
2024-03-04 11:18:38 +08:00
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
2023-06-30 09:45:02 +08:00
return nil, err
}
2024-04-19 22:23:08 +08:00
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq)
2023-06-30 09:45:02 +08:00
return &msg.SetConversationHasReadSeqResp{}, nil
}
2024-04-19 22:23:08 +08:00
func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (*msg.MarkMsgsAsReadResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
2023-06-30 09:45:02 +08:00
}
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
if err != nil {
2024-04-19 22:23:08 +08:00
return nil, err
2023-06-30 09:45:02 +08:00
}
hasReadSeq := req.Seqs[len(req.Seqs)-1]
if hasReadSeq > maxSeq {
2024-04-19 22:23:08 +08:00
return nil, errs.ErrArgs.WrapMsg("hasReadSeq must not be bigger than maxSeq")
2023-06-30 09:45:02 +08:00
}
conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID)
2023-06-30 09:45:02 +08:00
if err != nil {
2024-04-19 22:23:08 +08:00
return nil, err
2023-06-30 09:45:02 +08:00
}
2024-04-19 22:23:08 +08:00
if err := m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil {
return nil, err
2023-06-30 09:45:02 +08:00
}
currentHasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID)
2024-11-22 17:46:02 +08:00
if err != nil && !errors.Is(err, redis.Nil) {
2024-04-19 22:23:08 +08:00
return nil, err
2023-06-30 09:45:02 +08:00
}
if hasReadSeq > currentHasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, hasReadSeq)
if err != nil {
2024-04-19 22:23:08 +08:00
return nil, err
2023-06-30 09:45:02 +08:00
}
}
2024-04-19 22:23:08 +08:00
reqCallback := &cbapi.CallbackSingleMsgReadReq{
ConversationID: conversation.ConversationID,
UserID: req.UserID,
Seqs: req.Seqs,
ContentType: conversation.ConversationType,
}
2024-04-19 22:23:08 +08:00
m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCallback)
m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq)
2023-06-30 09:45:02 +08:00
return &msg.MarkMsgsAsReadResp{}, nil
}
2024-04-19 22:23:08 +08:00
func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkConversationAsReadReq) (*msg.MarkConversationAsReadResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID)
2023-06-30 09:45:02 +08:00
if err != nil {
return nil, err
2023-06-30 09:45:02 +08:00
}
hasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID)
2024-11-29 15:50:22 +08:00
if err != nil && !errors.Is(err, redis.Nil) {
return nil, err
2023-06-30 09:45:02 +08:00
}
2023-12-11 12:18:55 +08:00
var seqs []int64
2024-04-19 22:23:08 +08:00
log.ZDebug(ctx, "MarkConversationAsRead", "hasReadSeq", hasReadSeq, "req.HasReadSeq", req.HasReadSeq)
2023-12-11 12:18:55 +08:00
if conversation.ConversationType == constant.SingleChatType {
for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ {
seqs = append(seqs, i)
}
2024-04-19 22:23:08 +08:00
// avoid client missed call MarkConversationMessageAsRead by order
2023-12-11 12:18:55 +08:00
for _, val := range req.Seqs {
2024-04-19 22:23:08 +08:00
if !datautil.Contain(val, seqs...) {
2023-12-11 12:18:55 +08:00
seqs = append(seqs, val)
}
}
if len(seqs) > 0 {
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID)
if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil {
return nil, err
}
}
if req.HasReadSeq > hasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
if err != nil {
return nil, err
}
hasReadSeq = req.HasReadSeq
}
2024-04-19 22:23:08 +08:00
m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq)
} else if conversation.ConversationType == constant.ReadGroupChatType ||
2023-12-11 12:18:55 +08:00
conversation.ConversationType == constant.NotificationChatType {
if req.HasReadSeq > hasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
if err != nil {
return nil, err
}
hasReadSeq = req.HasReadSeq
}
2024-04-19 22:23:08 +08:00
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,
req.UserID, seqs, hasReadSeq)
}
2023-12-11 12:18:55 +08:00
2024-11-25 14:11:34 +08:00
if conversation.ConversationType == constant.SingleChatType {
reqCall := &cbapi.CallbackSingleMsgReadReq{
ConversationID: conversation.ConversationID,
UserID: conversation.OwnerUserID,
Seqs: req.Seqs,
ContentType: conversation.ConversationType,
}
m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCall)
} else if conversation.ConversationType == constant.ReadGroupChatType {
reqCall := &cbapi.CallbackGroupMsgReadReq{
SendID: conversation.OwnerUserID,
ReceiveID: req.UserID,
UnreadMsgNum: req.HasReadSeq,
ContentType: int64(conversation.ConversationType),
}
m.webhookAfterGroupMsgRead(ctx, &m.config.WebhooksConfig.AfterGroupMsgRead, reqCall)
2023-11-28 15:26:46 +08:00
}
2023-12-11 12:18:55 +08:00
return &msg.MarkConversationAsReadResp{}, nil
2023-06-30 09:45:02 +08:00
}
2024-04-19 22:23:08 +08:00
func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) {
2023-06-30 09:45:02 +08:00
tips := &sdkws.MarkAsReadTips{
MarkAsReadUserID: sendID,
ConversationID: conversationID,
Seqs: seqs,
HasReadSeq: hasReadSeq,
}
2024-04-19 22:23:08 +08:00
m.notificationSender.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
}