2021-05-26 19:24:25 +08:00
package logic
import (
2021-10-11 22:12:01 +08:00
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
2021-10-11 22:00:38 +08:00
pbMsg "Open_IM/pkg/proto/chat"
pbPush "Open_IM/pkg/proto/push"
2022-02-25 19:50:18 +08:00
"Open_IM/pkg/statistics"
2021-10-11 22:12:01 +08:00
"Open_IM/pkg/utils"
2021-05-26 19:24:25 +08:00
"context"
2022-05-09 18:23:06 +08:00
"fmt"
2021-05-26 19:24:25 +08:00
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"strings"
2022-04-28 16:47:46 +08:00
"time"
2021-05-26 19:24:25 +08:00
)
type fcb func ( msg [ ] byte , msgKey string )
type HistoryConsumerHandler struct {
2022-05-09 18:23:06 +08:00
msgHandle map [ string ] fcb
historyConsumerGroup * kfk . MConsumerGroup
singleMsgFailedCount uint64
singleMsgSuccessCount uint64
groupMsgCount uint64
2021-05-26 19:24:25 +08:00
}
func ( mc * HistoryConsumerHandler ) Init ( ) {
2022-05-09 18:23:06 +08:00
statistics . NewStatistics ( & mc . singleMsgSuccessCount , config . Config . ModuleName . MsgTransferName , fmt . Sprintf ( "%d second singleMsgCount insert to mongo" , constant . StatisticsTimeInterval ) , constant . StatisticsTimeInterval )
statistics . NewStatistics ( & mc . groupMsgCount , config . Config . ModuleName . MsgTransferName , fmt . Sprintf ( "%d second groupMsgCount insert to mongo" , constant . StatisticsTimeInterval ) , constant . StatisticsTimeInterval )
2022-02-25 19:50:18 +08:00
2021-05-26 19:24:25 +08:00
mc . msgHandle = make ( map [ string ] fcb )
mc . msgHandle [ config . Config . Kafka . Ws2mschat . Topic ] = mc . handleChatWs2Mongo
mc . historyConsumerGroup = kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V0_10_2_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . Ws2mschat . Topic } ,
config . Config . Kafka . Ws2mschat . Addr , config . Config . Kafka . ConsumerGroupID . MsgToMongo )
}
func ( mc * HistoryConsumerHandler ) handleChatWs2Mongo ( msg [ ] byte , msgKey string ) {
2022-04-28 16:47:46 +08:00
now := time . Now ( )
2021-12-23 17:34:32 +08:00
msgFromMQ := pbMsg . MsgDataToMQ { }
err := proto . Unmarshal ( msg , & msgFromMQ )
2021-05-26 19:24:25 +08:00
if err != nil {
2022-03-18 11:10:06 +08:00
log . Error ( "msg_transfer Unmarshal msg err" , "" , "msg" , string ( msg ) , "err" , err . Error ( ) )
2021-05-26 19:24:25 +08:00
return
}
2021-12-23 17:34:32 +08:00
operationID := msgFromMQ . OperationID
2022-03-18 11:10:06 +08:00
log . NewInfo ( operationID , "msg come mongo!!!" , "" , "msg" , string ( msg ) )
2021-05-26 19:24:25 +08:00
//Control whether to store offline messages (mongo)
2021-12-23 17:34:32 +08:00
isHistory := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsHistory )
2021-05-26 19:24:25 +08:00
//Control whether to store history messages (mysql)
2021-12-23 17:34:32 +08:00
isPersist := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsPersistent )
2022-02-15 14:31:55 +08:00
isSenderSync := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsSenderSync )
2021-12-23 17:34:32 +08:00
switch msgFromMQ . MsgData . SessionType {
2021-11-10 15:24:59 +08:00
case constant . SingleChatType :
2021-12-23 17:34:32 +08:00
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer msg type = SingleChatType" , isHistory , isPersist )
2021-05-26 19:24:25 +08:00
if isHistory {
2022-02-15 13:16:31 +08:00
err := saveUserChat ( msgKey , & msgFromMQ )
if err != nil {
2022-05-09 18:23:06 +08:00
mc . singleMsgFailedCount ++
2022-02-15 13:16:31 +08:00
log . NewError ( operationID , "single data insert to mongo err" , err . Error ( ) , msgFromMQ . String ( ) )
return
2021-05-26 19:24:25 +08:00
}
2022-05-09 18:23:06 +08:00
mc . singleMsgSuccessCount ++
2022-04-28 16:47:46 +08:00
log . NewDebug ( msgFromMQ . OperationID , "sendMessageToPush cost time " , time . Since ( now ) )
2021-05-26 19:24:25 +08:00
}
2022-02-15 14:31:55 +08:00
if ! isSenderSync && msgKey == msgFromMQ . MsgData . SendID {
} else {
go sendMessageToPush ( & msgFromMQ , msgKey )
}
2022-04-28 16:47:46 +08:00
log . NewDebug ( operationID , "saveUserChat cost time " , time . Since ( now ) )
2021-11-10 15:24:59 +08:00
case constant . GroupChatType :
2021-12-23 17:34:32 +08:00
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer msg type = GroupChatType" , isHistory , isPersist )
2021-08-06 14:56:41 +08:00
if isHistory {
2021-12-23 17:34:32 +08:00
err := saveUserChat ( msgFromMQ . MsgData . RecvID , & msgFromMQ )
2021-11-10 15:24:59 +08:00
if err != nil {
2021-12-23 17:34:32 +08:00
log . NewError ( operationID , "group data insert to mongo err" , msgFromMQ . String ( ) , msgFromMQ . MsgData . RecvID , err . Error ( ) )
2021-11-10 15:24:59 +08:00
return
}
2022-02-25 19:50:18 +08:00
mc . groupMsgCount ++
2021-08-06 14:56:41 +08:00
}
2022-02-08 17:12:02 +08:00
go sendMessageToPush ( & msgFromMQ , msgFromMQ . MsgData . RecvID )
2022-03-30 18:23:05 +08:00
case constant . NotificationChatType :
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer msg type = NotificationChatType" , isHistory , isPersist )
if isHistory {
err := saveUserChat ( msgKey , & msgFromMQ )
if err != nil {
log . NewError ( operationID , "single data insert to mongo err" , err . Error ( ) , msgFromMQ . String ( ) )
return
}
2022-04-28 16:47:46 +08:00
log . NewDebug ( msgFromMQ . OperationID , "sendMessageToPush cost time " , time . Since ( now ) )
2022-03-30 18:23:05 +08:00
}
if ! isSenderSync && msgKey == msgFromMQ . MsgData . SendID {
} else {
go sendMessageToPush ( & msgFromMQ , msgKey )
}
2022-04-28 16:47:46 +08:00
log . NewDebug ( operationID , "saveUserChat cost time " , time . Since ( now ) )
2021-11-10 15:24:59 +08:00
default :
2021-12-23 17:34:32 +08:00
log . NewError ( msgFromMQ . OperationID , "SessionType error" , msgFromMQ . String ( ) )
2021-11-10 15:24:59 +08:00
return
2021-05-26 19:24:25 +08:00
}
2021-12-23 17:34:32 +08:00
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer handle topic data to database success..." , msgFromMQ . String ( ) )
2021-05-26 19:24:25 +08:00
}
func ( HistoryConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( HistoryConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( mc * HistoryConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession ,
claim sarama . ConsumerGroupClaim ) error {
for msg := range claim . Messages ( ) {
2022-05-10 09:09:37 +08:00
log . NewDebug ( "" , "kafka get info to mongo" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "msg" , string ( msg . Value ) )
2021-05-26 19:24:25 +08:00
mc . msgHandle [ msg . Topic ] ( msg . Value , string ( msg . Key ) )
2021-09-13 15:48:19 +08:00
sess . MarkMessage ( msg , "" )
2021-05-26 19:24:25 +08:00
}
return nil
}
2022-02-08 17:12:02 +08:00
func sendMessageToPush ( message * pbMsg . MsgDataToMQ , pushToUserID string ) {
2022-03-18 11:10:06 +08:00
log . Info ( message . OperationID , "msg_transfer send message to push" , "message" , message . String ( ) )
2022-02-08 17:12:02 +08:00
rpcPushMsg := pbPush . PushMsgReq { OperationID : message . OperationID , MsgData : message . MsgData , PushToUserID : pushToUserID }
mqPushMsg := pbMsg . PushMsgDataToMQ { OperationID : message . OperationID , MsgData : message . MsgData , PushToUserID : pushToUserID }
2021-05-26 19:24:25 +08:00
grpcConn := getcdv3 . GetConn ( config . Config . Etcd . EtcdSchema , strings . Join ( config . Config . Etcd . EtcdAddr , "," ) , config . Config . RpcRegisterName . OpenImPushName )
if grpcConn == nil {
2022-03-18 11:10:06 +08:00
log . Error ( rpcPushMsg . OperationID , "rpc dial failed" , "push data" , rpcPushMsg . String ( ) )
2021-12-23 17:34:32 +08:00
pid , offset , err := producer . SendMessage ( & mqPushMsg )
2021-05-26 19:24:25 +08:00
if err != nil {
2022-03-18 11:10:06 +08:00
log . Error ( mqPushMsg . OperationID , "kafka send failed" , "send data" , message . String ( ) , "pid" , pid , "offset" , offset , "err" , err . Error ( ) )
2021-05-26 19:24:25 +08:00
}
return
}
msgClient := pbPush . NewPushMsgServiceClient ( grpcConn )
2021-12-23 17:34:32 +08:00
_ , err := msgClient . PushMsg ( context . Background ( ) , & rpcPushMsg )
2021-05-26 19:24:25 +08:00
if err != nil {
2022-03-18 11:10:06 +08:00
log . Error ( rpcPushMsg . OperationID , "rpc send failed" , rpcPushMsg . OperationID , "push data" , rpcPushMsg . String ( ) , "err" , err . Error ( ) )
2021-12-23 17:34:32 +08:00
pid , offset , err := producer . SendMessage ( & mqPushMsg )
2021-05-26 19:24:25 +08:00
if err != nil {
2022-03-18 11:10:06 +08:00
log . Error ( "kafka send failed" , mqPushMsg . OperationID , "send data" , mqPushMsg . String ( ) , "pid" , pid , "offset" , offset , "err" , err . Error ( ) )
2021-05-26 19:24:25 +08:00
}
} else {
2022-03-18 11:10:06 +08:00
log . Info ( "rpc send success" , rpcPushMsg . OperationID , "push data" , rpcPushMsg . String ( ) )
2021-05-26 19:24:25 +08:00
}
}