refactoring scheduled tasks

This commit is contained in:
withchao
2024-12-19 11:28:49 +08:00
parent f03cbec653
commit 705bc37f99
9 changed files with 180 additions and 147 deletions
+33 -42
View File
@@ -2,6 +2,7 @@ package msg
import (
"context"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@@ -26,52 +27,42 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
if req.Timestamp > time.Now().UnixMilli() {
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
}
var (
docNum int
msgNum int
start = time.Now()
getLimit = 5000
)
destructMsg := func(ctx context.Context) (bool, error) {
docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
if err != nil {
return false, err
}
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit)
if err != nil {
return false, err
}
if len(msgs) == 0 {
return false, nil
}
for _, msg := range msgs {
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
if err != nil {
return false, err
}
if len(index) == 0 {
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
}
docNum++
msgNum += len(index)
}
return true, nil
if req.Limit <= 0 {
return nil, errs.ErrArgs.WrapMsg("request limit error")
}
_, err = destructMsg(ctx)
docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit))
if err != nil {
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return nil, err
}
log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return &msg.DestructMsgsResp{}, nil
for _, doc := range docs {
if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil {
return nil, err
}
index := strings.LastIndex(doc.DocID, ":")
if index < 0 {
continue
}
var minSeq int64
for _, model := range doc.Msg {
if model.Msg == nil {
continue
}
if model.Msg.Seq > minSeq {
minSeq = model.Msg.Seq
}
}
if minSeq <= 0 {
continue
}
conversationID := doc.DocID[:index]
if conversationID == "" {
continue
}
if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil {
return nil, err
}
}
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil
}
// soft delete for user self
+5 -1
View File
@@ -19,6 +19,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"path"
"strconv"
"time"
@@ -284,10 +285,13 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
}
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
return nil, err
}
engine := t.config.RpcConfig.Object.Enable
expireTime := time.UnixMilli(req.ExpireTime)
// Find all expired data in S3 database
models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Count))
models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Limit))
if err != nil {
return nil, err
}
+19 -6
View File
@@ -12,12 +12,25 @@ import (
func (c *cronServer) deleteMsg() {
now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
operationID := fmt.Sprintf("cron_msg_%d_%d", os.Getpid(), deltime.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if _, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
const (
deleteCount = 20
deleteLimit = 50
)
var count int
for i := 1; i <= deleteCount; i++ {
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
resp, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli(), Limit: deleteLimit})
if err != nil {
log.ZError(ctx, "cron destruct chat records failed", err)
break
}
count += int(resp.Count)
if resp.Count <= deleteLimit {
break
}
}
log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now))
log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "deleteDocs", count)
}
+6 -5
View File
@@ -15,20 +15,21 @@ func (c *cronServer) clearS3() {
// number of pagination. if need modify, need update value in third.DeleteOutdatedData
pageShowNumber := 500
deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime))
operationID := fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())
operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
var count int
for i := 1; i <= executeNum; i++ {
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Count: int32(pageShowNumber)})
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: int32(pageShowNumber)})
if err != nil {
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(start))
log.ZError(ctx, "cron deleteoutDatedData failed", err)
return
}
count += int(resp.Count)
if resp.Count < int32(pageShowNumber) {
break
}
}
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start))
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count)
}
+3 -3
View File
@@ -12,9 +12,9 @@ import (
func (c *cronServer) clearUserMsg() {
now := time.Now()
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZDebug(ctx, "clear msg cron start", "now", now)
operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "clear msg cron start")
conversations, err := c.conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil {
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)