2023-02-09 20:36:34 +08:00
package msgtransfer
2022-06-16 20:35:27 +08:00
import (
2023-02-23 19:15:30 +08:00
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/controller"
kfk "OpenIM/pkg/common/kafka"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tracelog"
pbMsg "OpenIM/pkg/proto/msg"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
2023-02-15 15:52:32 +08:00
"context"
2022-06-16 20:35:27 +08:00
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup * kfk . MConsumerGroup
2023-02-23 17:28:57 +08:00
msgDatabase controller . MsgDatabase
2022-06-16 20:35:27 +08:00
}
2023-03-03 17:42:26 +08:00
func NewOnlineHistoryMongoConsumerHandler ( database controller . MsgDatabase ) * OnlineHistoryMongoConsumerHandler {
mc := & OnlineHistoryMongoConsumerHandler {
historyConsumerGroup : kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . MsgToMongo . Topic } ,
2023-03-08 16:35:18 +08:00
config . Config . Kafka . MsgToMongo . Addr , config . Config . Kafka . ConsumerGroupID . MsgToMongo ) ,
2023-03-03 17:42:26 +08:00
msgDatabase : database ,
}
return mc
2022-06-16 20:35:27 +08:00
}
2023-03-03 17:42:26 +08:00
func ( mc * OnlineHistoryMongoConsumerHandler ) handleChatWs2Mongo ( ctx context . Context , cMsg * sarama . ConsumerMessage , msgKey string , session sarama . ConsumerGroupSession ) {
2022-06-16 20:35:27 +08:00
msg := cMsg . Value
msgFromMQ := pbMsg . MsgDataToMongoByMQ { }
2023-03-03 17:42:26 +08:00
operationID := tracelog . GetOperationID ( ctx )
2022-06-16 20:35:27 +08:00
err := proto . Unmarshal ( msg , & msgFromMQ )
if err != nil {
log . Error ( "msg_transfer Unmarshal msg err" , "" , "msg" , string ( msg ) , "err" , err . Error ( ) )
return
}
2023-03-03 17:42:26 +08:00
log . Info ( operationID , "BatchInsertChat2DB userID: " , msgFromMQ . AggregationID , "msgFromMQ.LastSeq: " , msgFromMQ . LastSeq )
2023-02-15 15:52:32 +08:00
//err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
2023-03-03 17:42:26 +08:00
err = mc . msgDatabase . BatchInsertChat2DB ( ctx , msgFromMQ . AggregationID , msgFromMQ . Messages , msgFromMQ . LastSeq )
2022-06-16 20:35:27 +08:00
if err != nil {
2023-03-03 17:42:26 +08:00
log . NewError ( operationID , "single data insert to mongo err" , err . Error ( ) , msgFromMQ . Messages , msgFromMQ . AggregationID , msgFromMQ . TriggerID )
2023-02-15 15:52:32 +08:00
}
//err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID())
2023-03-03 17:42:26 +08:00
err = mc . msgDatabase . DeleteMessageFromCache ( ctx , msgFromMQ . AggregationID , msgFromMQ . Messages )
2023-02-15 15:52:32 +08:00
if err != nil {
2023-03-03 17:42:26 +08:00
log . NewError ( operationID , "remove cache msg from redis err" , err . Error ( ) , msgFromMQ . Messages , msgFromMQ . AggregationID , msgFromMQ . TriggerID )
2022-06-16 20:35:27 +08:00
}
2023-03-03 17:42:26 +08:00
for _ , v := range msgFromMQ . Messages {
2022-06-16 20:35:27 +08:00
if v . MsgData . ContentType == constant . DeleteMessageNotification {
2023-02-09 20:36:34 +08:00
tips := sdkws . TipsComm { }
DeleteMessageTips := sdkws . DeleteMessageTips { }
2022-06-16 20:35:27 +08:00
err := proto . Unmarshal ( v . MsgData . Content , & tips )
if err != nil {
2023-03-03 17:42:26 +08:00
log . NewError ( operationID , "tips unmarshal err:" , err . Error ( ) , v . String ( ) )
2022-06-16 20:35:27 +08:00
continue
}
err = proto . Unmarshal ( tips . Detail , & DeleteMessageTips )
if err != nil {
2023-03-03 17:42:26 +08:00
log . NewError ( operationID , "deleteMessageTips unmarshal err:" , err . Error ( ) , v . String ( ) )
2022-06-16 20:35:27 +08:00
continue
}
2023-02-23 17:28:57 +08:00
if totalUnExistSeqs , err := mc . msgDatabase . DelMsgBySeqs ( ctx , DeleteMessageTips . UserID , DeleteMessageTips . Seqs ) ; err != nil {
2023-03-03 17:42:26 +08:00
log . NewError ( operationID , utils . GetSelfFuncName ( ) , "DelMsgBySeqs args: " , DeleteMessageTips . UserID , DeleteMessageTips . Seqs , "error:" , err . Error ( ) , "totalUnExistSeqs: " , totalUnExistSeqs )
2022-06-16 20:35:27 +08:00
}
}
}
}
func ( OnlineHistoryMongoConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( OnlineHistoryMongoConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
2023-03-03 17:42:26 +08:00
func ( mc * OnlineHistoryMongoConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
2022-06-16 20:35:27 +08:00
log . NewDebug ( "" , "online new session msg come" , claim . HighWaterMarkOffset ( ) , claim . Topic ( ) , claim . Partition ( ) )
for msg := range claim . Messages ( ) {
log . NewDebug ( "" , "kafka get info to mongo" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "msg" , string ( msg . Value ) , "key" , string ( msg . Key ) )
if len ( msg . Value ) != 0 {
2023-03-09 16:36:47 +08:00
ctx := mc . historyConsumerGroup . GetContextFromMsg ( msg , "mongoDB consumer" )
2023-03-03 17:42:26 +08:00
mc . handleChatWs2Mongo ( ctx , msg , string ( msg . Key ) , sess )
2022-06-16 20:35:27 +08:00
} else {
log . Error ( "" , "mongo msg get from kafka but is nil" , msg . Key )
}
sess . MarkMessage ( msg , "" )
}
return nil
}