Files
open-im-server/pkg/common/kafka/consumer.go
T

44 lines
958 B
Go
Raw Normal View History

2021-05-26 19:37:10 +08:00
package kafka
import (
2022-09-16 11:57:01 +08:00
"Open_IM/pkg/common/config"
2021-05-26 19:37:10 +08:00
"sync"
2022-09-16 11:57:01 +08:00
"github.com/Shopify/sarama"
2021-05-26 19:37:10 +08:00
)
type Consumer struct {
addr []string
WG sync.WaitGroup
Topic string
PartitionList []int32
Consumer sarama.Consumer
}
func NewKafkaConsumer(addr []string, topic string) *Consumer {
p := Consumer{}
p.Topic = topic
p.addr = addr
2022-09-16 11:57:01 +08:00
consumerConfig := sarama.NewConfig()
2022-09-16 16:51:12 +08:00
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" {
2022-09-16 11:57:01 +08:00
consumerConfig.Net.SASL.Enable = true
2022-09-16 16:51:12 +08:00
consumerConfig.Net.SASL.User = config.Config.Kafka.SASLUserName
consumerConfig.Net.SASL.Password = config.Config.Kafka.SASLPassword
2022-09-16 11:57:01 +08:00
}
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
2021-05-26 19:37:10 +08:00
if err != nil {
2021-11-26 14:20:50 +08:00
panic(err.Error())
2021-05-26 19:37:10 +08:00
return nil
}
p.Consumer = consumer
partitionList, err := consumer.Partitions(p.Topic)
if err != nil {
2021-11-26 14:20:50 +08:00
panic(err.Error())
2021-05-26 19:37:10 +08:00
return nil
}
p.PartitionList = partitionList
return &p
}