Merge branch 'v2.3.0release' of github.com:OpenIMSDK/Open-IM-Server into v2.3.0release

This commit is contained in:
wangchuxiao
2022-10-28 15:43:18 +08:00
10 changed files with 343 additions and 255 deletions
+9 -14
View File
@@ -138,12 +138,6 @@ func ManagementSendMsg(c *gin.Context) {
data = VideoElem{}
case constant.File:
data = FileElem{}
//case constant.AtText:
// data = AtElem{}
//case constant.Merger:
// data =
//case constant.Card:
//case constant.Location:
case constant.Custom:
data = CustomElem{}
case constant.Revoke:
@@ -161,16 +155,16 @@ func ManagementSendMsg(c *gin.Context) {
//case constant.Typing:
//case constant.Quote:
default:
c.JSON(http.StatusBadRequest, gin.H{"errCode": 404, "errMsg": "contentType err"})
c.JSON(http.StatusOK, gin.H{"errCode": 404, "errMsg": "contentType err"})
log.Error(c.PostForm("operationID"), "contentType err", c.PostForm("content"))
return
}
if err := mapstructure.WeakDecode(params.Content, &data); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": err.Error()})
c.JSON(http.StatusOK, gin.H{"errCode": 401, "errMsg": err.Error()})
log.Error(c.PostForm("operationID"), "content to Data struct err", err.Error())
return
} else if err := validate.Struct(data); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 403, "errMsg": err.Error()})
c.JSON(http.StatusOK, gin.H{"errCode": 403, "errMsg": err.Error()})
log.Error(c.PostForm("operationID"), "data args validate err", err.Error())
return
}
@@ -178,12 +172,13 @@ func ManagementSendMsg(c *gin.Context) {
token := c.Request.Header.Get("token")
claims, err := token_verify.ParseToken(token, params.OperationID)
if err != nil {
log.NewError(params.OperationID, "parse token failed", err.Error())
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "parse token failed", "sendTime": 0, "MsgID": ""})
log.NewError(params.OperationID, "parse token failed", err.Error(), token)
c.JSON(http.StatusOK, gin.H{"errCode": 400, "errMsg": "parse token failed", "sendTime": 0, "MsgID": ""})
return
}
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not authorized", "sendTime": 0, "MsgID": ""})
log.NewError(params.OperationID, "not authorized", token)
c.JSON(http.StatusOK, gin.H{"errCode": 400, "errMsg": "not authorized", "sendTime": 0, "MsgID": ""})
return
}
@@ -191,13 +186,13 @@ func ManagementSendMsg(c *gin.Context) {
case constant.SingleChatType:
if len(params.RecvID) == 0 {
log.NewError(params.OperationID, "recvID is a null string")
c.JSON(http.StatusBadRequest, gin.H{"errCode": 405, "errMsg": "recvID is a null string", "sendTime": 0, "MsgID": ""})
c.JSON(http.StatusOK, gin.H{"errCode": 405, "errMsg": "recvID is a null string", "sendTime": 0, "MsgID": ""})
return
}
case constant.GroupChatType, constant.SuperGroupChatType:
if len(params.GroupID) == 0 {
log.NewError(params.OperationID, "groupID is a null string")
c.JSON(http.StatusBadRequest, gin.H{"errCode": 405, "errMsg": "groupID is a null string", "sendTime": 0, "MsgID": ""})
c.JSON(http.StatusOK, gin.H{"errCode": 405, "errMsg": "groupID is a null string", "sendTime": 0, "MsgID": ""})
return
}
+36 -2
View File
@@ -8,6 +8,7 @@ import (
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbChat "Open_IM/pkg/proto/msg"
push "Open_IM/pkg/proto/push"
pbRtc "Open_IM/pkg/proto/rtc"
sdk_ws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
@@ -58,7 +59,7 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
promePkg.PromeInc(promePkg.PullMsgBySeqListTotalCounter)
case constant.WsLogoutMsg:
log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier)
// conn.Close()
ws.userLogoutReq(conn, &m)
default:
log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier)
}
@@ -157,7 +158,6 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
ws.pullMsgBySeqListResp(conn, m, nReply)
}
}
func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullMessageBySeqListResp) {
log.NewInfo(m.OperationID, "pullMsgBySeqListResp come here ", pb.String())
c, _ := proto.Marshal(pb)
@@ -173,7 +173,40 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM
len(mReply.Data))
ws.sendMsg(conn, mReply)
}
func (ws *WServer) userLogoutReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to userLogoutReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data))
rpcReq := push.DelUserPushTokenReq{}
rpcReq.UserID = m.SendID
rpcReq.OperationID = m.OperationID
grpcConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, m.OperationID)
if grpcConn == nil {
errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(rpcReq.OperationID, errMsg)
ws.userLogoutResp(conn, m)
return
}
msgClient := push.NewPushMsgServiceClient(grpcConn)
reply, err := msgClient.DelUserPushToken(context.Background(), &rpcReq)
if err != nil {
log.NewError(rpcReq.OperationID, "DelUserPushToken err", err.Error())
ws.userLogoutResp(conn, m)
} else {
log.NewInfo(rpcReq.OperationID, "rpc call success to DelUserPushToken", reply.String())
ws.userLogoutResp(conn, m)
}
ws.userLogoutResp(conn, m)
}
func (ws *WServer) userLogoutResp(conn *UserConn, m *Req) {
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
OperationID: m.OperationID,
}
ws.sendMsg(conn, mReply)
_ = conn.Close()
}
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
sendMsgAllCountLock.Lock()
sendMsgAllCount++
@@ -233,6 +266,7 @@ func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
Data: b,
}
ws.sendMsg(conn, mReply)
}
func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
+2 -1
View File
@@ -29,6 +29,7 @@ import (
type UserConn struct {
*websocket.Conn
w *sync.Mutex
platformID int32
PushedMaxSeq uint32
}
type WServer struct {
@@ -74,7 +75,7 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
log.Error(operationID, "upgrade http conn err", err.Error(), query)
return
} else {
newConn := &UserConn{conn, new(sync.Mutex), 0}
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0}
userCount++
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID)
go ws.readMsg(newConn)
+13 -1
View File
@@ -3,6 +3,7 @@ package logic
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
@@ -10,7 +11,6 @@ import (
"Open_IM/pkg/utils"
"context"
"net"
"strconv"
"strings"
@@ -91,3 +91,15 @@ func (r *RPCServer) PushMsg(_ context.Context, pbData *pbPush.PushMsgReq) (*pbPu
}, nil
}
func (r *RPCServer) DelUserPushToken(c context.Context, req *pbPush.DelUserPushTokenReq) (*pbPush.DelUserPushTokenResp, error) {
var resp pbPush.DelUserPushTokenResp
err := db.DB.DelFcmToken(req.UserID, int(req.PlatformID))
if err != nil {
errMsg := req.OperationID + " " + "SetFcmToken failed " + err.Error()
log.NewError(req.OperationID, errMsg)
resp.ErrCode = 500
resp.ErrMsg = errMsg
}
return &resp, nil
}