Files
open-im-server/pkg/common/kafka/consumer_group.go
T

52 lines
1.2 KiB
Go
Raw Normal View History

2021-05-26 19:37:10 +08:00
/*
** description("").
** copyright('tuoyun,www.tuoyun.net').
** author("fg,Gordon@tuoyun.net").
** time(2021/5/11 9:36).
*/
package kafka
import (
"context"
2022-05-23 19:56:34 +08:00
"fmt"
2021-05-26 19:37:10 +08:00
"github.com/Shopify/sarama"
)
type MConsumerGroup struct {
sarama.ConsumerGroup
groupID string
topics []string
}
type MConsumerGroupConfig struct {
KafkaVersion sarama.KafkaVersion
OffsetsInitial int64
IsReturnErr bool
}
2022-05-23 20:12:54 +08:00
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
2021-05-26 19:37:10 +08:00
config := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr
2022-05-23 20:12:54 +08:00
fmt.Println("init address is ", addrs, "topics is ", topics)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
2021-05-26 19:37:10 +08:00
if err != nil {
2021-11-26 14:20:50 +08:00
panic(err.Error())
2021-05-26 19:37:10 +08:00
}
return &MConsumerGroup{
consumerGroup,
groupID,
topics,
}
}
func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) {
ctx := context.Background()
for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if err != nil {
2021-11-26 14:20:50 +08:00
panic(err.Error())
2021-05-26 19:37:10 +08:00
}
}
}