mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-08 02:55:58 +08:00
msgDestruct
This commit is contained in:
@@ -7,7 +7,7 @@ type CronTaskCmd struct {
|
||||
}
|
||||
|
||||
func NewCronTaskCmd() *CronTaskCmd {
|
||||
return &CronTaskCmd{NewRootCmd("cronTask")}
|
||||
return &CronTaskCmd{NewRootCmd("cronTask", WithCronTaskLogName())}
|
||||
}
|
||||
|
||||
func (c *CronTaskCmd) addRunE(f func() error) {
|
||||
|
||||
+28
-5
@@ -16,17 +16,40 @@ type RootCmd struct {
|
||||
prometheusPort int
|
||||
}
|
||||
|
||||
func NewRootCmd(name string) (rootCmd *RootCmd) {
|
||||
type CmdOpts struct {
|
||||
loggerPrefixName string
|
||||
}
|
||||
|
||||
func WithCronTaskLogName() func(*CmdOpts) {
|
||||
return func(opts *CmdOpts) {
|
||||
opts.loggerPrefixName = "OpenIM.CronTask.log.all"
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogName(logName string) func(*CmdOpts) {
|
||||
return func(opts *CmdOpts) {
|
||||
opts.loggerPrefixName = logName
|
||||
}
|
||||
}
|
||||
|
||||
func NewRootCmd(name string, opts ...func(*CmdOpts)) (rootCmd *RootCmd) {
|
||||
rootCmd = &RootCmd{Name: name}
|
||||
c := cobra.Command{
|
||||
Use: "start",
|
||||
Short: fmt.Sprintf(`Start %s server`, name),
|
||||
Long: fmt.Sprintf(`Start %s server`, name),
|
||||
Use: "start openIM application",
|
||||
Short: fmt.Sprintf(`Start %s `, name),
|
||||
Long: fmt.Sprintf(`Start %s `, name),
|
||||
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
if err := rootCmd.getConfFromCmdAndInit(cmd); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := log.InitFromConfig("OpenIM.log.all", name, config.Config.Log.RemainLogLevel, config.Config.Log.IsStdout, config.Config.Log.IsJson, config.Config.Log.StorageLocation, config.Config.Log.RemainRotationCount); err != nil {
|
||||
cmdOpts := &CmdOpts{}
|
||||
for _, opt := range opts {
|
||||
opt(cmdOpts)
|
||||
}
|
||||
if cmdOpts.loggerPrefixName == "" {
|
||||
cmdOpts.loggerPrefixName = "OpenIM.log.all"
|
||||
}
|
||||
if err := log.InitFromConfig(cmdOpts.loggerPrefixName, name, config.Config.Log.RemainLogLevel, config.Config.Log.IsStdout, config.Config.Log.IsJson, config.Config.Log.StorageLocation, config.Config.Log.RemainRotationCount); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -26,7 +26,7 @@ func (a *RpcCmd) Exec() error {
|
||||
return a.Execute()
|
||||
}
|
||||
|
||||
func (a *RpcCmd) StartSvr(name string, rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error {
|
||||
func (a *RpcCmd) StartSvr(name string, rpcFn func(discov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error {
|
||||
if a.GetPortFlag() == 0 {
|
||||
return errors.New("port is required")
|
||||
}
|
||||
|
||||
@@ -215,6 +215,7 @@ type config struct {
|
||||
SingleMessageHasReadReceiptEnable bool `yaml:"singleMessageHasReadReceiptEnable"`
|
||||
RetainChatRecords int `yaml:"retainChatRecords"`
|
||||
ChatRecordsClearTime string `yaml:"chatRecordsClearTime"`
|
||||
MsgDestructTime string `yaml:"msgDestructTime"`
|
||||
Secret string `yaml:"secret"`
|
||||
TokenPolicy struct {
|
||||
Expire int64 `yaml:"expire"`
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
func ConversationDB2Pb(conversationDB *relation.ConversationModel) *conversation.Conversation {
|
||||
conversationPB := &conversation.Conversation{}
|
||||
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix()
|
||||
if err := utils.CopyStructFields(conversationPB, conversationDB); err != nil {
|
||||
return nil
|
||||
}
|
||||
@@ -20,6 +21,7 @@ func ConversationsDB2Pb(conversationsDB []*relation.ConversationModel) (conversa
|
||||
if err := utils.CopyStructFields(conversationPB, conversationDB); err != nil {
|
||||
continue
|
||||
}
|
||||
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix()
|
||||
conversationsPB = append(conversationsPB, conversationPB)
|
||||
}
|
||||
return conversationsPB
|
||||
|
||||
@@ -35,6 +35,7 @@ type ConversationDatabase interface {
|
||||
GetAllConversationIDs(ctx context.Context) ([]string, error)
|
||||
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
|
||||
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationTb.ConversationModel, error)
|
||||
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationTb.ConversationModel, error)
|
||||
}
|
||||
|
||||
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||
@@ -275,3 +276,7 @@ func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerU
|
||||
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@ package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
@@ -48,6 +49,9 @@ type CommonMsgDatabase interface {
|
||||
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
|
||||
// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
|
||||
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
|
||||
// 用户标记删除过期消息返回标记删除的seq列表
|
||||
UserMsgsDestruct(cte context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error)
|
||||
|
||||
// 用户根据seq删除消息
|
||||
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
|
||||
// 物理删除消息置空
|
||||
@@ -391,44 +395,6 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
|
||||
return totalMsgs, nil
|
||||
}
|
||||
|
||||
// func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*unRelationTb.MsgDataModel, err error) {
|
||||
// var reFetchSeqs []int64
|
||||
// if delNums > 0 {
|
||||
// newBeginSeq := rangeBegin - delNums
|
||||
// if newBeginSeq >= begin {
|
||||
// newEndSeq := rangeBegin - 1
|
||||
// for i := newBeginSeq; i <= newEndSeq; i++ {
|
||||
// reFetchSeqs = append(reFetchSeqs, i)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// if len(reFetchSeqs) == 0 {
|
||||
// return
|
||||
// }
|
||||
// if len(reFetchSeqs) > 0 {
|
||||
// m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs)
|
||||
// for docID, seqs := range m {
|
||||
// msgs, _, err := db.findMsgInfoBySeq(ctx, docID, seqs)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// for _, msg := range msgs {
|
||||
// if msg.Status != constant.MsgDeleted {
|
||||
// seqMsgs = append(seqMsgs, msg)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// if len(seqMsgs) < int(delNums) {
|
||||
// seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin)
|
||||
// if err != nil {
|
||||
// return seqMsgs, err
|
||||
// }
|
||||
// seqMsgs = append(seqMsgs, seqMsgs2...)
|
||||
// }
|
||||
// return seqMsgs, nil
|
||||
// }
|
||||
|
||||
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, err error) {
|
||||
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
|
||||
for _, msg := range msgs {
|
||||
@@ -636,6 +602,40 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
|
||||
return db.cache.SetMinSeq(ctx, conversationID, minSeq)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) {
|
||||
var index int64
|
||||
for {
|
||||
// from oldest 2 newest
|
||||
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
|
||||
if err != nil || msgDocModel.DocID == "" {
|
||||
if err != nil {
|
||||
if err == unrelation.ErrMsgListNotExist {
|
||||
log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index)
|
||||
} else {
|
||||
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
|
||||
}
|
||||
}
|
||||
// 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
|
||||
break
|
||||
}
|
||||
//&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
|
||||
if len(msgDocModel.Msg) > 0 {
|
||||
for _, msg := range msgDocModel.Msg {
|
||||
if msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() {
|
||||
if msg.Msg.SendTime > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) {
|
||||
seqs = append(seqs, msg.Msg.Seq)
|
||||
}
|
||||
} else {
|
||||
log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index)
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
return seqs, db.DeleteUserMsgsBySeqs(ctx, userID, conversationID, seqs)
|
||||
}
|
||||
|
||||
// this is struct for recursion
|
||||
type delMsgRecursionStruct struct {
|
||||
minSeq int64
|
||||
|
||||
@@ -85,12 +85,13 @@ func (c *ConversationGorm) GetUserAllHasReadSeqs(ctx context.Context, ownerUserI
|
||||
var conversations []*relation.ConversationModel
|
||||
err = utils.Wrap(c.db(ctx).Where("owner_user_id = ?", ownerUserID).Select("conversation_id", "has_read_seq").Find(&conversations).Error, "")
|
||||
hasReadSeqs = make(map[string]int64, len(conversations))
|
||||
// for _, conversation := range conversations {
|
||||
// hasReadSeqs[conversation.ConversationID] = conversation.HasReadSeq
|
||||
// }
|
||||
return hasReadSeqs, err
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) (conversations []*relation.ConversationModel, err error) {
|
||||
return conversations, utils.Wrap(c.db(ctx).Where("conversation_id IN (?)", conversationIDs).Find(&conversations).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) GetConversationIDsNeedDestruct(ctx context.Context) (conversations []*relation.ConversationModel, err error) {
|
||||
return conversations, utils.Wrap(c.db(ctx).Where("UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) && msg_destruct_time != 0").Error, "")
|
||||
}
|
||||
|
||||
@@ -10,21 +10,23 @@ const (
|
||||
)
|
||||
|
||||
type ConversationModel struct {
|
||||
OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"`
|
||||
ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
|
||||
ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"`
|
||||
UserID string `gorm:"column:user_id;type:char(64)" json:"userID"`
|
||||
GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"`
|
||||
RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
|
||||
IsPinned bool `gorm:"column:is_pinned" json:"isPinned"`
|
||||
IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"`
|
||||
BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"`
|
||||
GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"`
|
||||
AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
|
||||
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
|
||||
MaxSeq int64 `gorm:"column:max_seq" json:"maxSeq"`
|
||||
MinSeq int64 `gorm:"column:min_seq" json:"minSeq"`
|
||||
CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"`
|
||||
OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"`
|
||||
ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
|
||||
ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"`
|
||||
UserID string `gorm:"column:user_id;type:char(64)" json:"userID"`
|
||||
GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"`
|
||||
RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
|
||||
IsPinned bool `gorm:"column:is_pinned" json:"isPinned"`
|
||||
IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"`
|
||||
BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"`
|
||||
GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"`
|
||||
AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
|
||||
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
|
||||
MaxSeq int64 `gorm:"column:max_seq" json:"maxSeq"`
|
||||
MinSeq int64 `gorm:"column:min_seq" json:"minSeq"`
|
||||
CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"`
|
||||
MsgDestructTime int64 `gorm:"column:msg_destruct_time;default:0"`
|
||||
LatestMsgDestructTime time.Time `gorm:"column:latest_msg_destruct_time;autoCreateTime"`
|
||||
}
|
||||
|
||||
func (ConversationModel) TableName() string {
|
||||
@@ -48,5 +50,6 @@ type ConversationModelInterface interface {
|
||||
GetAllConversationIDs(ctx context.Context) ([]string, error)
|
||||
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error)
|
||||
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error)
|
||||
GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error)
|
||||
NewTx(tx any) ConversationModelInterface
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user