Files
open-im-server/internal/push/push_rpc_server.go
T

108 lines
3.3 KiB
Go
Raw Normal View History

2023-07-04 11:15:20 +08:00
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2023-06-30 09:45:02 +08:00
package push
import (
"context"
2024-02-19 10:19:32 +08:00
"github.com/OpenIMSDK/protocol/constant"
pbpush "github.com/OpenIMSDK/protocol/push"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/log"
2024-03-04 21:32:07 +08:00
"github.com/OpenIMSDK/tools/utils"
2024-03-06 15:58:05 +08:00
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
2024-03-10 10:24:20 +08:00
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
2024-03-05 10:51:55 +08:00
"google.golang.org/grpc"
2023-06-30 09:45:02 +08:00
)
type pushServer struct {
pusher *Pusher
config *config.GlobalConfig
2023-06-30 09:45:02 +08:00
}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(config)
2023-06-30 09:45:02 +08:00
if err != nil {
return err
}
cacheModel := cache.NewMsgCacheModel(rdb, config)
offlinePusher := NewOfflinePusher(config, cacheModel)
2023-06-30 09:45:02 +08:00
database := controller.NewPushDatabase(cacheModel)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config)
2023-07-03 16:29:22 +08:00
pusher := NewPusher(
config,
2023-07-03 16:29:22 +08:00
client,
offlinePusher,
database,
rpccache.NewGroupLocalCache(groupRpcClient, rdb),
rpccache.NewConversationLocalCache(conversationRpcClient, rdb),
2023-07-03 16:29:22 +08:00
&conversationRpcClient,
&groupRpcClient,
&msgRpcClient,
)
pbpush.RegisterPushMsgServiceServer(server, &pushServer{
pusher: pusher,
config: config,
})
consumer, err := NewConsumer(config, pusher)
2024-02-02 10:11:13 +08:00
if err != nil {
return err
}
consumer.Start()
2023-06-30 09:45:02 +08:00
return nil
}
func (r *pushServer) PushMsg(ctx context.Context, pbData *pbpush.PushMsgReq) (resp *pbpush.PushMsgResp, err error) {
2023-06-30 09:45:02 +08:00
switch pbData.MsgData.SessionType {
case constant.SuperGroupChatType:
err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default:
var pushUserIDList []string
isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
if !isSenderSync {
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
} else {
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
}
err = r.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData)
2023-06-30 09:45:02 +08:00
}
if err != nil {
if err != errNoOfflinePusher {
return nil, err
}
2024-03-18 10:34:48 +08:00
log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String())
2023-06-30 09:45:02 +08:00
}
return &pbpush.PushMsgResp{}, nil
2023-06-30 09:45:02 +08:00
}
2023-07-03 16:29:22 +08:00
func (r *pushServer) DelUserPushToken(
ctx context.Context,
req *pbpush.DelUserPushTokenReq,
) (resp *pbpush.DelUserPushTokenResp, err error) {
2023-06-30 09:45:02 +08:00
if err = r.pusher.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil {
return nil, err
}
return &pbpush.DelUserPushTokenResp{}, nil
2023-06-30 09:45:02 +08:00
}