Files
open-im-server/internal/push/push_handler.go
T

74 lines
2.3 KiB
Go
Raw Normal View History

2021-05-26 19:22:11 +08:00
/*
** description("").
2023-02-23 19:15:30 +08:00
** copyright('OpenIM,www.OpenIM.io').
2021-05-26 19:22:11 +08:00
** author("fg,Gordon@tuoyun.net").
** time(2021/5/13 10:33).
*/
2023-02-20 10:13:29 +08:00
package push
2021-05-26 19:22:11 +08:00
import (
2023-02-23 19:15:30 +08:00
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
kfk "OpenIM/pkg/common/kafka"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tracelog"
pbChat "OpenIM/pkg/proto/msg"
pbPush "OpenIM/pkg/proto/push"
"OpenIM/pkg/utils"
2023-02-23 17:28:57 +08:00
"context"
2021-05-26 19:22:11 +08:00
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
2023-02-20 10:13:29 +08:00
type ConsumerHandler struct {
2021-05-26 19:22:11 +08:00
pushConsumerGroup *kfk.MConsumerGroup
2023-02-23 17:28:57 +08:00
pusher Pusher
2021-05-26 19:22:11 +08:00
}
2023-02-20 10:13:29 +08:00
func (c *ConsumerHandler) Init() {
c.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
2021-05-26 19:22:11 +08:00
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush)
}
2023-02-23 17:28:57 +08:00
2023-02-20 10:13:29 +08:00
func (c *ConsumerHandler) handleMs2PsChat(msg []byte) {
2022-05-10 09:09:37 +08:00
log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg))
2021-12-23 17:34:32 +08:00
msgFromMQ := pbChat.PushMsgDataToMQ{}
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
2022-05-10 10:52:32 +08:00
log.Error("", "push Unmarshal msg err", "msg", string(msg), "err", err.Error())
2021-05-26 19:22:11 +08:00
return
}
2022-07-29 19:08:56 +08:00
pbData := &pbPush.PushMsgReq{
2023-02-22 19:51:14 +08:00
MsgData: msgFromMQ.MsgData,
SourceID: msgFromMQ.PushToUserID,
2022-07-29 19:08:56 +08:00
}
2022-08-12 20:05:01 +08:00
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := utils.GetCurrentTimestampBySecond()
if nowSec-sec > 10 {
return
}
2023-02-23 17:28:57 +08:00
ctx := context.Background()
tracelog.SetOperationID(ctx, "")
var err error
2022-07-29 19:08:56 +08:00
switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType:
2023-02-23 17:28:57 +08:00
err = c.pusher.MsgToSuperGroupUser(ctx, pbData.SourceID, pbData.MsgData)
2022-07-29 19:08:56 +08:00
default:
2023-02-23 17:28:57 +08:00
err = c.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData)
}
if err != nil {
log.NewError("", "push failed", *pbData)
2022-07-29 19:08:56 +08:00
}
2021-05-26 19:22:11 +08:00
}
2023-02-20 10:13:29 +08:00
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
2021-05-26 19:22:11 +08:00
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
2022-05-10 09:09:37 +08:00
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
2023-02-20 10:13:29 +08:00
c.handleMs2PsChat(msg.Value)
2022-08-12 19:24:30 +08:00
sess.MarkMessage(msg, "")
2021-05-26 19:22:11 +08:00
}
return nil
}