2023-02-09 20:36:34 +08:00
package msgtransfer
2021-05-26 19:24:25 +08:00
import (
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2023-03-21 12:28:21 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
2023-03-16 10:46:06 +08:00
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2021-05-26 19:24:25 +08:00
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
2022-05-25 20:29:32 +08:00
"sync"
2022-04-28 16:47:46 +08:00
"time"
2021-05-26 19:24:25 +08:00
)
2023-02-15 15:52:32 +08:00
const ConsumerMsgs = 3
const AggregationMessages = 4
const MongoMessages = 5
const ChannelNum = 100
2022-05-19 12:25:46 +08:00
type MsgChannelValue struct {
2022-05-28 18:10:08 +08:00
aggregationID string //maybe userID or super groupID
triggerID string
msgList [ ] * pbMsg . MsgDataToMQ
lastSeq uint64
2022-05-20 14:42:49 +08:00
}
2023-02-15 15:52:32 +08:00
2022-05-20 14:42:49 +08:00
type TriggerChannelValue struct {
triggerID string
2023-02-15 15:52:32 +08:00
cMsgList [ ] * sarama . ConsumerMessage
2022-05-19 12:25:46 +08:00
}
2023-02-15 15:52:32 +08:00
2022-05-11 18:33:48 +08:00
type Cmd2Value struct {
2022-05-11 20:49:47 +08:00
Cmd int
2022-05-11 18:33:48 +08:00
Value interface { }
}
2023-02-15 15:52:32 +08:00
2022-06-16 20:35:27 +08:00
type OnlineHistoryRedisConsumerHandler struct {
2023-02-15 15:52:32 +08:00
historyConsumerGroup * kafka . MConsumerGroup
2022-05-20 13:33:38 +08:00
chArrays [ ChannelNum ] chan Cmd2Value
2022-05-25 18:46:53 +08:00
msgDistributionCh chan Cmd2Value
2023-02-15 15:52:32 +08:00
singleMsgSuccessCount uint64
singleMsgFailedCount uint64
singleMsgSuccessCountMutex sync . Mutex
singleMsgFailedCountMutex sync . Mutex
2023-03-13 15:39:47 +08:00
//producerToPush *kafka.Producer
//producerToModify *kafka.Producer
//producerToMongo *kafka.Producer
2023-02-15 15:52:32 +08:00
2023-02-23 17:28:57 +08:00
msgDatabase controller . MsgDatabase
2021-05-26 19:24:25 +08:00
}
2023-03-03 17:42:26 +08:00
func NewOnlineHistoryRedisConsumerHandler ( database controller . MsgDatabase ) * OnlineHistoryRedisConsumerHandler {
var och OnlineHistoryRedisConsumerHandler
och . msgDatabase = database
2022-05-25 18:46:53 +08:00
och . msgDistributionCh = make ( chan Cmd2Value ) //no buffer channel
go och . MessagesDistributionHandle ( )
2022-05-20 15:51:37 +08:00
for i := 0 ; i < ChannelNum ; i ++ {
2022-05-25 22:42:44 +08:00
och . chArrays [ i ] = make ( chan Cmd2Value , 50 )
2022-05-20 15:51:37 +08:00
go och . Run ( i )
}
2023-03-13 15:39:47 +08:00
//och.producerToPush = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
//och.producerToModify = kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic)
//och.producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
2023-02-15 15:52:32 +08:00
och . historyConsumerGroup = kafka . NewMConsumerGroup ( & kafka . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
2021-05-26 19:24:25 +08:00
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . Ws2mschat . Topic } ,
2022-06-16 20:35:27 +08:00
config . Config . Kafka . Ws2mschat . Addr , config . Config . Kafka . ConsumerGroupID . MsgToRedis )
2023-03-16 10:20:40 +08:00
//statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
2023-03-03 17:42:26 +08:00
return & och
2022-05-11 18:33:48 +08:00
}
2023-02-15 15:52:32 +08:00
2022-06-16 20:35:27 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) Run ( channelID int ) {
2022-05-19 12:25:46 +08:00
for {
select {
2022-05-20 13:33:38 +08:00
case cmd := <- och . chArrays [ channelID ] :
2022-05-19 12:25:46 +08:00
switch cmd . Cmd {
2022-05-28 18:10:08 +08:00
case AggregationMessages :
2022-05-19 12:25:46 +08:00
msgChannelValue := cmd . Value . ( MsgChannelValue )
2022-05-20 13:33:38 +08:00
msgList := msgChannelValue . msgList
2022-05-20 14:42:49 +08:00
triggerID := msgChannelValue . triggerID
2022-05-20 16:07:32 +08:00
storageMsgList := make ( [ ] * pbMsg . MsgDataToMQ , 0 , 80 )
2022-05-28 18:10:08 +08:00
notStoragePushMsgList := make ( [ ] * pbMsg . MsgDataToMQ , 0 , 80 )
log . Debug ( triggerID , "msg arrived channel" , "channel id" , channelID , msgList , msgChannelValue . aggregationID , len ( msgList ) )
2022-12-12 19:22:50 +08:00
var modifyMsgList [ ] * pbMsg . MsgDataToMQ
2023-03-21 12:28:21 +08:00
ctx := mcontext . NewCtx ( "redis consumer" )
mcontext . SetOperationID ( ctx , triggerID )
2022-05-20 13:33:38 +08:00
for _ , v := range msgList {
2022-05-20 14:42:49 +08:00
log . Debug ( triggerID , "msg come to storage center" , v . String ( ) )
2022-05-20 13:33:38 +08:00
isHistory := utils . GetSwitchFromOptions ( v . MsgData . Options , constant . IsHistory )
isSenderSync := utils . GetSwitchFromOptions ( v . MsgData . Options , constant . IsSenderSync )
if isHistory {
storageMsgList = append ( storageMsgList , v )
2022-05-21 15:04:42 +08:00
//log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
2022-05-21 15:03:26 +08:00
} else {
2022-05-28 18:10:08 +08:00
if ! ( ! isSenderSync && msgChannelValue . aggregationID == v . MsgData . SendID ) {
notStoragePushMsgList = append ( notStoragePushMsgList , v )
2022-05-21 15:03:26 +08:00
}
2022-05-20 13:33:38 +08:00
}
2022-12-12 19:22:50 +08:00
if v . MsgData . ContentType == constant . ReactionMessageModifier || v . MsgData . ContentType == constant . ReactionMessageDeleter {
modifyMsgList = append ( modifyMsgList , v )
}
}
if len ( modifyMsgList ) > 0 {
2023-03-13 15:39:47 +08:00
och . msgDatabase . MsgToModifyMQ ( ctx , msgChannelValue . aggregationID , triggerID , modifyMsgList )
2022-05-20 13:33:38 +08:00
}
2022-05-28 18:10:08 +08:00
log . Debug ( triggerID , "msg storage length" , len ( storageMsgList ) , "push length" , len ( notStoragePushMsgList ) )
2022-07-25 12:53:49 +08:00
if len ( storageMsgList ) > 0 {
2023-02-23 17:28:57 +08:00
lastSeq , err := och . msgDatabase . BatchInsertChat2Cache ( ctx , msgChannelValue . aggregationID , storageMsgList )
2022-07-25 12:53:49 +08:00
if err != nil {
2023-03-02 12:07:32 +08:00
log . NewError ( triggerID , "single data insert to redis err" , err . Error ( ) , storageMsgList )
2023-02-15 15:52:32 +08:00
och . singleMsgFailedCountMutex . Lock ( )
och . singleMsgFailedCount += uint64 ( len ( storageMsgList ) )
och . singleMsgFailedCountMutex . Unlock ( )
2022-07-25 12:53:49 +08:00
} else {
2023-02-15 15:52:32 +08:00
och . singleMsgSuccessCountMutex . Lock ( )
och . singleMsgSuccessCount += uint64 ( len ( storageMsgList ) )
och . singleMsgSuccessCountMutex . Unlock ( )
2023-03-13 15:39:47 +08:00
och . msgDatabase . MsgToMongoMQ ( ctx , msgChannelValue . aggregationID , triggerID , storageMsgList , lastSeq )
2022-07-26 15:52:38 +08:00
for _ , v := range storageMsgList {
2023-03-13 15:39:47 +08:00
och . msgDatabase . MsgToPushMQ ( ctx , msgChannelValue . aggregationID , v )
2022-07-26 15:52:38 +08:00
}
2023-03-13 15:39:47 +08:00
for _ , v := range notStoragePushMsgList {
och . msgDatabase . MsgToPushMQ ( ctx , msgChannelValue . aggregationID , v )
2022-07-26 15:52:38 +08:00
}
2022-07-25 12:53:49 +08:00
}
2022-05-19 12:25:46 +08:00
} else {
2023-02-15 15:52:32 +08:00
for _ , v := range notStoragePushMsgList {
2023-03-13 15:39:47 +08:00
och . msgDatabase . MsgToPushMQ ( ctx , msgChannelValue . aggregationID , v )
2022-07-26 15:52:38 +08:00
}
2022-05-19 12:25:46 +08:00
}
}
}
}
}
2022-07-26 15:52:38 +08:00
2022-06-16 20:35:27 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) MessagesDistributionHandle ( ) {
2022-05-25 18:46:53 +08:00
for {
2022-05-28 18:10:08 +08:00
aggregationMsgs := make ( map [ string ] [ ] * pbMsg . MsgDataToMQ , ChannelNum )
2022-05-25 18:46:53 +08:00
select {
case cmd := <- och . msgDistributionCh :
switch cmd . Cmd {
case ConsumerMsgs :
triggerChannelValue := cmd . Value . ( TriggerChannelValue )
triggerID := triggerChannelValue . triggerID
2023-02-15 15:52:32 +08:00
consumerMessages := triggerChannelValue . cMsgList
2022-05-25 18:46:53 +08:00
//Aggregation map[userid]message list
log . Debug ( triggerID , "batch messages come to distribution center" , len ( consumerMessages ) )
for i := 0 ; i < len ( consumerMessages ) ; i ++ {
msgFromMQ := pbMsg . MsgDataToMQ { }
err := proto . Unmarshal ( consumerMessages [ i ] . Value , & msgFromMQ )
if err != nil {
log . Error ( triggerID , "msg_transfer Unmarshal msg err" , "msg" , string ( consumerMessages [ i ] . Value ) , "err" , err . Error ( ) )
return
}
log . Debug ( triggerID , "single msg come to distribution center" , msgFromMQ . String ( ) , string ( consumerMessages [ i ] . Key ) )
2022-05-28 18:10:08 +08:00
if oldM , ok := aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] ; ok {
2022-05-25 18:46:53 +08:00
oldM = append ( oldM , & msgFromMQ )
2022-05-28 18:10:08 +08:00
aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = oldM
2022-05-25 18:46:53 +08:00
} else {
m := make ( [ ] * pbMsg . MsgDataToMQ , 0 , 100 )
m = append ( m , & msgFromMQ )
2022-05-28 18:10:08 +08:00
aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = m
2022-05-25 18:46:53 +08:00
}
}
2022-05-28 18:10:08 +08:00
log . Debug ( triggerID , "generate map list users len" , len ( aggregationMsgs ) )
for aggregationID , v := range aggregationMsgs {
2022-05-25 18:46:53 +08:00
if len ( v ) >= 0 {
2023-02-15 15:52:32 +08:00
hashCode := utils . GetHashCode ( aggregationID )
2022-05-25 18:46:53 +08:00
channelID := hashCode % ChannelNum
2022-05-28 18:10:08 +08:00
log . Debug ( triggerID , "generate channelID" , hashCode , channelID , aggregationID )
och . chArrays [ channelID ] <- Cmd2Value { Cmd : AggregationMessages , Value : MsgChannelValue { aggregationID : aggregationID , msgList : v , triggerID : triggerID } }
2022-05-25 18:46:53 +08:00
}
}
}
}
2022-05-19 12:25:46 +08:00
}
2021-05-26 19:24:25 +08:00
}
2022-06-16 20:35:27 +08:00
func ( OnlineHistoryRedisConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( OnlineHistoryRedisConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
2022-05-20 13:33:38 +08:00
2023-02-15 15:52:32 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
2022-05-22 21:08:57 +08:00
for {
if sess == nil {
log . NewWarn ( "" , " sess == nil, waiting " )
time . Sleep ( 100 * time . Millisecond )
} else {
break
}
2022-05-21 10:19:02 +08:00
}
2022-05-25 20:29:32 +08:00
rwLock := new ( sync . RWMutex )
2022-05-11 19:31:32 +08:00
log . NewDebug ( "" , "online new session msg come" , claim . HighWaterMarkOffset ( ) , claim . Topic ( ) , claim . Partition ( ) )
2022-05-25 18:46:53 +08:00
cMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
t := time . NewTicker ( time . Duration ( 100 ) * time . Millisecond )
2022-05-20 14:42:49 +08:00
var triggerID string
2022-05-25 20:29:32 +08:00
go func ( ) {
2022-05-25 21:15:17 +08:00
for {
select {
case <- t . C :
if len ( cMsg ) > 0 {
rwLock . Lock ( )
ccMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
for _ , v := range cMsg {
ccMsg = append ( ccMsg , v )
}
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
rwLock . Unlock ( )
split := 1000
triggerID = utils . OperationIDGenerator ( )
2022-08-12 21:48:13 +08:00
log . Debug ( triggerID , "timer trigger msg consumer start" , len ( ccMsg ) )
2022-05-25 21:15:17 +08:00
for i := 0 ; i < len ( ccMsg ) / split ; i ++ {
//log.Debug()
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
2023-02-15 15:52:32 +08:00
triggerID : triggerID , cMsgList : ccMsg [ i * split : ( i + 1 ) * split ] } }
2022-05-25 21:15:17 +08:00
}
if ( len ( ccMsg ) % split ) > 0 {
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
2023-02-15 15:52:32 +08:00
triggerID : triggerID , cMsgList : ccMsg [ split * ( len ( ccMsg ) / split ) : ] } }
2022-05-25 21:15:17 +08:00
}
2022-08-12 21:48:13 +08:00
log . Debug ( triggerID , "timer trigger msg consumer end" , len ( cMsg ) )
2022-05-25 21:03:48 +08:00
}
2022-05-25 18:46:53 +08:00
}
}
2022-05-25 20:29:32 +08:00
} ( )
2022-05-25 21:23:10 +08:00
for msg := range claim . Messages ( ) {
rwLock . Lock ( )
2022-06-08 17:11:17 +08:00
if len ( msg . Value ) != 0 {
cMsg = append ( cMsg , msg )
}
2022-05-25 21:23:10 +08:00
rwLock . Unlock ( )
sess . MarkMessage ( msg , "" )
}
2021-05-26 19:24:25 +08:00
return nil
}