This commit is contained in:
wangchuxiao
2023-02-22 19:51:14 +08:00
parent 2b38381296
commit 94d50a6c71
31 changed files with 500 additions and 602 deletions
+2 -2
View File
@@ -36,11 +36,11 @@ func Init(rpcPort, wsPort int) {
initPrometheus()
}
func Run(promethuesPort int) {
func Run(prometheusPort int) {
go ws.run()
go rpcSvr.run()
go func() {
err := prome.StartPromeSrv(promethuesPort)
err := prome.StartPromeSrv(prometheusPort)
if err != nil {
panic(err)
}
+2 -2
View File
@@ -287,7 +287,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg, m.OperationID)
if isPass {
signalResp := pbRtc.SignalResp{}
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRealTimeCommName, m.OperationID)
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRtcName, m.OperationID)
if etcdConn == nil {
errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(m.OperationID, errMsg)
@@ -301,7 +301,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
}
respPb, err := rtcClient.SignalMessageAssemble(context.Background(), req)
if err != nil {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "SignalMessageAssemble", err.Error(), config.Config.RpcRegisterName.OpenImRealTimeCommName)
log.NewError(m.OperationID, utils.GetSelfFuncName(), "SignalMessageAssemble", err.Error(), config.Config.RpcRegisterName.OpenImRtcName)
ws.sendSignalMsgResp(conn, 204, "grpc SignalMessageAssemble failed: "+err.Error(), m, &signalResp)
return
}
+20 -20
View File
@@ -6,7 +6,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/http"
"Open_IM/pkg/common/tracelog"
common "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"context"
)
@@ -15,7 +15,7 @@ func url() string {
return config.Config.Callback.CallbackUrl
}
func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.MsgData, offlinePushUserIDList *[]string) error {
func callbackOfflinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
if !config.Config.Callback.CallbackOfflinePush.Enable {
return nil
}
@@ -27,7 +27,7 @@ func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.M
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
},
UserIDList: userIDList,
UserIDList: userIDs,
},
OfflinePushInfo: msg.OfflinePushInfo,
ClientMsgID: msg.ClientMsgID,
@@ -43,8 +43,8 @@ func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.M
if err != nil {
return err
}
if len(resp.UserIDList) != 0 {
*offlinePushUserIDList = resp.UserIDList
if len(resp.UserIDs) != 0 {
*offlinePushUserIDs = resp.UserIDs
}
if resp.OfflinePushInfo != nil {
msg.OfflinePushInfo = resp.OfflinePushInfo
@@ -52,19 +52,19 @@ func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.M
return nil
}
func callbackOnlinePush(operationID string, userIDList []string, msg *common.MsgData) error {
if !config.Config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDList...) {
func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
if !config.Config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) {
return nil
}
req := callbackstruct.CallbackBeforePushReq{
UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: constant.CallbackOnlinePushCommand,
OperationID: operationID,
OperationID: tracelog.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
},
UserIDList: userIDList,
UserIDList: userIDs,
},
ClientMsgID: msg.ClientMsgID,
SendID: msg.SendID,
@@ -78,7 +78,7 @@ func callbackOnlinePush(operationID string, userIDList []string, msg *common.Msg
return http.CallBackPostReturn(url(), req, resp, config.Config.Callback.CallbackOnlinePush)
}
func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg *common.MsgData, pushToUserList *[]string) error {
func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg *sdkws.MsgData, pushToUserIDs *[]string) error {
if !config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable {
return nil
}
@@ -89,21 +89,21 @@ func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
},
ClientMsgID: msg.ClientMsgID,
SendID: msg.SendID,
GroupID: groupID,
ContentType: msg.ContentType,
SessionType: msg.SessionType,
AtUserIDList: msg.AtUserIDList,
Content: utils.GetContent(msg),
Seq: msg.Seq,
ClientMsgID: msg.ClientMsgID,
SendID: msg.SendID,
GroupID: groupID,
ContentType: msg.ContentType,
SessionType: msg.SessionType,
AtUserIDs: msg.AtUserIDList,
Content: utils.GetContent(msg),
Seq: msg.Seq,
}
resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
return err
}
if len(resp.UserIDList) != 0 {
*pushToUserList = resp.UserIDList
if len(resp.UserIDs) != 0 {
*pushToUserIDs = resp.UserIDs
}
return nil
}
+2 -3
View File
@@ -22,7 +22,7 @@ type Fcm struct {
cache cache.Cache
}
func newFcmClient(cache cache.Cache) *Fcm {
func NewClient(cache cache.Cache) *Fcm {
opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount))
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil {
@@ -42,7 +42,7 @@ func newFcmClient(cache cache.Cache) *Fcm {
return &Fcm{fcmMsgCli: fcmMsgClient}
}
func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts push.Opts) error {
func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *push.Opts) error {
// accounts->registrationToken
allTokens := make(map[string][]string, 0)
for _, account := range userIDs {
@@ -105,7 +105,6 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
}
messages = append(messages, temp)
}
}
messageCount := len(messages)
if messageCount > 0 {
+5 -4
View File
@@ -2,14 +2,15 @@ package fcm
import (
"Open_IM/internal/push"
"fmt"
"Open_IM/pkg/common/db/cache"
"context"
"github.com/stretchr/testify/assert"
"testing"
)
func Test_Push(t *testing.T) {
offlinePusher := NewFcm()
resp, err := offlinePusher.Push([]string{"test_uid"}, "test", "test", "12321", push.PushOpts{})
var redis cache.Cache
offlinePusher := NewClient(redis)
err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{})
assert.Nil(t, err)
fmt.Println(resp)
}
+30 -2
View File
@@ -1,13 +1,32 @@
package getui
import "Open_IM/pkg/common/config"
import (
"Open_IM/pkg/common/config"
"fmt"
)
type CommonResp struct {
type Resp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data interface{} `json:"data"`
}
func (r *Resp) parseError() (err error) {
switch r.Code {
case tokenExpireCode:
err = TokenExpireError
case 0:
err = nil
default:
err = fmt.Errorf("code %d, msg %s", r.Code, r.Msg)
}
return err
}
type RespI interface {
parseError() error
}
type AuthReq struct {
Sign string `json:"sign"`
Timestamp string `json:"timestamp"`
@@ -95,6 +114,10 @@ type Options struct {
} `json:"VV"`
}
type Payload struct {
IsSignal bool `json:"isSignal"`
}
func newPushReq(title, content string) PushReq {
pushReq := PushReq{PushMessage: &PushMessage{Notification: &Notification{
Title: title,
@@ -106,6 +129,11 @@ func newPushReq(title, content string) PushReq {
return pushReq
}
func newBatchPushReq(userIDs []string, taskID string) PushReq {
var IsAsync = true
return PushReq{Audience: &Audience{Alias: userIDs}, IsAsync: &IsAsync, TaskID: &taskID}
}
func (pushReq *PushReq) setPushChannel(title string, body string) {
pushReq.PushChannel = &PushChannel{}
// autoBadge := "+1"
+66 -77
View File
@@ -4,23 +4,23 @@ import (
"Open_IM/internal/push"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db/cache"
//http2 "Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
http2 "Open_IM/pkg/common/http"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils/splitter"
"github.com/go-redis/redis/v8"
"Open_IM/pkg/utils"
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"strconv"
"time"
)
var (
TokenExpireError = errors.New("token expire")
UserIDEmptyError = errors.New("userIDs is empty")
)
const (
@@ -29,62 +29,56 @@ const (
taskURL = "/push/list/message"
batchPushURL = "/push/list/alias"
tokenExpire = 10001
ttl = 0
// codes
tokenExpireCode = 10001
tokenExpireTime = 60 * 60 * 23
taskIDTTL = 1000 * 60 * 60 * 24
)
type Client struct {
cache cache.Cache
cache cache.Cache
tokenExpireTime int64
taskIDTTL int64
}
func newClient(cache cache.Cache) *Client {
return &Client{cache: cache}
func NewClient(cache cache.Cache) *Client {
return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
}
func (g *Client) Push(ctx context.Context, userIDs []string, title, content, operationID string, opts *push.Opts) error {
func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *push.Opts) error {
token, err := g.cache.GetGetuiToken(ctx)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "GetGetuiToken failed", err.Error())
}
if token == "" || err != nil {
token, err = g.getTokenAndSave2Redis(ctx)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "getTokenAndSave2Redis failed", err.Error())
return utils.Wrap(err, "")
if err == redis.Nil {
token, err = g.getTokenAndSave2Redis(ctx)
if err != nil {
return err
}
} else {
return err
}
}
pushReq := newPushReq(title, content)
pushReq.setPushChannel(title, content)
pushResp := struct{}{}
if len(userIDs) > 1 {
taskID, err := g.GetTaskID(ctx, token, pushReq)
if err != nil {
return utils.Wrap(err, "GetTaskIDAndSave2Redis failed")
maxNum := 999
if len(userIDs) > maxNum {
s := splitter.NewSplitter(maxNum, userIDs)
for _, v := range s.GetSplitResult() {
err = g.batchPush(ctx, token, v.Item, pushReq)
}
} else {
err = g.batchPush(ctx, token, userIDs, pushReq)
}
pushReq = PushReq{Audience: &Audience{Alias: userIDs}}
var IsAsync = true
pushReq.IsAsync = &IsAsync
pushReq.TaskID = &taskID
err = g.request(ctx, batchPushURL, pushReq, token, &pushResp)
} else if len(userIDs) == 1 {
err = g.singlePush(ctx, token, userIDs[0], pushReq)
} else {
reqID := utils.OperationIDGenerator()
pushReq.RequestID = &reqID
pushReq.Audience = &Audience{Alias: []string{userIDs[0]}}
err = g.request(ctx, pushURL, pushReq, token, &pushResp)
return UserIDEmptyError
}
switch err {
case TokenExpireError:
token, err = g.getTokenAndSave2Redis(ctx)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "getTokenAndSave2Redis failed, ", err.Error())
} else {
log.NewInfo(operationID, utils.GetSelfFuncName(), "getTokenAndSave2Redis: ", token)
}
}
if err != nil {
return utils.Wrap(err, "push failed")
}
return utils.Wrap(err, "")
return err
}
func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) {
@@ -101,7 +95,6 @@ func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expir
if err != nil {
return "", 0, err
}
//log.NewInfo(operationID, utils.GetSelfFuncName(), "result: ", respAuth)
expire, err := strconv.Atoi(respAuth.ExpireTime)
return respAuth.Token, int64(expire), err
}
@@ -117,63 +110,59 @@ func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (
return respTask.TaskID, nil
}
func (g *Client) request(ctx context.Context, url string, content interface{}, token string, output interface{}) error {
con, err := json.Marshal(content)
// max num is 999
func (g *Client) batchPush(ctx context.Context, token string, userIDs []string, pushReq PushReq) error {
taskID, err := g.GetTaskID(ctx, token, pushReq)
if err != nil {
return err
}
client := &http.Client{}
req, err := http.NewRequest("POST", config.Config.Push.Getui.PushUrl+url, bytes.NewBuffer(con))
pushReq = newBatchPushReq(userIDs, taskID)
return g.request(ctx, batchPushURL, pushReq, token, nil)
}
func (g *Client) singlePush(ctx context.Context, token, userID string, pushReq PushReq) error {
operationID := tracelog.GetOperationID(ctx)
pushReq.RequestID = &operationID
pushReq.Audience = &Audience{Alias: []string{userID}}
return g.request(ctx, pushURL, pushReq, token, nil)
}
func (g *Client) request(ctx context.Context, url string, input interface{}, token string, output interface{}) error {
header := map[string]string{"token": token}
resp := &Resp{}
resp.Data = output
return g.postReturn(config.Config.Push.Getui.PushUrl+url, header, input, resp, 3)
}
func (g *Client) postReturn(url string, header map[string]string, input interface{}, output RespI, timeout int) error {
err := http2.PostReturn(url, header, input, output, timeout)
if err != nil {
return err
}
if token != "" {
req.Header.Set("token", token)
}
req.Header.Set("content-type", "application/json")
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
result, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
//log.NewDebug(operationID, "getui", utils.GetSelfFuncName(), "resp, ", string(result))
commonResp := CommonResp{}
commonResp.Data = output
if err := json.Unmarshal(result, &commonResp); err != nil {
return err
}
if commonResp.Code == tokenExpire {
return TokenExpireError
}
return nil
return output.parseError()
}
func (g *Client) getTokenAndSave2Redis(ctx context.Context) (token string, err error) {
token, _, err = g.Auth(ctx, time.Now().UnixNano()/1e6)
if err != nil {
return "", utils.Wrap(err, "Auth failed")
return
}
err = g.cache.SetGetuiTaskID(ctx, token, 60*60*23)
err = g.cache.SetGetuiToken(ctx, token, 60*60*23)
if err != nil {
return "", utils.Wrap(err, "Auth failed")
return
}
return token, nil
}
func (g *Client) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) {
ttl := int64(1000 * 60 * 60 * 24)
pushReq.Settings = &Settings{TTL: &ttl}
pushReq.Settings = &Settings{TTL: &g.taskIDTTL}
taskID, err = g.GetTaskID(ctx, token, pushReq)
if err != nil {
return "", utils.Wrap(err, "GetTaskIDAndSave2Redis failed")
return
}
err = g.cache.SetGetuiTaskID(ctx, taskID, 60*60*23)
err = g.cache.SetGetuiTaskID(ctx, taskID, g.tokenExpireTime)
if err != nil {
return "", utils.Wrap(err, "Auth failed")
return
}
return token, nil
}
+17 -18
View File
@@ -12,47 +12,46 @@ import (
jpush "Open_IM/internal/push/jpush"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/prome"
"Open_IM/pkg/statistics"
"fmt"
)
var (
type Push struct {
rpcServer RPCServer
pushCh ConsumerHandler
offlinePusher OfflinePusher
successCount uint64
)
func Init(rpcPort int) {
rpcServer.Init(rpcPort)
pushCh.Init()
}
func init() {
statistics.NewStatistics(&successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
func (p *Push) Init(rpcPort int) {
var cacheInterface cache.Cache
p.rpcServer.Init(rpcPort, cacheInterface)
p.pushCh.Init()
statistics.NewStatistics(&p.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
if *config.Config.Push.Getui.Enable {
offlinePusher = getui.GetuiClient
p.offlinePusher = getui.NewClient(cacheInterface)
}
if config.Config.Push.Jpns.Enable {
offlinePusher = jpush.JPushClient
p.offlinePusher = jpush.NewClient()
}
if config.Config.Push.Fcm.Enable {
offlinePusher = fcm.NewFcm()
p.offlinePusher = fcm.NewClient(cacheInterface)
}
}
func initPrometheus() {
func (p *Push) initPrometheus() {
prome.NewMsgOfflinePushSuccessCounter()
prome.NewMsgOfflinePushFailedCounter()
}
func Run(promethuesPort int) {
go rpcServer.run()
go pushCh.ConsumerGroup.RegisterHandleAndConsumer(&pushCh)
func (p *Push) Run(prometheusPort int) {
go p.rpcServer.run()
go p.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&p.pushCh)
go func() {
err := prome.StartPromeSrv(promethuesPort)
err := prome.StartPromeSrv(prometheusPort)
if err != nil {
panic(err)
}
+8 -10
View File
@@ -1,13 +1,11 @@
package body
const (
TAG = "tag"
TAG_AND = "tag_and"
TAG_NOT = "tag_not"
ALIAS = "alias"
REGISTRATION_ID = "registration_id"
SEGMENT = "segment"
ABTEST = "abtest"
TAG = "tag"
TAGAND = "tag_and"
TAGNOT = "tag_not"
ALIAS = "alias"
REGISTRATIONID = "registration_id"
)
type Audience struct {
@@ -32,11 +30,11 @@ func (a *Audience) SetTag(tags []string) {
}
func (a *Audience) SetTagAnd(tags []string) {
a.set(TAG_AND, tags)
a.set(TAGAND, tags)
}
func (a *Audience) SetTagNot(tags []string) {
a.set(TAG_NOT, tags)
a.set(TAGNOT, tags)
}
func (a *Audience) SetAlias(alias []string) {
@@ -44,7 +42,7 @@ func (a *Audience) SetAlias(alias []string) {
}
func (a *Audience) SetRegistrationId(ids []string) {
a.set(REGISTRATION_ID, ids)
a.set(REGISTRATIONID, ids)
}
func (a *Audience) SetAll() {
+23 -49
View File
@@ -3,27 +3,16 @@ package push
import (
"Open_IM/internal/push"
"Open_IM/internal/push/jpush/body"
"Open_IM/internal/push/jpush/common"
"Open_IM/pkg/common/config"
"bytes"
http2 "Open_IM/pkg/common/http"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
var (
JPushClient *JPush
)
func init() {
JPushClient = newJPushClient()
}
type JPush struct{}
func newJPushClient() *JPush {
func NewClient() *JPush {
return &JPush{}
}
@@ -35,18 +24,18 @@ func (j *JPush) SetAlias(cid, alias string) (resp string, err error) {
return resp, nil
}
func (j *JPush) getAuthorization(Appkey string, MasterSecret string) string {
str := fmt.Sprintf("%s:%s", Appkey, MasterSecret)
func (j *JPush) getAuthorization(appKey string, masterSecret string) string {
str := fmt.Sprintf("%s:%s", appKey, masterSecret)
buf := []byte(str)
Authorization := fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(buf))
return Authorization
}
func (j *JPush) Push(accounts []string, title, detailContent, operationID string, opts push.PushOpts) (string, error) {
func (j *JPush) Push(ctx context.Context, userIDs []string, title, content string, opts *push.Opts) error {
var pf body.Platform
pf.SetAll()
var au body.Audience
au.SetAlias(accounts)
au.SetAlias(userIDs)
var no body.Notification
var extras body.Extras
if opts.Signal.ClientMsgID != "" {
@@ -55,35 +44,20 @@ func (j *JPush) Push(accounts []string, title, detailContent, operationID string
no.IOSEnableMutableContent()
no.SetExtras(extras)
no.SetAlert(title)
var me body.Message
me.SetMsgContent(detailContent)
var o body.Options
o.SetApnsProduction(config.Config.IOSPush.Production)
var po body.PushObj
po.SetPlatform(&pf)
po.SetAudience(&au)
po.SetNotification(&no)
po.SetMessage(&me)
po.SetOptions(&o)
con, err := json.Marshal(po)
if err != nil {
return "", err
}
client := &http.Client{}
req, err := http.NewRequest("POST", config.Config.Push.Jpns.PushUrl, bytes.NewBuffer(con))
if err != nil {
return "", err
}
req.Header.Set("Authorization", j.getAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret))
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
result, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(result), nil
var msg body.Message
msg.SetMsgContent(content)
var opt body.Options
opt.SetApnsProduction(config.Config.IOSPush.Production)
var pushObj body.PushObj
pushObj.SetPlatform(&pf)
pushObj.SetAudience(&au)
pushObj.SetNotification(&no)
pushObj.SetMessage(&msg)
pushObj.SetOptions(&opt)
var resp interface{}
return j.request(pushObj, resp, 5)
}
func (j *JPush) request(po body.PushObj, resp interface{}, timeout int) error {
return http2.PostReturn(config.Config.Push.Jpns.PushUrl, map[string]string{"Authorization": j.getAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret)}, po, resp, timeout)
}
+2 -3
View File
@@ -35,9 +35,8 @@ func (c *ConsumerHandler) handleMs2PsChat(msg []byte) {
return
}
pbData := &pbPush.PushMsgReq{
OperationID: msgFromMQ.OperationID,
MsgData: msgFromMQ.MsgData,
PushToUserID: msgFromMQ.PushToUserID,
MsgData: msgFromMQ.MsgData,
SourceID: msgFromMQ.PushToUserID,
}
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := utils.GetCurrentTimestampBySecond()
+1 -1
View File
@@ -3,7 +3,7 @@ package push
import "context"
type OfflinePusher interface {
Push(ctx context.Context, userIDs []string, title, content, opts *Opts) error
Push(ctx context.Context, userIDs []string, title, content string, opts *Opts) error
}
type Opts struct {
+11 -20
View File
@@ -3,9 +3,10 @@ package push
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/log"
prome "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/prome"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
"context"
@@ -22,14 +23,16 @@ type RPCServer struct {
rpcRegisterName string
etcdSchema string
etcdAddr []string
push controller.PushInterface
}
func (r *RPCServer) Init(rpcPort int) {
func (r *RPCServer) Init(rpcPort int, cache cache.Cache) {
r.rpcPort = rpcPort
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName
r.etcdSchema = config.Config.Etcd.EtcdSchema
r.etcdAddr = config.Config.Etcd.EtcdAddr
}
func (r *RPCServer) run() {
listenIP := ""
if config.Config.ListenIP == "" {
@@ -77,29 +80,17 @@ func (r *RPCServer) run() {
return
}
}
func (r *RPCServer) PushMsg(_ context.Context, pbData *pbPush.PushMsgReq) (*pbPush.PushMsgResp, error) {
//Call push module to send message to the user
func (r *RPCServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (resp *pbPush.PushMsgResp, err error) {
switch pbData.MsgData.SessionType {
case constant.SuperGroupChatType:
MsgToSuperGroupUser(pbData)
default:
MsgToUser(pbData)
}
return &pbPush.PushMsgResp{
ResultCode: 0,
}, nil
return &pbPush.PushMsgResp{}, nil
}
func (r *RPCServer) DelUserPushToken(c context.Context, req *pbPush.DelUserPushTokenReq) (*pbPush.DelUserPushTokenResp, error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "req", req.String())
var resp pbPush.DelUserPushTokenResp
err := db.DB.DelFcmToken(req.UserID, int(req.PlatformID))
if err != nil {
errMsg := req.OperationID + " " + "DelFcmToken failed " + err.Error()
log.NewError(req.OperationID, errMsg)
resp.ErrCode = 500
resp.ErrMsg = errMsg
}
return &resp, nil
func (r *RPCServer) DelUserPushToken(ctx context.Context, req *pbPush.DelUserPushTokenReq) (resp *pbPush.DelUserPushTokenResp, err error) {
return &pbPush.DelUserPushTokenResp{}, r.push.DelFcmToken(ctx, req.UserID, int(req.PlatformID))
}
+202 -224
View File
@@ -9,280 +9,258 @@ package push
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/localcache"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/prome"
pbPush "Open_IM/pkg/proto/push"
pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/discoveryregistry"
msggateway "Open_IM/pkg/proto/msggateway"
pbRtc "Open_IM/pkg/proto/rtc"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"context"
"errors"
"github.com/golang/protobuf/proto"
"strings"
)
type AtContent struct {
Text string `json:"text"`
AtUserList []string `json:"atUserList"`
IsAtSelf bool `json:"isAtSelf"`
type Pusher struct {
cache cache.Cache
client discoveryregistry.SvcDiscoveryRegistry
offlinePusher OfflinePusher
groupLocalCache localcache.GroupLocalCache
conversationLocalCache localcache.ConversationLocalCache
successCount int
}
func MsgToUser(pushMsg *pbPush.PushMsgReq) {
var wsResult []*pbRelay.SingelMsgToUserResultList
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String())
grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID)
var UIDList = []string{pushMsg.PushToUserID}
callbackResp := callbackOnlinePush(pushMsg.OperationID, UIDList, pushMsg.MsgData)
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "OnlinePush callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOnlinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "OnlinePush stop")
return
func NewPusher(cache cache.Cache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher {
return &Pusher{
cache: cache,
client: client,
offlinePusher: offlinePusher,
}
}
//Online push message
log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
for _, v := range grpcCons {
msgClient := pbRelay.NewRelayClient(v)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}})
if err != nil {
log.NewError("SuperGroupOnlineBatchPushOneMsg push data to client rpc err", pushMsg.OperationID, "err", err)
continue
}
if reply != nil && reply.SinglePushResult != nil {
wsResult = append(wsResult, reply.SinglePushResult...)
}
func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgData) error {
operationID := tracelog.GetOperationID(ctx)
var userIDs = []string{userID}
log.Debug(operationID, "Get msg from msg_transfer And push msg", msg.String(), userID)
// callback
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil {
return err
}
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData, "isOfflinePush", isOfflinePush)
successCount++
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
// push
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs)
if err != nil {
return err
}
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
log.NewInfo(operationID, "push_result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush)
p.successCount++
if isOfflinePush && userID != msg.SendID {
// save invitation info for offline push
for _, v := range wsResult {
for _, v := range wsResults {
if v.OnlinePush {
return
return nil
}
}
if pushMsg.MsgData.ContentType == constant.SignalingNotification {
isSend, err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData, pushMsg.PushToUserID)
if msg.ContentType == constant.SignalingNotification {
isSend, err := p.cache.HandleSignalInfo(ctx, msg, userID)
if err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData)
return
return err
}
if !isSend {
return
return nil
}
}
var title, detailContent string
callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList, pushMsg.MsgData, &[]string{})
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop")
return
}
if pushMsg.MsgData.OfflinePushInfo != nil {
title = pushMsg.MsgData.OfflinePushInfo.Title
detailContent = pushMsg.MsgData.OfflinePushInfo.Desc
}
if offlinePusher == nil {
return
if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
return err
}
opts, err := GetOfflinePushOpts(pushMsg)
err = p.OfflinePushMsg(ctx, userID, msg, userIDs)
if err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "GetOfflinePushOpts failed", pushMsg, err.Error())
}
log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), UIDList, title, detailContent, "opts:", opts)
if title == "" {
switch pushMsg.MsgData.ContentType {
case constant.Text:
fallthrough
case constant.Picture:
fallthrough
case constant.Voice:
fallthrough
case constant.Video:
fallthrough
case constant.File:
title = constant.ContentType2PushContent[int64(pushMsg.MsgData.ContentType)]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) {
title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
title = constant.ContentType2PushContent[constant.GroupMsg]
}
case constant.SignalingNotification:
title = constant.ContentType2PushContent[constant.SignalMsg]
default:
title = constant.ContentType2PushContent[constant.Common]
}
// detailContent = title
}
if detailContent == "" {
detailContent = title
}
pushResult, err := offlinePusher.Push(UIDList, title, detailContent, pushMsg.OperationID, opts)
if err != nil {
prome.PromeInc(prome.MsgOfflinePushFailedCounter)
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error())
} else {
prome.PromeInc(prome.MsgOfflinePushSuccessCounter)
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
log.NewError(operationID, "OfflinePushMsg failed", userID)
return err
}
}
return nil
}
func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
var wsResult []*pbRelay.SingelMsgToUserResultList
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
log.Debug(pushMsg.OperationID, "Get super group msg from msg_transfer And push msg", pushMsg.String(), config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable)
var pushToUserIDList []string
if config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable {
callbackResp := callbackBeforeSuperGroupOnlinePush(pushMsg.OperationID, pushMsg.PushToUserID, pushMsg.MsgData, &pushToUserIDList)
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "onlinePush stop")
return
}
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "callback userIDList Resp", pushToUserIDList)
func (p *Pusher) MsgToSuperGroupUser(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
operationID := tracelog.GetOperationID(ctx)
log.Debug(operationID, "Get super group msg from msg_transfer And push msg", msg.String(), groupID)
var pushToUserIDs []string
if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
return err
}
if len(pushToUserIDList) == 0 {
userIDList, err := utils.GetGroupMemberUserIDList(context.Background(), pushMsg.MsgData.GroupID, pushMsg.OperationID)
if len(pushToUserIDs) == 0 {
pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
if err != nil {
log.Error(pushMsg.OperationID, "GetGroupMemberUserIDList failed ", err.Error(), pushMsg.MsgData.GroupID)
return
return err
}
pushToUserIDList = userIDList
}
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
if err != nil {
return err
}
log.Debug(operationID, "push_result", wsResults, "sendData", msg)
p.successCount++
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush {
var onlineSuccessUserIDs []string
var WebAndPcBackgroundUserIDs []string
onlineSuccessUserIDs = append(onlineSuccessUserIDs, msg.SendID)
for _, v := range wsResults {
if v.OnlinePush && v.UserID != msg.SendID {
onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID)
}
if !v.OnlinePush {
if len(v.Resp) != 0 {
for _, singleResult := range v.Resp {
if singleResult.ResultCode == -2 {
if constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC ||
singleResult.RecvPlatFormID == constant.WebPlatformID {
WebAndPcBackgroundUserIDs = append(WebAndPcBackgroundUserIDs, v.UserID)
}
}
}
}
}
}
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
if msg.ContentType != constant.SignalingNotification {
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
log.Error(operationID, utils.GetSelfFuncName(), "GetRecvMsgNotNotifyUserIDs failed", groupID)
return err
}
needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs)
}
//Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
}
err = p.OfflinePushMsg(ctx, groupID, msg, offlinePushUserIDs)
if err != nil {
log.NewError(operationID, "OfflinePushMsg failed", groupID)
return err
}
_, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
if err != nil {
log.NewError(operationID, "OfflinePushMsg failed", groupID)
return err
}
}
}
return nil
}
grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID)
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingelMsgToUserResultList, err error) {
conns, err := p.client.GetConns(config.Config.RpcRegisterName.OpenImMessageGatewayName)
if err != nil {
return nil, err
}
//Online push message
log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
for _, v := range grpcCons {
msgClient := pbRelay.NewRelayClient(v)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: pushToUserIDList})
for _, v := range conns {
msgClient := msggateway.NewRelayClient(v)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{OperationID: tracelog.GetOperationID(ctx), MsgData: msg, PushToUserIDList: pushToUserIDs})
if err != nil {
log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err)
log.NewError(tracelog.GetOperationID(ctx), msg, len(pushToUserIDs), "err", err)
continue
}
if reply != nil && reply.SinglePushResult != nil {
wsResult = append(wsResult, reply.SinglePushResult...)
}
}
log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
successCount++
if isOfflinePush {
var onlineSuccessUserIDList []string
onlineSuccessUserIDList = append(onlineSuccessUserIDList, pushMsg.MsgData.SendID)
for _, v := range wsResult {
if v.OnlinePush && v.UserID != pushMsg.MsgData.SendID {
onlineSuccessUserIDList = append(onlineSuccessUserIDList, v.UserID)
}
}
onlineFailedUserIDList := utils.DifferenceString(onlineSuccessUserIDList, pushToUserIDList)
//Use offline push messaging
var title, detailContent string
if len(onlineFailedUserIDList) > 0 {
var offlinePushUserIDList []string
var needOfflinePushUserIDList []string
callbackResp := callbackOfflinePush(pushMsg.OperationID, onlineFailedUserIDList, pushMsg.MsgData, &offlinePushUserIDList)
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
if callbackResp.ErrCode != 0 {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop")
return
}
if pushMsg.MsgData.OfflinePushInfo != nil {
title = pushMsg.MsgData.OfflinePushInfo.Title
detailContent = pushMsg.MsgData.OfflinePushInfo.Desc
}
if len(offlinePushUserIDList) > 0 {
needOfflinePushUserIDList = offlinePushUserIDList
} else {
needOfflinePushUserIDList = onlineFailedUserIDList
}
if offlinePusher == nil {
return
}
opts, err := GetOfflinePushOpts(pushMsg)
if err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "GetOfflinePushOpts failed", pushMsg, err.Error())
}
log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), needOfflinePushUserIDList, title, detailContent, "opts:", opts)
if title == "" {
switch pushMsg.MsgData.ContentType {
case constant.Text:
fallthrough
case constant.Picture:
fallthrough
case constant.Voice:
fallthrough
case constant.Video:
fallthrough
case constant.File:
title = constant.ContentType2PushContent[int64(pushMsg.MsgData.ContentType)]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) {
title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
title = constant.ContentType2PushContent[constant.GroupMsg]
}
case constant.SignalingNotification:
title = constant.ContentType2PushContent[constant.SignalMsg]
default:
title = constant.ContentType2PushContent[constant.Common]
}
detailContent = title
}
pushResult, err := offlinePusher.Push(needOfflinePushUserIDList, title, detailContent, pushMsg.OperationID, opts)
if err != nil {
prome.PromeInc(prome.MsgOfflinePushFailedCounter)
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error())
} else {
prome.PromeInc(prome.MsgOfflinePushSuccessCounter)
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
}
wsResults = append(wsResults, reply.SinglePushResult...)
}
}
return wsResults, nil
}
func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts *Opts, err error) {
if pushMsg.MsgData.ContentType > constant.SignalingNotificationBegin && pushMsg.MsgData.ContentType < constant.SignalingNotificationEnd {
func (p *Pusher) OfflinePushMsg(ctx context.Context, sourceID string, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
title, content, opts, err := p.GetOfflinePushInfos(sourceID, msg)
if err != nil {
return err
}
err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
if err != nil {
prome.PromeInc(prome.MsgOfflinePushFailedCounter)
return err
}
prome.PromeInc(prome.MsgOfflinePushSuccessCounter)
return nil
}
func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *Opts, err error) {
opts = &Opts{}
if msg.ContentType > constant.SignalingNotificationBegin && msg.ContentType < constant.SignalingNotificationEnd {
req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(pushMsg.MsgData.Content, req); err != nil {
if err := proto.Unmarshal(msg.Content, req); err != nil {
return nil, utils.Wrap(err, "")
}
opts = &Opts{}
switch req.Payload.(type) {
case *pbRtc.SignalReq_Invite, *pbRtc.SignalReq_InviteInGroup:
opts.Signal.ClientMsgID = pushMsg.MsgData.ClientMsgID
log.NewDebug(pushMsg.OperationID, opts)
opts.Signal = &Signal{ClientMsgID: msg.ClientMsgID}
}
}
if pushMsg.MsgData.OfflinePushInfo != nil {
opts = &Opts{}
opts.IOSBadgeCount = pushMsg.MsgData.OfflinePushInfo.IOSBadgeCount
opts.IOSPushSound = pushMsg.MsgData.OfflinePushInfo.IOSPushSound
opts.Ex = pushMsg.MsgData.OfflinePushInfo.Ex
if msg.OfflinePushInfo != nil {
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
opts.Ex = msg.OfflinePushInfo.Ex
}
return opts, nil
}
func (p *Pusher) GetOfflinePushInfos(sourceID string, msg *sdkws.MsgData) (title, content string, opts *Opts, err error) {
if p.offlinePusher == nil {
err = errors.New("no offlinePusher is configured")
return
}
type AtContent struct {
Text string `json:"text"`
AtUserList []string `json:"atUserList"`
IsAtSelf bool `json:"isAtSelf"`
}
opts, err = p.GetOfflinePushOpts(msg)
if err != nil {
return
}
if msg.OfflinePushInfo != nil {
title = msg.OfflinePushInfo.Title
content = msg.OfflinePushInfo.Desc
}
if title == "" {
switch msg.ContentType {
case constant.Text:
fallthrough
case constant.Picture:
fallthrough
case constant.Voice:
fallthrough
case constant.Video:
fallthrough
case constant.File:
title = constant.ContentType2PushContent[int64(msg.ContentType)]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(msg.Content), &a)
if utils.IsContain(sourceID, a.AtUserList) {
title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
title = constant.ContentType2PushContent[constant.GroupMsg]
}
case constant.SignalingNotification:
title = constant.ContentType2PushContent[constant.SignalMsg]
default:
title = constant.ContentType2PushContent[constant.Common]
}
}
if content == "" {
content = title
}
return
}