mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-05 17:45:59 +08:00
fix: update set seq implement. (#2911)
* update set seq implement. * revert seq logic. * Update func logic. * add log print.
This commit is contained in:
+23
-12
@@ -12,13 +12,14 @@ import (
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"github.com/openimsdk/tools/utils/idutil"
|
||||
"github.com/openimsdk/tools/utils/stringutil"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// hard delete in Database.
|
||||
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
|
||||
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
|
||||
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -26,18 +27,19 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
||||
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
|
||||
}
|
||||
var (
|
||||
docNum int
|
||||
msgNum int
|
||||
start = time.Now()
|
||||
docNum int
|
||||
msgNum int
|
||||
start = time.Now()
|
||||
getLimit = 5000
|
||||
)
|
||||
|
||||
clearMsg := func(ctx context.Context) (bool, error) {
|
||||
destructMsg := func(ctx context.Context) (bool, error) {
|
||||
docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, 5000)
|
||||
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -61,7 +63,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
||||
return true, nil
|
||||
}
|
||||
|
||||
_, err = clearMsg(ctx)
|
||||
_, err = destructMsg(ctx)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
||||
return nil, err
|
||||
@@ -69,11 +71,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
||||
|
||||
log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
||||
|
||||
return &msg.ClearMsgResp{}, nil
|
||||
return &msg.DestructMsgsResp{}, nil
|
||||
}
|
||||
|
||||
// soft delete for self
|
||||
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
|
||||
// soft delete for user self
|
||||
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
|
||||
temp := convert.ConversationsPb2DB(req.Conversations)
|
||||
|
||||
batchNum := 100
|
||||
@@ -93,22 +95,31 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
|
||||
"msgDestructTime", conversation.MsgDestructTime,
|
||||
"lastMsgDestructTime", conversation.LatestMsgDestructTime)
|
||||
|
||||
seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
|
||||
seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
|
||||
if err != nil {
|
||||
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(seqs) > 0 {
|
||||
minseq := datautil.Max(seqs...)
|
||||
|
||||
// update
|
||||
if err := m.Conversation.UpdateConversation(handleCtx,
|
||||
&pbconversation.UpdateConversationReq{
|
||||
UserIDs: []string{conversation.OwnerUserID},
|
||||
ConversationID: conversation.ConversationID,
|
||||
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil {
|
||||
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
|
||||
MinSeq: wrapperspb.Int64(minseq),
|
||||
}); err != nil {
|
||||
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if you need Notify SDK client userseq is update.
|
||||
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user