mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 14:29:19 +08:00
feat: implement offline push using kafka (#2600)
* refactor: refactor workflows contents. * add tool workflows. * update field. * fix: remove chat error. * Fix err. * fix error. * remove cn comment. * update workflows files. * update infra config. * move workflows. * feat: update bot. * fix: solve uncorrect outdated msg get. * update get docIDs logic. * update * update skip logic. * fix * update. * fix: delay deleteObject func. * remove unused content. * update log type. * feat: implement request batch count limit. * update * update * feat: implement offline push. * feat: implement batch Push spilt * update go mod * feat: implement kafka producer and consumer. * update format, * add PushMQ log. * feat: update Handler logic. * update MQ logic. * update * update * fix: update OfflinePushConsumerHandler.
This commit is contained in:
@@ -1,33 +1,20 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbchat "github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/msggateway"
|
||||
pbpush "github.com/openimsdk/protocol/push"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
@@ -46,6 +33,7 @@ type ConsumerHandler struct {
|
||||
pushConsumerGroup *kafka.MConsumerGroup
|
||||
offlinePusher offlinepush.OfflinePusher
|
||||
onlinePusher OnlinePusher
|
||||
pushDatabase controller.PushDatabase
|
||||
onlineCache *rpccache.OnlineCache
|
||||
groupLocalCache *rpccache.GroupLocalCache
|
||||
conversationLocalCache *rpccache.ConversationLocalCache
|
||||
@@ -56,7 +44,7 @@ type ConsumerHandler struct {
|
||||
config *Config
|
||||
}
|
||||
|
||||
func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
|
||||
func NewConsumerHandler(config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
|
||||
client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) {
|
||||
var consumerHandler ConsumerHandler
|
||||
var err error
|
||||
@@ -65,6 +53,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
|
||||
consumerHandler.offlinePusher = offlinePusher
|
||||
consumerHandler.onlinePusher = NewOnlinePusher(client, config)
|
||||
@@ -75,43 +64,42 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
|
||||
consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb)
|
||||
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
|
||||
consumerHandler.config = config
|
||||
consumerHandler.pushDatabase = database
|
||||
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil)
|
||||
return &consumerHandler, nil
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
||||
msgFromMQ := pbchat.PushMsgDataToMQ{}
|
||||
msgFromMQ := pbpush.PushMsgReq{}
|
||||
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
|
||||
log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg))
|
||||
return
|
||||
}
|
||||
pbData := &pbpush.PushMsgReq{
|
||||
MsgData: msgFromMQ.MsgData,
|
||||
ConversationID: msgFromMQ.ConversationID,
|
||||
}
|
||||
|
||||
sec := msgFromMQ.MsgData.SendTime / 1000
|
||||
nowSec := timeutil.GetCurrentTimestampBySecond()
|
||||
|
||||
if nowSec-sec > 10 {
|
||||
prommetrics.MsgLoneTimePushCounter.Inc()
|
||||
log.ZWarn(ctx, "it’s been a while since the message was sent", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
|
||||
log.ZWarn(ctx, "it’s been a while since the message was sent", nil, "msg", msgFromMQ.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
|
||||
}
|
||||
var err error
|
||||
|
||||
switch msgFromMQ.MsgData.SessionType {
|
||||
case constant.ReadGroupChatType:
|
||||
err = c.Push2Group(ctx, pbData.MsgData.GroupID, pbData.MsgData)
|
||||
err = c.Push2Group(ctx, msgFromMQ.MsgData.GroupID, msgFromMQ.MsgData)
|
||||
default:
|
||||
var pushUserIDList []string
|
||||
isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
|
||||
if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
|
||||
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
|
||||
isSenderSync := datautil.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
|
||||
if !isSenderSync || msgFromMQ.MsgData.SendID == msgFromMQ.MsgData.RecvID {
|
||||
pushUserIDList = append(pushUserIDList, msgFromMQ.MsgData.RecvID)
|
||||
} else {
|
||||
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
|
||||
pushUserIDList = append(pushUserIDList, msgFromMQ.MsgData.RecvID, msgFromMQ.MsgData.SendID)
|
||||
}
|
||||
err = c.Push2User(ctx, pushUserIDList, pbData.MsgData)
|
||||
err = c.Push2User(ctx, pushUserIDList, msgFromMQ.MsgData)
|
||||
}
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "push failed", err, "msg", pbData.String())
|
||||
log.ZWarn(ctx, "push failed", err, "msg", msgFromMQ.String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -246,28 +234,34 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Use offline push messaging
|
||||
if len(needOfflinePushUserIDs) > 0 {
|
||||
var offlinePushUserIDs []string
|
||||
err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(offlinePushUserIDs) > 0 {
|
||||
needOfflinePushUserIDs = offlinePushUserIDs
|
||||
}
|
||||
|
||||
err = c.offlinePushMsg(ctx, msg, needOfflinePushUserIDs)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
c.asyncOfflinePush(ctx, needOfflinePushUserIDs, msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) asyncOfflinePush(ctx context.Context, needOfflinePushUserIDs []string, msg *sdkws.MsgData) {
|
||||
var offlinePushUserIDs []string
|
||||
err := c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "webhookBeforeOfflinePush failed", err, "msg", msg)
|
||||
return
|
||||
}
|
||||
|
||||
if len(offlinePushUserIDs) > 0 {
|
||||
needOfflinePushUserIDs = offlinePushUserIDs
|
||||
}
|
||||
if err := c.pushDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil {
|
||||
log.ZError(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs",
|
||||
needOfflinePushUserIDs, "msg", msg)
|
||||
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) {
|
||||
if len(*pushToUserIDs) == 0 {
|
||||
*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
|
||||
|
||||
Reference in New Issue
Block a user