2023-02-09 20:36:34 +08:00
package msgtransfer
2022-12-12 19:22:50 +08:00
import (
2023-02-23 19:15:30 +08:00
"OpenIM/pkg/apistruct"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/controller"
unRelationTb "OpenIM/pkg/common/db/table/unrelation"
kfk "OpenIM/pkg/common/kafka"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tracelog"
pbMsg "OpenIM/pkg/proto/msg"
"OpenIM/pkg/utils"
2023-02-15 15:52:32 +08:00
"context"
2022-12-12 19:22:50 +08:00
"encoding/json"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
type ModifyMsgConsumerHandler struct {
modifyMsgConsumerGroup * kfk . MConsumerGroup
2023-02-15 15:52:32 +08:00
2023-03-01 15:32:26 +08:00
extendMsgDatabase controller . ExtendMsgDatabase
2023-03-06 16:23:16 +08:00
extendSetMsgModel unRelationTb . ExtendMsgSetModel
2022-12-12 19:22:50 +08:00
}
2023-03-03 17:42:26 +08:00
func NewModifyMsgConsumerHandler ( database controller . ExtendMsgDatabase ) * ModifyMsgConsumerHandler {
return & ModifyMsgConsumerHandler {
modifyMsgConsumerGroup : kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . MsgToModify . Topic } ,
config . Config . Kafka . MsgToModify . Addr , config . Config . Kafka . ConsumerGroupID . MsgToModify ) ,
extendMsgDatabase : database ,
}
2022-12-12 19:22:50 +08:00
}
func ( ModifyMsgConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( ModifyMsgConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( mmc * ModifyMsgConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession ,
claim sarama . ConsumerGroupClaim ) error {
for msg := range claim . Messages ( ) {
log . NewDebug ( "" , "kafka get info to mysql" , "ModifyMsgConsumerHandler" , msg . Topic , "msgPartition" , msg . Partition , "msg" , string ( msg . Value ) , "key" , string ( msg . Key ) )
if len ( msg . Value ) != 0 {
2023-03-09 16:36:47 +08:00
ctx := mmc . modifyMsgConsumerGroup . GetContextFromMsg ( msg , "modify consumer" )
2023-03-03 17:42:26 +08:00
mmc . ModifyMsg ( ctx , msg , string ( msg . Key ) , sess )
2022-12-12 19:22:50 +08:00
} else {
log . Error ( "" , "msg get from kafka but is nil" , msg . Key )
}
sess . MarkMessage ( msg , "" )
}
return nil
}
2023-03-03 17:42:26 +08:00
func ( mmc * ModifyMsgConsumerHandler ) ModifyMsg ( ctx context . Context , cMsg * sarama . ConsumerMessage , msgKey string , _ sarama . ConsumerGroupSession ) {
2022-12-12 19:22:50 +08:00
log . NewInfo ( "msg come here ModifyMsg!!!" , "" , "msg" , string ( cMsg . Value ) , msgKey )
msgFromMQ := pbMsg . MsgDataToModifyByMQ { }
2023-03-03 17:42:26 +08:00
operationID := tracelog . GetOperationID ( ctx )
2022-12-12 19:22:50 +08:00
err := proto . Unmarshal ( cMsg . Value , & msgFromMQ )
if err != nil {
log . NewError ( msgFromMQ . TriggerID , "msg_transfer Unmarshal msg err" , "msg" , string ( cMsg . Value ) , "err" , err . Error ( ) )
return
}
log . Debug ( msgFromMQ . TriggerID , "proto.Unmarshal MsgDataToMQ" , msgFromMQ . String ( ) )
2023-03-03 17:42:26 +08:00
for _ , msgDataToMQ := range msgFromMQ . Messages {
2022-12-23 16:09:59 +08:00
isReactionFromCache := utils . GetSwitchFromOptions ( msgDataToMQ . MsgData . Options , constant . IsReactionFromCache )
if ! isReactionFromCache {
continue
}
2023-03-03 17:42:26 +08:00
tracelog . SetOperationID ( ctx , operationID )
2022-12-12 19:22:50 +08:00
if msgDataToMQ . MsgData . ContentType == constant . ReactionMessageModifier {
2023-02-09 16:11:18 +08:00
notification := & apistruct . ReactionMessageModifierNotification { }
2022-12-12 19:22:50 +08:00
if err := json . Unmarshal ( msgDataToMQ . MsgData . Content , notification ) ; err != nil {
continue
}
if notification . IsExternalExtensions {
2023-03-03 17:42:26 +08:00
log . NewInfo ( operationID , "msg:" , notification , "this is external extensions" )
2022-12-12 19:22:50 +08:00
continue
}
if ! notification . IsReact {
// first time to modify
2023-02-15 15:52:32 +08:00
var reactionExtensionList = make ( map [ string ] unRelationTb . KeyValueModel )
extendMsg := unRelationTb . ExtendMsgModel {
2022-12-12 19:22:50 +08:00
ReactionExtensionList : reactionExtensionList ,
ClientMsgID : notification . ClientMsgID ,
MsgFirstModifyTime : notification . MsgFirstModifyTime ,
}
2023-03-06 16:23:16 +08:00
for _ , v := range notification . SuccessReactionExtensions {
2023-02-15 15:52:32 +08:00
reactionExtensionList [ v . TypeKey ] = unRelationTb . KeyValueModel {
2022-12-12 19:22:50 +08:00
TypeKey : v . TypeKey ,
Value : v . Value ,
LatestUpdateTime : v . LatestUpdateTime ,
}
}
2023-03-01 15:32:26 +08:00
if err := mmc . extendMsgDatabase . InsertExtendMsg ( ctx , notification . SourceID , notification . SessionType , & extendMsg ) ; err != nil {
2023-03-03 17:42:26 +08:00
log . NewError ( operationID , "MsgFirstModify InsertExtendMsg failed" , notification . SourceID , notification . SessionType , extendMsg , err . Error ( ) )
2022-12-12 19:22:50 +08:00
continue
}
} else {
2023-03-06 16:23:16 +08:00
if err := mmc . extendMsgDatabase . InsertOrUpdateReactionExtendMsgSet ( ctx , notification . SourceID , notification . SessionType , notification . ClientMsgID , notification . MsgFirstModifyTime , mmc . extendSetMsgModel . Pb2Model ( notification . SuccessReactionExtensions ) ) ; err != nil {
2023-03-03 17:42:26 +08:00
log . NewError ( operationID , "InsertOrUpdateReactionExtendMsgSet failed" )
2022-12-12 19:22:50 +08:00
}
}
} else if msgDataToMQ . MsgData . ContentType == constant . ReactionMessageDeleter {
2023-02-09 16:11:18 +08:00
notification := & apistruct . ReactionMessageDeleteNotification { }
2022-12-12 19:22:50 +08:00
if err := json . Unmarshal ( msgDataToMQ . MsgData . Content , notification ) ; err != nil {
continue
}
2023-03-06 16:23:16 +08:00
if err := mmc . extendMsgDatabase . DeleteReactionExtendMsgSet ( ctx , notification . SourceID , notification . SessionType , notification . ClientMsgID , notification . MsgFirstModifyTime , mmc . extendSetMsgModel . Pb2Model ( notification . SuccessReactionExtensions ) ) ; err != nil {
2023-03-03 17:42:26 +08:00
log . NewError ( operationID , "InsertOrUpdateReactionExtendMsgSet failed" )
2022-12-12 19:22:50 +08:00
}
}
}
}