This commit is contained in:
wangchuxiao
2022-05-26 18:02:00 +08:00
parent d1651b54ec
commit b4da35a0ab
13 changed files with 359 additions and 126 deletions
+56 -53
View File
@@ -8,6 +8,7 @@ import (
"Open_IM/pkg/common/log"
pbChat "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils"
http2 "net/http"
)
func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbApi.CommonCallbackReq {
@@ -27,11 +28,11 @@ func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbApi.CommonCallbackReq
}
}
func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) {
func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable {
return true, nil
return callbackResp
}
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
commonCallbackReq := copyCallbackCommonReqStruct(msg)
commonCallbackReq.CallbackCommand = constant.CallbackBeforeSendSingleMsgCommand
@@ -40,27 +41,28 @@ func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err erro
RecvID: msg.MsgData.RecvID,
}
resp := &cbApi.CallbackBeforeSendSingleMsgResp{
CommonCallbackResp: cbApi.CommonCallbackResp{},
CommonCallbackResp: callbackResp,
}
//utils.CopyStructFields(req, msg.MsgData)
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
if !config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackFailedContinue {
return false, err
callbackResp.ActionCode = constant.ActionForbidden
return callbackResp
} else {
return true, err
}
} else {
if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess {
return false, nil
callbackResp.ActionCode = constant.ActionAllow
return callbackResp
}
}
return true, err
return callbackResp
}
func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error {
func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable {
return nil
return callbackResp
}
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
commonCallbackReq := copyCallbackCommonReqStruct(msg)
@@ -69,18 +71,20 @@ func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error {
CommonCallbackReq: commonCallbackReq,
RecvID: msg.MsgData.RecvID,
}
resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
//utils.CopyStructFields(req, msg.MsgData)
resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: callbackResp}
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendSingleMsg.CallbackTimeOut); err != nil {
return err
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
return callbackResp
}
return nil
return callbackResp
}
func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) {
func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable {
return true, nil
return callbackResp
}
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
commonCallbackReq := copyCallbackCommonReqStruct(msg)
@@ -89,26 +93,26 @@ func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error
CommonCallbackReq: commonCallbackReq,
GroupID: msg.MsgData.GroupID,
}
resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
//utils.CopyStructFields(req, msg.MsgData)
resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: callbackResp}
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackTimeOut); err != nil {
if !config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackFailedContinue {
return false, err
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
if !config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackFailedContinue {
callbackResp.ActionCode = constant.ActionForbidden
return callbackResp
} else {
return true, err
}
} else {
if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess {
return false, nil
callbackResp.ActionCode = constant.ActionAllow
return callbackResp
}
}
return true, err
return callbackResp
}
func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error {
func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
return nil
return callbackResp
}
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
commonCallbackReq := copyCallbackCommonReqStruct(msg)
@@ -117,19 +121,20 @@ func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error {
CommonCallbackReq: commonCallbackReq,
GroupID: msg.MsgData.GroupID,
}
resp := &cbApi.CallbackAfterSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
//utils.CopyStructFields(req, msg.MsgData)
resp := &cbApi.CallbackAfterSendGroupMsgResp{CommonCallbackResp: callbackResp}
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
return err
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
return callbackResp
}
return nil
return callbackResp
}
func callbackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) {
func callbackWordFilter(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text {
return true, nil
return callbackResp
}
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
commonCallbackReq := copyCallbackCommonReqStruct(msg)
@@ -137,24 +142,22 @@ func callbackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) {
req := cbApi.CallbackWordFilterReq{
CommonCallbackReq: commonCallbackReq,
}
resp := &cbApi.CallbackWordFilterResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
//utils.CopyStructFields(&req., msg.MsgData)
resp := &cbApi.CallbackWordFilterResp{CommonCallbackResp: callbackResp}
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackWordFilter.CallbackTimeOut); err != nil {
if !config.Config.Callback.CallbackWordFilter.CallbackFailedContinue {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), "callback failed and config disable, stop this operation")
return false, err
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
if !config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackFailedContinue {
callbackResp.ActionCode = constant.ActionForbidden
return callbackResp
} else {
return true, err
callbackResp.ActionCode = constant.ActionAllow
return callbackResp
}
} else {
if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess {
return false, nil
}
if resp.ErrCode == constant.CallbackHandleSuccess {
msg.MsgData.Content = []byte(resp.Content)
}
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content))
}
return true, err
if resp.ErrCode == constant.CallbackHandleSuccess && resp.ActionCode == constant.ActionAllow {
msg.MsgData.Content = []byte(resp.Content)
}
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content))
return callbackResp
}
+24 -41
View File
@@ -148,47 +148,29 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
//if !utils.VerifyToken(pb.Token, pb.SendID) {
// return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
rpc.encapsulateMsgData(pb.MsgData)
log.Info("", "this is a test MsgData ", pb.MsgData)
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
//options := utils.JsonStringToMap(pbData.Options)
isHistory := utils.GetSwitchFromOptions(pb.MsgData.Options, constant.IsHistory)
mReq := MsgCallBackReq{
SendID: pb.MsgData.SendID,
RecvID: pb.MsgData.RecvID,
Content: string(pb.MsgData.Content),
SendTime: pb.MsgData.SendTime,
MsgFrom: pb.MsgData.MsgFrom,
ContentType: pb.MsgData.ContentType,
SessionType: pb.MsgData.SessionType,
PlatformID: pb.MsgData.SenderPlatformID,
MsgID: pb.MsgData.ClientMsgID,
}
if !isHistory {
mReq.IsOnlineOnly = true
}
// callback
canSend, err := callbackWordFilter(pb)
if err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter failed", err.Error(), pb.MsgData)
callbackResp := callbackWordFilter(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp)
}
if !canSend {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter result", canSend, "end rpc and return", pb.MsgData)
return returnMsg(&replay, pb, 201, "callbackWordFilter result stop rpc and return", "", 0)
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter result", "end rpc and return", pb.MsgData)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
switch pb.MsgData.SessionType {
case constant.SingleChatType:
// callback
canSend, err := callbackBeforeSendSingleMsg(pb)
if err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg failed", err.Error())
callbackResp := callbackBeforeSendSingleMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp)
}
if !canSend {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", canSend, "end rpc and return")
return returnMsg(&replay, pb, 201, "callbackBeforeSendSingleMsg result stop rpc and return", "", 0)
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
return returnMsg(&replay, pb, int32(callbackResp.ActionCode), callbackResp.ErrMsg, "", 0)
}
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
if isSend {
@@ -208,19 +190,20 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
}
// callback
if err := callbackAfterSendSingleMsg(pb); err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg failed", err.Error())
callbackResp = callbackAfterSendSingleMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp)
}
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
case constant.GroupChatType:
// callback
canSend, err := callbackBeforeSendGroupMsg(pb)
if err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg failed", err.Error())
callbackResp := callbackBeforeSendGroupMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg resp:", callbackResp)
}
if !canSend {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg result", canSend, "end rpc and return")
return returnMsg(&replay, pb, 201, "callbackBeforeSendGroupMsg result stop rpc and return", "", 0)
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
return returnMsg(&replay, pb, int32(callbackResp.ActionCode), callbackResp.ErrMsg, "", 0)
}
getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pb.OperationID, GroupID: pb.MsgData.GroupID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
@@ -295,8 +278,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.Debug(pb.OperationID, "send msg cost time2 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
newTime = db.GetCurrentTimestampByMill()
// callback
if err := callbackAfterSendGroupMsg(pb); err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error())
callbackResp = callbackAfterSendGroupMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg resp: ", callbackResp)
}
if !sendTag {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
@@ -354,7 +338,6 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
log.Debug(pb.OperationID, "send msg cost time3 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
}
case constant.NotificationChatType:
msgToMQSingle.MsgData = pb.MsgData