ws and push update

This commit is contained in:
Gordon
2022-06-02 17:37:00 +08:00
parent 1aacd3cb23
commit 0a260a67d5
4 changed files with 133 additions and 149 deletions
+7 -26
View File
@@ -159,11 +159,16 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online
}
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
}
userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap {
if userConn != nil {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { //仅仅记录推送成功的平台端
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: v,
@@ -174,31 +179,7 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online
}
}
//for _, x := range r.platformList {
// if conn := ws.getUserConn(v, x); conn != nil {
// resultCode := sendMsgBatchToUser(conn, replyBytes.Bytes(), req, x, v)
// temp := &pbRelay.SingleMsgToUserPlatform{
// ResultCode: resultCode,
// RecvID: v,
// RecvPlatFormID: constant.PlatformNameToID(x),
// }
// resp = append(resp, temp)
// } else {
// if utils.IsContain(x,r.pushTerminal) {
// temp := &pbRelay.SingleMsgToUserPlatform{
// ResultCode: -1,
// RecvID: v,
// RecvPlatFormID: constant.PlatformNameToID(x),
// }
// resp = append(resp, temp)
// }
//
// }
//}
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
Resp: resp,
}
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
}
+61 -68
View File
@@ -36,7 +36,7 @@ type AtContent struct {
var grpcCons []*grpc.ClientConn
func MsgToUser(pushMsg *pbPush.PushMsgReq) {
var wsResult []*pbRelay.SingleMsgToUserPlatform
var wsResult []*pbRelay.SingelMsgToUserResultList
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String())
if len(grpcCons) == 0 {
@@ -47,90 +47,83 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
for _, v := range grpcCons {
msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v)
reply, err := msgClient.OnlinePushMsg(context.Background(), &pbRelay.OnlinePushMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserID: pushMsg.PushToUserID})
reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}})
if err != nil {
log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err)
continue
}
if reply != nil && reply.Resp != nil {
wsResult = append(wsResult, reply.Resp...)
if reply != nil && reply.SinglePushResult != nil {
wsResult = append(wsResult, reply.SinglePushResult...)
}
}
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
successCount++
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
for _, v := range wsResult {
if v.ResultCode == 0 {
if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) {
break
}
continue
if v.OnlinePush {
return
}
if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) {
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, v.RecvID)
customContent := OpenIMContent{
SessionType: int(pushMsg.MsgData.SessionType),
From: pushMsg.MsgData.SendID,
To: pushMsg.MsgData.RecvID,
Seq: pushMsg.MsgData.Seq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
var content string
if pushMsg.MsgData.OfflinePushInfo != nil {
content = pushMsg.MsgData.OfflinePushInfo.Title
}
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, pushMsg.PushToUserID)
customContent := OpenIMContent{
SessionType: int(pushMsg.MsgData.SessionType),
From: pushMsg.MsgData.SendID,
To: pushMsg.MsgData.RecvID,
Seq: pushMsg.MsgData.Seq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
var content string
if pushMsg.MsgData.OfflinePushInfo != nil {
content = pushMsg.MsgData.OfflinePushInfo.Title
} else {
switch pushMsg.MsgData.ContentType {
case constant.Text:
content = constant.ContentType2PushContent[constant.Text]
case constant.Picture:
content = constant.ContentType2PushContent[constant.Picture]
case constant.Voice:
content = constant.ContentType2PushContent[constant.Voice]
case constant.Video:
content = constant.ContentType2PushContent[constant.Video]
case constant.File:
content = constant.ContentType2PushContent[constant.File]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) {
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
switch pushMsg.MsgData.ContentType {
case constant.Text:
content = constant.ContentType2PushContent[constant.Text]
case constant.Picture:
content = constant.ContentType2PushContent[constant.Picture]
case constant.Voice:
content = constant.ContentType2PushContent[constant.Voice]
case constant.Video:
content = constant.ContentType2PushContent[constant.Video]
case constant.File:
content = constant.ContentType2PushContent[constant.File]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
if utils.IsContain(v.RecvID, a.AtUserList) {
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
content = constant.ContentType2PushContent[constant.GroupMsg]
}
default:
content = constant.ContentType2PushContent[constant.Common]
}
content = constant.ContentType2PushContent[constant.GroupMsg]
}
callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList[0], pushMsg.MsgData.OfflinePushInfo, v.RecvPlatFormID)
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop")
break
}
if offlinePusher == nil {
offlinePusher = jpush.JPushClient
}
pushResult, err := offlinePusher.Push(UIDList, content, jsonCustomContent, pushMsg.OperationID)
if err != nil {
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error())
} else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
}
break
default:
content = constant.ContentType2PushContent[constant.Common]
}
}
callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList[0], pushMsg.MsgData.OfflinePushInfo, constant.AndroidPlatformID)
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop")
return
}
if offlinePusher == nil {
offlinePusher = jpush.JPushClient
}
pushResult, err := offlinePusher.Push(UIDList, content, jsonCustomContent, pushMsg.OperationID)
if err != nil {
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error())
} else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
}
}
}
func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
@@ -155,7 +148,7 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
// wsResult = append(wsResult, reply.SinglePushResult...)
// }
//}
//log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
//log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
//successCount++
//if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
// for _, v := range wsResult {