2022-08-09 16:38:33 +08:00
package cronTask
2022-08-08 11:30:10 +08:00
import (
"Open_IM/pkg/common/config"
2022-08-10 19:31:57 +08:00
"Open_IM/pkg/common/constant"
2022-08-08 11:30:10 +08:00
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
2022-08-10 19:31:57 +08:00
"math"
2022-11-03 19:50:06 +08:00
"strconv"
"strings"
2022-12-21 16:46:16 +08:00
goRedis "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
2022-08-08 11:30:10 +08:00
)
const oldestList = 0
const newestList = - 1
2022-08-10 12:02:50 +08:00
func ResetUserGroupMinSeq ( operationID , groupID string , userIDList [ ] string ) error {
2022-11-04 16:18:45 +08:00
var delStruct delMsgRecursionStruct
minSeq , err := deleteMongoMsg ( operationID , groupID , oldestList , & delStruct )
2022-08-10 12:02:50 +08:00
if err != nil {
log . NewError ( operationID , utils . GetSelfFuncName ( ) , groupID , "deleteMongoMsg failed" )
}
2022-08-11 12:09:44 +08:00
if minSeq == 0 {
return nil
}
2022-11-04 16:18:45 +08:00
log . NewDebug ( operationID , utils . GetSelfFuncName ( ) , "delMsgIDList:" , delStruct , "minSeq" , minSeq )
2022-08-10 12:02:50 +08:00
for _ , userID := range userIDList {
userMinSeq , err := db . DB . GetGroupUserMinSeq ( groupID , userID )
2022-08-11 12:09:44 +08:00
if err != nil && err != goRedis . Nil {
2022-08-10 12:02:50 +08:00
log . NewError ( operationID , utils . GetSelfFuncName ( ) , "GetGroupUserMinSeq failed" , groupID , userID , err . Error ( ) )
continue
}
if userMinSeq > uint64 ( minSeq ) {
err = db . DB . SetGroupUserMinSeq ( groupID , userID , userMinSeq )
} else {
err = db . DB . SetGroupUserMinSeq ( groupID , userID , uint64 ( minSeq ) )
}
if err != nil {
log . NewError ( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) , groupID , userID , userMinSeq , minSeq )
}
}
2022-08-09 18:48:11 +08:00
return nil
}
func DeleteMongoMsgAndResetRedisSeq ( operationID , userID string ) error {
2022-11-04 16:18:45 +08:00
var delStruct delMsgRecursionStruct
minSeq , err := deleteMongoMsg ( operationID , userID , oldestList , & delStruct )
2022-08-08 11:30:10 +08:00
if err != nil {
return utils . Wrap ( err , "" )
}
2022-08-11 12:09:44 +08:00
if minSeq == 0 {
return nil
}
2022-12-02 16:40:11 +08:00
log . NewDebug ( operationID , utils . GetSelfFuncName ( ) , "delMsgIDStruct: " , delStruct , "minSeq" , minSeq )
2022-08-09 18:48:11 +08:00
err = db . DB . SetUserMinSeq ( userID , minSeq )
2022-11-03 18:31:29 +08:00
return utils . Wrap ( err , "" )
2022-08-08 11:30:10 +08:00
}
2022-11-03 18:31:29 +08:00
// del list
2022-11-04 16:18:45 +08:00
func delMongoMsgsPhysical ( uidList [ ] string ) error {
if len ( uidList ) > 0 {
err := db . DB . DelMongoMsgs ( uidList )
2022-08-10 15:14:51 +08:00
if err != nil {
return utils . Wrap ( err , "DelMongoMsgs failed" )
}
}
return nil
}
2022-11-04 16:18:45 +08:00
type delMsgRecursionStruct struct {
minSeq uint32
delUidList [ ] string
}
func ( d * delMsgRecursionStruct ) getSetMinSeq ( ) uint32 {
return d . minSeq
}
2022-11-03 18:31:29 +08:00
// index 0....19(del) 20...69
// seq 70
// set minSeq 21
2022-12-01 19:34:54 +08:00
// recursion 删除list并且返回设置的最小seq
2022-11-04 16:18:45 +08:00
func deleteMongoMsg ( operationID string , ID string , index int64 , delStruct * delMsgRecursionStruct ) ( uint32 , error ) {
2022-11-03 18:31:29 +08:00
// find from oldest list
2022-08-08 11:30:10 +08:00
msgs , err := db . DB . GetUserMsgListByIndex ( ID , index )
2022-08-10 15:14:51 +08:00
if err != nil || msgs . UID == "" {
if err != nil {
2022-11-02 11:17:20 +08:00
if err == db . ErrMsgListNotExist {
2022-11-04 16:18:45 +08:00
log . NewInfo ( operationID , utils . GetSelfFuncName ( ) , "ID:" , ID , "index:" , index , err . Error ( ) )
2022-11-02 11:17:20 +08:00
} else {
log . NewError ( operationID , utils . GetSelfFuncName ( ) , "GetUserMsgListByIndex failed" , err . Error ( ) , index , ID )
}
2022-08-10 15:14:51 +08:00
}
2022-11-03 18:31:29 +08:00
// 获取报错,或者获取不到了,物理删除并且返回seq
2022-11-04 16:18:45 +08:00
err = delMongoMsgsPhysical ( delStruct . delUidList )
2022-11-03 18:31:29 +08:00
if err != nil {
return 0 , err
}
2022-12-26 10:25:42 +08:00
return delStruct . getSetMinSeq ( ) + 1 , nil
2022-11-01 19:39:56 +08:00
}
2022-11-02 11:17:20 +08:00
log . NewDebug ( operationID , "ID:" , ID , "index:" , index , "uid:" , msgs . UID , "len:" , len ( msgs . Msg ) )
2022-08-10 12:02:50 +08:00
if len ( msgs . Msg ) > db . GetSingleGocMsgNum ( ) {
log . NewWarn ( operationID , utils . GetSelfFuncName ( ) , "msgs too large" , len ( msgs . Msg ) , msgs . UID )
}
2022-12-26 10:25:42 +08:00
if msgs . Msg [ len ( msgs . Msg ) - 1 ] . SendTime + ( int64 ( config . Config . Mongo . DBRetainChatRecords ) * 24 * 60 * 60 * 1000 ) > utils . GetCurrentTimestampByMill ( ) && msgListIsFull ( msgs ) {
2022-11-04 16:18:45 +08:00
delStruct . delUidList = append ( delStruct . delUidList , msgs . UID )
2022-12-26 10:25:42 +08:00
lastMsgPb := & server_api_params . MsgData { }
err = proto . Unmarshal ( msgs . Msg [ len ( msgs . Msg ) - 1 ] . Msg , lastMsgPb )
if err != nil {
log . NewError ( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) , len ( msgs . Msg ) - 1 , msgs . UID )
return 0 , utils . Wrap ( err , "proto.Unmarshal failed" )
}
delStruct . minSeq = lastMsgPb . Seq
2022-12-01 19:34:54 +08:00
} else {
2022-12-26 10:25:42 +08:00
var hasMarkDelFlag bool
2022-12-28 11:02:14 +08:00
for index , msg := range msgs . Msg {
2022-12-26 10:25:42 +08:00
msgPb := & server_api_params . MsgData { }
err = proto . Unmarshal ( msg . Msg , msgPb )
2022-12-01 19:34:54 +08:00
if err != nil {
2022-12-26 10:25:42 +08:00
log . NewError ( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) , len ( msgs . Msg ) - 1 , msgs . UID )
return 0 , utils . Wrap ( err , "proto.Unmarshal failed" )
}
if utils . GetCurrentTimestampByMill ( ) > msg . SendTime + ( int64 ( config . Config . Mongo . DBRetainChatRecords ) * 24 * 60 * 60 * 1000 ) {
msgPb . Status = constant . MsgDeleted
bytes , _ := proto . Marshal ( msgPb )
msg . Msg = bytes
msg . SendTime = 0
hasMarkDelFlag = true
} else {
if err := delMongoMsgsPhysical ( delStruct . delUidList ) ; err != nil {
return 0 , err
2022-12-01 19:34:54 +08:00
}
2022-12-26 10:25:42 +08:00
if hasMarkDelFlag {
2022-12-28 11:33:49 +08:00
log . NewInfo ( operationID , ID , "hasMarkDelFlag" , "index:" , index , "msgPb:" , msgPb , msgs . UID )
2022-12-26 10:25:42 +08:00
if err := db . DB . UpdateOneMsgList ( msgs ) ; err != nil {
return delStruct . getSetMinSeq ( ) , utils . Wrap ( err , "" )
}
}
2022-12-28 11:15:15 +08:00
return msgPb . Seq , nil
2022-12-01 19:34:54 +08:00
}
}
2022-08-10 12:02:50 +08:00
}
2022-12-01 19:34:54 +08:00
log . NewDebug ( operationID , ID , "continue to" , delStruct )
2022-11-03 18:31:29 +08:00
// 继续递归 index+1
2022-11-04 16:18:45 +08:00
seq , err := deleteMongoMsg ( operationID , ID , index + 1 , delStruct )
2022-12-01 19:34:54 +08:00
return seq , utils . Wrap ( err , "deleteMongoMsg failed" )
2022-08-08 11:30:10 +08:00
}
2022-11-03 19:50:06 +08:00
func msgListIsFull ( chat * db . UserChat ) bool {
index , _ := strconv . Atoi ( strings . Split ( chat . UID , ":" ) [ 1 ] )
if index == 0 {
if len ( chat . Msg ) >= 4999 {
return true
}
}
if len ( chat . Msg ) >= 5000 {
return true
}
return false
}
2022-08-10 19:31:57 +08:00
func checkMaxSeqWithMongo ( operationID , ID string , diffusionType int ) error {
2022-11-03 19:19:43 +08:00
var seqRedis uint64
2022-08-10 19:31:57 +08:00
var err error
if diffusionType == constant . WriteDiffusion {
2022-11-03 19:19:43 +08:00
seqRedis , err = db . DB . GetUserMaxSeq ( ID )
2022-08-10 19:31:57 +08:00
} else {
2022-11-03 19:19:43 +08:00
seqRedis , err = db . DB . GetGroupMaxSeq ( ID )
2022-08-10 19:31:57 +08:00
}
if err != nil {
2022-08-11 12:09:44 +08:00
if err == goRedis . Nil {
return nil
}
2022-08-10 19:31:57 +08:00
return utils . Wrap ( err , "GetUserMaxSeq failed" )
}
msg , err := db . DB . GetNewestMsg ( ID )
if err != nil {
return utils . Wrap ( err , "GetNewestMsg failed" )
}
2022-11-03 19:26:41 +08:00
if msg == nil {
return nil
}
2022-11-29 18:08:47 +08:00
if math . Abs ( float64 ( msg . Seq - uint32 ( seqRedis ) ) ) > 10 {
log . NewWarn ( operationID , utils . GetSelfFuncName ( ) , "seqMongo, seqRedis" , msg . Seq , seqRedis , ID , "redis maxSeq is different with msg.Seq > 10" , "status: " , msg . Status , msg . SendTime )
2022-11-02 11:17:20 +08:00
} else {
2022-11-29 18:08:47 +08:00
log . NewInfo ( operationID , utils . GetSelfFuncName ( ) , "seqMongo, seqRedis" , msg . Seq , seqRedis , ID , "seq and msg OK" , "status:" , msg . Status , msg . SendTime )
2022-08-10 19:31:57 +08:00
}
return nil
}