Files
open-im-server/internal/push/push_handler.go
T

72 lines
2.4 KiB
Go
Raw Normal View History

2023-02-20 10:13:29 +08:00
package push
2021-05-26 19:22:11 +08:00
import (
2023-02-23 17:28:57 +08:00
"context"
2023-03-23 19:02:20 +08:00
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2021-05-26 19:22:11 +08:00
"github.com/Shopify/sarama"
2023-04-28 18:33:33 +08:00
"google.golang.org/protobuf/proto"
2021-05-26 19:22:11 +08:00
)
2023-02-20 10:13:29 +08:00
type ConsumerHandler struct {
2021-05-26 19:22:11 +08:00
pushConsumerGroup *kfk.MConsumerGroup
2023-03-03 17:42:26 +08:00
pusher *Pusher
2021-05-26 19:22:11 +08:00
}
2023-03-03 17:42:26 +08:00
func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
2021-05-26 19:22:11 +08:00
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush)
2023-03-03 17:42:26 +08:00
return &consumerHandler
2021-05-26 19:22:11 +08:00
}
2023-02-23 17:28:57 +08:00
2023-03-03 17:42:26 +08:00
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
2021-12-23 17:34:32 +08:00
msgFromMQ := pbChat.PushMsgDataToMQ{}
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
2023-03-22 18:35:21 +08:00
log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg))
2021-05-26 19:22:11 +08:00
return
}
2022-07-29 19:08:56 +08:00
pbData := &pbPush.PushMsgReq{
2023-05-04 15:06:23 +08:00
MsgData: msgFromMQ.MsgData,
ConversationID: msgFromMQ.ConversationID,
2022-07-29 19:08:56 +08:00
}
2022-08-12 20:05:01 +08:00
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := utils.GetCurrentTimestampBySecond()
if nowSec-sec > 10 {
return
}
2023-02-23 17:28:57 +08:00
var err error
2022-07-29 19:08:56 +08:00
switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType:
2023-05-15 17:28:37 +08:00
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
2022-07-29 19:08:56 +08:00
default:
2023-05-15 17:28:37 +08:00
err = c.pusher.Push2User(ctx, []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}, pbData.MsgData)
2023-02-23 17:28:57 +08:00
}
if err != nil {
2023-06-04 17:57:21 +08:00
if err == errNoOfflinePusher {
log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String())
} else {
log.ZError(ctx, "push failed", err, "msg", pbData.String())
}
2022-07-29 19:08:56 +08:00
}
2021-05-26 19:22:11 +08:00
}
2023-02-20 10:13:29 +08:00
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
2021-05-26 19:22:11 +08:00
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
2023-03-22 19:24:38 +08:00
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
2023-03-03 17:42:26 +08:00
c.handleMs2PsChat(ctx, msg.Value)
2022-08-12 19:24:30 +08:00
sess.MarkMessage(msg, "")
2021-05-26 19:22:11 +08:00
}
return nil
}