push msg specify receiver

This commit is contained in:
Gordon
2022-02-08 17:12:02 +08:00
parent ba5d178ba0
commit dc1ad8fb2d
5 changed files with 115 additions and 103 deletions
@@ -45,6 +45,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
switch msgFromMQ.MsgData.SessionType {
case constant.SingleChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
@@ -55,6 +56,8 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
} else if msgKey == msgFromMQ.MsgData.SendID {
err := saveUserChat(msgFromMQ.MsgData.SendID, &msgFromMQ)
@@ -62,15 +65,12 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
if isSenderSync {
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.SendID)
}
}
log.NewDebug(operationID, "saveUserChat cost time ", utils.GetCurrentTimestampByNano()-time)
}
if msgKey == msgFromMQ.MsgData.RecvID {
go sendMessageToPush(&msgFromMQ)
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
}
case constant.GroupChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
if isHistory {
@@ -80,7 +80,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
return
}
}
go sendMessageToPush(&msgFromMQ)
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
default:
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
return
@@ -99,10 +99,10 @@ func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
}
return nil
}
func sendMessageToPush(message *pbMsg.MsgDataToMQ) {
func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
log.InfoByKv("msg_transfer send message to push", message.OperationID, "message", message.String())
rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData}
mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData}
rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName)
if grpcConn == nil {
log.ErrorByKv("rpc dial failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String())