2023-03-03 17:42:26 +08:00
package tools
import (
"context"
2023-03-08 16:35:18 +08:00
"errors"
2023-03-03 17:42:26 +08:00
"fmt"
2023-03-27 11:03:50 +08:00
"math"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
2023-03-28 19:24:59 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2023-03-21 12:28:21 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2023-03-03 17:42:26 +08:00
"github.com/go-redis/redis/v8"
)
type MsgTool struct {
2023-05-08 12:39:45 +08:00
msgDatabase controller . CommonMsgDatabase
2023-03-03 17:42:26 +08:00
userDatabase controller . UserDatabase
groupDatabase controller . GroupDatabase
}
2023-03-08 16:35:18 +08:00
var errSeq = errors . New ( "cache max seq and mongo max seq is diff > 10" )
2023-05-08 12:39:45 +08:00
func NewMsgTool ( msgDatabase controller . CommonMsgDatabase , userDatabase controller . UserDatabase , groupDatabase controller . GroupDatabase ) * MsgTool {
2023-03-03 17:42:26 +08:00
return & MsgTool {
msgDatabase : msgDatabase ,
userDatabase : userDatabase ,
groupDatabase : groupDatabase ,
}
}
func InitMsgTool ( ) ( * MsgTool , error ) {
rdb , err := cache . NewRedis ( )
if err != nil {
return nil , err
}
mongo , err := unrelation . NewMongo ( )
if err != nil {
return nil , err
}
db , err := relation . NewGormDB ( )
if err != nil {
return nil , err
}
2023-03-28 19:24:59 +08:00
userDB := relation . NewUserGorm ( db )
2023-05-08 12:39:45 +08:00
msgDatabase := controller . InitCommonMsgDatabase ( rdb , mongo . GetDatabase ( ) )
2023-03-28 19:24:59 +08:00
userDatabase := controller . NewUserDatabase ( userDB , cache . NewUserCacheRedis ( rdb , relation . NewUserGorm ( db ) , cache . GetDefaultOpt ( ) ) , tx . NewGorm ( db ) )
2023-03-03 17:42:26 +08:00
groupDatabase := controller . InitGroupDatabase ( db , rdb , mongo . GetDatabase ( ) )
msgTool := NewMsgTool ( msgDatabase , userDatabase , groupDatabase )
return msgTool , nil
}
func ( c * MsgTool ) AllUserClearMsgAndFixSeq ( ) {
2023-03-21 12:28:21 +08:00
ctx := mcontext . NewCtx ( utils . GetSelfFuncName ( ) )
2023-03-27 11:03:50 +08:00
log . ZInfo ( ctx , "============================ start del cron task ============================" )
2023-03-03 17:42:26 +08:00
var err error
2023-03-27 11:03:50 +08:00
userIDs , err := c . userDatabase . GetAllUserID ( ctx )
2023-03-03 17:42:26 +08:00
if err == nil {
2023-03-27 11:03:50 +08:00
c . ClearUsersMsg ( ctx , userIDs )
2023-03-03 17:42:26 +08:00
} else {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "ClearUsersMsg failed" , err )
2023-03-03 17:42:26 +08:00
}
// working group msg clear
2023-03-27 11:03:50 +08:00
superGroupIDs , err := c . groupDatabase . GetGroupIDsByGroupType ( ctx , constant . WorkingGroup )
2023-03-03 17:42:26 +08:00
if err == nil {
2023-03-27 11:03:50 +08:00
c . ClearSuperGroupMsg ( ctx , superGroupIDs )
2023-03-03 17:42:26 +08:00
} else {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "ClearSuperGroupMsg failed" , err )
2023-03-03 17:42:26 +08:00
}
2023-03-27 11:03:50 +08:00
log . ZInfo ( ctx , "============================ start del cron finished ============================" )
2023-03-03 17:42:26 +08:00
}
func ( c * MsgTool ) ClearUsersMsg ( ctx context . Context , userIDs [ ] string ) {
for _ , userID := range userIDs {
2023-05-05 21:30:32 +08:00
if err := c . msgDatabase . DeleteConversationMsgsAndSetMinSeq ( ctx , userID , int64 ( config . Config . Mongo . DBRetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "DeleteUserMsgsAndSetMinSeq failed" , err , "userID" , userID , "DBRetainChatRecords" , config . Config . Mongo . DBRetainChatRecords )
2023-03-03 17:42:26 +08:00
}
maxSeqCache , maxSeqMongo , err := c . GetAndFixUserSeqs ( ctx , userID )
if err != nil {
continue
}
2023-05-05 21:30:32 +08:00
c . CheckMaxSeqWithMongo ( ctx , userID , maxSeqCache , maxSeqMongo )
2023-03-03 17:42:26 +08:00
}
}
func ( c * MsgTool ) ClearSuperGroupMsg ( ctx context . Context , superGroupIDs [ ] string ) {
for _ , groupID := range superGroupIDs {
userIDs , err := c . groupDatabase . FindGroupMemberUserID ( ctx , groupID )
if err != nil {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "ClearSuperGroupMsg failed" , err , "groupID" , groupID )
2023-03-03 17:42:26 +08:00
continue
}
2023-05-06 10:42:11 +08:00
if err := c . msgDatabase . DeleteConversationMsgsAndSetMinSeq ( ctx , groupID , int64 ( config . Config . Mongo . DBRetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "DeleteUserSuperGroupMsgsAndSetMinSeq failed" , err , "groupID" , groupID , "userID" , userIDs , "DBRetainChatRecords" , config . Config . Mongo . DBRetainChatRecords )
2023-03-03 17:42:26 +08:00
}
if err := c . fixGroupSeq ( ctx , groupID , userIDs ) ; err != nil {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "fixGroupSeq failed" , err , "groupID" , groupID , "userID" , userIDs )
2023-03-03 17:42:26 +08:00
}
}
}
func ( c * MsgTool ) FixGroupSeq ( ctx context . Context , groupID string ) error {
userIDs , err := c . groupDatabase . FindGroupMemberUserID ( ctx , groupID )
if err != nil {
return err
}
return c . fixGroupSeq ( ctx , groupID , userIDs )
}
func ( c * MsgTool ) fixGroupSeq ( ctx context . Context , groupID string , userIDs [ ] string ) error {
2023-05-06 10:42:11 +08:00
_ , maxSeqMongo , _ , maxSeqCache , err := c . msgDatabase . GetConversationMinMaxSeqInMongoAndCache ( ctx , groupID )
2023-03-03 17:42:26 +08:00
if err != nil {
2023-03-10 18:37:36 +08:00
if err == unrelation . ErrMsgNotFound {
2023-03-10 15:06:57 +08:00
return nil
}
2023-03-03 17:42:26 +08:00
return err
}
for _ , userID := range userIDs {
if _ , err := c . GetAndFixGroupUserSeq ( ctx , userID , groupID , maxSeqCache ) ; err != nil {
continue
}
}
2023-05-06 10:42:11 +08:00
if err := c . CheckMaxSeqWithMongo ( ctx , groupID , maxSeqCache , maxSeqMongo ) ; err != nil {
2023-03-27 11:03:50 +08:00
log . ZWarn ( ctx , "cache max seq and mongo max seq is diff > 10" , err , "groupID" , groupID , "maxSeqCache" , maxSeqCache , "maxSeqMongo" , maxSeqMongo , "constant.WriteDiffusion" , constant . WriteDiffusion )
2023-03-08 16:35:18 +08:00
}
2023-03-03 17:42:26 +08:00
return nil
}
func ( c * MsgTool ) GetAndFixUserSeqs ( ctx context . Context , userID string ) ( maxSeqCache , maxSeqMongo int64 , err error ) {
2023-05-06 10:42:11 +08:00
minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache , err := c . msgDatabase . GetConversationMinMaxSeqInMongoAndCache ( ctx , userID )
2023-03-03 17:42:26 +08:00
if err != nil {
2023-03-10 16:47:22 +08:00
if err != unrelation . ErrMsgNotFound {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "GetUserMinMaxSeqInMongoAndCache failed" , err , "userID" , userID )
2023-03-10 16:47:22 +08:00
}
2023-03-03 17:42:26 +08:00
return 0 , 0 , err
}
2023-03-27 11:03:50 +08:00
log . ZDebug ( ctx , "userID" , userID , "minSeqMongo" , minSeqMongo , "maxSeqMongo" , maxSeqMongo , "minSeqCache" , minSeqCache , "maxSeqCache" , maxSeqCache )
2023-03-03 17:42:26 +08:00
if minSeqCache > maxSeqCache {
2023-05-06 10:42:11 +08:00
if err := c . msgDatabase . SetMinSeq ( ctx , userID , maxSeqCache ) ; err != nil {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "SetUserMinSeq failed" , err , "userID" , userID , "minSeqCache" , minSeqCache , "maxSeqCache" , maxSeqCache )
2023-03-03 17:42:26 +08:00
} else {
2023-03-27 11:03:50 +08:00
log . ZInfo ( ctx , "SetUserMinSeq success" , "userID" , userID , "minSeqCache" , minSeqCache , "maxSeqCache" , maxSeqCache )
2023-03-03 17:42:26 +08:00
}
}
return maxSeqCache , maxSeqMongo , nil
}
func ( c * MsgTool ) GetAndFixGroupUserSeq ( ctx context . Context , userID string , groupID string , maxSeqCache int64 ) ( minSeqCache int64 , err error ) {
2023-05-06 10:42:11 +08:00
minSeqCache , err = c . msgDatabase . GetMinSeq ( ctx , groupID )
2023-03-03 17:42:26 +08:00
if err != nil {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "GetGroupUserMinSeq failed" , err , "groupID" , groupID , "userID" , userID )
2023-03-03 17:42:26 +08:00
return 0 , err
}
if minSeqCache > maxSeqCache {
2023-05-06 10:42:11 +08:00
if err := c . msgDatabase . SetConversationUserMinSeq ( ctx , groupID , userID , maxSeqCache ) ; err != nil {
2023-03-27 11:03:50 +08:00
log . ZError ( ctx , "SetGroupUserMinSeq failed" , err , "groupID" , groupID , "userID" , userID , "minSeqCache" , minSeqCache , "maxSeqCache" , maxSeqCache )
2023-03-03 17:42:26 +08:00
} else {
2023-03-27 11:03:50 +08:00
log . ZInfo ( ctx , "SetGroupUserMinSeq success" , "groupID" , groupID , "userID" , userID , "minSeqCache" , minSeqCache , "maxSeqCache" , maxSeqCache )
2023-03-03 17:42:26 +08:00
}
}
return minSeqCache , nil
}
2023-05-05 21:30:32 +08:00
func ( c * MsgTool ) CheckMaxSeqWithMongo ( ctx context . Context , conversationID string , maxSeqCache , maxSeqMongo int64 ) error {
2023-03-03 17:42:26 +08:00
if math . Abs ( float64 ( maxSeqMongo - maxSeqCache ) ) > 10 {
2023-03-08 16:35:18 +08:00
return errSeq
2023-03-03 17:42:26 +08:00
}
2023-03-08 16:35:18 +08:00
return nil
2023-03-03 17:42:26 +08:00
}
func ( c * MsgTool ) ShowUserSeqs ( ctx context . Context , userID string ) {
}
func ( c * MsgTool ) ShowSuperGroupSeqs ( ctx context . Context , groupID string ) {
}
func ( c * MsgTool ) ShowSuperGroupUserSeqs ( ctx context . Context , groupID , userID string ) {
}
2023-03-07 17:59:34 +08:00
func ( c * MsgTool ) FixAllSeq ( ctx context . Context ) error {
2023-03-03 17:42:26 +08:00
userIDs , err := c . userDatabase . GetAllUserID ( ctx )
if err != nil {
2023-03-07 17:59:34 +08:00
return err
2023-03-03 17:42:26 +08:00
}
for _ , userID := range userIDs {
2023-05-06 10:42:11 +08:00
userCurrentMinSeq , err := c . msgDatabase . GetMinSeq ( ctx , userID )
2023-03-03 17:42:26 +08:00
if err != nil && err != redis . Nil {
continue
}
2023-05-06 10:42:11 +08:00
userCurrentMaxSeq , err := c . msgDatabase . GetMaxSeq ( ctx , userID )
2023-03-03 17:42:26 +08:00
if err != nil && err != redis . Nil {
continue
}
if userCurrentMinSeq > userCurrentMaxSeq {
2023-05-06 10:42:11 +08:00
if err = c . msgDatabase . SetMinSeq ( ctx , userID , userCurrentMaxSeq ) ; err != nil {
2023-03-03 17:42:26 +08:00
fmt . Println ( "SetUserMinSeq failed" , userID , userCurrentMaxSeq )
}
fmt . Println ( "fix" , userID , userCurrentMaxSeq )
}
}
fmt . Println ( "fix users seq success" )
groupIDs , err := c . groupDatabase . GetGroupIDsByGroupType ( ctx , constant . WorkingGroup )
if err != nil {
2023-03-07 17:59:34 +08:00
return err
2023-03-03 17:42:26 +08:00
}
for _ , groupID := range groupIDs {
2023-05-06 10:42:11 +08:00
maxSeq , err := c . msgDatabase . GetMaxSeq ( ctx , groupID )
2023-03-03 17:42:26 +08:00
if err != nil {
fmt . Println ( "GetGroupMaxSeq failed" , groupID )
continue
}
userIDs , err := c . groupDatabase . FindGroupMemberUserID ( ctx , groupID )
if err != nil {
fmt . Println ( "get groupID" , groupID , "failed, try again later" )
continue
}
for _ , userID := range userIDs {
2023-05-06 10:42:11 +08:00
userMinSeq , err := c . msgDatabase . GetMinSeq ( ctx , groupID )
2023-03-03 17:42:26 +08:00
if err != nil && err != redis . Nil {
fmt . Println ( "GetGroupUserMinSeq failed" , groupID , userID )
continue
}
if userMinSeq > maxSeq {
2023-05-06 10:42:11 +08:00
if err = c . msgDatabase . SetMinSeq ( ctx , groupID , maxSeq ) ; err != nil {
2023-03-03 17:42:26 +08:00
fmt . Println ( "SetGroupUserMinSeq failed" , err . Error ( ) , groupID , userID , maxSeq )
}
fmt . Println ( "fix" , groupID , userID , maxSeq , userMinSeq )
}
}
}
fmt . Println ( "fix all seq finished" )
2023-03-07 17:59:34 +08:00
return nil
2023-03-03 17:42:26 +08:00
}