refactor: scheduled task splitting (#1299)

* optimize scheduled deletion

* optimize scheduled deletion

* optimize scheduled deletion

* fix: conflicts
This commit is contained in:
chao
2023-11-02 03:11:45 -05:00
committed by GitHub
parent d2f0af1b8b
commit 62e9980f3c
6 changed files with 175 additions and 40 deletions
+45 -9
View File
@@ -17,13 +17,11 @@ package tools
import (
"context"
"fmt"
"math"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"math"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
@@ -31,6 +29,7 @@ import (
"github.com/OpenIMSDK/tools/mw"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"math/rand"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
@@ -104,18 +103,55 @@ func InitMsgTool() (*MsgTool, error) {
return msgTool, nil
}
//func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
// ctx := mcontext.NewCtx(utils.GetSelfFuncName())
// log.ZInfo(ctx, "============================ start del cron task ============================")
// conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
// if err != nil {
// log.ZError(ctx, "GetAllConversationIDs failed", err)
// return
// }
// for _, conversationID := range conversationIDs {
// conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
// }
// c.ClearConversationsMsg(ctx, conversationIDs)
// log.ZInfo(ctx, "============================ start del cron finished ============================")
//}
func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
ctx := mcontext.NewCtx(utils.GetSelfFuncName())
log.ZInfo(ctx, "============================ start del cron task ============================")
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDs failed", err)
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
const batchNum = 50
log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num)
if num == 0 {
return
}
count := int(num/batchNum + num/batchNum/2)
if count < 1 {
count = 1
}
maxPage := 1 + num/batchNum
if num%batchNum != 0 {
maxPage++
}
for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
log.ZDebug(ctx, "PageConversationIDs failed", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
c.ClearConversationsMsg(ctx, conversationIDs)
}
c.ClearConversationsMsg(ctx, conversationIDs)
log.ZInfo(ctx, "============================ start del cron finished ============================")
}