feat: Optimize Scheduled Task (#2985)

* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks
This commit is contained in:
chao
2024-12-20 15:45:37 +08:00
committed by GitHub
parent 04f3c99697
commit e9895062cb
22 changed files with 645 additions and 398 deletions
@@ -74,6 +74,8 @@ type ConversationDatabase interface {
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
// GetPinnedConversationIDs gets pinned conversationIDs by userID
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
// FindRandConversation finds random conversations based on the specified timestamp and limit.
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
}
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@@ -401,3 +403,7 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
}
return conversationIDs, nil
}
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
return c.conversationDB.FindRandConversation(ctx, ts, limit)
}
+52 -56
View File
@@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"errors"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
@@ -69,6 +68,7 @@ type CommonMsgDatabase interface {
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
@@ -95,13 +95,16 @@ type CommonMsgDatabase interface {
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
// get Msg when destruct msg before
GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error)
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
//DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
GetDocIDs(ctx context.Context) ([]string, error)
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
DeleteDoc(ctx context.Context, docID string) error
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
}
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
@@ -806,9 +809,10 @@ func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID strin
return db.seqConversation.GetMaxSeq(ctx, conversationID)
}
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
}
//
//func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
// return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
//}
func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return db.seqConversation.SetMinSeqs(ctx, seqs)
@@ -947,56 +951,40 @@ func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversation
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}
func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) {
var msgs []*model.MsgDocModel
for i := 0; i < len(docIDs); i += 1000 {
end := i + 1000
if end > len(docIDs) {
end = len(docIDs)
}
res, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, docIDs[i:end], limit)
if err != nil {
return nil, err
}
msgs = append(msgs, res...)
if len(msgs) >= limit {
return msgs[:limit], nil
}
}
return msgs, nil
func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit)
}
func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
var notNull int
index := make([]int, 0, len(doc.Msg))
for i, message := range doc.Msg {
if message.Msg != nil {
notNull++
if message.Msg.SendTime < ts {
index = append(index, i)
}
}
}
if len(index) == 0 {
return index, nil
}
maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
if err := db.setMinSeq(ctx, conversationID, maxSeq+1); err != nil {
return index, err
}
if len(index) == notNull {
log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
} else {
log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
}
}
//
//func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
// var notNull int
// index := make([]int, 0, len(doc.Msg))
// for i, message := range doc.Msg {
// if message.Msg != nil {
// notNull++
// if message.Msg.SendTime < ts {
// index = append(index, i)
// }
// }
// }
// if len(index) == 0 {
// return index, nil
// }
// maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
// conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
// if err := db.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil {
// return index, err
// }
// if len(index) == notNull {
// log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
// return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
// } else {
// log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
// return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
// }
//}
func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error {
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil {
if errors.Is(errs.Unwrap(err), redis.Nil) {
@@ -1010,8 +998,8 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin
return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
}
func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) {
return db.msgDocDatabase.GetDocIDs(ctx)
func (db *commonMsgDatabase) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) {
return db.msgDocDatabase.GetRandDocIDs(ctx, limit)
}
func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
@@ -1026,3 +1014,11 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio
// todo: only the time in the redis cache will be taken, not the message time
return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs)
}
func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error {
return db.msgDocDatabase.DeleteDoc(ctx, docID)
}
func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time)
}
+10 -14
View File
@@ -24,7 +24,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/s3"
"github.com/openimsdk/tools/s3/cont"
"github.com/redis/go-redis/v9"
@@ -40,11 +39,10 @@ type S3Database interface {
SetObject(ctx context.Context, info *model.Object) error
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
DeleteObject(ctx context.Context, name string) error
DeleteSpecifiedData(ctx context.Context, engine string, name string) error
FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error)
FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error)
DeleteSpecifiedData(ctx context.Context, engine string, name []string) error
DelS3Key(ctx context.Context, engine string, keys ...string) error
GetKeyCount(ctx context.Context, engine string, key string) (int64, error)
}
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
@@ -120,21 +118,19 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return s.s3.FormData(ctx, name, size, contentType, duration)
}
func (s *s3Database) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
return s.db.FindNeedDeleteObjectByDB(ctx, duration, needDelType, pagination)
func (s *s3Database) FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) {
return s.db.FindExpirationObject(ctx, engine, expiration, needDelType, count)
}
func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
return s.s3.DeleteObject(ctx, name)
func (s *s3Database) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) {
return s.db.GetKeyCount(ctx, engine, key)
}
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error {
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name []string) error {
return s.db.Delete(ctx, engine, name)
}
func (s *s3Database) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) {
return s.db.FindModelsByKey(ctx, key)
}
func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error {
return s.s3cache.DelS3Key(ctx, engine, keys...)
}