merge: pre-release-v3.8.4 (#3623)

* merge: pre-release-v3.8.4

* merge: v3.8.4
This commit is contained in:
icey-yu
2025-11-25 16:23:22 +08:00
committed by GitHub
parent 07badb162f
commit b8c4b459fa
17 changed files with 1211 additions and 12 deletions
-1
View File
@@ -220,7 +220,6 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
+33
View File
@@ -0,0 +1,33 @@
// Copyright © 2024 OpenIM open source community. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
type TLSConfig struct {
EnableTLS bool `yaml:"enableTLS"`
CACrt string `yaml:"caCrt"`
ClientCrt string `yaml:"clientCrt"`
ClientKey string `yaml:"clientKey"`
ClientKeyPwd string `yaml:"clientKeyPwd"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
}
type Config struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
ProducerAck string `yaml:"producerAck"`
CompressType string `yaml:"compressType"`
Addr []string `yaml:"addr"`
TLS TLSConfig `yaml:"tls"`
}
@@ -0,0 +1,68 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/log"
)
type MConsumerGroup struct {
sarama.ConsumerGroup
groupID string
topics []string
}
func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) {
config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable)
if err != nil {
return nil, err
}
group, err := NewConsumerGroup(config, conf.Addr, groupID)
if err != nil {
return nil, err
}
return &MConsumerGroup{
ConsumerGroup: group,
groupID: groupID,
topics: topics,
}, nil
}
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
return GetContextWithMQHeader(cMsg.Headers)
}
func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
if errors.Is(err, context.Canceled) {
return
}
if err != nil {
log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
}
}
}
func (mc *MConsumerGroup) Close() error {
return mc.ConsumerGroup.Close()
}
+82
View File
@@ -0,0 +1,82 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/errs"
"google.golang.org/protobuf/proto"
)
// Producer represents a Kafka producer.
type Producer struct {
addr []string
topic string
config *sarama.Config
producer sarama.SyncProducer
}
func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) {
producer, err := NewProducer(config, addr)
if err != nil {
return nil, err
}
return &Producer{
addr: addr,
topic: topic,
config: config,
producer: producer,
}, nil
}
// SendMessage sends a message to the Kafka topic configured in the Producer.
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
// Marshal the protobuf message
bMsg, err := proto.Marshal(msg)
if err != nil {
return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err")
}
if len(bMsg) == 0 {
return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err")
}
// Prepare Kafka message
kMsg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(bMsg),
}
// Validate message key and value
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
return 0, 0, errs.Wrap(errEmptyMsg)
}
// Attach context metadata as headers
header, err := GetMQHeaderWithContext(ctx)
if err != nil {
return 0, 0, err
}
kMsg.Headers = header
// Send the message
partition, offset, err := p.producer.SendMessage(kMsg)
if err != nil {
return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error")
}
return partition, offset, nil
}
+85
View File
@@ -0,0 +1,85 @@
package kafka
import (
"bytes"
"strings"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/errs"
)
func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) {
kfk := sarama.NewConfig()
kfk.Version = sarama.V2_0_0_0
kfk.Consumer.Offsets.Initial = initial
kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable
kfk.Consumer.Return.Errors = false
if conf.Username != "" || conf.Password != "" {
kfk.Net.SASL.Enable = true
kfk.Net.SASL.User = conf.Username
kfk.Net.SASL.Password = conf.Password
}
if conf.TLS.EnableTLS {
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
if err != nil {
return nil, err
}
kfk.Net.TLS.Config = tls
kfk.Net.TLS.Enable = true
}
return kfk, nil
}
func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) {
cg, err := sarama.NewConsumerGroup(addr, groupID, conf)
if err != nil {
return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf)
}
return cg, nil
}
func BuildProducerConfig(conf Config) (*sarama.Config, error) {
kfk := sarama.NewConfig()
kfk.Producer.Return.Successes = true
kfk.Producer.Return.Errors = true
kfk.Producer.Partitioner = sarama.NewHashPartitioner
if conf.Username != "" || conf.Password != "" {
kfk.Net.SASL.Enable = true
kfk.Net.SASL.User = conf.Username
kfk.Net.SASL.Password = conf.Password
}
switch strings.ToLower(conf.ProducerAck) {
case "no_response":
kfk.Producer.RequiredAcks = sarama.NoResponse
case "wait_for_local":
kfk.Producer.RequiredAcks = sarama.WaitForLocal
case "wait_for_all":
kfk.Producer.RequiredAcks = sarama.WaitForAll
default:
kfk.Producer.RequiredAcks = sarama.WaitForAll
}
if conf.CompressType == "" {
kfk.Producer.Compression = sarama.CompressionNone
} else {
if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil {
return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType)
}
}
if conf.TLS.EnableTLS {
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
if err != nil {
return nil, err
}
kfk.Net.TLS.Config = tls
kfk.Net.TLS.Enable = true
}
return kfk, nil
}
func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) {
producer, err := sarama.NewSyncProducer(addr, conf)
if err != nil {
return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf)
}
return producer, nil
}
+83
View File
@@ -0,0 +1,83 @@
// Copyright © 2024 OpenIM open source community. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"os"
"github.com/openimsdk/tools/errs"
)
// decryptPEM decrypts a PEM block using a password.
func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
if len(passphrase) == 0 {
return data, nil
}
b, _ := pem.Decode(data)
d, err := x509.DecryptPEMBlock(b, passphrase)
if err != nil {
return nil, errs.WrapMsg(err, "DecryptPEMBlock failed")
}
return pem.EncodeToMemory(&pem.Block{
Type: b.Type,
Bytes: d,
}), nil
}
func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, errs.WrapMsg(err, "ReadFile failed", "path", path)
}
return decryptPEM(data, pwd)
}
// newTLSConfig setup the TLS config from general config file.
func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) {
var tlsConfig tls.Config
if clientCertFile != "" && clientKeyFile != "" {
certPEMBlock, err := os.ReadFile(clientCertFile)
if err != nil {
return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile)
}
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
if err != nil {
return nil, err
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
return nil, errs.WrapMsg(err, "X509KeyPair failed")
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if caCertFile != "" {
caCert, err := os.ReadFile(caCertFile)
if err != nil {
return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile)
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, errs.New("AppendCertsFromPEM failed")
}
tlsConfig.RootCAs = caCertPool
}
tlsConfig.InsecureSkipVerify = insecureSkipVerify
return &tlsConfig, nil
}
+34
View File
@@ -0,0 +1,34 @@
package kafka
import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/mcontext"
)
var errEmptyMsg = errors.New("kafka binary msg is empty")
// GetMQHeaderWithContext extracts message queue headers from the context.
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
if err != nil {
return nil, err
}
return []sarama.RecordHeader{
{Key: []byte(constant.OperationID), Value: []byte(operationID)},
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
{Key: []byte(constant.ConnID), Value: []byte(connID)},
}, nil
}
// GetContextWithMQHeader creates a context from message queue headers.
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
var values []string
for _, recordHeader := range header {
values = append(values, string(recordHeader.Value))
}
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
}
+79
View File
@@ -0,0 +1,79 @@
// Copyright © 2024 OpenIM open source community. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/errs"
)
func CheckTopics(ctx context.Context, conf *Config, topics []string) error {
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
if err != nil {
return err
}
cli, err := sarama.NewClient(conf.Addr, kfk)
if err != nil {
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
}
defer cli.Close()
existingTopics, err := cli.Topics()
if err != nil {
return errs.WrapMsg(err, "Failed to list topics")
}
existingTopicsMap := make(map[string]bool)
for _, t := range existingTopics {
existingTopicsMap[t] = true
}
for _, topic := range topics {
if !existingTopicsMap[topic] {
return errs.New("topic not exist", "topic", topic).Wrap()
}
}
return nil
}
func CheckHealth(ctx context.Context, conf *Config) error {
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
if err != nil {
return err
}
cli, err := sarama.NewClient(conf.Addr, kfk)
if err != nil {
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
}
defer cli.Close()
// Get broker list
brokers := cli.Brokers()
if len(brokers) == 0 {
return errs.New("no brokers found").Wrap()
}
// Check if all brokers are reachable
for _, broker := range brokers {
if err := broker.Open(kfk); err != nil {
return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr())
}
}
return nil
}