Files
open-im-server/pkg/common/kafka/consumer_group.go
T

57 lines
1.2 KiB
Go
Raw Normal View History

2021-05-26 19:37:10 +08:00
/*
** description("").
** copyright('tuoyun,www.tuoyun.net').
** author("fg,Gordon@tuoyun.net").
** time(2021/5/11 9:36).
*/
package kafka
import (
"context"
2023-04-21 17:51:28 +08:00
2021-05-26 19:37:10 +08:00
"github.com/Shopify/sarama"
)
type MConsumerGroup struct {
sarama.ConsumerGroup
groupID string
topics []string
}
type MConsumerGroupConfig struct {
KafkaVersion sarama.KafkaVersion
OffsetsInitial int64
IsReturnErr bool
}
2022-05-23 20:12:54 +08:00
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
2021-05-26 19:37:10 +08:00
config := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr
2022-05-23 20:12:54 +08:00
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
2021-05-26 19:37:10 +08:00
if err != nil {
2021-11-26 14:20:50 +08:00
panic(err.Error())
2021-05-26 19:37:10 +08:00
}
return &MConsumerGroup{
consumerGroup,
groupID,
topics,
}
}
2023-03-03 17:42:26 +08:00
2023-03-22 19:24:38 +08:00
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
return GetContextWithMQHeader(cMsg.Headers)
2023-03-03 17:42:26 +08:00
}
2021-05-26 19:37:10 +08:00
func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) {
ctx := context.Background()
for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if err != nil {
2021-11-26 14:20:50 +08:00
panic(err.Error())
2021-05-26 19:37:10 +08:00
}
}
}