mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-01 07:35:58 +08:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 856b54e371 | |||
| d61495b047 | |||
| 6271a61c36 | |||
| f875f99dd8 | |||
| af54170cf2 |
@@ -37,13 +37,20 @@ zookeeper:
|
|||||||
|
|
||||||
###################### Mongo ######################
|
###################### Mongo ######################
|
||||||
# MongoDB configuration
|
# MongoDB configuration
|
||||||
# If uri is not empty, it will be used directly
|
|
||||||
#
|
# If uri is not empty, it will be used directly for the MongoDB connection.
|
||||||
# MongoDB address for standalone setup, Mongos address for sharded cluster setup
|
# This is a complete MongoDB URI string.
|
||||||
# Default MongoDB database name
|
# Example: mongodb://user:password@host1:port1,host2:port2/dbname?options
|
||||||
# Maximum connection pool size
|
|
||||||
mongo:
|
mongo:
|
||||||
uri: ${MONGO_URI}
|
uri: ${MONGO_URI}
|
||||||
|
|
||||||
|
# List of MongoDB server addresses.
|
||||||
|
# Used for constructing the MongoDB URI if 'uri' above is empty.
|
||||||
|
# For a standalone setup, specify the address of the single server.
|
||||||
|
# For a sharded cluster, specify the addresses of the Mongos servers.
|
||||||
|
# Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ]
|
||||||
|
# Default MongoDB database name
|
||||||
|
# Maximum connection pool size
|
||||||
address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ]
|
address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ]
|
||||||
database: ${MONGO_DATABASE}
|
database: ${MONGO_DATABASE}
|
||||||
username: ${MONGO_USERNAME}
|
username: ${MONGO_USERNAME}
|
||||||
|
|||||||
@@ -34,7 +34,6 @@ services:
|
|||||||
ipv4_address: ${MONGO_NETWORK_ADDRESS:-172.28.0.2}
|
ipv4_address: ${MONGO_NETWORK_ADDRESS:-172.28.0.2}
|
||||||
|
|
||||||
redis:
|
redis:
|
||||||
# image: redis:7.0.0
|
|
||||||
image: redis:${REDIS_IMAGE_VERSION:-7.0.0}
|
image: redis:${REDIS_IMAGE_VERSION:-7.0.0}
|
||||||
container_name: redis
|
container_name: redis
|
||||||
ports:
|
ports:
|
||||||
@@ -53,7 +52,6 @@ services:
|
|||||||
ipv4_address: ${REDIS_NETWORK_ADDRESS:-172.28.0.3}
|
ipv4_address: ${REDIS_NETWORK_ADDRESS:-172.28.0.3}
|
||||||
|
|
||||||
zookeeper:
|
zookeeper:
|
||||||
# image: bitnami/zookeeper:3.8
|
|
||||||
image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION:-3.8}
|
image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION:-3.8}
|
||||||
container_name: zookeeper
|
container_name: zookeeper
|
||||||
ports:
|
ports:
|
||||||
@@ -69,7 +67,6 @@ services:
|
|||||||
ipv4_address: ${ZOOKEEPER_NETWORK_ADDRESS:-172.28.0.5}
|
ipv4_address: ${ZOOKEEPER_NETWORK_ADDRESS:-172.28.0.5}
|
||||||
|
|
||||||
kafka:
|
kafka:
|
||||||
# image: 'bitnami/kafka:3.5.1'
|
|
||||||
image: 'bitnami/kafka:${KAFKA_IMAGE_VERSION:-3.5.1}'
|
image: 'bitnami/kafka:${KAFKA_IMAGE_VERSION:-3.5.1}'
|
||||||
container_name: kafka
|
container_name: kafka
|
||||||
restart: always
|
restart: always
|
||||||
@@ -95,7 +92,6 @@ services:
|
|||||||
ipv4_address: ${KAFKA_NETWORK_ADDRESS:-172.28.0.4}
|
ipv4_address: ${KAFKA_NETWORK_ADDRESS:-172.28.0.4}
|
||||||
|
|
||||||
minio:
|
minio:
|
||||||
# image: minio/minio
|
|
||||||
image: minio/minio:${MINIO_IMAGE_VERSION:-latest}
|
image: minio/minio:${MINIO_IMAGE_VERSION:-latest}
|
||||||
ports:
|
ports:
|
||||||
- "${MINIO_PORT:-10005}:9000"
|
- "${MINIO_PORT:-10005}:9000"
|
||||||
@@ -114,7 +110,6 @@ services:
|
|||||||
ipv4_address: ${MINIO_NETWORK_ADDRESS:-172.28.0.6}
|
ipv4_address: ${MINIO_NETWORK_ADDRESS:-172.28.0.6}
|
||||||
|
|
||||||
openim-web:
|
openim-web:
|
||||||
# image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:latest
|
|
||||||
image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:${OPENIM_WEB_IMAGE_VERSION:-latest}
|
image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:${OPENIM_WEB_IMAGE_VERSION:-latest}
|
||||||
container_name: openim-web
|
container_name: openim-web
|
||||||
environment:
|
environment:
|
||||||
|
|||||||
Vendored
+19
-1
@@ -18,6 +18,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
@@ -43,6 +45,9 @@ func NewRedis() (redis.UniversalClient, error) {
|
|||||||
return redisClient, nil
|
return redisClient, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read configuration from environment variables
|
||||||
|
overrideConfigFromEnv()
|
||||||
|
|
||||||
if len(config.Config.Redis.Address) == 0 {
|
if len(config.Config.Redis.Address) == 0 {
|
||||||
return nil, errors.New("redis address is empty")
|
return nil, errors.New("redis address is empty")
|
||||||
}
|
}
|
||||||
@@ -60,7 +65,7 @@ func NewRedis() (redis.UniversalClient, error) {
|
|||||||
rdb = redis.NewClient(&redis.Options{
|
rdb = redis.NewClient(&redis.Options{
|
||||||
Addr: config.Config.Redis.Address[0],
|
Addr: config.Config.Redis.Address[0],
|
||||||
Username: config.Config.Redis.Username,
|
Username: config.Config.Redis.Username,
|
||||||
Password: config.Config.Redis.Password, // no password set
|
Password: config.Config.Redis.Password,
|
||||||
DB: 0, // use default DB
|
DB: 0, // use default DB
|
||||||
PoolSize: 100, // connection pool size
|
PoolSize: 100, // connection pool size
|
||||||
MaxRetries: maxRetry,
|
MaxRetries: maxRetry,
|
||||||
@@ -78,3 +83,16 @@ func NewRedis() (redis.UniversalClient, error) {
|
|||||||
redisClient = rdb
|
redisClient = rdb
|
||||||
return rdb, err
|
return rdb, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// overrideConfigFromEnv overrides configuration fields with environment variables if present.
|
||||||
|
func overrideConfigFromEnv() {
|
||||||
|
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
|
||||||
|
config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated
|
||||||
|
}
|
||||||
|
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
|
||||||
|
config.Config.Redis.Username = envUser
|
||||||
|
}
|
||||||
|
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
|
||||||
|
config.Config.Redis.Password = envPass
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,27 +1,13 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package unrelation
|
package unrelation
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
|
||||||
@@ -34,7 +20,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxRetry = 10 // number of retries
|
maxRetry = 10 // number of retries
|
||||||
|
mongoConnTimeout = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
type Mongo struct {
|
type Mongo struct {
|
||||||
@@ -44,90 +31,123 @@ type Mongo struct {
|
|||||||
// NewMongo Initialize MongoDB connection.
|
// NewMongo Initialize MongoDB connection.
|
||||||
func NewMongo() (*Mongo, error) {
|
func NewMongo() (*Mongo, error) {
|
||||||
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
|
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
|
||||||
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
|
uri := buildMongoURI()
|
||||||
if config.Config.Mongo.Uri != "" {
|
|
||||||
uri = config.Config.Mongo.Uri
|
|
||||||
} else {
|
|
||||||
mongodbHosts := ""
|
|
||||||
for i, v := range config.Config.Mongo.Address {
|
|
||||||
if i == len(config.Config.Mongo.Address)-1 {
|
|
||||||
mongodbHosts += v
|
|
||||||
} else {
|
|
||||||
mongodbHosts += v + ","
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
|
|
||||||
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
|
|
||||||
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
|
|
||||||
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
|
|
||||||
} else {
|
|
||||||
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
|
|
||||||
mongodbHosts, config.Config.Mongo.Database,
|
|
||||||
config.Config.Mongo.MaxPoolSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Println("mongo:", uri)
|
fmt.Println("mongo:", uri)
|
||||||
|
|
||||||
var mongoClient *mongo.Client
|
var mongoClient *mongo.Client
|
||||||
var err error = nil
|
var err error
|
||||||
|
|
||||||
|
// Retry connecting to MongoDB
|
||||||
for i := 0; i <= maxRetry; i++ {
|
for i := 0; i <= maxRetry; i++ {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
|
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return &Mongo{db: mongoClient}, nil
|
return &Mongo{db: mongoClient}, nil
|
||||||
}
|
}
|
||||||
if cmdErr, ok := err.(mongo.CommandError); ok {
|
if shouldRetry(err) {
|
||||||
if cmdErr.Code == 13 || cmdErr.Code == 18 {
|
fmt.Printf("Failed to connect to MongoDB, retrying: %s\n", err)
|
||||||
return nil, err
|
time.Sleep(time.Second) // exponential backoff could be implemented here
|
||||||
} else {
|
continue
|
||||||
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildMongoURI() string {
|
||||||
|
uri := os.Getenv("MONGO_URI")
|
||||||
|
if uri != "" {
|
||||||
|
return uri
|
||||||
|
}
|
||||||
|
|
||||||
|
username := os.Getenv("MONGO_USERNAME")
|
||||||
|
password := os.Getenv("MONGO_PASSWORD")
|
||||||
|
address := os.Getenv("MONGO_ADDRESS")
|
||||||
|
port := os.Getenv("MONGO_PORT")
|
||||||
|
database := os.Getenv("MONGO_DATABASE")
|
||||||
|
maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE")
|
||||||
|
|
||||||
|
if username == "" {
|
||||||
|
username = config.Config.Mongo.Username
|
||||||
|
}
|
||||||
|
if password == "" {
|
||||||
|
password = config.Config.Mongo.Password
|
||||||
|
}
|
||||||
|
if address == "" {
|
||||||
|
address = strings.Join(config.Config.Mongo.Address, ",")
|
||||||
|
} else if port != "" {
|
||||||
|
address = fmt.Sprintf("%s:%s", address, port)
|
||||||
|
}
|
||||||
|
if database == "" {
|
||||||
|
database = config.Config.Mongo.Database
|
||||||
|
}
|
||||||
|
if maxPoolSize == "" {
|
||||||
|
maxPoolSize = fmt.Sprint(config.Config.Mongo.MaxPoolSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
uriFormat := "mongodb://%s/%s?maxPoolSize=%s&authSource=admin"
|
||||||
|
if username != "" && password != "" {
|
||||||
|
uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s&authSource=admin"
|
||||||
|
return fmt.Sprintf(uriFormat, username, password, address, database, maxPoolSize)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf(uriFormat, address, database, maxPoolSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
func shouldRetry(err error) bool {
|
||||||
|
if cmdErr, ok := err.(mongo.CommandError); ok {
|
||||||
|
return cmdErr.Code != 13 && cmdErr.Code != 18
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetClient returns the MongoDB client.
|
||||||
func (m *Mongo) GetClient() *mongo.Client {
|
func (m *Mongo) GetClient() *mongo.Client {
|
||||||
return m.db
|
return m.db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetDatabase returns the specific database from MongoDB.
|
||||||
func (m *Mongo) GetDatabase() *mongo.Database {
|
func (m *Mongo) GetDatabase() *mongo.Database {
|
||||||
return m.db.Database(config.Config.Mongo.Database)
|
return m.db.Database(config.Config.Mongo.Database)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateMsgIndex creates an index for messages in MongoDB.
|
||||||
func (m *Mongo) CreateMsgIndex() error {
|
func (m *Mongo) CreateMsgIndex() error {
|
||||||
return m.createMongoIndex(unrelation.Msg, true, "doc_id")
|
return m.createMongoIndex(unrelation.Msg, true, "doc_id")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createMongoIndex creates an index in a MongoDB collection.
|
||||||
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
|
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
|
||||||
db := m.db.Database(config.Config.Mongo.Database).Collection(collection)
|
db := m.GetDatabase().Collection(collection)
|
||||||
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
|
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
|
||||||
indexView := db.Indexes()
|
indexView := db.Indexes()
|
||||||
keysDoc := bson.D{}
|
|
||||||
// create composite indexes
|
keysDoc := buildIndexKeys(keys)
|
||||||
for _, key := range keys {
|
|
||||||
if strings.HasPrefix(key, "-") {
|
|
||||||
keysDoc = append(keysDoc, bson.E{Key: strings.TrimLeft(key, "-"), Value: -1})
|
|
||||||
// keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1))
|
|
||||||
} else {
|
|
||||||
keysDoc = append(keysDoc, bson.E{Key: key, Value: 1})
|
|
||||||
// keysDoc = keysDoc.Append(key, bsonx.Int32(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// create index
|
|
||||||
index := mongo.IndexModel{
|
index := mongo.IndexModel{
|
||||||
Keys: keysDoc,
|
Keys: keysDoc,
|
||||||
}
|
}
|
||||||
if isUnique {
|
if isUnique {
|
||||||
index.Options = options.Index().SetUnique(true)
|
index.Options = options.Index().SetUnique(true)
|
||||||
}
|
}
|
||||||
result, err := indexView.CreateOne(
|
|
||||||
context.Background(),
|
_, err := indexView.CreateOne(context.Background(), index, opts)
|
||||||
index,
|
|
||||||
opts,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return utils.Wrap(err, result)
|
return utils.Wrap(err, "CreateIndex")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildIndexKeys builds the BSON document for index keys.
|
||||||
|
func buildIndexKeys(keys []string) bson.D {
|
||||||
|
keysDoc := bson.D{}
|
||||||
|
for _, key := range keys {
|
||||||
|
direction := 1 // default direction is ascending
|
||||||
|
if strings.HasPrefix(key, "-") {
|
||||||
|
direction = -1 // descending order for prefixed with "-"
|
||||||
|
key = strings.TrimLeft(key, "-")
|
||||||
|
}
|
||||||
|
keysDoc = append(keysDoc, bson.E{Key: key, Value: direction})
|
||||||
|
}
|
||||||
|
return keysDoc
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,93 +1,22 @@
|
|||||||
package discoveryregister
|
package discoveryregister
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"time"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
|
||||||
"github.com/OpenIMSDK/tools/log"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
|
||||||
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
|
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||||
var client discoveryregistry.SvcDiscoveryRegistry
|
|
||||||
var err error
|
|
||||||
switch envType {
|
switch envType {
|
||||||
case "zookeeper":
|
case "zookeeper":
|
||||||
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
|
return zookeeper.NewZookeeperDiscoveryRegister()
|
||||||
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
|
|
||||||
config.Config.Zookeeper.Username,
|
|
||||||
config.Config.Zookeeper.Password,
|
|
||||||
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))
|
|
||||||
case "k8s":
|
case "k8s":
|
||||||
client, err = NewK8sDiscoveryRegister()
|
return kubernetes.NewK8sDiscoveryRegister()
|
||||||
default:
|
default:
|
||||||
client = nil
|
return nil, errors.New("envType not correct")
|
||||||
err = errors.New("envType not correct")
|
|
||||||
}
|
}
|
||||||
return client, err
|
|
||||||
}
|
|
||||||
|
|
||||||
type K8sDR struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
|
||||||
return &K8sDR{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
|
||||||
cli.rpcRegisterAddr = serviceName
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) UnRegister() error {
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
|
|
||||||
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
|
||||||
|
|
||||||
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
|
||||||
return []*grpc.ClientConn{conn}, err
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
|
||||||
|
|
||||||
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) GetSelfConnTarget() string {
|
|
||||||
|
|
||||||
return cli.rpcRegisterAddr
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
|
|
||||||
cli.options = append(cli.options, opts...)
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
|
|
||||||
conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// do not use this method for call rpc
|
|
||||||
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
|
|
||||||
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) Close() {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,407 +1,45 @@
|
|||||||
package discoveryregister
|
package discoveryregister
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"os"
|
||||||
"reflect"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
"google.golang.org/grpc"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func setupTestEnvironment() {
|
||||||
|
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
|
||||||
|
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
|
||||||
|
os.Setenv("ZOOKEEPER_USERNAME", "")
|
||||||
|
os.Setenv("ZOOKEEPER_PASSWORD", "")
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewDiscoveryRegister(t *testing.T) {
|
func TestNewDiscoveryRegister(t *testing.T) {
|
||||||
type args struct {
|
setupTestEnvironment()
|
||||||
envType string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
args args
|
|
||||||
want discoveryregistry.SvcDiscoveryRegistry
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
got, err := NewDiscoveryRegister(tt.args.envType)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNewK8sDiscoveryRegister(t *testing.T) {
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
envType string
|
||||||
want discoveryregistry.SvcDiscoveryRegistry
|
expectedError bool
|
||||||
wantErr bool
|
expectedResult bool
|
||||||
}{
|
}{
|
||||||
// TODO: Add test cases.
|
{"zookeeper", false, true},
|
||||||
|
{"k8s", false, true}, // 假设 k8s 配置也已正确设置
|
||||||
|
{"invalid", true, false},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
got, err := NewK8sDiscoveryRegister()
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_Register(t *testing.T) {
|
for _, test := range tests {
|
||||||
type fields struct {
|
client, err := NewDiscoveryRegister(test.envType)
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
serviceName string
|
|
||||||
host string
|
|
||||||
port int
|
|
||||||
opts []grpc.DialOption
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_UnRegister(t *testing.T) {
|
if test.expectedError {
|
||||||
type fields struct {
|
assert.Error(t, err)
|
||||||
options []grpc.DialOption
|
} else {
|
||||||
rpcRegisterAddr string
|
assert.NoError(t, err)
|
||||||
}
|
if test.expectedResult {
|
||||||
tests := []struct {
|
assert.Implements(t, (*discoveryregistry.SvcDiscoveryRegistry)(nil), client)
|
||||||
name string
|
} else {
|
||||||
fields fields
|
assert.Nil(t, client)
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
}
|
||||||
if err := cli.UnRegister(); (err != nil) != tt.wantErr {
|
}
|
||||||
t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_CreateRpcRootNodes(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
serviceNames []string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_RegisterConf2Registry(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
key string
|
|
||||||
conf []byte
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetConfFromRegistry(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
key string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
want []byte
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
got, err := cli.GetConfFromRegistry(tt.args.key)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetConns(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
ctx context.Context
|
|
||||||
serviceName string
|
|
||||||
opts []grpc.DialOption
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
want []*grpc.ClientConn
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetConn(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
ctx context.Context
|
|
||||||
serviceName string
|
|
||||||
opts []grpc.DialOption
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
want *grpc.ClientConn
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetSelfConnTarget(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
want string
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if got := cli.GetSelfConnTarget(); got != tt.want {
|
|
||||||
t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_AddOption(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
opts []grpc.DialOption
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
cli.AddOption(tt.args.opts...)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_CloseConn(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
conn *grpc.ClientConn
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
cli.CloseConn(tt.args.conn)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetClientLocalConns(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
want map[string][]*grpc.ClientConn
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_Close(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
cli.Close()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,90 @@
|
|||||||
|
package kubernetes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
|
)
|
||||||
|
|
||||||
|
// K8sDR represents the Kubernetes service discovery and registration client.
|
||||||
|
type K8sDR struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
|
||||||
|
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||||
|
|
||||||
|
return &K8sDR{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register registers a service with Kubernetes.
|
||||||
|
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
||||||
|
cli.rpcRegisterAddr = serviceName
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnRegister removes a service registration from Kubernetes.
|
||||||
|
func (cli *K8sDR) UnRegister() error {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
|
||||||
|
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterConf2Registry registers a configuration to the registry.
|
||||||
|
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConfFromRegistry retrieves a configuration from the registry.
|
||||||
|
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConns returns a list of gRPC client connections for a given service.
|
||||||
|
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
||||||
|
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||||
|
return []*grpc.ClientConn{conn}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConn returns a single gRPC client connection for a given service.
|
||||||
|
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||||
|
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSelfConnTarget returns the connection target of the client itself.
|
||||||
|
func (cli *K8sDR) GetSelfConnTarget() string {
|
||||||
|
return cli.rpcRegisterAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddOption adds gRPC dial options to the client.
|
||||||
|
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
|
||||||
|
cli.options = append(cli.options, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseConn closes a given gRPC client connection.
|
||||||
|
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not use this method for call rpc
|
||||||
|
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
|
||||||
|
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the K8sDR client.
|
||||||
|
func (cli *K8sDR) Close() {
|
||||||
|
// Close any open resources here (if applicable)
|
||||||
|
return
|
||||||
|
}
|
||||||
@@ -0,0 +1,46 @@
|
|||||||
|
package zookeeper
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
|
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
||||||
|
"github.com/OpenIMSDK/tools/log"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
|
||||||
|
func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||||
|
schema := getEnv("ZOOKEEPER_SCHEMA", config.Config.Zookeeper.Schema)
|
||||||
|
zkAddr := getZkAddrFromEnv(config.Config.Zookeeper.ZkAddr)
|
||||||
|
username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username)
|
||||||
|
password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password)
|
||||||
|
|
||||||
|
return openkeeper.NewClient(
|
||||||
|
zkAddr,
|
||||||
|
schema,
|
||||||
|
openkeeper.WithFreq(time.Hour),
|
||||||
|
openkeeper.WithUserNameAndPassword(username, password),
|
||||||
|
openkeeper.WithRoundRobin(),
|
||||||
|
openkeeper.WithTimeout(10),
|
||||||
|
openkeeper.WithLogger(log.NewZkLogger()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||||
|
func getEnv(key, fallback string) string {
|
||||||
|
if value, exists := os.LookupEnv(key); exists {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
|
||||||
|
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||||
|
func getZkAddrFromEnv(fallback []string) []string {
|
||||||
|
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
|
||||||
|
return strings.Split(value, ",")
|
||||||
|
}
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
@@ -1,17 +1,3 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -21,19 +7,17 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/IBM/sarama"
|
||||||
"github.com/OpenIMSDK/protocol/constant"
|
"github.com/OpenIMSDK/protocol/constant"
|
||||||
log "github.com/OpenIMSDK/tools/log"
|
"github.com/OpenIMSDK/tools/log"
|
||||||
"github.com/OpenIMSDK/tools/mcontext"
|
"github.com/OpenIMSDK/tools/mcontext"
|
||||||
"github.com/OpenIMSDK/tools/utils"
|
"github.com/OpenIMSDK/tools/utils"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
|
||||||
"github.com/IBM/sarama"
|
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxRetry = 10 // number of retries
|
maxRetry = 10 // Maximum number of retries for producer creation
|
||||||
)
|
)
|
||||||
|
|
||||||
var errEmptyMsg = errors.New("binary msg is empty")
|
var errEmptyMsg = errors.New("binary msg is empty")
|
||||||
@@ -45,62 +29,85 @@ type Producer struct {
|
|||||||
producer sarama.SyncProducer
|
producer sarama.SyncProducer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKafkaProducer Initialize kafka producer.
|
// NewKafkaProducer initializes a new Kafka producer.
|
||||||
func NewKafkaProducer(addr []string, topic string) *Producer {
|
func NewKafkaProducer(addr []string, topic string) *Producer {
|
||||||
p := Producer{}
|
p := Producer{
|
||||||
p.config = sarama.NewConfig() // Instantiate a sarama Config
|
addr: addr,
|
||||||
p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully
|
topic: topic,
|
||||||
|
config: sarama.NewConfig(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set producer return flags
|
||||||
|
p.config.Producer.Return.Successes = true
|
||||||
p.config.Producer.Return.Errors = true
|
p.config.Producer.Return.Errors = true
|
||||||
p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
|
|
||||||
|
|
||||||
var producerAck = sarama.WaitForAll // default: WaitForAll
|
// Set partitioner strategy
|
||||||
switch strings.ToLower(config.Config.Kafka.ProducerAck) {
|
p.config.Producer.Partitioner = sarama.NewHashPartitioner
|
||||||
case "no_response":
|
|
||||||
producerAck = sarama.NoResponse
|
|
||||||
case "wait_for_local":
|
|
||||||
producerAck = sarama.WaitForLocal
|
|
||||||
case "wait_for_all":
|
|
||||||
producerAck = sarama.WaitForAll
|
|
||||||
}
|
|
||||||
p.config.Producer.RequiredAcks = producerAck
|
|
||||||
|
|
||||||
var compress = sarama.CompressionNone // default: no compress
|
// Configure producer acknowledgement level
|
||||||
_ = compress.UnmarshalText(bytes.ToLower([]byte(config.Config.Kafka.CompressType)))
|
configureProducerAck(&p, config.Config.Kafka.ProducerAck)
|
||||||
p.config.Producer.Compression = compress
|
|
||||||
|
|
||||||
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
|
// Configure message compression
|
||||||
|
configureCompression(&p, config.Config.Kafka.CompressType)
|
||||||
|
|
||||||
|
// Get Kafka configuration from environment variables or fallback to config file
|
||||||
|
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
|
||||||
|
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
|
||||||
|
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config
|
||||||
|
|
||||||
|
// Configure SASL authentication if credentials are provided
|
||||||
|
if kafkaUsername != "" && kafkaPassword != "" {
|
||||||
p.config.Net.SASL.Enable = true
|
p.config.Net.SASL.Enable = true
|
||||||
p.config.Net.SASL.User = config.Config.Kafka.Username
|
p.config.Net.SASL.User = kafkaUsername
|
||||||
p.config.Net.SASL.Password = config.Config.Kafka.Password
|
p.config.Net.SASL.Password = kafkaPassword
|
||||||
}
|
}
|
||||||
p.addr = addr
|
|
||||||
p.topic = topic
|
// Set the Kafka address
|
||||||
|
p.addr = []string{kafkaAddr}
|
||||||
|
|
||||||
|
// Set up TLS configuration (if required)
|
||||||
SetupTLSConfig(p.config)
|
SetupTLSConfig(p.config)
|
||||||
var producer sarama.SyncProducer
|
|
||||||
|
// Create the producer with retries
|
||||||
var err error
|
var err error
|
||||||
for i := 0; i <= maxRetry; i++ {
|
for i := 0; i <= maxRetry; i++ {
|
||||||
producer, err = sarama.NewSyncProducer(p.addr, p.config) // Initialize the client
|
p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
p.producer = producer
|
|
||||||
return &p
|
return &p
|
||||||
}
|
}
|
||||||
//TODO If the password is wrong, exit directly
|
time.Sleep(1 * time.Second) // Wait before retrying
|
||||||
//if packetErr, ok := err.(*sarama.PacketEncodingError); ok {
|
|
||||||
//if _, ok := packetErr.Err.(sarama.AuthenticationError); ok {
|
|
||||||
// fmt.Println("Kafka password is wrong.")
|
|
||||||
//}
|
|
||||||
//} else {
|
|
||||||
// fmt.Printf("Failed to create Kafka producer: %v\n", err)
|
|
||||||
//}
|
|
||||||
time.Sleep(time.Duration(1) * time.Second)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Panic if unable to create producer after retries
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err.Error())
|
panic("Failed to create Kafka producer: " + err.Error())
|
||||||
}
|
}
|
||||||
p.producer = producer
|
|
||||||
return &p
|
return &p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// configureProducerAck configures the producer's acknowledgement level.
|
||||||
|
func configureProducerAck(p *Producer, ackConfig string) {
|
||||||
|
switch strings.ToLower(ackConfig) {
|
||||||
|
case "no_response":
|
||||||
|
p.config.Producer.RequiredAcks = sarama.NoResponse
|
||||||
|
case "wait_for_local":
|
||||||
|
p.config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||||
|
case "wait_for_all":
|
||||||
|
p.config.Producer.RequiredAcks = sarama.WaitForAll
|
||||||
|
default:
|
||||||
|
p.config.Producer.RequiredAcks = sarama.WaitForAll
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// configureCompression configures the message compression type for the producer.
|
||||||
|
func configureCompression(p *Producer, compressType string) {
|
||||||
|
var compress sarama.CompressionCodec = sarama.CompressionNone
|
||||||
|
compress.UnmarshalText(bytes.ToLower([]byte(compressType)))
|
||||||
|
p.config.Producer.Compression = compress
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMQHeaderWithContext extracts message queue headers from the context.
|
||||||
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
|
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
|
||||||
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
|
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -111,22 +118,23 @@ func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)
|
|||||||
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
|
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
|
||||||
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
|
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
|
||||||
{Key: []byte(constant.ConnID), Value: []byte(connID)},
|
{Key: []byte(constant.ConnID), Value: []byte(connID)},
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetContextWithMQHeader creates a context from message queue headers.
|
||||||
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
|
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
|
||||||
var values []string
|
var values []string
|
||||||
for _, recordHeader := range header {
|
for _, recordHeader := range header {
|
||||||
values = append(values, string(recordHeader.Value))
|
values = append(values, string(recordHeader.Value))
|
||||||
}
|
}
|
||||||
return mcontext.WithMustInfoCtx(values) // TODO
|
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SendMessage sends a message to the Kafka topic configured in the Producer.
|
||||||
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
|
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
|
||||||
log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key)
|
log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key)
|
||||||
kMsg := &sarama.ProducerMessage{}
|
|
||||||
kMsg.Topic = p.topic
|
// Marshal the protobuf message
|
||||||
kMsg.Key = sarama.StringEncoder(key)
|
|
||||||
bMsg, err := proto.Marshal(msg)
|
bMsg, err := proto.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
|
return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
|
||||||
@@ -134,20 +142,33 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
|
|||||||
if len(bMsg) == 0 {
|
if len(bMsg) == 0 {
|
||||||
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
||||||
}
|
}
|
||||||
kMsg.Value = sarama.ByteEncoder(bMsg)
|
|
||||||
|
// Prepare Kafka message
|
||||||
|
kMsg := &sarama.ProducerMessage{
|
||||||
|
Topic: p.topic,
|
||||||
|
Key: sarama.StringEncoder(key),
|
||||||
|
Value: sarama.ByteEncoder(bMsg),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate message key and value
|
||||||
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
||||||
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
||||||
}
|
}
|
||||||
kMsg.Metadata = ctx
|
|
||||||
|
// Attach context metadata as headers
|
||||||
header, err := GetMQHeaderWithContext(ctx)
|
header, err := GetMQHeaderWithContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, utils.Wrap(err, "")
|
return 0, 0, utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
kMsg.Headers = header
|
kMsg.Headers = header
|
||||||
|
|
||||||
|
// Send the message
|
||||||
partition, offset, err := p.producer.SendMessage(kMsg)
|
partition, offset, err := p.producer.SendMessage(kMsg)
|
||||||
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "p.producer.SendMessage error", err)
|
log.ZWarn(ctx, "p.producer.SendMessage error", err)
|
||||||
|
return 0, 0, utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
return partition, offset, utils.Wrap(err, "")
|
|
||||||
|
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())
|
||||||
|
return partition, offset, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,8 @@
|
|||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/IBM/sarama"
|
"github.com/IBM/sarama"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
@@ -33,3 +35,12 @@ func SetupTLSConfig(cfg *sarama.Config) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getEnvOrConfig returns the value of the environment variable if it exists,
|
||||||
|
// otherwise, it returns the value from the configuration file.
|
||||||
|
func getEnvOrConfig(envName string, configValue string) string {
|
||||||
|
if value, exists := os.LookupEnv(envName); exists {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return configValue
|
||||||
|
}
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ openim::log::info "\n# Use Docker to start all openim service"
|
|||||||
|
|
||||||
trap 'openim::util::onCtrlC' INT
|
trap 'openim::util::onCtrlC' INT
|
||||||
|
|
||||||
|
"${OPENIM_ROOT}"/scripts/init-config.sh --skip
|
||||||
|
|
||||||
"${OPENIM_ROOT}"/scripts/start-all.sh
|
"${OPENIM_ROOT}"/scripts/start-all.sh
|
||||||
|
|
||||||
sleep 5
|
sleep 5
|
||||||
|
|||||||
+102
-69
@@ -1,17 +1,4 @@
|
|||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
# Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# You may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
# This script automatically initializes various configuration files and can generate example files.
|
# This script automatically initializes various configuration files and can generate example files.
|
||||||
|
|
||||||
@@ -48,7 +35,8 @@ declare -A EXAMPLES=(
|
|||||||
FORCE_OVERWRITE=false
|
FORCE_OVERWRITE=false
|
||||||
SKIP_EXISTING=false
|
SKIP_EXISTING=false
|
||||||
GENERATE_EXAMPLES=false
|
GENERATE_EXAMPLES=false
|
||||||
CLEAN_ENV_EXAMPLES=false
|
CLEAN_CONFIG=false
|
||||||
|
CLEAN_EXAMPLES=false
|
||||||
|
|
||||||
# Function to display help information
|
# Function to display help information
|
||||||
show_help() {
|
show_help() {
|
||||||
@@ -58,59 +46,45 @@ show_help() {
|
|||||||
echo " --force Overwrite existing files without prompt"
|
echo " --force Overwrite existing files without prompt"
|
||||||
echo " --skip Skip generation if file exists"
|
echo " --skip Skip generation if file exists"
|
||||||
echo " --examples Generate example files"
|
echo " --examples Generate example files"
|
||||||
echo " --clean-env-examples Generate example files in a clean environment"
|
echo " --clean-config Clean all configuration files"
|
||||||
|
echo " --clean-examples Clean all example files"
|
||||||
}
|
}
|
||||||
|
|
||||||
# Function to generate configuration files
|
# Function to generate configuration files
|
||||||
generate_config_files() {
|
generate_config_files() {
|
||||||
# Loop through each template in TEMPLATES
|
|
||||||
for template in "${!TEMPLATES[@]}"; do
|
for template in "${!TEMPLATES[@]}"; do
|
||||||
# Read the corresponding output files for the template
|
local output_file="${TEMPLATES[$template]}"
|
||||||
IFS=';' read -ra OUTPUT_FILES <<< "${TEMPLATES[$template]}"
|
if [[ -f "${output_file}" ]]; then
|
||||||
for output_file in "${OUTPUT_FILES[@]}"; do
|
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
||||||
# Check if the output file already exists
|
openim::log::info "Force overwriting ${output_file}."
|
||||||
if [[ -f "${output_file}" ]]; then
|
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||||
# Handle existing file based on command-line options
|
openim::log::info "Skipping generation of ${output_file} as it already exists."
|
||||||
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
continue
|
||||||
openim::log::info "Force overwriting ${output_file}."
|
else
|
||||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
|
||||||
openim::log::info "Skipping generation of ${output_file} as it already exists."
|
read -r -n 1 REPLY
|
||||||
|
echo
|
||||||
|
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
||||||
|
openim::log::info "Skipping generation of ${output_file}."
|
||||||
continue
|
continue
|
||||||
else
|
|
||||||
# Ask user for confirmation to overwrite
|
|
||||||
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
|
|
||||||
read -r -n 1 REPLY
|
|
||||||
echo
|
|
||||||
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
|
||||||
openim::log::info "Skipping generation of ${output_file}."
|
|
||||||
continue
|
|
||||||
fi
|
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
else
|
||||||
# Process the template file to generate the output file
|
if [[ "${SKIP_EXISTING}" == true ]]; then
|
||||||
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
|
openim::log::info "Generating ${output_file} as it does not exist."
|
||||||
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
|
||||||
openim::log::error "genconfig.sh script not found"
|
|
||||||
exit 1
|
|
||||||
fi
|
fi
|
||||||
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
|
|
||||||
openim::log::error "Error processing template file ${template}"
|
|
||||||
exit 1
|
|
||||||
}
|
|
||||||
sleep 0.5
|
|
||||||
done
|
|
||||||
done
|
|
||||||
}
|
|
||||||
|
|
||||||
# Function to generate example files
|
|
||||||
generate_example_files() {
|
|
||||||
for template in "${!EXAMPLES[@]}"; do
|
|
||||||
local example_file="${EXAMPLES[$template]}"
|
|
||||||
if [[ ! -f "${example_file}" ]]; then
|
|
||||||
openim::log::info "Generating example file: ${example_file} from ${template}..."
|
|
||||||
cp "${template}" "${example_file}"
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
|
||||||
|
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
||||||
|
openim::log::error "genconfig.sh script not found"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
|
||||||
|
openim::log::error "Error processing template file ${template}"
|
||||||
|
exit 1
|
||||||
|
}
|
||||||
|
sleep 0.5
|
||||||
done
|
done
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,7 +94,8 @@ declare -A env_vars=(
|
|||||||
["LOG_STORAGE_LOCATION"]="../logs/"
|
["LOG_STORAGE_LOCATION"]="../logs/"
|
||||||
)
|
)
|
||||||
|
|
||||||
generate_clean_environment_examples() {
|
# Function to generate example files
|
||||||
|
generate_example_files() {
|
||||||
env_cmd="env -i"
|
env_cmd="env -i"
|
||||||
for var in "${!env_vars[@]}"; do
|
for var in "${!env_vars[@]}"; do
|
||||||
env_cmd+=" $var='${env_vars[$var]}'"
|
env_cmd+=" $var='${env_vars[$var]}'"
|
||||||
@@ -128,12 +103,56 @@ generate_clean_environment_examples() {
|
|||||||
|
|
||||||
for template in "${!EXAMPLES[@]}"; do
|
for template in "${!EXAMPLES[@]}"; do
|
||||||
local example_file="${EXAMPLES[$template]}"
|
local example_file="${EXAMPLES[$template]}"
|
||||||
openim::log::info "Generating example file: ${example_file} from ${template}..."
|
if [[ -f "${example_file}" ]]; then
|
||||||
|
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
||||||
|
openim::log::info "Force overwriting example file: ${example_file}."
|
||||||
|
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||||
|
openim::log::info "Skipping generation of example file: ${example_file} as it already exists."
|
||||||
|
continue
|
||||||
|
else
|
||||||
|
echo -n "Example file ${example_file} already exists. Overwrite? (Y/N): "
|
||||||
|
read -r -n 1 REPLY
|
||||||
|
echo
|
||||||
|
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
||||||
|
openim::log::info "Skipping generation of example file: ${example_file}."
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||||
|
openim::log::info "Generating example file: ${example_file} as it does not exist."
|
||||||
|
fi
|
||||||
|
|
||||||
|
openim::log::info "⌚ Working with template file: ${template} to generate example file: ${example_file}..."
|
||||||
|
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
||||||
|
openim::log::error "genconfig.sh script not found"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
eval "$env_cmd ${OPENIM_ROOT}/scripts/genconfig.sh '${ENV_FILE}' '${template}' > '${example_file}'" || {
|
eval "$env_cmd ${OPENIM_ROOT}/scripts/genconfig.sh '${ENV_FILE}' '${template}' > '${example_file}'" || {
|
||||||
openim::log::error "Error processing template file ${template}"
|
openim::log::error "Error processing template file ${template}"
|
||||||
exit 1
|
exit 1
|
||||||
}
|
}
|
||||||
|
sleep 0.5
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Function to clean configuration files
|
||||||
|
clean_config_files() {
|
||||||
|
for output_file in "${TEMPLATES[@]}"; do
|
||||||
|
if [[ -f "${output_file}" ]]; then
|
||||||
|
rm -f "${output_file}"
|
||||||
|
openim::log::info "Removed configuration file: ${output_file}"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
# Function to clean example files
|
||||||
|
clean_example_files() {
|
||||||
|
for example_file in "${EXAMPLES[@]}"; do
|
||||||
|
if [[ -f "${example_file}" ]]; then
|
||||||
|
rm -f "${example_file}"
|
||||||
|
openim::log::info "Removed example file: ${example_file}"
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,8 +174,12 @@ while [[ $# -gt 0 ]]; do
|
|||||||
GENERATE_EXAMPLES=true
|
GENERATE_EXAMPLES=true
|
||||||
shift
|
shift
|
||||||
;;
|
;;
|
||||||
--clean-env-examples)
|
--clean-config)
|
||||||
CLEAN_ENV_EXAMPLES=true
|
CLEAN_CONFIG=true
|
||||||
|
shift
|
||||||
|
;;
|
||||||
|
--clean-examples)
|
||||||
|
CLEAN_EXAMPLES=true
|
||||||
shift
|
shift
|
||||||
;;
|
;;
|
||||||
*)
|
*)
|
||||||
@@ -167,19 +190,29 @@ while [[ $# -gt 0 ]]; do
|
|||||||
esac
|
esac
|
||||||
done
|
done
|
||||||
|
|
||||||
|
# Clean configuration files if --clean-config option is provided
|
||||||
|
if [[ "${CLEAN_CONFIG}" == true ]]; then
|
||||||
|
clean_config_files
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Clean example files if --clean-examples option is provided
|
||||||
|
if [[ "${CLEAN_EXAMPLES}" == true ]]; then
|
||||||
|
clean_example_files
|
||||||
|
fi
|
||||||
|
|
||||||
# Generate configuration files if requested
|
# Generate configuration files if requested
|
||||||
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]]; then
|
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]] && [[ "${CLEAN_CONFIG}" == false ]]; then
|
||||||
|
generate_config_files
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Generate configuration files if requested
|
||||||
|
if [[ "${SKIP_EXISTING}" == true ]]; then
|
||||||
generate_config_files
|
generate_config_files
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Generate example files if --examples option is provided
|
# Generate example files if --examples option is provided
|
||||||
if [[ "${GENERATE_EXAMPLES}" == true ]]; then
|
if [[ "${GENERATE_EXAMPLES}" == true ]] && [[ "${CLEAN_EXAMPLES}" == false ]]; then
|
||||||
generate_example_files
|
generate_example_files
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Generate example files in a clean environment if --clean-env-examples option is provided
|
openim::log::success "Configuration and example files operation complete!"
|
||||||
if [[ "${CLEAN_ENV_EXAMPLES}" == true ]]; then
|
|
||||||
generate_clean_environment_examples
|
|
||||||
fi
|
|
||||||
|
|
||||||
openim::log::success "Configuration and example files generation complete!"
|
|
||||||
|
|||||||
Reference in New Issue
Block a user