Files
open-im-server/pkg/common/kafka/consumer.go
T

43 lines
945 B
Go
Raw Normal View History

2023-06-29 22:35:31 +08:00
package kafka
import (
"sync"
2023-06-30 09:45:02 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/Shopify/sarama"
2023-06-29 22:35:31 +08:00
)
type Consumer struct {
addr []string
WG sync.WaitGroup
Topic string
PartitionList []int32
Consumer sarama.Consumer
}
func NewKafkaConsumer(addr []string, topic string) *Consumer {
p := Consumer{}
p.Topic = topic
p.addr = addr
2023-06-30 09:45:02 +08:00
consumerConfig := sarama.NewConfig()
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerConfig.Net.SASL.Enable = true
consumerConfig.Net.SASL.User = config.Config.Kafka.Username
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
}
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
2023-06-29 22:35:31 +08:00
if err != nil {
panic(err.Error())
}
p.Consumer = consumer
partitionList, err := consumer.Partitions(p.Topic)
if err != nil {
panic(err.Error())
}
p.PartitionList = partitionList
return &p
}