mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-19 00:09:02 +08:00
Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun
This commit is contained in:
@@ -147,6 +147,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
|
||||
sendMsgAllCount++
|
||||
sendMsgAllCountLock.Unlock()
|
||||
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data)
|
||||
|
||||
nReply := new(pbChat.SendMsgResp)
|
||||
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
|
||||
if isPass {
|
||||
|
||||
@@ -19,5 +19,10 @@ func saveUserChat(uid string, msg *pbMsg.MsgDataToMQ) error {
|
||||
pbSaveData.MsgData = msg.MsgData
|
||||
log.NewInfo(msg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time)
|
||||
return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData)
|
||||
// return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData)
|
||||
// return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData)
|
||||
}
|
||||
|
||||
func saveUserChatList(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error {
|
||||
log.Info(operationID, utils.GetSelfFuncName(), "args ", userID, len(msgList))
|
||||
return db.DB.BatchInsertChat(userID, msgList, operationID)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,10 @@ import (
|
||||
|
||||
const OnlineTopicBusy = 1
|
||||
const OnlineTopicVacancy = 0
|
||||
const Msg = 2
|
||||
const ConsumerMsgs = 3
|
||||
const UserMessages = 4
|
||||
const ChannelNum = 100
|
||||
|
||||
var (
|
||||
persistentCH PersistentConsumerHandler
|
||||
@@ -35,7 +39,7 @@ func Init() {
|
||||
historyCH.Init(cmdCh)
|
||||
onlineTopicStatus = OnlineTopicVacancy
|
||||
log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic)
|
||||
offlineHistoryCH.Init(cmdCh)
|
||||
//offlineHistoryCH.Init(cmdCh)
|
||||
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
|
||||
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
|
||||
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
|
||||
@@ -48,7 +52,7 @@ func Run() {
|
||||
fmt.Println("not start mysql consumer")
|
||||
}
|
||||
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
|
||||
go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
|
||||
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
|
||||
}
|
||||
func SetOnlineTopicStatus(status int) {
|
||||
w.Lock()
|
||||
|
||||
@@ -3,6 +3,7 @@ package logic
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db"
|
||||
kfk "Open_IM/pkg/common/kafka"
|
||||
"Open_IM/pkg/common/log"
|
||||
pbMsg "Open_IM/pkg/proto/chat"
|
||||
@@ -14,21 +15,129 @@ import (
|
||||
|
||||
type OfflineHistoryConsumerHandler struct {
|
||||
msgHandle map[string]fcb
|
||||
cmdCh chan Cmd2Value
|
||||
historyConsumerGroup *kfk.MConsumerGroup
|
||||
cmdCh chan Cmd2Value
|
||||
msgCh chan Cmd2Value
|
||||
chArrays [ChannelNum]chan Cmd2Value
|
||||
msgDistributionCh chan Cmd2Value
|
||||
}
|
||||
|
||||
func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
|
||||
mc.msgHandle = make(map[string]fcb)
|
||||
mc.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
|
||||
go mc.MessagesDistributionHandle()
|
||||
mc.cmdCh = cmdCh
|
||||
mc.msgHandle[config.Config.Kafka.Ws2mschatOffline.Topic] = mc.handleChatWs2Mongo
|
||||
mc.msgCh = make(chan Cmd2Value, 1000)
|
||||
for i := 0; i < ChannelNum; i++ {
|
||||
mc.chArrays[i] = make(chan Cmd2Value, 1000)
|
||||
go mc.Run(i)
|
||||
}
|
||||
if config.Config.ReliableStorage {
|
||||
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
|
||||
} else {
|
||||
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability
|
||||
|
||||
}
|
||||
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic},
|
||||
config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline)
|
||||
|
||||
}
|
||||
func (och *OfflineHistoryConsumerHandler) Run(channelID int) {
|
||||
for {
|
||||
select {
|
||||
case cmd := <-och.chArrays[channelID]:
|
||||
switch cmd.Cmd {
|
||||
case UserMessages:
|
||||
msgChannelValue := cmd.Value.(MsgChannelValue)
|
||||
msgList := msgChannelValue.msgList
|
||||
triggerID := msgChannelValue.triggerID
|
||||
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
||||
pushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
||||
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList))
|
||||
for _, v := range msgList {
|
||||
log.Debug(triggerID, "msg come to storage center", v.String())
|
||||
isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory)
|
||||
isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync)
|
||||
if isHistory {
|
||||
storageMsgList = append(storageMsgList, v)
|
||||
}
|
||||
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) {
|
||||
pushMsgList = append(pushMsgList, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
|
||||
//switch msgChannelValue.msg.MsgData.SessionType {
|
||||
//case constant.SingleChatType:
|
||||
//case constant.GroupChatType:
|
||||
//case constant.NotificationChatType:
|
||||
//default:
|
||||
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
|
||||
// return
|
||||
//}
|
||||
|
||||
err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID)
|
||||
if err != nil {
|
||||
singleMsgFailedCount += uint64(len(storageMsgList))
|
||||
log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList)
|
||||
} else {
|
||||
singleMsgSuccessCountMutex.Lock()
|
||||
singleMsgSuccessCount += uint64(len(storageMsgList))
|
||||
singleMsgSuccessCountMutex.Unlock()
|
||||
for _, v := range pushMsgList {
|
||||
sendMessageToPush(v, msgChannelValue.userID)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func (och *OfflineHistoryConsumerHandler) MessagesDistributionHandle() {
|
||||
UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
|
||||
for {
|
||||
select {
|
||||
case cmd := <-och.msgDistributionCh:
|
||||
switch cmd.Cmd {
|
||||
case ConsumerMsgs:
|
||||
triggerChannelValue := cmd.Value.(TriggerChannelValue)
|
||||
triggerID := triggerChannelValue.triggerID
|
||||
consumerMessages := triggerChannelValue.cmsgList
|
||||
//Aggregation map[userid]message list
|
||||
log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages))
|
||||
for i := 0; i < len(consumerMessages); i++ {
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String())
|
||||
if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
|
||||
oldM = append(oldM, &msgFromMQ)
|
||||
UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
|
||||
} else {
|
||||
m := make([]*pbMsg.MsgDataToMQ, 0, 100)
|
||||
m = append(m, &msgFromMQ)
|
||||
UserAggregationMsgs[string(consumerMessages[i].Key)] = m
|
||||
}
|
||||
}
|
||||
log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs))
|
||||
for userID, v := range UserAggregationMsgs {
|
||||
if len(v) >= 0 {
|
||||
channelID := getHashCode(userID) % ChannelNum
|
||||
go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
|
||||
och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}}
|
||||
}(channelID, userID, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
now := time.Now()
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
@@ -95,34 +204,110 @@ func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s
|
||||
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
|
||||
return
|
||||
}
|
||||
sess.MarkMessage(cMsg, "")
|
||||
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
|
||||
}
|
||||
func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
operationID := msgFromMQ.OperationID
|
||||
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
|
||||
//Control whether to store offline messages (mongo)
|
||||
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
|
||||
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
|
||||
if isHistory {
|
||||
seq, err := db.DB.IncrUserSeq(msgKey)
|
||||
if err != nil {
|
||||
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
|
||||
return
|
||||
}
|
||||
sess.MarkMessage(cMsg, "")
|
||||
msgFromMQ.MsgData.Seq = uint32(seq)
|
||||
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
|
||||
//mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
|
||||
//err := saveUserChat(msgKey, &msgFromMQ)
|
||||
//if err != nil {
|
||||
// singleMsgFailedCount++
|
||||
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
||||
// return
|
||||
//}
|
||||
//singleMsgSuccessCountMutex.Lock()
|
||||
//singleMsgSuccessCount++
|
||||
//singleMsgSuccessCountMutex.Unlock()
|
||||
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
|
||||
} else {
|
||||
if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) {
|
||||
go sendMessageToPush(&msgFromMQ, msgKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
//log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
|
||||
//for msg := range claim.Messages() {
|
||||
// log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline")
|
||||
// //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||
//}
|
||||
for msg := range claim.Messages() {
|
||||
if GetOnlineTopicStatus() == OnlineTopicVacancy {
|
||||
log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||
sess.MarkMessage(msg, "")
|
||||
} else {
|
||||
select {
|
||||
case <-mc.cmdCh:
|
||||
log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
case <-time.After(time.Millisecond * time.Duration(100)):
|
||||
log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
}
|
||||
mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||
sess.MarkMessage(msg, "")
|
||||
}
|
||||
}
|
||||
|
||||
//func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
// //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
|
||||
// //for msg := range claim.Messages() {
|
||||
// // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline")
|
||||
// // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||
// //}
|
||||
// for msg := range claim.Messages() {
|
||||
// if GetOnlineTopicStatus() == OnlineTopicVacancy {
|
||||
// log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
|
||||
// } else {
|
||||
// select {
|
||||
// case <-mc.cmdCh:
|
||||
// log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
// case <-time.After(time.Millisecond * time.Duration(100)):
|
||||
// log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
// }
|
||||
// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return nil
|
||||
//}
|
||||
func (och *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
|
||||
cMsg := make([]*sarama.ConsumerMessage, 0, 500)
|
||||
t := time.NewTicker(time.Duration(500) * time.Millisecond)
|
||||
var triggerID string
|
||||
for msg := range claim.Messages() {
|
||||
//och.TriggerCmd(OnlineTopicBusy)
|
||||
cMsg = append(cMsg, msg)
|
||||
select {
|
||||
case <-t.C:
|
||||
if len(cMsg) >= 0 {
|
||||
triggerID = utils.OperationIDGenerator()
|
||||
log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg))
|
||||
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
||||
triggerID: triggerID, cmsgList: cMsg}}
|
||||
sess.MarkMessage(msg, "")
|
||||
cMsg = cMsg[0:0]
|
||||
log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg))
|
||||
}
|
||||
default:
|
||||
if len(cMsg) >= 500 {
|
||||
triggerID = utils.OperationIDGenerator()
|
||||
log.Debug(triggerID, "length trigger msg consumer start", len(cMsg))
|
||||
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
||||
triggerID: triggerID, cmsgList: cMsg}}
|
||||
sess.MarkMessage(msg, "")
|
||||
cMsg = cMsg[0:0]
|
||||
log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))
|
||||
}
|
||||
|
||||
}
|
||||
log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package logic
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db"
|
||||
kfk "Open_IM/pkg/common/kafka"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
@@ -13,11 +14,21 @@ import (
|
||||
"errors"
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"hash/crc32"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type fcb func(msg []byte, msgKey string)
|
||||
type MsgChannelValue struct {
|
||||
userID string
|
||||
triggerID string
|
||||
msgList []*pbMsg.MsgDataToMQ
|
||||
}
|
||||
type TriggerChannelValue struct {
|
||||
triggerID string
|
||||
cmsgList []*sarama.ConsumerMessage
|
||||
}
|
||||
type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession)
|
||||
type Cmd2Value struct {
|
||||
Cmd int
|
||||
Value interface{}
|
||||
@@ -26,12 +37,27 @@ type OnlineHistoryConsumerHandler struct {
|
||||
msgHandle map[string]fcb
|
||||
historyConsumerGroup *kfk.MConsumerGroup
|
||||
cmdCh chan Cmd2Value
|
||||
msgCh chan Cmd2Value
|
||||
chArrays [ChannelNum]chan Cmd2Value
|
||||
msgDistributionCh chan Cmd2Value
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
|
||||
och.msgHandle = make(map[string]fcb)
|
||||
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
|
||||
go och.MessagesDistributionHandle()
|
||||
och.cmdCh = cmdCh
|
||||
och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo
|
||||
och.msgCh = make(chan Cmd2Value, 1000)
|
||||
for i := 0; i < ChannelNum; i++ {
|
||||
och.chArrays[i] = make(chan Cmd2Value, 1000)
|
||||
go och.Run(i)
|
||||
}
|
||||
if config.Config.ReliableStorage {
|
||||
och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo
|
||||
} else {
|
||||
och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability
|
||||
|
||||
}
|
||||
och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
|
||||
@@ -61,7 +87,184 @@ func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error {
|
||||
return errors.New("send cmd timeout")
|
||||
}
|
||||
}
|
||||
func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
|
||||
func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
|
||||
for {
|
||||
select {
|
||||
case cmd := <-och.chArrays[channelID]:
|
||||
switch cmd.Cmd {
|
||||
case UserMessages:
|
||||
msgChannelValue := cmd.Value.(MsgChannelValue)
|
||||
msgList := msgChannelValue.msgList
|
||||
triggerID := msgChannelValue.triggerID
|
||||
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
||||
noStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
||||
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList))
|
||||
for _, v := range msgList {
|
||||
log.Debug(triggerID, "msg come to storage center", v.String())
|
||||
isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory)
|
||||
isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync)
|
||||
if isHistory {
|
||||
storageMsgList = append(storageMsgList, v)
|
||||
//log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
|
||||
} else {
|
||||
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) {
|
||||
noStoragepushMsgList = append(noStoragepushMsgList, v)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//switch msgChannelValue.msg.MsgData.SessionType {
|
||||
//case constant.SingleChatType:
|
||||
//case constant.GroupChatType:
|
||||
//case constant.NotificationChatType:
|
||||
//default:
|
||||
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
|
||||
// return
|
||||
//}
|
||||
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList))
|
||||
err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID)
|
||||
if err != nil {
|
||||
singleMsgFailedCount += uint64(len(storageMsgList))
|
||||
log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList)
|
||||
} else {
|
||||
singleMsgSuccessCountMutex.Lock()
|
||||
singleMsgSuccessCount += uint64(len(storageMsgList))
|
||||
singleMsgSuccessCountMutex.Unlock()
|
||||
go func(push, storage []*pbMsg.MsgDataToMQ) {
|
||||
for _, v := range storage {
|
||||
sendMessageToPush(v, msgChannelValue.userID)
|
||||
}
|
||||
for _, x := range push {
|
||||
sendMessageToPush(x, msgChannelValue.userID)
|
||||
}
|
||||
|
||||
}(noStoragepushMsgList, storageMsgList)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
|
||||
// msg := cMsg.Value
|
||||
// now := time.Now()
|
||||
// msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
// err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
// if err != nil {
|
||||
// log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
||||
// return
|
||||
// }
|
||||
// operationID := msgFromMQ.OperationID
|
||||
// log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
|
||||
// //Control whether to store offline messages (mongo)
|
||||
// isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
|
||||
// //Control whether to store history messages (mysql)
|
||||
// isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
|
||||
// isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
|
||||
// switch msgFromMQ.MsgData.SessionType {
|
||||
// case constant.SingleChatType:
|
||||
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
|
||||
// if isHistory {
|
||||
// err := saveUserChat(msgKey, &msgFromMQ)
|
||||
// if err != nil {
|
||||
// singleMsgFailedCount++
|
||||
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
||||
// return
|
||||
// }
|
||||
// singleMsgSuccessCountMutex.Lock()
|
||||
// singleMsgSuccessCount++
|
||||
// singleMsgSuccessCountMutex.Unlock()
|
||||
// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
|
||||
// }
|
||||
// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
||||
// } else {
|
||||
// go sendMessageToPush(&msgFromMQ, msgKey)
|
||||
// }
|
||||
// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
|
||||
// case constant.GroupChatType:
|
||||
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
|
||||
// if isHistory {
|
||||
// err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
|
||||
// if err != nil {
|
||||
// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
|
||||
// return
|
||||
// }
|
||||
// groupMsgCount++
|
||||
// }
|
||||
// go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
|
||||
// case constant.NotificationChatType:
|
||||
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
|
||||
// if isHistory {
|
||||
// err := saveUserChat(msgKey, &msgFromMQ)
|
||||
// if err != nil {
|
||||
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
||||
// return
|
||||
// }
|
||||
// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
|
||||
// }
|
||||
// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
||||
// } else {
|
||||
// go sendMessageToPush(&msgFromMQ, msgKey)
|
||||
// }
|
||||
// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
|
||||
// default:
|
||||
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
|
||||
// return
|
||||
// }
|
||||
// sess.MarkMessage(cMsg, "")
|
||||
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
|
||||
//}
|
||||
|
||||
func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
|
||||
for {
|
||||
UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
|
||||
select {
|
||||
case cmd := <-och.msgDistributionCh:
|
||||
switch cmd.Cmd {
|
||||
case ConsumerMsgs:
|
||||
triggerChannelValue := cmd.Value.(TriggerChannelValue)
|
||||
triggerID := triggerChannelValue.triggerID
|
||||
consumerMessages := triggerChannelValue.cmsgList
|
||||
//Aggregation map[userid]message list
|
||||
log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages))
|
||||
for i := 0; i < len(consumerMessages); i++ {
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
|
||||
if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
|
||||
oldM = append(oldM, &msgFromMQ)
|
||||
UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
|
||||
} else {
|
||||
m := make([]*pbMsg.MsgDataToMQ, 0, 100)
|
||||
m = append(m, &msgFromMQ)
|
||||
UserAggregationMsgs[string(consumerMessages[i].Key)] = m
|
||||
}
|
||||
}
|
||||
log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs))
|
||||
for userID, v := range UserAggregationMsgs {
|
||||
if len(v) >= 0 {
|
||||
hashCode := getHashCode(userID)
|
||||
channelID := hashCode % ChannelNum
|
||||
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
|
||||
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
|
||||
och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}}
|
||||
//}(channelID, userID, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
now := time.Now()
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
@@ -95,7 +298,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s
|
||||
} else {
|
||||
go sendMessageToPush(&msgFromMQ, msgKey)
|
||||
}
|
||||
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
|
||||
log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now))
|
||||
case constant.GroupChatType:
|
||||
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
|
||||
if isHistory {
|
||||
@@ -107,6 +310,8 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s
|
||||
groupMsgCount++
|
||||
}
|
||||
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
|
||||
log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now))
|
||||
|
||||
case constant.NotificationChatType:
|
||||
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
|
||||
if isHistory {
|
||||
@@ -126,26 +331,125 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s
|
||||
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
|
||||
return
|
||||
}
|
||||
sess.MarkMessage(cMsg, "")
|
||||
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
operationID := msgFromMQ.OperationID
|
||||
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
|
||||
//Control whether to store offline messages (mongo)
|
||||
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
|
||||
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
|
||||
if isHistory {
|
||||
seq, err := db.DB.IncrUserSeq(msgKey)
|
||||
if err != nil {
|
||||
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
|
||||
return
|
||||
}
|
||||
sess.MarkMessage(cMsg, "")
|
||||
msgFromMQ.MsgData.Seq = uint32(seq)
|
||||
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
|
||||
//och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
|
||||
//err := saveUserChat(msgKey, &msgFromMQ)
|
||||
//if err != nil {
|
||||
// singleMsgFailedCount++
|
||||
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
||||
// return
|
||||
//}
|
||||
//singleMsgSuccessCountMutex.Lock()
|
||||
//singleMsgSuccessCount++
|
||||
//singleMsgSuccessCountMutex.Unlock()
|
||||
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
|
||||
} else {
|
||||
if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) {
|
||||
go sendMessageToPush(&msgFromMQ, msgKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
|
||||
//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
// log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
|
||||
// for msg := range claim.Messages() {
|
||||
// SetOnlineTopicStatus(OnlineTopicBusy)
|
||||
// //och.TriggerCmd(OnlineTopicBusy)
|
||||
// log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
|
||||
// och.msgHandle[msg.Topic](msg, string(msg.Key), sess)
|
||||
// if claim.HighWaterMarkOffset()-msg.Offset <= 1 {
|
||||
// log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset)
|
||||
// SetOnlineTopicStatus(OnlineTopicVacancy)
|
||||
// och.TriggerCmd(OnlineTopicVacancy)
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
//}
|
||||
|
||||
func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
|
||||
for msg := range claim.Messages() {
|
||||
SetOnlineTopicStatus(OnlineTopicBusy)
|
||||
//och.TriggerCmd(OnlineTopicBusy)
|
||||
log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
|
||||
och.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||
sess.MarkMessage(msg, "")
|
||||
if claim.HighWaterMarkOffset()-msg.Offset <= 1 {
|
||||
log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset)
|
||||
SetOnlineTopicStatus(OnlineTopicVacancy)
|
||||
och.TriggerCmd(OnlineTopicVacancy)
|
||||
|
||||
for {
|
||||
if sess == nil {
|
||||
log.NewWarn("", " sess == nil, waiting ")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
|
||||
cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
|
||||
t := time.NewTicker(time.Duration(100) * time.Millisecond)
|
||||
var triggerID string
|
||||
for {
|
||||
//och.TriggerCmd(OnlineTopicBusy)
|
||||
select {
|
||||
case msg := <-claim.Messages():
|
||||
log.NewDebug("", "claim.Messages ", msg)
|
||||
cMsg = append(cMsg, msg)
|
||||
if len(cMsg) >= 1000 {
|
||||
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
|
||||
for _, v := range cMsg {
|
||||
ccMsg = append(ccMsg, v)
|
||||
}
|
||||
triggerID = utils.OperationIDGenerator()
|
||||
log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg))
|
||||
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
||||
triggerID: triggerID, cmsgList: ccMsg}}
|
||||
sess.MarkMessage(msg, "")
|
||||
cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
|
||||
log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))
|
||||
}
|
||||
|
||||
case <-t.C:
|
||||
if len(cMsg) > 0 {
|
||||
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
|
||||
for _, v := range cMsg {
|
||||
ccMsg = append(ccMsg, v)
|
||||
}
|
||||
triggerID = utils.OperationIDGenerator()
|
||||
log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg))
|
||||
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
||||
triggerID: triggerID, cmsgList: ccMsg}}
|
||||
sess.MarkMessage(cMsg[len(cMsg)-1], "")
|
||||
cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
|
||||
log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg))
|
||||
}
|
||||
|
||||
}
|
||||
//log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
||||
@@ -174,3 +478,12 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// String hashes a string to a unique hashcode.
|
||||
//
|
||||
// crc32 returns a uint32, but for our use we need
|
||||
// and non negative integer. Here we cast to an integer
|
||||
// and invert it if the result is negative.
|
||||
func getHashCode(s string) uint32 {
|
||||
return crc32.ChecksumIEEE([]byte(s))
|
||||
}
|
||||
|
||||
@@ -32,7 +32,8 @@ func (pc *PersistentConsumerHandler) Init() {
|
||||
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) {
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg))
|
||||
var tag bool
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
@@ -71,7 +72,7 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
|
||||
claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
pc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||
pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
|
||||
sess.MarkMessage(msg, "")
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -57,6 +57,9 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
||||
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
|
||||
for _, v := range wsResult {
|
||||
if v.ResultCode == 0 {
|
||||
if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) {
|
||||
|
||||
@@ -142,7 +142,8 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
|
||||
}
|
||||
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
||||
replay := pbChat.SendMsgResp{}
|
||||
log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String())
|
||||
newTime := db.GetCurrentTimestampByMill()
|
||||
log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID)
|
||||
flag, errCode, errMsg := userRelationshipVerification(pb)
|
||||
if !flag {
|
||||
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
|
||||
@@ -265,24 +266,34 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
||||
m[constant.OnlineStatus] = memberUserIDList
|
||||
}
|
||||
|
||||
log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
|
||||
newTime = db.GetCurrentTimestampByMill()
|
||||
|
||||
//split parallel send
|
||||
var wg sync.WaitGroup
|
||||
var sendTag bool
|
||||
var split = 50
|
||||
var split = 20
|
||||
for k, v := range m {
|
||||
remain := len(v) % split
|
||||
for i := 0; i < len(v)/split; i++ {
|
||||
wg.Add(1)
|
||||
go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg)
|
||||
tmp := valueCopy(pb)
|
||||
// go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg)
|
||||
go rpc.sendMsgToGroupOptimization(v[i*split:(i+1)*split], tmp, k, &sendTag, &wg)
|
||||
}
|
||||
if remain > 0 {
|
||||
wg.Add(1)
|
||||
go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg)
|
||||
tmp := valueCopy(pb)
|
||||
// go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg)
|
||||
go rpc.sendMsgToGroupOptimization(v[split*(len(v)/split):], tmp, k, &sendTag, &wg)
|
||||
}
|
||||
}
|
||||
log.Debug(pb.OperationID, "send msg cost time22 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, "uidList : ", len(addUidList))
|
||||
wg.Add(1)
|
||||
go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg)
|
||||
wg.Wait()
|
||||
log.Debug(pb.OperationID, "send msg cost time2 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
|
||||
newTime = db.GetCurrentTimestampByMill()
|
||||
// callback
|
||||
if err := callbackAfterSendGroupMsg(pb); err != nil {
|
||||
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error())
|
||||
@@ -341,6 +352,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
||||
}
|
||||
}()
|
||||
}
|
||||
log.Debug(pb.OperationID, "send msg cost time3 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
|
||||
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
|
||||
|
||||
}
|
||||
@@ -360,9 +372,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
||||
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
|
||||
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
|
||||
default:
|
||||
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
|
||||
return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -372,10 +386,12 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str
|
||||
pid, offset, err := rpc.onlineProducer.SendMessage(m, key)
|
||||
if err != nil {
|
||||
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
|
||||
} else {
|
||||
// log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID)
|
||||
}
|
||||
return err
|
||||
case constant.OfflineStatus:
|
||||
pid, offset, err := rpc.offlineProducer.SendMessage(m, key)
|
||||
pid, offset, err := rpc.onlineProducer.SendMessage(m, key)
|
||||
if err != nil {
|
||||
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
|
||||
}
|
||||
@@ -420,6 +436,29 @@ func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType i
|
||||
return true
|
||||
}
|
||||
|
||||
func modifyMessageByUserMessageReceiveOptoptimization(userID, sourceID string, sessionType int, operationID string, options *map[string]bool) bool {
|
||||
conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType)
|
||||
opt, err := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID)
|
||||
if err != nil && err != redis.ErrNil {
|
||||
log.NewError(operationID, "GetSingleConversationMsgOpt from redis err", userID, conversationID, err.Error())
|
||||
return true
|
||||
}
|
||||
|
||||
switch opt {
|
||||
case constant.ReceiveMessage:
|
||||
return true
|
||||
case constant.NotReceiveMessage:
|
||||
return false
|
||||
case constant.ReceiveNotNotifyMessage:
|
||||
if *options == nil {
|
||||
*options = make(map[string]bool, 10)
|
||||
}
|
||||
utils.SetSwitchFromOptions(*options, constant.IsOfflinePush, false)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type NotificationMsg struct {
|
||||
SendID string
|
||||
RecvID string
|
||||
@@ -737,6 +776,23 @@ func getOnlineAndOfflineUserIDList(memberList []string, m map[string][]string, o
|
||||
m[constant.OfflineStatus] = offlUserIDList
|
||||
}
|
||||
|
||||
func valueCopy(pb *pbChat.SendMsgReq) *pbChat.SendMsgReq {
|
||||
offlinePushInfo := sdk_ws.OfflinePushInfo{}
|
||||
if pb.MsgData.OfflinePushInfo != nil {
|
||||
offlinePushInfo = *pb.MsgData.OfflinePushInfo
|
||||
}
|
||||
msgData := sdk_ws.MsgData{}
|
||||
msgData = *pb.MsgData
|
||||
msgData.OfflinePushInfo = &offlinePushInfo
|
||||
|
||||
options := make(map[string]bool, 10)
|
||||
for key, value := range pb.MsgData.Options {
|
||||
options[key] = value
|
||||
}
|
||||
msgData.Options = options
|
||||
return &pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData}
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {
|
||||
// log.Debug(pb.OperationID, "split userID ", list)
|
||||
offlinePushInfo := sdk_ws.OfflinePushInfo{}
|
||||
@@ -772,3 +828,22 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {
|
||||
msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData}
|
||||
for _, v := range list {
|
||||
groupPB.MsgData.RecvID = v
|
||||
isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB)
|
||||
if isSend {
|
||||
err := rpc.sendMsgToKafka(&msgToMQGroup, v, status)
|
||||
if err != nil {
|
||||
log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String())
|
||||
} else {
|
||||
*sendTag = true
|
||||
}
|
||||
} else {
|
||||
log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ func TagSendMessage(operationID string, user *db.User, recvID, content string, s
|
||||
msgData.SenderNickname = user.Nickname
|
||||
msgData.Options = map[string]bool{}
|
||||
msgData.Options[constant.IsSenderConversationUpdate] = false
|
||||
msgData.Options[constant.IsSenderNotificationPush] = false
|
||||
msgData.CreateTime = utils.GetCurrentTimestampByMill()
|
||||
msgData.ClientMsgID = utils.GetMsgID(user.UserID)
|
||||
msgData.SenderPlatformID = senderPlatformID
|
||||
|
||||
Reference in New Issue
Block a user