Files
open-im-server/internal/push/logic/push_to_client.go
T

373 lines
15 KiB
Go
Raw Normal View History

2021-05-26 19:22:11 +08:00
/*
** description("").
** copyright('open-im,www.open-im.io').
** author("fg,Gordon@open-im.io").
** time(2021/3/5 14:31).
*/
package logic
import (
2022-06-02 19:14:11 +08:00
"Open_IM/internal/push"
2022-08-30 22:04:13 +08:00
utils2 "Open_IM/internal/utils"
"Open_IM/pkg/common/config"
2022-03-04 16:11:04 +08:00
"Open_IM/pkg/common/constant"
2022-06-10 17:31:30 +08:00
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
2021-12-23 17:34:32 +08:00
pbPush "Open_IM/pkg/proto/push"
2021-10-11 22:00:38 +08:00
pbRelay "Open_IM/pkg/proto/relay"
2022-06-02 18:17:11 +08:00
pbRtc "Open_IM/pkg/proto/rtc"
2022-08-19 13:04:38 +08:00
commonPb "Open_IM/pkg/proto/sdk_ws"
2022-03-04 16:11:04 +08:00
"Open_IM/pkg/utils"
2021-05-26 19:22:11 +08:00
"context"
2022-03-04 16:11:04 +08:00
"encoding/json"
2022-08-16 23:08:16 +08:00
"github.com/golang/protobuf/proto"
2022-08-30 21:22:14 +08:00
"strings"
2021-05-26 19:22:11 +08:00
)
type OpenIMContent struct {
SessionType int `json:"sessionType"`
2021-05-26 19:22:11 +08:00
From string `json:"from"`
To string `json:"to"`
2022-01-20 11:42:43 +08:00
Seq uint32 `json:"seq"`
2021-05-26 19:22:11 +08:00
}
type AtContent struct {
Text string `json:"text"`
AtUserList []string `json:"atUserList"`
IsAtSelf bool `json:"isAtSelf"`
}
2021-05-26 19:22:11 +08:00
2022-08-04 14:04:29 +08:00
//var grpcCons []*grpc.ClientConn
2022-05-25 17:14:09 +08:00
2021-12-23 17:34:32 +08:00
func MsgToUser(pushMsg *pbPush.PushMsgReq) {
2022-06-02 17:37:00 +08:00
var wsResult []*pbRelay.SingelMsgToUserResultList
2022-03-04 16:11:04 +08:00
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
2022-04-28 16:47:46 +08:00
log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String())
2022-08-17 14:57:57 +08:00
grpcCons := getcdv3.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID)
2022-07-29 14:36:07 +08:00
var UIDList = []string{pushMsg.PushToUserID}
callbackResp := callbackOnlinePush(pushMsg.OperationID, UIDList, pushMsg.MsgData)
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "OnlinePush callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOnlinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "OnlinePush stop")
return
}
2021-05-26 19:22:11 +08:00
//Online push message
2022-06-11 09:23:51 +08:00
log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
2021-05-26 19:22:11 +08:00
for _, v := range grpcCons {
2022-07-20 21:36:10 +08:00
msgClient := pbRelay.NewRelayClient(v)
2022-06-15 11:31:24 +08:00
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}})
2021-06-28 19:21:23 +08:00
if err != nil {
2022-06-13 14:56:24 +08:00
log.NewError("SuperGroupOnlineBatchPushOneMsg push data to client rpc err", pushMsg.OperationID, "err", err)
2021-11-29 16:26:57 +08:00
continue
2021-06-28 19:21:23 +08:00
}
2022-06-02 17:37:00 +08:00
if reply != nil && reply.SinglePushResult != nil {
wsResult = append(wsResult, reply.SinglePushResult...)
2021-05-26 19:22:11 +08:00
}
}
2022-04-28 16:47:46 +08:00
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
2022-05-09 18:23:06 +08:00
successCount++
2022-03-04 16:11:04 +08:00
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
2022-06-14 16:05:45 +08:00
// save invitation info for offline push
2022-05-30 17:46:06 +08:00
for _, v := range wsResult {
2022-06-02 17:37:00 +08:00
if v.OnlinePush {
return
2022-05-30 17:46:06 +08:00
}
2022-06-02 17:37:00 +08:00
}
2022-06-17 13:14:38 +08:00
if pushMsg.MsgData.ContentType == constant.SignalingNotification {
if err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData); err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData)
return
}
2022-06-14 16:05:45 +08:00
}
2022-06-02 17:37:00 +08:00
customContent := OpenIMContent{
SessionType: int(pushMsg.MsgData.SessionType),
From: pushMsg.MsgData.SendID,
To: pushMsg.MsgData.RecvID,
Seq: pushMsg.MsgData.Seq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
var content string
if pushMsg.MsgData.OfflinePushInfo != nil {
content = pushMsg.MsgData.OfflinePushInfo.Title
jsonCustomContent = pushMsg.MsgData.OfflinePushInfo.Desc
2022-08-16 23:08:16 +08:00
}
if content == "" {
2022-06-02 17:37:00 +08:00
switch pushMsg.MsgData.ContentType {
case constant.Text:
content = constant.ContentType2PushContent[constant.Text]
case constant.Picture:
content = constant.ContentType2PushContent[constant.Picture]
case constant.Voice:
content = constant.ContentType2PushContent[constant.Voice]
case constant.Video:
content = constant.ContentType2PushContent[constant.Video]
case constant.File:
content = constant.ContentType2PushContent[constant.File]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) {
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
2022-04-13 18:33:22 +08:00
} else {
2022-06-02 17:37:00 +08:00
content = constant.ContentType2PushContent[constant.GroupMsg]
2022-06-02 18:17:11 +08:00
}
2022-06-07 17:42:24 +08:00
case constant.SignalingNotification:
content = constant.ContentType2PushContent[constant.SignalMsg]
2022-06-02 17:37:00 +08:00
default:
content = constant.ContentType2PushContent[constant.Common]
2022-06-14 16:05:45 +08:00
2022-05-30 17:46:06 +08:00
}
2022-06-02 17:37:00 +08:00
}
2022-08-19 13:04:38 +08:00
var offlineInfo commonPb.OfflinePushInfo
callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList, pushMsg.MsgData, &[]string{}, &offlineInfo)
2022-06-02 17:37:00 +08:00
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop")
return
}
2022-08-19 13:04:38 +08:00
if offlineInfo.Title != "" {
content = offlineInfo.Title
}
if offlineInfo.Desc != "" {
jsonCustomContent = offlineInfo.Desc
}
2022-06-02 17:37:00 +08:00
if offlinePusher == nil {
2022-06-07 17:42:24 +08:00
return
2022-06-02 17:37:00 +08:00
}
2022-06-07 17:42:24 +08:00
opts, err := GetOfflinePushOpts(pushMsg)
if err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "GetOfflinePushOpts failed", pushMsg, err.Error())
}
log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), UIDList, content, jsonCustomContent, "opts:", opts)
pushResult, err := offlinePusher.Push(UIDList, content, jsonCustomContent, pushMsg.OperationID, opts)
2022-06-02 17:37:00 +08:00
if err != nil {
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error())
} else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
2022-05-30 17:46:06 +08:00
}
}
}
2022-05-26 18:02:00 +08:00
2022-06-02 16:48:56 +08:00
func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
2022-06-07 18:39:43 +08:00
var wsResult []*pbRelay.SingelMsgToUserResultList
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
2022-08-01 17:02:56 +08:00
log.Debug(pushMsg.OperationID, "Get super group msg from msg_transfer And push msg", pushMsg.String(), config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable)
2022-07-29 14:36:07 +08:00
var pushToUserIDList []string
if config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable {
callbackResp := callbackBeforeSuperGroupOnlinePush(pushMsg.OperationID, pushMsg.PushToUserID, pushMsg.MsgData, &pushToUserIDList)
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "onlinePush stop")
return
}
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "callback userIDList Resp", pushToUserIDList)
2022-07-29 15:56:16 +08:00
}
if len(pushToUserIDList) == 0 {
2022-08-30 22:04:13 +08:00
userIDList, err := utils2.GetGroupMemberUserIDList(pushMsg.MsgData.GroupID, pushMsg.OperationID)
2022-07-29 14:36:07 +08:00
if err != nil {
2022-08-30 21:31:53 +08:00
log.Error(pushMsg.OperationID, "GetGroupMemberUserIDList failed ", err.Error(), pushMsg.MsgData.GroupID)
2022-07-29 14:36:07 +08:00
return
}
2022-08-30 21:31:53 +08:00
pushToUserIDList = userIDList
//getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID}
//etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, pushMsg.OperationID)
//if etcdConn == nil {
// errMsg := pushMsg.OperationID + "getcdv3.GetDefaultConn == nil"
// log.NewError(pushMsg.OperationID, errMsg)
// return
//}
//client := pbCache.NewCacheClient(etcdConn)
//cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq)
//if err != nil {
// log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error())
// return
//}
//if cacheResp.CommonResp.ErrCode != 0 {
// log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
// return
//}
//pushToUserIDList = cacheResp.UserIDList
2022-06-07 18:39:43 +08:00
}
2022-07-29 14:36:07 +08:00
2022-08-17 14:57:57 +08:00
grpcCons := getcdv3.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID)
2022-07-29 14:36:07 +08:00
2022-06-07 18:39:43 +08:00
//Online push message
2022-08-04 14:04:29 +08:00
log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
2022-06-07 18:39:43 +08:00
for _, v := range grpcCons {
2022-07-20 21:36:10 +08:00
msgClient := pbRelay.NewRelayClient(v)
2022-07-29 14:36:07 +08:00
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: pushToUserIDList})
2022-06-07 18:39:43 +08:00
if err != nil {
log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err)
continue
}
if reply != nil && reply.SinglePushResult != nil {
wsResult = append(wsResult, reply.SinglePushResult...)
}
}
log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
successCount++
if isOfflinePush {
var onlineSuccessUserIDList []string
onlineSuccessUserIDList = append(onlineSuccessUserIDList, pushMsg.MsgData.SendID)
for _, v := range wsResult {
if v.OnlinePush && v.UserID != pushMsg.MsgData.SendID {
onlineSuccessUserIDList = append(onlineSuccessUserIDList, v.UserID)
}
}
2022-07-29 14:36:07 +08:00
onlineFailedUserIDList := utils.DifferenceString(onlineSuccessUserIDList, pushToUserIDList)
2022-06-07 18:39:43 +08:00
//Use offline push messaging
customContent := OpenIMContent{
SessionType: int(pushMsg.MsgData.SessionType),
From: pushMsg.MsgData.SendID,
To: pushMsg.MsgData.RecvID,
Seq: pushMsg.MsgData.Seq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
var content string
if pushMsg.MsgData.OfflinePushInfo != nil {
content = pushMsg.MsgData.OfflinePushInfo.Title
jsonCustomContent = pushMsg.MsgData.OfflinePushInfo.Desc
2022-06-07 18:39:43 +08:00
} else {
switch pushMsg.MsgData.ContentType {
case constant.Text:
content = constant.ContentType2PushContent[constant.Text]
case constant.Picture:
content = constant.ContentType2PushContent[constant.Picture]
case constant.Voice:
content = constant.ContentType2PushContent[constant.Voice]
case constant.Video:
content = constant.ContentType2PushContent[constant.Video]
case constant.File:
content = constant.ContentType2PushContent[constant.File]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) {
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
2022-04-13 18:33:22 +08:00
} else {
2022-06-07 18:39:43 +08:00
content = constant.ContentType2PushContent[constant.GroupMsg]
2022-04-13 18:33:22 +08:00
}
2022-06-07 18:39:43 +08:00
case constant.SignalingNotification:
content = constant.ContentType2PushContent[constant.SignalMsg]
default:
content = constant.ContentType2PushContent[constant.Common]
2022-04-13 16:20:30 +08:00
2022-06-07 18:39:43 +08:00
}
2022-03-04 16:11:04 +08:00
}
2022-07-06 16:14:31 +08:00
if len(onlineFailedUserIDList) > 0 {
2022-07-29 17:09:21 +08:00
var offlinePushUserIDList []string
var needOfflinePushUserIDList []string
2022-08-19 13:04:38 +08:00
var offlineInfo commonPb.OfflinePushInfo
callbackResp := callbackOfflinePush(pushMsg.OperationID, onlineFailedUserIDList, pushMsg.MsgData, &offlinePushUserIDList, &offlineInfo)
2022-07-06 16:14:31 +08:00
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop")
return
}
2022-07-29 17:09:21 +08:00
if len(offlinePushUserIDList) > 0 {
needOfflinePushUserIDList = offlinePushUserIDList
} else {
needOfflinePushUserIDList = onlineFailedUserIDList
}
2022-08-19 13:04:38 +08:00
if offlineInfo.Title != "" {
content = offlineInfo.Title
}
if offlineInfo.Desc != "" {
jsonCustomContent = offlineInfo.Desc
}
2022-07-06 16:14:31 +08:00
if offlinePusher == nil {
return
}
opts, err := GetOfflinePushOpts(pushMsg)
if err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "GetOfflinePushOpts failed", pushMsg, err.Error())
}
log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), onlineFailedUserIDList, content, jsonCustomContent, "opts:", opts)
2022-07-29 17:09:21 +08:00
pushResult, err := offlinePusher.Push(needOfflinePushUserIDList, content, jsonCustomContent, pushMsg.OperationID, opts)
2022-07-06 16:14:31 +08:00
if err != nil {
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error())
} else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
}
2022-06-07 18:39:43 +08:00
}
2022-07-06 16:14:31 +08:00
2022-03-04 16:11:04 +08:00
}
2021-06-28 19:21:23 +08:00
}
2021-12-07 14:28:07 +08:00
2022-06-02 19:14:11 +08:00
func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err error) {
2022-06-02 20:19:24 +08:00
if pushMsg.MsgData.ContentType < constant.SignalingNotificationEnd && pushMsg.MsgData.ContentType > constant.SignalingNotificationBegin {
2022-06-02 19:52:29 +08:00
req := &pbRtc.SignalReq{}
2022-06-02 18:17:11 +08:00
if err := proto.Unmarshal(pushMsg.MsgData.Content, req); err != nil {
2022-06-07 12:08:53 +08:00
return opts, utils.Wrap(err, "")
2022-06-02 18:17:11 +08:00
}
2022-06-02 20:19:24 +08:00
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "SignalReq: ", req.String())
2022-06-02 19:52:29 +08:00
switch req.Payload.(type) {
2022-06-02 18:17:11 +08:00
case *pbRtc.SignalReq_Invite, *pbRtc.SignalReq_InviteInGroup:
opts.Signal.ClientMsgID = pushMsg.MsgData.ClientMsgID
2022-06-02 20:19:24 +08:00
log.NewDebug(pushMsg.OperationID, opts)
2022-06-02 18:17:11 +08:00
}
2022-08-31 11:55:55 +08:00
opts.IOSBadgeCount = pushMsg.MsgData.OfflinePushInfo.IOSBadgeCount
opts.IOSPushSound = pushMsg.MsgData.OfflinePushInfo.IOSPushSound
2022-06-02 18:17:11 +08:00
}
return opts, nil
}
2021-12-23 17:34:32 +08:00
//func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) {
// m.MsgID = rpcChat.GetMsgID(m.SendID)
// m.ClientMsgID = m.MsgID
// switch m.SessionType {
// case constant.SingleChatType:
// sendMsgToKafka(m, m.SendID, "msgKey--sendID")
// sendMsgToKafka(m, m.RecvID, "msgKey--recvID")
// case constant.GroupChatType:
2022-08-17 12:12:54 +08:00
// etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
2021-12-23 17:34:32 +08:00
// client := pbGroup.NewGroupClient(etcdConn)
2022-01-25 19:18:04 +08:00
// req := &pbGroup.Req{
2021-12-23 17:34:32 +08:00
// GroupID: m.RecvID,
// Token: config.Config.Secret,
// OperationID: m.OperationID,
// }
2022-01-25 19:18:04 +08:00
// reply, err := client.(context.Background(), req)
2021-12-23 17:34:32 +08:00
// if err != nil {
// log.Error(m.Token, m.OperationID, "rpc getGroupInfo failed, err = %s", err.Error())
// return
// }
// if reply.ErrorCode != 0 {
// log.Error(m.Token, m.OperationID, "rpc getGroupInfo failed, err = %s", reply.ErrorMsg)
// return
// }
// groupID := m.RecvID
// for i, v := range reply.MemberList {
// m.RecvID = v.UserId + " " + groupID
// sendMsgToKafka(m, utils.IntToString(i), "msgKey--recvID+\" \"+groupID")
// }
// default:
//
// }
//}
//
//func sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string, flag string) {
// pid, offset, err := producer.SendMessage(m, key)
// if err != nil {
// log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), flag, key)
// }
//
//}