2022-08-09 16:38:33 +08:00
package cronTask
2022-08-08 11:30:10 +08:00
import (
"Open_IM/pkg/common/config"
2022-08-10 19:31:57 +08:00
"Open_IM/pkg/common/constant"
2023-02-15 15:52:32 +08:00
"Open_IM/pkg/common/db/controller"
2022-08-08 11:30:10 +08:00
"Open_IM/pkg/common/log"
2023-02-15 15:52:32 +08:00
"Open_IM/pkg/common/tracelog"
2022-08-08 11:30:10 +08:00
"Open_IM/pkg/utils"
2023-02-15 15:52:32 +08:00
"context"
2022-08-10 19:31:57 +08:00
"math"
2022-08-08 11:30:10 +08:00
)
2022-08-09 18:48:11 +08:00
2023-02-16 15:20:59 +08:00
type ClearMsgTool struct {
msgInterface controller . MsgInterface
userInterface controller . UserInterface
groupInterface controller . GroupInterface
2022-08-08 11:30:10 +08:00
}
2023-02-16 15:20:59 +08:00
func ( c * ClearMsgTool ) getCronTaskOperationID ( ) string {
2023-02-15 15:52:32 +08:00
return cronTaskOperationID + utils . OperationIDGenerator ( )
2022-08-10 15:14:51 +08:00
}
2023-02-16 15:20:59 +08:00
func ( c * ClearMsgTool ) ClearAll ( ) {
2023-02-15 15:52:32 +08:00
operationID := c . getCronTaskOperationID ( )
ctx := context . Background ( )
tracelog . SetOperationID ( ctx , operationID )
2023-02-16 15:20:59 +08:00
log . NewInfo ( operationID , "============================ start del cron task ============================" )
2023-02-15 15:52:32 +08:00
var err error
2023-02-16 15:20:59 +08:00
userIDList , err := c . userInterface . GetAllUserID ( ctx )
2023-02-15 15:52:32 +08:00
if err == nil {
2023-02-16 15:20:59 +08:00
c . ClearUsersMsg ( ctx , userIDList )
2023-02-15 15:52:32 +08:00
} else {
log . NewError ( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) )
}
// working group msg clear
2023-02-16 15:20:59 +08:00
workingGroupIDList , err := c . groupInterface . GetGroupIDsByGroupType ( ctx , constant . WorkingGroup )
2023-02-15 15:52:32 +08:00
if err == nil {
2023-02-16 15:20:59 +08:00
c . ClearSuperGroupMsg ( ctx , workingGroupIDList )
2023-02-15 15:52:32 +08:00
} else {
log . NewError ( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) )
}
2023-02-16 15:20:59 +08:00
log . NewInfo ( operationID , "============================ start del cron finished ============================" )
2022-11-04 16:18:45 +08:00
}
2023-02-16 15:20:59 +08:00
func ( c * ClearMsgTool ) ClearUsersMsg ( ctx context . Context , userIDList [ ] string ) {
2023-02-15 15:52:32 +08:00
for _ , userID := range userIDList {
2023-02-16 15:20:59 +08:00
if err := c . msgInterface . DeleteUserMsgsAndSetMinSeq ( ctx , userID , int64 ( config . Config . Mongo . DBRetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
log . NewError ( tracelog . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , err . Error ( ) , userID )
2022-08-10 15:14:51 +08:00
}
2023-02-16 15:20:59 +08:00
minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache , err := c . msgInterface . GetUserMinMaxSeqInMongoAndCache ( ctx , userID )
if err != nil {
log . NewError ( tracelog . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , err . Error ( ) , "GetUserMinMaxSeqInMongoAndCache failed" , userID )
continue
2022-11-03 18:31:29 +08:00
}
2023-02-16 15:20:59 +08:00
if
2022-11-01 19:39:56 +08:00
}
2023-02-15 15:52:32 +08:00
}
2023-02-16 15:20:59 +08:00
func ( c * ClearMsgTool ) ClearSuperGroupMsg ( ctx context . Context , workingGroupIDList [ ] string ) {
2023-02-15 15:52:32 +08:00
for _ , groupID := range workingGroupIDList {
2023-02-16 15:20:59 +08:00
userIDs , err := c . groupInterface . FindGroupMemberUserID ( ctx , groupID )
2022-12-26 10:25:42 +08:00
if err != nil {
2023-02-16 15:20:59 +08:00
log . NewError ( tracelog . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , "FindGroupMemberUserID" , err . Error ( ) , groupID )
2023-02-15 15:52:32 +08:00
continue
2022-12-26 10:25:42 +08:00
}
2023-02-16 15:20:59 +08:00
if err := c . msgInterface . DeleteUserSuperGroupMsgsAndSetMinSeq ( ctx , groupID , userIDs , int64 ( config . Config . Mongo . DBRetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
2022-11-03 19:50:06 +08:00
}
2023-02-16 15:20:59 +08:00
minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache , err := c . msgInterface . GetSuperGroupMinMaxSeqInMongoAndCache ( ctx , groupID )
2022-11-03 19:50:06 +08:00
}
}
2023-02-16 15:20:59 +08:00
func ( c * ClearMsgTool ) checkMaxSeqWithMongo ( ctx context . Context , sourceID string , diffusionType int ) error {
2022-11-03 19:19:43 +08:00
var seqRedis uint64
2022-08-10 19:31:57 +08:00
var err error
if diffusionType == constant . WriteDiffusion {
2023-02-15 15:52:32 +08:00
seqRedis , err = db . DB . GetUserMaxSeq ( sourceID )
2022-08-10 19:31:57 +08:00
} else {
2023-02-15 15:52:32 +08:00
seqRedis , err = db . DB . GetGroupMaxSeq ( sourceID )
2022-08-10 19:31:57 +08:00
}
if err != nil {
2022-08-11 12:09:44 +08:00
if err == goRedis . Nil {
return nil
}
2022-08-10 19:31:57 +08:00
return utils . Wrap ( err , "GetUserMaxSeq failed" )
}
2023-02-15 15:52:32 +08:00
msg , err := db . DB . GetNewestMsg ( sourceID )
2022-08-10 19:31:57 +08:00
if err != nil {
return utils . Wrap ( err , "GetNewestMsg failed" )
}
2022-11-03 19:26:41 +08:00
if msg == nil {
return nil
}
2022-11-29 18:08:47 +08:00
if math . Abs ( float64 ( msg . Seq - uint32 ( seqRedis ) ) ) > 10 {
2023-02-15 15:52:32 +08:00
log . NewWarn ( operationID , utils . GetSelfFuncName ( ) , "seqMongo, seqRedis" , msg . Seq , seqRedis , sourceID , "redis maxSeq is different with msg.Seq > 10" , "status: " , msg . Status , msg . SendTime )
2022-11-02 11:17:20 +08:00
} else {
2023-02-15 15:52:32 +08:00
log . NewInfo ( operationID , utils . GetSelfFuncName ( ) , "seqMongo, seqRedis" , msg . Seq , seqRedis , sourceID , "seq and msg OK" , "status:" , msg . Status , msg . SendTime )
2022-08-10 19:31:57 +08:00
}
return nil
}