Files
open-im-server/internal/push/push_handler.go
T

77 lines
2.5 KiB
Go
Raw Normal View History

2021-05-26 19:22:11 +08:00
/*
** description("").
2023-02-23 19:15:30 +08:00
** copyright('OpenIM,www.OpenIM.io').
2021-05-26 19:22:11 +08:00
** author("fg,Gordon@tuoyun.net").
** time(2021/5/13 10:33).
*/
2023-02-20 10:13:29 +08:00
package push
2021-05-26 19:22:11 +08:00
import (
2023-02-23 19:15:30 +08:00
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
kfk "OpenIM/pkg/common/kafka"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tracelog"
pbChat "OpenIM/pkg/proto/msg"
pbPush "OpenIM/pkg/proto/push"
"OpenIM/pkg/utils"
2023-02-23 17:28:57 +08:00
"context"
2021-05-26 19:22:11 +08:00
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
2023-02-20 10:13:29 +08:00
type ConsumerHandler struct {
2021-05-26 19:22:11 +08:00
pushConsumerGroup *kfk.MConsumerGroup
2023-03-03 17:42:26 +08:00
pusher *Pusher
2021-05-26 19:22:11 +08:00
}
2023-03-03 17:42:26 +08:00
func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
2021-05-26 19:22:11 +08:00
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush)
2023-03-03 17:42:26 +08:00
return &consumerHandler
2021-05-26 19:22:11 +08:00
}
2023-02-23 17:28:57 +08:00
2023-03-03 17:42:26 +08:00
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
2022-05-10 09:09:37 +08:00
log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg))
2021-12-23 17:34:32 +08:00
msgFromMQ := pbChat.PushMsgDataToMQ{}
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
2022-05-10 10:52:32 +08:00
log.Error("", "push Unmarshal msg err", "msg", string(msg), "err", err.Error())
2021-05-26 19:22:11 +08:00
return
}
2022-07-29 19:08:56 +08:00
pbData := &pbPush.PushMsgReq{
2023-02-22 19:51:14 +08:00
MsgData: msgFromMQ.MsgData,
2023-03-03 17:42:26 +08:00
SourceID: msgFromMQ.SourceID,
2022-07-29 19:08:56 +08:00
}
2022-08-12 20:05:01 +08:00
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := utils.GetCurrentTimestampBySecond()
if nowSec-sec > 10 {
return
}
2023-02-23 17:28:57 +08:00
tracelog.SetOperationID(ctx, "")
var err error
2022-07-29 19:08:56 +08:00
switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType:
2023-02-23 17:28:57 +08:00
err = c.pusher.MsgToSuperGroupUser(ctx, pbData.SourceID, pbData.MsgData)
2022-07-29 19:08:56 +08:00
default:
2023-02-23 17:28:57 +08:00
err = c.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData)
}
if err != nil {
log.NewError("", "push failed", *pbData)
2022-07-29 19:08:56 +08:00
}
2021-05-26 19:22:11 +08:00
}
2023-02-20 10:13:29 +08:00
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
2021-05-26 19:22:11 +08:00
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
2022-05-10 09:09:37 +08:00
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
2023-03-09 16:36:47 +08:00
ctx := c.pushConsumerGroup.GetContextFromMsg(msg, "push consumer")
2023-03-03 17:42:26 +08:00
c.handleMs2PsChat(ctx, msg.Value)
2022-08-12 19:24:30 +08:00
sess.MarkMessage(msg, "")
2021-05-26 19:22:11 +08:00
}
return nil
}