revoke message

This commit is contained in:
Gordon
2022-09-29 17:57:26 +08:00
parent cf57f999c5
commit 4d9ed1a6cb
6 changed files with 144 additions and 52 deletions
+65 -43
View File
@@ -36,6 +36,26 @@ var (
ExcludeContentType = []int{constant.HasReadReceipt, constant.GroupHasReadReceipt}
)
type Validator interface {
validate(pb *pbChat.SendMsgReq) (bool, int32, string)
}
//type MessageValidator struct {
//
//}
type MessageRevoked struct {
RevokerID string `json:"revokerID"`
RevokerRole int32 `json:"revokerRole"`
ClientMsgID string `json:"clientMsgID"`
RevokerNickname string `json:"revokerNickname"`
RevokeTime int64 `json:"revokeTime"`
SourceMessageSendTime int64 `json:"sourceMessageSendTime"`
SourceMessageSendID string `json:"sourceMessageSendID"`
SourceMessageSenderNickname string `json:"sourceMessageSenderNickname"`
SessionType int32 `json:"sessionType"`
Seq uint32 `json:"seq"`
}
type MsgCallBackReq struct {
SendID string `json:"sendID"`
RecvID string `json:"recvID"`
@@ -76,7 +96,7 @@ func isMessageHasReadEnabled(pb *pbChat.SendMsgReq) (bool, int32, string) {
return true, 0, ""
}
func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string) {
func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string) {
switch data.MsgData.SessionType {
case constant.SingleChatType:
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
@@ -140,6 +160,31 @@ func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string
if err != nil {
return false, 201, err.Error(), nil
}
if data.MsgData.ContentType == constant.AdvancedRevoke {
revokeMesage := new(MessageRevoked)
err := utils.JsonStringToStruct(string(data.MsgData.Content), revokeMesage)
if err != nil {
log.Error(data.OperationID, "json unmarshal err:", err.Error())
return false, 201, err.Error(), nil
}
req := pbChat.GetSuperGroupMsgReq{OperationID: data.OperationID, Seq: revokeMesage.Seq, GroupID: data.MsgData.GroupID}
resp, err := rpc.GetSuperGroupMsg(context.Background(), &req)
if err != nil {
log.Error(data.OperationID, "GetSuperGroupMsgReq err:", err.Error())
} else if resp.ErrCode != 0 {
log.Error(data.OperationID, "GetSuperGroupMsgReq err:", err.Error())
} else {
if resp.MsgData != nil && resp.MsgData.ClientMsgID == revokeMesage.ClientMsgID && resp.MsgData.Seq == revokeMesage.Seq {
revokeMesage.SourceMessageSendTime = resp.MsgData.SendTime
revokeMesage.SourceMessageSenderNickname = resp.MsgData.SenderNickname
revokeMesage.SourceMessageSendID = resp.MsgData.SendID
data.MsgData.Content = []byte(utils.StructToJsonString(revokeMesage))
} else {
return false, 201, errors.New("msg err").Error(), nil
}
}
}
if groupInfo.GroupType == constant.SuperGroup {
return true, 0, "", nil
} else {
@@ -149,29 +194,6 @@ func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string
log.NewError(data.OperationID, errMsg)
return false, 201, errMsg, nil
}
//
//getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: data.OperationID, GroupID: data.MsgData.GroupID}
//etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, data.OperationID)
//if etcdConn == nil {
// errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil"
// log.NewError(data.OperationID, errMsg)
// return false, 201, errMsg, nil
//}
//client := pbCache.NewCacheClient(etcdConn)
// cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq)
//
//
//if err != nil {
// log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error())
// //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache failed", "", 0)
// return false, 201, err.Error(), nil
//}
//if cacheResp.CommonResp.ErrCode != 0 {
// log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
// //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache logic failed", "", 0)
// return false, cacheResp.CommonResp.ErrCode, cacheResp.CommonResp.ErrMsg, nil
//}
if !token_verify.IsManagerUserID(data.MsgData.SendID) {
if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin {
return true, 0, "", userIDList
@@ -281,7 +303,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
t1 = time.Now()
flag, errCode, errMsg, _ = messageVerification(pb)
flag, errCode, errMsg, _ = rpc.messageVerification(pb)
log.Debug(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1))
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
@@ -293,8 +315,8 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
t1 = time.Now()
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1))
err1 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
log.Info(pb.OperationID, "sendMsgToWriter ", " cost time: ", time.Since(t1))
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error())
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
@@ -303,8 +325,8 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
t1 = time.Now()
err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1))
err2 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
log.Info(pb.OperationID, "sendMsgToWriter ", " cost time: ", time.Since(t1))
if err2 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
@@ -336,7 +358,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
var memberUserIDList []string
if flag, errCode, errMsg, memberUserIDList = messageVerification(pb); !flag {
if flag, errCode, errMsg, memberUserIDList = rpc.messageVerification(pb); !flag {
promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
@@ -473,14 +495,14 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
t1 = time.Now()
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
err1 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
err2 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
if err2 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
@@ -504,13 +526,13 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
if flag, errCode, errMsg, _ = messageVerification(pb); !flag {
if flag, errCode, errMsg, _ = rpc.messageVerification(pb); !flag {
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus)
err1 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
@@ -529,7 +551,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
}
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error {
func (rpc *rpcChat) sendMsgToWriter(m *pbChat.MsgDataToMQ, key string, status string) error {
switch status {
case constant.OnlineStatus:
if m.MsgData.ContentType == constant.SignalingNotification {
@@ -548,15 +570,15 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str
return nil
}
}
pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID)
pid, offset, err := rpc.messageWriter.SendMessage(m, key, m.OperationID)
if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
} else {
// log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID)
// log.NewWarn(m.OperationID, "sendMsgToWriter client msgID ", m.MsgData.ClientMsgID)
}
return err
case constant.OfflineStatus:
pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID)
pid, offset, err := rpc.messageWriter.SendMessage(m, key, m.OperationID)
if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
}
@@ -1014,15 +1036,15 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s
isSend := modifyMessageByUserMessageReceiveOpt(v, msgData.GroupID, constant.GroupChatType, &groupPB)
if isSend {
msgToMQGroup.MsgData = groupPB.MsgData
// log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String())
err := rpc.sendMsgToKafka(&msgToMQGroup, v, status)
// log.Debug(groupPB.OperationID, "sendMsgToWriter, ", v, groupID, msgToMQGroup.String())
err := rpc.sendMsgToWriter(&msgToMQGroup, v, status)
if err != nil {
log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String())
} else {
*sendTag = true
}
} else {
log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v)
log.Debug(groupPB.OperationID, "not sendMsgToWriter, ", v)
}
}
wg.Done()
@@ -1047,14 +1069,14 @@ func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.Se
log.Error(msgToMQGroup.OperationID, "sendMsgToGroupOptimization userID nil ", msgToMQGroup.String())
continue
}
err := rpc.sendMsgToKafka(&msgToMQGroup, v, status)
err := rpc.sendMsgToWriter(&msgToMQGroup, v, status)
if err != nil {
log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String())
} else {
*sendTag = true
}
} else {
log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v)
log.Debug(groupPB.OperationID, "not sendMsgToWriter, ", v)
}
}
wg.Done()