2023-02-09 20:36:34 +08:00
package msgtransfer
2022-06-16 20:35:27 +08:00
import (
2023-02-15 15:52:32 +08:00
"context"
2023-05-23 17:11:56 +08:00
"encoding/json"
2023-05-23 17:20:34 +08:00
"errors"
2023-05-04 15:53:58 +08:00
2023-04-28 18:39:21 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
2023-04-26 18:57:41 +08:00
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
2022-06-16 20:35:27 +08:00
"github.com/Shopify/sarama"
2023-04-28 18:33:33 +08:00
"google.golang.org/protobuf/proto"
2022-06-16 20:35:27 +08:00
)
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup * kfk . MConsumerGroup
2023-05-08 12:39:45 +08:00
msgDatabase controller . CommonMsgDatabase
2022-06-16 20:35:27 +08:00
}
2023-05-12 20:05:25 +08:00
func NewOnlineHistoryMongoConsumerHandler ( database controller . CommonMsgDatabase ) * OnlineHistoryMongoConsumerHandler {
2023-03-03 17:42:26 +08:00
mc := & OnlineHistoryMongoConsumerHandler {
historyConsumerGroup : kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . MsgToMongo . Topic } ,
2023-03-08 16:35:18 +08:00
config . Config . Kafka . MsgToMongo . Addr , config . Config . Kafka . ConsumerGroupID . MsgToMongo ) ,
2023-05-12 20:05:25 +08:00
msgDatabase : database ,
2023-03-03 17:42:26 +08:00
}
return mc
2022-06-16 20:35:27 +08:00
}
2023-03-03 17:42:26 +08:00
2023-05-22 10:08:04 +08:00
func ( mc * OnlineHistoryMongoConsumerHandler ) handleChatWs2Mongo ( ctx context . Context , cMsg * sarama . ConsumerMessage , key string , session sarama . ConsumerGroupSession ) {
2022-06-16 20:35:27 +08:00
msg := cMsg . Value
msgFromMQ := pbMsg . MsgDataToMongoByMQ { }
err := proto . Unmarshal ( msg , & msgFromMQ )
if err != nil {
2023-05-22 10:08:04 +08:00
log . ZError ( ctx , "unmarshall failed" , err , "key" , key , "len" , len ( msg ) )
2022-06-16 20:35:27 +08:00
return
}
2023-04-28 18:39:21 +08:00
if len ( msgFromMQ . MsgData ) == 0 {
2023-05-10 18:36:02 +08:00
log . ZError ( ctx , "msgFromMQ.MsgData is empty" , nil , "cMsg" , cMsg )
2023-04-28 18:39:21 +08:00
return
2022-06-16 20:35:27 +08:00
}
2023-05-10 18:36:02 +08:00
log . ZInfo ( ctx , "mongo consumer recv msg" , "msgs" , msgFromMQ . MsgData )
2023-05-12 20:05:25 +08:00
err = mc . msgDatabase . BatchInsertChat2DB ( ctx , msgFromMQ . ConversationID , msgFromMQ . MsgData , msgFromMQ . LastSeq )
if err != nil {
2023-05-19 19:43:43 +08:00
log . ZError ( ctx , "single data insert to mongo err" , err , "msg" , msgFromMQ . MsgData , "conversationID" , msgFromMQ . ConversationID )
2023-05-12 20:05:25 +08:00
}
err = mc . msgDatabase . DeleteMessageFromCache ( ctx , msgFromMQ . ConversationID , msgFromMQ . MsgData )
if err != nil {
2023-05-19 19:43:43 +08:00
log . ZError ( ctx , "remove cache msg from redis err" , err , "msg" , msgFromMQ . MsgData , "conversationID" , msgFromMQ . ConversationID )
2023-05-12 20:05:25 +08:00
}
for _ , v := range msgFromMQ . MsgData {
2023-05-23 17:11:56 +08:00
switch v . ContentType {
case constant . DeleteMessageNotification :
2023-05-12 20:05:25 +08:00
deleteMessageTips := sdkws . DeleteMessageTips { }
err := proto . Unmarshal ( v . Content , & deleteMessageTips )
if err != nil {
log . ZError ( ctx , "tips unmarshal err:" , err , "msg" , msg )
continue
2022-06-16 20:35:27 +08:00
}
2023-05-12 20:05:25 +08:00
if totalUnExistSeqs , err := mc . msgDatabase . DelMsgBySeqs ( ctx , deleteMessageTips . UserID , deleteMessageTips . Seqs ) ; err != nil {
log . ZError ( ctx , "DelMsgBySeqs" , err , "userIDs" , deleteMessageTips . UserID , "seqs" , deleteMessageTips . Seqs , "totalUnExistSeqs" , totalUnExistSeqs )
2023-05-23 17:11:56 +08:00
continue
}
case constant . MsgRevokeNotification :
var elem sdkws . NotificationElem
if err := json . Unmarshal ( v . Content , & elem ) ; err != nil {
log . ZError ( ctx , "json.Unmarshal NotificationElem" , err , "content" , string ( v . Content ) )
continue
}
var tips sdkws . RevokeMsgTips
if err := json . Unmarshal ( [ ] byte ( elem . Detail ) , & tips ) ; err != nil {
log . ZError ( ctx , "json.Unmarshal RevokeMsgTips" , err , "content" , string ( v . Content ) )
continue
}
2023-05-23 17:20:34 +08:00
msgs , err := mc . msgDatabase . GetMsgBySeqs ( ctx , tips . ConversationID , [ ] int64 { tips . Seq } )
if err != nil {
log . ZError ( ctx , "GetMsgBySeqs" , err , "conversationID" , tips . ConversationID , "seq" , tips . Seq )
continue
}
if len ( msgs ) == 0 {
log . ZError ( ctx , "GetMsgBySeqs empty" , errors . New ( "seq not found" ) , "conversationID" , tips . ConversationID , "seq" , tips . Seq )
continue
}
msgs [ 0 ] . Content = [ ] byte ( elem . Detail )
data , err := proto . Marshal ( msgs [ 0 ] )
2023-05-23 17:11:56 +08:00
if err != nil {
log . ZError ( ctx , "proto.Marshal MsgData" , err )
continue
}
if err := mc . msgDatabase . RevokeMsg ( ctx , tips . ConversationID , tips . Seq , data ) ; err != nil {
log . ZError ( ctx , "RevokeMsg" , err , "conversationID" , tips . ConversationID , "seq" , tips . Seq )
continue
2022-06-16 20:35:27 +08:00
}
}
}
}
func ( OnlineHistoryMongoConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( OnlineHistoryMongoConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
2023-03-03 17:42:26 +08:00
func ( mc * OnlineHistoryMongoConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
2023-05-10 18:18:30 +08:00
log . ZDebug ( context . Background ( ) , "online new session msg come" , "highWaterMarkOffset" ,
claim . HighWaterMarkOffset ( ) , "topic" , claim . Topic ( ) , "partition" , claim . Partition ( ) )
2022-06-16 20:35:27 +08:00
for msg := range claim . Messages ( ) {
2023-05-10 18:18:30 +08:00
ctx := mc . historyConsumerGroup . GetContextFromMsg ( msg )
2022-06-16 20:35:27 +08:00
if len ( msg . Value ) != 0 {
2023-03-03 17:42:26 +08:00
mc . handleChatWs2Mongo ( ctx , msg , string ( msg . Key ) , sess )
2022-06-16 20:35:27 +08:00
} else {
2023-05-10 18:36:02 +08:00
log . ZError ( ctx , "mongo msg get from kafka but is nil" , nil , "conversationID" , msg . Key )
2022-06-16 20:35:27 +08:00
}
sess . MarkMessage ( msg , "" )
}
return nil
}