message update

This commit is contained in:
Gordon
2021-08-06 14:56:41 +08:00
parent bfc28cab8a
commit b36c041d35
6 changed files with 43 additions and 19 deletions
+22 -9
View File
@@ -41,6 +41,7 @@ type MsgCallBackResp struct {
}
func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String())
serverMsgID := GetMsgID(pb.SendID)
pbData := pbChat.WSToMsgSvrChatMsg{}
pbData.MsgFrom = pb.MsgFrom
@@ -59,7 +60,7 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
pbData.MsgID = serverMsgID
pbData.OperationID = pb.OperationID
pbData.Token = pb.Token
pbData.SendTime = utils.GetCurrentTimestampBySecond()
pbData.SendTime = utils.GetCurrentTimestampByNano()
replay := pbChat.UserSendMsgResp{}
m := MsgCallBackResp{}
if config.Config.MessageCallBack.CallbackSwitch {
@@ -84,16 +85,22 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
return returnMsg(&replay, pb, m.ResponseErrCode, m.ErrMsg, "", 0)
} else {
pbData.Content = m.ResponseResult.ModifiedMsg
rpc.sendMsgToKafka(&pbData, pbData.RecvID)
rpc.sendMsgToKafka(&pbData, pbData.SendID)
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
if err1 != nil || err2 != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
}
}
} else {
switch pbData.SessionType {
case constant.SingleChatType:
rpc.sendMsgToKafka(&pbData, pbData.RecvID)
rpc.sendMsgToKafka(&pbData, pbData.SendID)
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
if err1 != nil || err2 != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
case constant.GroupChatType:
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
@@ -138,11 +145,17 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
groupID := pbData.RecvID
for i, v := range reply.MemberList {
pbData.RecvID = v.UserId + " " + groupID
rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
for i, v := range addUidList {
pbData.RecvID = v + " " + groupID
rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1))
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
default:
@@ -153,12 +166,12 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
}
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) {
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) error {
pid, offset, err := rpc.producer.SendMessage(m, key)
if err != nil {
log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key)
}
return err
}
func GetMsgID(sendID string) string {
t := time.Now().Format("2006-01-02 15:04:05")