message add ex field to return

This commit is contained in:
Gordon
2023-04-04 15:55:57 +08:00
parent baece7eb3a
commit 7aefb7fa2b
8 changed files with 644 additions and 624 deletions
+2 -2
View File
@@ -115,12 +115,12 @@ func (s MsgFormats) Len() int {
return len(s)
}
//Implement the sort.Interface interface comparison element method
// Implement the sort.Interface interface comparison element method
func (s MsgFormats) Less(i, j int) bool {
return s[i].SendTime < s[j].SendTime
}
//Implement the sort.Interface interface exchange element method
// Implement the sort.Interface interface exchange element method
func (s MsgFormats) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
+22 -21
View File
@@ -30,7 +30,7 @@ import (
"github.com/golang/protobuf/proto"
)
//When the number of group members is greater than this valueOnline users will be sent firstGuaranteed service availability
// When the number of group members is greater than this valueOnline users will be sent firstGuaranteed service availability
const GroupMemberNum = 500
var (
@@ -348,7 +348,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String())
flag, errCode, errMsg := isMessageHasReadEnabled(pb)
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
return returnMsg(&replay, pb, errCode, errMsg, "", 0, "")
}
t1 := time.Now()
rpc.encapsulateMsgData(pb.MsgData)
@@ -367,7 +367,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
callbackResp.ErrCode = 201
}
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackMsgModify result", "end rpc and return", pb.MsgData)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0, "")
}
switch pb.MsgData.SessionType {
case constant.SingleChatType:
@@ -385,13 +385,13 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0, "")
}
t1 = time.Now()
flag, errCode, errMsg, _ = rpc.messageVerification(pb)
log.Debug(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1))
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
return returnMsg(&replay, pb, errCode, errMsg, "", 0, "")
}
t1 = time.Now()
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
@@ -405,7 +405,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error())
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0, "")
}
}
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
@@ -415,7 +415,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if err2 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0, "")
}
}
// callback
@@ -426,7 +426,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp)
}
promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime, msgToMQSingle.MsgData.Ex)
case constant.GroupChatType:
// callback
promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter)
@@ -440,12 +440,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0, "")
}
var memberUserIDList []string
if flag, errCode, errMsg, memberUserIDList = rpc.messageVerification(pb); !flag {
promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
return returnMsg(&replay, pb, errCode, errMsg, "", 0, "")
}
log.Debug(pb.OperationID, "GetGroupAllMember userID list", memberUserIDList, "len: ", len(memberUserIDList))
var addUidList []string
@@ -509,7 +509,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if !sendTag {
log.NewWarn(pb.OperationID, "send tag is ", sendTag)
promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0, "")
} else {
if pb.MsgData.ContentType == constant.AtText {
go func() {
@@ -574,7 +574,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
log.Debug(pb.OperationID, "send msg cost time3 ", time.Since(t1), pb.MsgData.ClientMsgID)
promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime, msgToMQSingle.MsgData.Ex)
}
case constant.NotificationChatType:
t1 = time.Now()
@@ -583,19 +583,19 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
err1 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0, "")
}
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
err2 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
if err2 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0, "")
}
}
log.Debug(pb.OperationID, "send msg cost time ", time.Since(t1), pb.MsgData.ClientMsgID)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime, msgToMQSingle.MsgData.Ex)
case constant.SuperGroupChatType:
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
// callback
@@ -609,11 +609,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0, "")
}
if flag, errCode, errMsg, _ = rpc.messageVerification(pb); !flag {
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
return returnMsg(&replay, pb, errCode, errMsg, "", 0, "")
}
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
@@ -621,7 +621,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0, "")
}
// callback
callbackResp = callbackAfterSendGroupMsg(pb)
@@ -629,10 +629,10 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp)
}
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime, msgToMQSingle.MsgData.Ex)
default:
return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0)
return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0, "")
}
}
@@ -676,12 +676,13 @@ func GetMsgID(sendID string) string {
return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int()))
}
func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) {
func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64, ex string) (*pbChat.SendMsgResp, error) {
replay.ErrCode = errCode
replay.ErrMsg = errMsg
replay.ServerMsgID = serverMsgID
replay.ClientMsgID = pb.MsgData.ClientMsgID
replay.SendTime = sendTime
replay.Ex = ex
return replay, nil
}