Files
open-im-server/pkg/common/kafka/consumer_group.go
T

60 lines
1.4 KiB
Go
Raw Normal View History

2023-06-29 22:35:31 +08:00
/*
** description("").
** copyright('tuoyun,www.tuoyun.net').
** author("fg,Gordon@tuoyun.net").
** time(2021/5/11 9:36).
*/
package kafka
import (
"context"
2023-07-03 14:12:29 +08:00
2023-06-30 09:45:02 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2023-06-29 22:35:31 +08:00
"github.com/Shopify/sarama"
)
type MConsumerGroup struct {
sarama.ConsumerGroup
groupID string
topics []string
}
type MConsumerGroupConfig struct {
KafkaVersion sarama.KafkaVersion
OffsetsInitial int64
IsReturnErr bool
}
2022-05-23 20:12:54 +08:00
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
2023-06-29 22:35:31 +08:00
config := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr
2022-05-23 20:12:54 +08:00
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
2023-06-29 22:35:31 +08:00
if err != nil {
panic(err.Error())
}
return &MConsumerGroup{
consumerGroup,
groupID,
topics,
}
}
2023-06-30 09:45:02 +08:00
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
return GetContextWithMQHeader(cMsg.Headers)
}
2023-06-29 22:35:31 +08:00
func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) {
2023-06-30 09:45:02 +08:00
log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID)
2023-06-29 22:35:31 +08:00
ctx := context.Background()
for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if err != nil {
panic(err.Error())
}
}
}