merge code

This commit is contained in:
wangchuxiao
2022-02-28 17:57:03 +08:00
parent 7d8d314962
commit 0bed7ee669
20 changed files with 341 additions and 57 deletions
+94
View File
@@ -0,0 +1,94 @@
package msg
import (
cbApi "Open_IM/pkg/call_back_struct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
pbChat "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils"
)
func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
if !config.Config.Callback.CallbackbeforeSendSingleMsg.Enable || msg.MsgData.ContentType != constant.Text {
return true, nil
}
req := cbApi.CallbackBeforeSendSingleMsgReq{CommonCallbackReq:cbApi.CommonCallbackReq{
}}
resp := &cbApi.CallbackBeforeSendSingleMsgResp{CommonCallbackResp:cbApi.CommonCallbackResp{
}}
utils.CopyStructFields(req, msg.MsgData)
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackbeforeSendSingleMsg.CallbackTimeOut); err != nil && !config.Config.Callback.CallbackbeforeSendSingleMsg.CallbackFailedContinue{
return false, err
}
if resp.ActionCode == constant.ActionForbidden {
return false, nil
}
return true, nil
}
func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable || msg.MsgData.ContentType != constant.Text {
return nil
}
req := cbApi.CallbackAfterSendSingleMsgReq{CommonCallbackReq: cbApi.CommonCallbackReq{}}
resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
utils.CopyStructFields(req, msg.MsgData)
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendSingleMsg.CallbackTimeOut); err != nil && config.Config.Callback.CallbackAfterSendSingleMsg.CallbackFailedContinue{
return err
}
return nil
}
func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable || msg.MsgData.ContentType != constant.Text {
return true, nil
}
req := cbApi.CallbackBeforeSendSingleMsgReq{CommonCallbackReq: cbApi.CommonCallbackReq{}}
resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
utils.CopyStructFields(req, msg.MsgData)
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackTimeOut); err != nil && !config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackFailedContinue{
return false, nil
}
if resp.ActionCode == constant.ActionForbidden {
return false, nil
}
return true, nil
}
func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable || msg.MsgData.ContentType != constant.Text {
return nil
}
return nil
}
func callBackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text {
return true, nil
}
req := cbApi.CallBackWordFilterReq{
CommonCallbackReq: cbApi.CommonCallbackReq{},
}
resp := cbApi.CallBackWordFilterResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
utils.CopyStructFields(&req, msg.MsgData)
if err := http.PostReturn(msg.OperationID, req, &resp, config.Config.Callback.CallbackWordFilter.CallbackTimeOut); err != nil {
return false, err
}
msg.MsgData.Content = resp.Content
return true, nil
}
+31 -19
View File
@@ -4,7 +4,6 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
http2 "Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbChat "Open_IM/pkg/proto/chat"
@@ -12,11 +11,9 @@ import (
sdk_ws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/garyburd/redigo/redis"
"github.com/golang/protobuf/proto"
"math/rand"
"net/http"
"strconv"
"strings"
"time"
@@ -127,25 +124,26 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if !isHistory {
mReq.IsOnlineOnly = true
}
mResp := MsgCallBackResp{}
if config.Config.MessageCallBack.CallbackSwitch {
bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, mReq, config.Config.MessageCallBack.CallBackTimeOut)
if err != nil {
log.ErrorByKv("callback to Business server err", pb.OperationID, "args", pb.String(), "err", err.Error())
return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "", 0)
} else if err = json.Unmarshal(bMsg, &mResp); err != nil {
log.ErrorByKv("ws json Unmarshal err", pb.OperationID, "args", pb.String(), "err", err.Error())
return returnMsg(&replay, pb, 200, err.Error(), "", 0)
} else {
if mResp.ErrCode != 0 {
return returnMsg(&replay, pb, mResp.ResponseErrCode, mResp.ErrMsg, "", 0)
} else {
pb.MsgData.Content = []byte(mResp.ResponseResult.ModifiedMsg)
}
}
// callback
canSend, err := callBackWordFilter(pb)
if err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallbackBeforeSendMsg failed", err.Error(), pb.MsgData)
}
if !canSend {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return")
return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0)
}
switch pb.MsgData.SessionType {
case constant.SingleChatType:
canSend, err := callbackBeforeSendSingleMsg(pb)
if err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error())
}
if !canSend {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return")
return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0)
}
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
if isSend {
msgToMQ.MsgData = pb.MsgData
@@ -163,8 +161,19 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
if err := callbackAfterSendSingleMsg(pb); err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error())
}
return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
case constant.GroupChatType:
canSend, err := callbackBeforeSendGroupMsg(pb)
if err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error())
}
if !canSend {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return")
return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0)
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
client := pbGroup.NewGroupClient(etcdConn)
req := &pbGroup.GetGroupAllMemberReq{
@@ -230,6 +239,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
}
if err := callbackAfterSendGroupMsg(pb); err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error())
}
return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
default:
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)