mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-15 14:35:57 +08:00
feat: implement scheduled destruct msgs feature in cron task. (#2466)
* update protocol in go mod. * add debug log in writePongMsg. * update log level. * add Warn log in writePongMsg. * add debug log. * feat: update webhookBeforeMemberJoinGroup to batch method. * feat: update version field implement. * update webhook implement contents. * update method field and contents. * update callbackCommand field. * fix: add correct fields. * update struct tags. * refactor: rename friend module to relation. * feat: implement scheduled destruct msgs feature in cron task. * update log contents. * update func name and comments. * update waitgroup to errgroup. * update errgroup wait. * remove unnecessary contents. * update clearMsg logic.
This commit is contained in:
@@ -16,13 +16,16 @@ package conversation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
||||
"github.com/openimsdk/tools/db/redisutil"
|
||||
"sort"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||
@@ -40,10 +43,11 @@ import (
|
||||
)
|
||||
|
||||
type conversationServer struct {
|
||||
msgRpcClient *rpcclient.MessageRpcClient
|
||||
user *rpcclient.UserRpcClient
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
conversationDatabase controller.ConversationDatabase
|
||||
msgRpcClient *rpcclient.MessageRpcClient
|
||||
user *rpcclient.UserRpcClient
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
conversationDatabase controller.ConversationDatabase
|
||||
|
||||
conversationNotificationSender *ConversationNotificationSender
|
||||
config *Config
|
||||
}
|
||||
@@ -204,11 +208,11 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s
|
||||
}
|
||||
|
||||
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
|
||||
var conversation tablerelation.Conversation
|
||||
var conversation dbModel.Conversation
|
||||
if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.Conversation{&conversation})
|
||||
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*dbModel.Conversation{&conversation})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -232,7 +236,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
||||
}
|
||||
}
|
||||
var unequal int
|
||||
var conv tablerelation.Conversation
|
||||
var conv dbModel.Conversation
|
||||
if len(req.UserIDs) == 1 {
|
||||
cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID})
|
||||
if err != nil {
|
||||
@@ -243,7 +247,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
||||
}
|
||||
conv = *cs[0]
|
||||
}
|
||||
var conversation tablerelation.Conversation
|
||||
var conversation dbModel.Conversation
|
||||
conversation.ConversationID = req.Conversation.ConversationID
|
||||
conversation.ConversationType = req.Conversation.ConversationType
|
||||
conversation.UserID = req.Conversation.UserID
|
||||
@@ -292,7 +296,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
||||
}
|
||||
}
|
||||
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
|
||||
var conversations []*tablerelation.Conversation
|
||||
var conversations []*dbModel.Conversation
|
||||
for _, ownerUserID := range req.UserIDs {
|
||||
conversation2 := conversation
|
||||
conversation2.OwnerUserID = ownerUserID
|
||||
@@ -340,12 +344,12 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
|
||||
) (*pbconversation.CreateSingleChatConversationsResp, error) {
|
||||
switch req.ConversationType {
|
||||
case constant.SingleChatType:
|
||||
var conversation tablerelation.Conversation
|
||||
var conversation dbModel.Conversation
|
||||
conversation.ConversationID = req.ConversationID
|
||||
conversation.ConversationType = req.ConversationType
|
||||
conversation.OwnerUserID = req.SendID
|
||||
conversation.UserID = req.RecvID
|
||||
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
|
||||
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
|
||||
}
|
||||
@@ -353,17 +357,17 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
|
||||
conversation2 := conversation
|
||||
conversation2.OwnerUserID = req.RecvID
|
||||
conversation2.UserID = req.SendID
|
||||
err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation2})
|
||||
err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
|
||||
}
|
||||
case constant.NotificationChatType:
|
||||
var conversation tablerelation.Conversation
|
||||
var conversation dbModel.Conversation
|
||||
conversation.ConversationID = req.ConversationID
|
||||
conversation.ConversationType = req.ConversationType
|
||||
conversation.OwnerUserID = req.RecvID
|
||||
conversation.UserID = req.SendID
|
||||
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
|
||||
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
|
||||
}
|
||||
@@ -584,6 +588,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv
|
||||
if req.MaxSeq != nil {
|
||||
m["max_seq"] = req.MaxSeq.Value
|
||||
}
|
||||
if req.LatestMsgDestructTime != nil {
|
||||
m["latest_msg_destruct_time"] = time.UnixMilli(req.LatestMsgDestructTime.Value)
|
||||
}
|
||||
if len(m) > 0 {
|
||||
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil {
|
||||
return nil, err
|
||||
@@ -602,3 +609,53 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
|
||||
Conversations: convert.ConversationsDB2Pb(conversations),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) {
|
||||
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
|
||||
return nil, err
|
||||
}
|
||||
const batchNum = 100
|
||||
|
||||
if num == 0 {
|
||||
return nil, errs.New("Need Destruct Msg is nil").Wrap()
|
||||
}
|
||||
|
||||
maxPage := (num + batchNum - 1) / batchNum
|
||||
|
||||
temp := make([]*model.Conversation, 0, maxPage*batchNum)
|
||||
|
||||
for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
|
||||
pagination := &sdkws.RequestPagination{
|
||||
PageNumber: int32(pageNumber),
|
||||
ShowNumber: batchNum,
|
||||
}
|
||||
|
||||
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
|
||||
continue
|
||||
}
|
||||
|
||||
log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
|
||||
if len(conversationIDs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, conversation := range conversations {
|
||||
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
|
||||
conversation.LatestMsgDestructTime.IsZero()) {
|
||||
temp = append(temp, conversation)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
|
||||
}
|
||||
|
||||
+66
-19
@@ -2,16 +2,22 @@ package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
||||
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/wrapperspb"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"strings"
|
||||
"time"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/idutil"
|
||||
"github.com/openimsdk/tools/utils/stringutil"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// hard delete in Database.
|
||||
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
|
||||
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
@@ -25,18 +31,6 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
||||
start = time.Now()
|
||||
)
|
||||
clearMsg := func(ctx context.Context) (bool, error) {
|
||||
conversationSeqs := make(map[string]struct{})
|
||||
defer func() {
|
||||
req := &conversation.UpdateConversationReq{
|
||||
MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
|
||||
}
|
||||
for conversationID := range conversationSeqs {
|
||||
req.ConversationID = conversationID
|
||||
if err := m.Conversation.UpdateConversations(ctx, req); err != nil {
|
||||
log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime)
|
||||
}
|
||||
}
|
||||
}()
|
||||
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -44,6 +38,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
||||
if len(msgs) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
|
||||
if err != nil {
|
||||
@@ -52,15 +47,14 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
||||
if len(index) == 0 {
|
||||
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
|
||||
}
|
||||
|
||||
docNum++
|
||||
msgNum += len(index)
|
||||
conversationID := msg.DocID[:strings.LastIndex(msg.DocID, ":")]
|
||||
if _, ok := conversationSeqs[conversationID]; !ok {
|
||||
conversationSeqs[conversationID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
for {
|
||||
keep, err := clearMsg(ctx)
|
||||
if err != nil {
|
||||
@@ -71,7 +65,60 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
||||
log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
||||
break
|
||||
}
|
||||
|
||||
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
||||
}
|
||||
return &msg.ClearMsgResp{}, nil
|
||||
}
|
||||
|
||||
// soft delete for self
|
||||
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
|
||||
temp := convert.ConversationsPb2DB(req.Conversations)
|
||||
|
||||
batchNum := 100
|
||||
|
||||
errg, _ := errgroup.WithContext(ctx)
|
||||
errg.SetLimit(100)
|
||||
|
||||
for i := 0; i < len(temp); i += batchNum {
|
||||
batch := temp[i:min(i+batchNum, len(temp))]
|
||||
|
||||
errg.Go(func() error {
|
||||
for _, conversation := range batch {
|
||||
handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
|
||||
log.ZDebug(handleCtx, "User MsgsDestruct",
|
||||
"conversationID", conversation.ConversationID,
|
||||
"ownerUserID", conversation.OwnerUserID,
|
||||
"msgDestructTime", conversation.MsgDestructTime,
|
||||
"lastMsgDestructTime", conversation.LatestMsgDestructTime)
|
||||
|
||||
seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
|
||||
if err != nil {
|
||||
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(seqs) > 0 {
|
||||
if err := m.Conversation.UpdateConversation(handleCtx,
|
||||
&pbconversation.UpdateConversationReq{
|
||||
UserIDs: []string{conversation.OwnerUserID},
|
||||
ConversationID: conversation.ConversationID,
|
||||
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil {
|
||||
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
continue
|
||||
}
|
||||
|
||||
// if you need Notify SDK client userseq is update.
|
||||
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := errg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||
@@ -50,6 +51,7 @@ type (
|
||||
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
|
||||
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
|
||||
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
|
||||
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
|
||||
config *Config // Global configuration settings.
|
||||
webhookClient *webhook.Client
|
||||
}
|
||||
@@ -117,7 +119,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
}
|
||||
|
||||
s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
|
||||
s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg))
|
||||
|
||||
msg.RegisterMsgServer(server, s)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user