2021-05-26 19:24:25 +08:00
package logic
import (
2021-10-11 22:12:01 +08:00
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
2022-05-19 12:25:46 +08:00
"Open_IM/pkg/common/db"
2021-10-11 22:12:01 +08:00
kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
2021-10-11 22:00:38 +08:00
pbMsg "Open_IM/pkg/proto/chat"
pbPush "Open_IM/pkg/proto/push"
2021-10-11 22:12:01 +08:00
"Open_IM/pkg/utils"
2021-05-26 19:24:25 +08:00
"context"
2022-05-11 18:33:48 +08:00
"errors"
2021-05-26 19:24:25 +08:00
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
2022-05-20 13:33:38 +08:00
"hash/crc32"
2021-05-26 19:24:25 +08:00
"strings"
2022-04-28 16:47:46 +08:00
"time"
2021-05-26 19:24:25 +08:00
)
2022-05-19 12:25:46 +08:00
type MsgChannelValue struct {
2022-05-20 14:42:49 +08:00
userID string
triggerID string
msgList [ ] * pbMsg . MsgDataToMQ
}
type TriggerChannelValue struct {
triggerID string
cmsgList [ ] * sarama . ConsumerMessage
2022-05-19 12:25:46 +08:00
}
type fcb func ( cMsg * sarama . ConsumerMessage , msgKey string , sess sarama . ConsumerGroupSession )
2022-05-11 18:33:48 +08:00
type Cmd2Value struct {
2022-05-11 20:49:47 +08:00
Cmd int
2022-05-11 18:33:48 +08:00
Value interface { }
}
type OnlineHistoryConsumerHandler struct {
msgHandle map [ string ] fcb
historyConsumerGroup * kfk . MConsumerGroup
cmdCh chan Cmd2Value
2022-05-19 12:25:46 +08:00
msgCh chan Cmd2Value
2022-05-20 13:33:38 +08:00
chArrays [ ChannelNum ] chan Cmd2Value
msgDistributionCh chan Cmd2Value
2021-05-26 19:24:25 +08:00
}
2022-05-11 18:33:48 +08:00
func ( och * OnlineHistoryConsumerHandler ) Init ( cmdCh chan Cmd2Value ) {
och . msgHandle = make ( map [ string ] fcb )
2022-05-20 13:33:38 +08:00
och . msgDistributionCh = make ( chan Cmd2Value ) //no buffer channel
go och . MessagesDistributionHandle ( )
2022-05-11 18:33:48 +08:00
och . cmdCh = cmdCh
2022-05-19 12:25:46 +08:00
och . msgCh = make ( chan Cmd2Value , 1000 )
2022-05-20 15:51:37 +08:00
for i := 0 ; i < ChannelNum ; i ++ {
och . chArrays [ i ] = make ( chan Cmd2Value , 1000 )
go och . Run ( i )
}
2022-05-19 12:25:46 +08:00
if config . Config . ReliableStorage {
och . msgHandle [ config . Config . Kafka . Ws2mschat . Topic ] = och . handleChatWs2Mongo
} else {
och . msgHandle [ config . Config . Kafka . Ws2mschat . Topic ] = och . handleChatWs2MongoLowReliability
2022-05-20 15:51:37 +08:00
2022-05-19 12:25:46 +08:00
}
2022-05-24 12:36:05 +08:00
och . historyConsumerGroup = kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
2021-05-26 19:24:25 +08:00
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . Ws2mschat . Topic } ,
config . Config . Kafka . Ws2mschat . Addr , config . Config . Kafka . ConsumerGroupID . MsgToMongo )
}
2022-05-11 20:49:47 +08:00
func ( och * OnlineHistoryConsumerHandler ) TriggerCmd ( status int ) {
2022-05-11 18:33:48 +08:00
operationID := utils . OperationIDGenerator ( )
2022-05-13 21:05:39 +08:00
err := sendCmd ( och . cmdCh , Cmd2Value { Cmd : status , Value : "" } , 1 )
if err != nil {
log . Error ( operationID , "TriggerCmd failed " , err . Error ( ) , status )
2022-05-11 18:33:48 +08:00
return
}
2022-05-13 21:05:39 +08:00
log . Debug ( operationID , "TriggerCmd success" , status )
2022-05-11 18:33:48 +08:00
}
func sendCmd ( ch chan Cmd2Value , value Cmd2Value , timeout int64 ) error {
var flag = 0
select {
case ch <- value :
flag = 1
case <- time . After ( time . Second * time . Duration ( timeout ) ) :
flag = 2
}
if flag == 1 {
return nil
} else {
return errors . New ( "send cmd timeout" )
}
}
2022-05-20 13:33:38 +08:00
func ( och * OnlineHistoryConsumerHandler ) Run ( channelID int ) {
2022-05-19 12:25:46 +08:00
for {
select {
2022-05-20 13:33:38 +08:00
case cmd := <- och . chArrays [ channelID ] :
2022-05-19 12:25:46 +08:00
switch cmd . Cmd {
2022-05-20 13:33:38 +08:00
case UserMessages :
2022-05-19 12:25:46 +08:00
msgChannelValue := cmd . Value . ( MsgChannelValue )
2022-05-20 13:33:38 +08:00
msgList := msgChannelValue . msgList
2022-05-20 14:42:49 +08:00
triggerID := msgChannelValue . triggerID
2022-05-20 16:07:32 +08:00
storageMsgList := make ( [ ] * pbMsg . MsgDataToMQ , 0 , 80 )
2022-05-21 15:03:26 +08:00
noStoragepushMsgList := make ( [ ] * pbMsg . MsgDataToMQ , 0 , 80 )
2022-05-20 14:42:49 +08:00
log . Debug ( triggerID , "msg arrived channel" , "channel id" , channelID , msgList , msgChannelValue . userID , len ( msgList ) )
2022-05-20 13:33:38 +08:00
for _ , v := range msgList {
2022-05-20 14:42:49 +08:00
log . Debug ( triggerID , "msg come to storage center" , v . String ( ) )
2022-05-20 13:33:38 +08:00
isHistory := utils . GetSwitchFromOptions ( v . MsgData . Options , constant . IsHistory )
isSenderSync := utils . GetSwitchFromOptions ( v . MsgData . Options , constant . IsSenderSync )
if isHistory {
storageMsgList = append ( storageMsgList , v )
2022-05-21 15:04:42 +08:00
//log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
2022-05-21 15:03:26 +08:00
} else {
if ! ( ! isSenderSync && msgChannelValue . userID == v . MsgData . SendID ) {
noStoragepushMsgList = append ( noStoragepushMsgList , v )
}
2022-05-20 13:33:38 +08:00
}
2022-05-21 15:03:26 +08:00
2022-05-20 13:33:38 +08:00
}
2022-05-19 18:08:44 +08:00
//switch msgChannelValue.msg.MsgData.SessionType {
//case constant.SingleChatType:
//case constant.GroupChatType:
//case constant.NotificationChatType:
//default:
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return
//}
2022-05-21 15:03:26 +08:00
log . Debug ( triggerID , "msg storage length" , len ( storageMsgList ) , "push length" , len ( noStoragepushMsgList ) )
2022-05-20 14:42:49 +08:00
err := saveUserChatList ( msgChannelValue . userID , storageMsgList , triggerID )
2022-05-19 12:25:46 +08:00
if err != nil {
2022-05-20 13:33:38 +08:00
singleMsgFailedCount += uint64 ( len ( storageMsgList ) )
2022-05-20 14:42:49 +08:00
log . NewError ( triggerID , "single data insert to mongo err" , err . Error ( ) , storageMsgList )
2022-05-19 12:25:46 +08:00
} else {
singleMsgSuccessCountMutex . Lock ( )
2022-05-20 13:33:38 +08:00
singleMsgSuccessCount += uint64 ( len ( storageMsgList ) )
2022-05-19 12:25:46 +08:00
singleMsgSuccessCountMutex . Unlock ( )
2022-05-21 15:03:26 +08:00
go func ( push , storage [ ] * pbMsg . MsgDataToMQ ) {
for _ , v := range storage {
sendMessageToPush ( v , msgChannelValue . userID )
}
for _ , x := range push {
sendMessageToPush ( x , msgChannelValue . userID )
}
} ( noStoragepushMsgList , storageMsgList )
2022-05-20 13:33:38 +08:00
2022-05-19 12:25:46 +08:00
}
}
}
}
}
2022-05-20 13:33:38 +08:00
//func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
// msg := cMsg.Value
// now := time.Now()
// msgFromMQ := pbMsg.MsgDataToMQ{}
// err := proto.Unmarshal(msg, &msgFromMQ)
// if err != nil {
// log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
// return
// }
// operationID := msgFromMQ.OperationID
// log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
// //Control whether to store offline messages (mongo)
// isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
// //Control whether to store history messages (mysql)
// isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
// isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
// switch msgFromMQ.MsgData.SessionType {
// case constant.SingleChatType:
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
// if isHistory {
// err := saveUserChat(msgKey, &msgFromMQ)
// if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
// }
// singleMsgSuccessCountMutex.Lock()
// singleMsgSuccessCount++
// singleMsgSuccessCountMutex.Unlock()
// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
// }
// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
// } else {
// go sendMessageToPush(&msgFromMQ, msgKey)
// }
// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
// case constant.GroupChatType:
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
// if isHistory {
// err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
// if err != nil {
// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
// return
// }
// groupMsgCount++
// }
// go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
// case constant.NotificationChatType:
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
// if isHistory {
// err := saveUserChat(msgKey, &msgFromMQ)
// if err != nil {
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
// }
// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
// }
// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
// } else {
// go sendMessageToPush(&msgFromMQ, msgKey)
// }
// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
// default:
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return
// }
// sess.MarkMessage(cMsg, "")
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
//}
func ( och * OnlineHistoryConsumerHandler ) MessagesDistributionHandle ( ) {
for {
2022-05-20 17:32:40 +08:00
UserAggregationMsgs := make ( map [ string ] [ ] * pbMsg . MsgDataToMQ , ChannelNum )
2022-05-20 13:33:38 +08:00
select {
case cmd := <- och . msgDistributionCh :
switch cmd . Cmd {
case ConsumerMsgs :
2022-05-20 14:42:49 +08:00
triggerChannelValue := cmd . Value . ( TriggerChannelValue )
triggerID := triggerChannelValue . triggerID
consumerMessages := triggerChannelValue . cmsgList
2022-05-20 13:33:38 +08:00
//Aggregation map[userid]message list
2022-05-20 14:42:49 +08:00
log . Debug ( triggerID , "batch messages come to distribution center" , len ( consumerMessages ) )
2022-05-20 13:33:38 +08:00
for i := 0 ; i < len ( consumerMessages ) ; i ++ {
msgFromMQ := pbMsg . MsgDataToMQ { }
err := proto . Unmarshal ( consumerMessages [ i ] . Value , & msgFromMQ )
if err != nil {
2022-05-20 14:42:49 +08:00
log . Error ( triggerID , "msg_transfer Unmarshal msg err" , "msg" , string ( consumerMessages [ i ] . Value ) , "err" , err . Error ( ) )
2022-05-20 13:33:38 +08:00
return
}
2022-05-20 16:07:32 +08:00
log . Debug ( triggerID , "single msg come to distribution center" , msgFromMQ . String ( ) , string ( consumerMessages [ i ] . Key ) )
2022-05-20 14:42:49 +08:00
if oldM , ok := UserAggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] ; ok {
2022-05-20 13:33:38 +08:00
oldM = append ( oldM , & msgFromMQ )
2022-05-20 14:42:49 +08:00
UserAggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = oldM
2022-05-20 13:33:38 +08:00
} else {
2022-05-20 15:28:36 +08:00
m := make ( [ ] * pbMsg . MsgDataToMQ , 0 , 100 )
2022-05-20 13:33:38 +08:00
m = append ( m , & msgFromMQ )
2022-05-20 14:42:49 +08:00
UserAggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = m
2022-05-20 13:33:38 +08:00
}
}
2022-05-20 14:42:49 +08:00
log . Debug ( triggerID , "generate map list users len" , len ( UserAggregationMsgs ) )
for userID , v := range UserAggregationMsgs {
2022-05-20 13:33:38 +08:00
if len ( v ) >= 0 {
2022-05-20 15:40:40 +08:00
hashCode := getHashCode ( userID )
channelID := hashCode % ChannelNum
log . Debug ( triggerID , "generate channelID" , hashCode , channelID , userID )
2022-05-20 15:47:04 +08:00
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och . chArrays [ channelID ] <- Cmd2Value { Cmd : UserMessages , Value : MsgChannelValue { userID : userID , msgList : v , triggerID : triggerID } }
//}(channelID, userID, v)
2022-05-20 13:33:38 +08:00
}
}
}
}
2022-05-20 17:32:40 +08:00
2022-05-20 13:33:38 +08:00
}
}
func ( mc * OnlineHistoryConsumerHandler ) handleChatWs2Mongo ( cMsg * sarama . ConsumerMessage , msgKey string , sess sarama . ConsumerGroupSession ) {
2022-05-19 12:25:46 +08:00
msg := cMsg . Value
2022-04-28 16:47:46 +08:00
now := time . Now ( )
2021-12-23 17:34:32 +08:00
msgFromMQ := pbMsg . MsgDataToMQ { }
err := proto . Unmarshal ( msg , & msgFromMQ )
2021-05-26 19:24:25 +08:00
if err != nil {
2022-03-18 11:10:06 +08:00
log . Error ( "msg_transfer Unmarshal msg err" , "" , "msg" , string ( msg ) , "err" , err . Error ( ) )
2021-05-26 19:24:25 +08:00
return
}
2021-12-23 17:34:32 +08:00
operationID := msgFromMQ . OperationID
2022-03-18 11:10:06 +08:00
log . NewInfo ( operationID , "msg come mongo!!!" , "" , "msg" , string ( msg ) )
2021-05-26 19:24:25 +08:00
//Control whether to store offline messages (mongo)
2021-12-23 17:34:32 +08:00
isHistory := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsHistory )
2021-05-26 19:24:25 +08:00
//Control whether to store history messages (mysql)
2021-12-23 17:34:32 +08:00
isPersist := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsPersistent )
2022-02-15 14:31:55 +08:00
isSenderSync := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsSenderSync )
2021-12-23 17:34:32 +08:00
switch msgFromMQ . MsgData . SessionType {
2021-11-10 15:24:59 +08:00
case constant . SingleChatType :
2021-12-23 17:34:32 +08:00
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer msg type = SingleChatType" , isHistory , isPersist )
2021-05-26 19:24:25 +08:00
if isHistory {
2022-02-15 13:16:31 +08:00
err := saveUserChat ( msgKey , & msgFromMQ )
if err != nil {
2022-05-11 18:33:48 +08:00
singleMsgFailedCount ++
2022-02-15 13:16:31 +08:00
log . NewError ( operationID , "single data insert to mongo err" , err . Error ( ) , msgFromMQ . String ( ) )
return
2021-05-26 19:24:25 +08:00
}
2022-05-18 08:06:05 +08:00
singleMsgSuccessCountMutex . Lock ( )
2022-05-11 18:33:48 +08:00
singleMsgSuccessCount ++
2022-05-18 08:06:05 +08:00
singleMsgSuccessCountMutex . Unlock ( )
2022-04-28 16:47:46 +08:00
log . NewDebug ( msgFromMQ . OperationID , "sendMessageToPush cost time " , time . Since ( now ) )
2021-05-26 19:24:25 +08:00
}
2022-02-15 14:31:55 +08:00
if ! isSenderSync && msgKey == msgFromMQ . MsgData . SendID {
} else {
go sendMessageToPush ( & msgFromMQ , msgKey )
}
2022-05-20 13:33:38 +08:00
log . NewDebug ( operationID , "saveSingleMsg cost time " , time . Since ( now ) )
2021-11-10 15:24:59 +08:00
case constant . GroupChatType :
2021-12-23 17:34:32 +08:00
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer msg type = GroupChatType" , isHistory , isPersist )
2021-08-06 14:56:41 +08:00
if isHistory {
2021-12-23 17:34:32 +08:00
err := saveUserChat ( msgFromMQ . MsgData . RecvID , & msgFromMQ )
2021-11-10 15:24:59 +08:00
if err != nil {
2021-12-23 17:34:32 +08:00
log . NewError ( operationID , "group data insert to mongo err" , msgFromMQ . String ( ) , msgFromMQ . MsgData . RecvID , err . Error ( ) )
2021-11-10 15:24:59 +08:00
return
}
2022-05-11 18:33:48 +08:00
groupMsgCount ++
2021-08-06 14:56:41 +08:00
}
2022-02-08 17:12:02 +08:00
go sendMessageToPush ( & msgFromMQ , msgFromMQ . MsgData . RecvID )
2022-05-20 13:33:38 +08:00
log . NewDebug ( operationID , "saveGroupMsg cost time " , time . Since ( now ) )
2022-03-30 18:23:05 +08:00
case constant . NotificationChatType :
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer msg type = NotificationChatType" , isHistory , isPersist )
if isHistory {
err := saveUserChat ( msgKey , & msgFromMQ )
if err != nil {
log . NewError ( operationID , "single data insert to mongo err" , err . Error ( ) , msgFromMQ . String ( ) )
return
}
2022-04-28 16:47:46 +08:00
log . NewDebug ( msgFromMQ . OperationID , "sendMessageToPush cost time " , time . Since ( now ) )
2022-03-30 18:23:05 +08:00
}
if ! isSenderSync && msgKey == msgFromMQ . MsgData . SendID {
} else {
go sendMessageToPush ( & msgFromMQ , msgKey )
}
2022-04-28 16:47:46 +08:00
log . NewDebug ( operationID , "saveUserChat cost time " , time . Since ( now ) )
2021-11-10 15:24:59 +08:00
default :
2021-12-23 17:34:32 +08:00
log . NewError ( msgFromMQ . OperationID , "SessionType error" , msgFromMQ . String ( ) )
2021-11-10 15:24:59 +08:00
return
2021-05-26 19:24:25 +08:00
}
2022-05-19 12:25:46 +08:00
sess . MarkMessage ( cMsg , "" )
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer handle topic data to database success..." , msgFromMQ . String ( ) )
}
2022-05-20 13:33:38 +08:00
2022-05-19 12:25:46 +08:00
func ( och * OnlineHistoryConsumerHandler ) handleChatWs2MongoLowReliability ( cMsg * sarama . ConsumerMessage , msgKey string , sess sarama . ConsumerGroupSession ) {
msg := cMsg . Value
msgFromMQ := pbMsg . MsgDataToMQ { }
err := proto . Unmarshal ( msg , & msgFromMQ )
if err != nil {
log . Error ( "msg_transfer Unmarshal msg err" , "" , "msg" , string ( msg ) , "err" , err . Error ( ) )
return
}
operationID := msgFromMQ . OperationID
log . NewInfo ( operationID , "msg come mongo!!!" , "" , "msg" , string ( msg ) )
//Control whether to store offline messages (mongo)
isHistory := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsHistory )
isSenderSync := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsSenderSync )
2022-05-19 18:08:44 +08:00
if isHistory {
seq , err := db . DB . IncrUserSeq ( msgKey )
if err != nil {
log . NewError ( operationID , "data insert to redis err" , err . Error ( ) , string ( msg ) )
return
2022-05-19 12:25:46 +08:00
}
2022-05-19 18:08:44 +08:00
sess . MarkMessage ( cMsg , "" )
msgFromMQ . MsgData . Seq = uint32 ( seq )
log . Debug ( operationID , "send ch msg is " , msgFromMQ . String ( ) )
2022-05-20 13:33:38 +08:00
//och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
2022-05-19 18:08:44 +08:00
//err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
//}
//singleMsgSuccessCountMutex.Lock()
//singleMsgSuccessCount++
//singleMsgSuccessCountMutex.Unlock()
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
} else {
if ! ( ! isSenderSync && msgKey == msgFromMQ . MsgData . SendID ) {
2022-05-19 12:25:46 +08:00
go sendMessageToPush ( & msgFromMQ , msgKey )
}
}
2021-05-26 19:24:25 +08:00
}
2022-05-11 18:33:48 +08:00
func ( OnlineHistoryConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( OnlineHistoryConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
2022-05-20 13:33:38 +08:00
//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
// log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
// for msg := range claim.Messages() {
// SetOnlineTopicStatus(OnlineTopicBusy)
// //och.TriggerCmd(OnlineTopicBusy)
// log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
// och.msgHandle[msg.Topic](msg, string(msg.Key), sess)
// if claim.HighWaterMarkOffset()-msg.Offset <= 1 {
// log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset)
// SetOnlineTopicStatus(OnlineTopicVacancy)
// och.TriggerCmd(OnlineTopicVacancy)
// }
// }
// return nil
//}
2022-05-11 18:33:48 +08:00
func ( och * OnlineHistoryConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession ,
claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
2022-05-22 21:27:29 +08:00
2022-05-22 21:08:57 +08:00
for {
if sess == nil {
log . NewWarn ( "" , " sess == nil, waiting " )
time . Sleep ( 100 * time . Millisecond )
} else {
break
}
2022-05-21 10:19:02 +08:00
}
2022-05-22 21:08:57 +08:00
2022-05-11 19:31:32 +08:00
log . NewDebug ( "" , "online new session msg come" , claim . HighWaterMarkOffset ( ) , claim . Topic ( ) , claim . Partition ( ) )
2022-05-21 09:58:27 +08:00
cMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
2022-05-20 21:02:18 +08:00
t := time . NewTicker ( time . Duration ( 100 ) * time . Millisecond )
2022-05-20 14:42:49 +08:00
var triggerID string
2022-05-20 21:58:23 +08:00
for {
2022-05-11 20:49:47 +08:00
//och.TriggerCmd(OnlineTopicBusy)
2022-05-20 13:33:38 +08:00
select {
2022-05-20 21:58:23 +08:00
case msg := <- claim . Messages ( ) :
2022-05-24 20:04:40 +08:00
triggerID = utils . OperationIDGenerator ( )
if msg != nil {
log . NewDebug ( triggerID , "claim.Messages " , msg )
cMsg = append ( cMsg , msg )
if len ( cMsg ) >= 1000 {
ccMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
for _ , v := range cMsg {
ccMsg = append ( ccMsg , v )
}
log . Debug ( triggerID , "length trigger msg consumer start" , len ( ccMsg ) )
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
triggerID : triggerID , cmsgList : ccMsg } }
sess . MarkMessage ( msg , "" )
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
log . Debug ( triggerID , "length trigger msg consumer end" , len ( cMsg ) )
2022-05-20 16:53:35 +08:00
}
2022-05-24 20:04:40 +08:00
} else {
log . NewWarn ( triggerID , "msg is nil" )
2022-05-20 13:33:38 +08:00
}
2022-05-23 10:55:33 +08:00
2022-05-20 21:58:23 +08:00
case <- t . C :
if len ( cMsg ) > 0 {
2022-05-21 09:58:27 +08:00
ccMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
2022-05-20 16:53:35 +08:00
for _ , v := range cMsg {
ccMsg = append ( ccMsg , v )
}
2022-05-20 14:42:49 +08:00
triggerID = utils . OperationIDGenerator ( )
2022-05-20 21:58:23 +08:00
log . Debug ( triggerID , "timer trigger msg consumer start" , len ( ccMsg ) )
2022-05-20 14:42:49 +08:00
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
2022-05-20 16:53:35 +08:00
triggerID : triggerID , cmsgList : ccMsg } }
2022-05-20 21:58:23 +08:00
sess . MarkMessage ( cMsg [ len ( cMsg ) - 1 ] , "" )
2022-05-21 09:58:27 +08:00
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
2022-05-20 21:58:23 +08:00
log . Debug ( triggerID , "timer trigger msg consumer end" , len ( cMsg ) )
2022-05-20 13:33:38 +08:00
}
2022-05-11 18:33:48 +08:00
}
2022-05-20 21:58:23 +08:00
//log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
2022-05-20 13:33:38 +08:00
2021-05-26 19:24:25 +08:00
}
return nil
}
2022-02-08 17:12:02 +08:00
func sendMessageToPush ( message * pbMsg . MsgDataToMQ , pushToUserID string ) {
2022-03-18 11:10:06 +08:00
log . Info ( message . OperationID , "msg_transfer send message to push" , "message" , message . String ( ) )
2022-02-08 17:12:02 +08:00
rpcPushMsg := pbPush . PushMsgReq { OperationID : message . OperationID , MsgData : message . MsgData , PushToUserID : pushToUserID }
mqPushMsg := pbMsg . PushMsgDataToMQ { OperationID : message . OperationID , MsgData : message . MsgData , PushToUserID : pushToUserID }
2021-05-26 19:24:25 +08:00
grpcConn := getcdv3 . GetConn ( config . Config . Etcd . EtcdSchema , strings . Join ( config . Config . Etcd . EtcdAddr , "," ) , config . Config . RpcRegisterName . OpenImPushName )
if grpcConn == nil {
2022-03-18 11:10:06 +08:00
log . Error ( rpcPushMsg . OperationID , "rpc dial failed" , "push data" , rpcPushMsg . String ( ) )
2021-12-23 17:34:32 +08:00
pid , offset , err := producer . SendMessage ( & mqPushMsg )
2021-05-26 19:24:25 +08:00
if err != nil {
2022-03-18 11:10:06 +08:00
log . Error ( mqPushMsg . OperationID , "kafka send failed" , "send data" , message . String ( ) , "pid" , pid , "offset" , offset , "err" , err . Error ( ) )
2021-05-26 19:24:25 +08:00
}
return
}
msgClient := pbPush . NewPushMsgServiceClient ( grpcConn )
2021-12-23 17:34:32 +08:00
_ , err := msgClient . PushMsg ( context . Background ( ) , & rpcPushMsg )
2021-05-26 19:24:25 +08:00
if err != nil {
2022-03-18 11:10:06 +08:00
log . Error ( rpcPushMsg . OperationID , "rpc send failed" , rpcPushMsg . OperationID , "push data" , rpcPushMsg . String ( ) , "err" , err . Error ( ) )
2021-12-23 17:34:32 +08:00
pid , offset , err := producer . SendMessage ( & mqPushMsg )
2021-05-26 19:24:25 +08:00
if err != nil {
2022-05-11 12:13:17 +08:00
log . Error ( message . OperationID , "kafka send failed" , mqPushMsg . OperationID , "send data" , mqPushMsg . String ( ) , "pid" , pid , "offset" , offset , "err" , err . Error ( ) )
2021-05-26 19:24:25 +08:00
}
} else {
2022-05-11 12:13:17 +08:00
log . Info ( message . OperationID , "rpc send success" , rpcPushMsg . OperationID , "push data" , rpcPushMsg . String ( ) )
2021-05-26 19:24:25 +08:00
}
}
2022-05-20 13:33:38 +08:00
// String hashes a string to a unique hashcode.
//
// crc32 returns a uint32, but for our use we need
// and non negative integer. Here we cast to an integer
// and invert it if the result is negative.
func getHashCode ( s string ) uint32 {
return crc32 . ChecksumIEEE ( [ ] byte ( s ) )
}