Files
open-im-server/pkg/common/kafka/consumer_group.go
T

80 lines
2.5 KiB
Go
Raw Normal View History

2023-07-12 12:28:18 +08:00
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2023-06-29 22:35:31 +08:00
package kafka
import (
"context"
2024-02-02 10:11:13 +08:00
"strings"
2023-07-03 14:12:29 +08:00
2024-02-04 10:40:26 +08:00
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
2023-06-30 09:45:02 +08:00
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
2023-09-07 17:38:09 +08:00
"github.com/IBM/sarama"
2023-06-29 22:35:31 +08:00
)
type MConsumerGroup struct {
sarama.ConsumerGroup
groupID string
topics []string
}
type MConsumerGroupConfig struct {
KafkaVersion sarama.KafkaVersion
OffsetsInitial int64
IsReturnErr bool
}
2024-02-02 10:11:13 +08:00
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) (*MConsumerGroup, error) {
2023-08-25 21:00:53 +08:00
consumerGroupConfig := sarama.NewConfig()
consumerGroupConfig.Version = consumerConfig.KafkaVersion
consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerGroupConfig.Net.SASL.Enable = true
consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username
consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password
}
SetupTLSConfig(consumerGroupConfig)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
2023-06-29 22:35:31 +08:00
if err != nil {
return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password)
2023-06-29 22:35:31 +08:00
}
return &MConsumerGroup{
consumerGroup,
groupID,
topics,
2024-02-02 10:11:13 +08:00
}, nil
2023-06-29 22:35:31 +08:00
}
2023-06-30 09:45:02 +08:00
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
return GetContextWithMQHeader(cMsg.Headers)
}
2024-02-02 10:11:13 +08:00
func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
2023-06-30 09:45:02 +08:00
log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID)
2023-06-29 22:35:31 +08:00
for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if err != nil {
2024-02-02 10:11:13 +08:00
log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
}
if ctx.Err() != nil {
return
2023-06-29 22:35:31 +08:00
}
}
}