Compare commits

...

5 Commits

Author SHA1 Message Date
Xinwei Xiong (cubxxw) 856b54e371 feat: add kafka and redis mongo env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-13 23:20:58 +08:00
Xinwei Xiong (cubxxw) d61495b047 feat: add zk and redis mongo env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-13 20:14:45 +08:00
Xinwei Xiong (cubxxw) 6271a61c36 feat: add openim mongo and redis env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-13 18:21:19 +08:00
Xinwei Xiong (cubxxw) f875f99dd8 feat: add openim env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-13 15:59:14 +08:00
Xinwei Xiong (cubxxw) af54170cf2 feat: add openim server code
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-13 10:53:24 +08:00
12 changed files with 487 additions and 677 deletions
+12 -5
View File
@@ -37,13 +37,20 @@ zookeeper:
###################### Mongo ###################### ###################### Mongo ######################
# MongoDB configuration # MongoDB configuration
# If uri is not empty, it will be used directly
# # If uri is not empty, it will be used directly for the MongoDB connection.
# MongoDB address for standalone setup, Mongos address for sharded cluster setup # This is a complete MongoDB URI string.
# Default MongoDB database name # Example: mongodb://user:password@host1:port1,host2:port2/dbname?options
# Maximum connection pool size
mongo: mongo:
uri: ${MONGO_URI} uri: ${MONGO_URI}
# List of MongoDB server addresses.
# Used for constructing the MongoDB URI if 'uri' above is empty.
# For a standalone setup, specify the address of the single server.
# For a sharded cluster, specify the addresses of the Mongos servers.
# Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ]
# Default MongoDB database name
# Maximum connection pool size
address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ] address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ]
database: ${MONGO_DATABASE} database: ${MONGO_DATABASE}
username: ${MONGO_USERNAME} username: ${MONGO_USERNAME}
-5
View File
@@ -34,7 +34,6 @@ services:
ipv4_address: ${MONGO_NETWORK_ADDRESS:-172.28.0.2} ipv4_address: ${MONGO_NETWORK_ADDRESS:-172.28.0.2}
redis: redis:
# image: redis:7.0.0
image: redis:${REDIS_IMAGE_VERSION:-7.0.0} image: redis:${REDIS_IMAGE_VERSION:-7.0.0}
container_name: redis container_name: redis
ports: ports:
@@ -53,7 +52,6 @@ services:
ipv4_address: ${REDIS_NETWORK_ADDRESS:-172.28.0.3} ipv4_address: ${REDIS_NETWORK_ADDRESS:-172.28.0.3}
zookeeper: zookeeper:
# image: bitnami/zookeeper:3.8
image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION:-3.8} image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION:-3.8}
container_name: zookeeper container_name: zookeeper
ports: ports:
@@ -69,7 +67,6 @@ services:
ipv4_address: ${ZOOKEEPER_NETWORK_ADDRESS:-172.28.0.5} ipv4_address: ${ZOOKEEPER_NETWORK_ADDRESS:-172.28.0.5}
kafka: kafka:
# image: 'bitnami/kafka:3.5.1'
image: 'bitnami/kafka:${KAFKA_IMAGE_VERSION:-3.5.1}' image: 'bitnami/kafka:${KAFKA_IMAGE_VERSION:-3.5.1}'
container_name: kafka container_name: kafka
restart: always restart: always
@@ -95,7 +92,6 @@ services:
ipv4_address: ${KAFKA_NETWORK_ADDRESS:-172.28.0.4} ipv4_address: ${KAFKA_NETWORK_ADDRESS:-172.28.0.4}
minio: minio:
# image: minio/minio
image: minio/minio:${MINIO_IMAGE_VERSION:-latest} image: minio/minio:${MINIO_IMAGE_VERSION:-latest}
ports: ports:
- "${MINIO_PORT:-10005}:9000" - "${MINIO_PORT:-10005}:9000"
@@ -114,7 +110,6 @@ services:
ipv4_address: ${MINIO_NETWORK_ADDRESS:-172.28.0.6} ipv4_address: ${MINIO_NETWORK_ADDRESS:-172.28.0.6}
openim-web: openim-web:
# image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:latest
image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:${OPENIM_WEB_IMAGE_VERSION:-latest} image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:${OPENIM_WEB_IMAGE_VERSION:-latest}
container_name: openim-web container_name: openim-web
environment: environment:
+19 -1
View File
@@ -18,6 +18,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"os"
"strings"
"time" "time"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
@@ -43,6 +45,9 @@ func NewRedis() (redis.UniversalClient, error) {
return redisClient, nil return redisClient, nil
} }
// Read configuration from environment variables
overrideConfigFromEnv()
if len(config.Config.Redis.Address) == 0 { if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty") return nil, errors.New("redis address is empty")
} }
@@ -60,7 +65,7 @@ func NewRedis() (redis.UniversalClient, error) {
rdb = redis.NewClient(&redis.Options{ rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.Address[0], Addr: config.Config.Redis.Address[0],
Username: config.Config.Redis.Username, Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set Password: config.Config.Redis.Password,
DB: 0, // use default DB DB: 0, // use default DB
PoolSize: 100, // connection pool size PoolSize: 100, // connection pool size
MaxRetries: maxRetry, MaxRetries: maxRetry,
@@ -78,3 +83,16 @@ func NewRedis() (redis.UniversalClient, error) {
redisClient = rdb redisClient = rdb
return rdb, err return rdb, err
} }
// overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv() {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated
}
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
config.Config.Redis.Username = envUser
}
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
config.Config.Redis.Password = envPass
}
}
+85 -65
View File
@@ -1,27 +1,13 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unrelation package unrelation
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"strings" "strings"
"time" "time"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
@@ -34,7 +20,8 @@ import (
) )
const ( const (
maxRetry = 10 // number of retries maxRetry = 10 // number of retries
mongoConnTimeout = 10 * time.Second
) )
type Mongo struct { type Mongo struct {
@@ -44,90 +31,123 @@ type Mongo struct {
// NewMongo Initialize MongoDB connection. // NewMongo Initialize MongoDB connection.
func NewMongo() (*Mongo, error) { func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" uri := buildMongoURI()
if config.Config.Mongo.Uri != "" {
uri = config.Config.Mongo.Uri
} else {
mongodbHosts := ""
for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 {
mongodbHosts += v
} else {
mongodbHosts += v + ","
}
}
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize)
}
}
fmt.Println("mongo:", uri) fmt.Println("mongo:", uri)
var mongoClient *mongo.Client var mongoClient *mongo.Client
var err error = nil var err error
// Retry connecting to MongoDB
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
defer cancel() defer cancel()
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri)) mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err == nil { if err == nil {
return &Mongo{db: mongoClient}, nil return &Mongo{db: mongoClient}, nil
} }
if cmdErr, ok := err.(mongo.CommandError); ok { if shouldRetry(err) {
if cmdErr.Code == 13 || cmdErr.Code == 18 { fmt.Printf("Failed to connect to MongoDB, retrying: %s\n", err)
return nil, err time.Sleep(time.Second) // exponential backoff could be implemented here
} else { continue
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
}
} }
return nil, err
} }
return nil, err return nil, err
} }
func buildMongoURI() string {
uri := os.Getenv("MONGO_URI")
if uri != "" {
return uri
}
username := os.Getenv("MONGO_USERNAME")
password := os.Getenv("MONGO_PASSWORD")
address := os.Getenv("MONGO_ADDRESS")
port := os.Getenv("MONGO_PORT")
database := os.Getenv("MONGO_DATABASE")
maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE")
if username == "" {
username = config.Config.Mongo.Username
}
if password == "" {
password = config.Config.Mongo.Password
}
if address == "" {
address = strings.Join(config.Config.Mongo.Address, ",")
} else if port != "" {
address = fmt.Sprintf("%s:%s", address, port)
}
if database == "" {
database = config.Config.Mongo.Database
}
if maxPoolSize == "" {
maxPoolSize = fmt.Sprint(config.Config.Mongo.MaxPoolSize)
}
uriFormat := "mongodb://%s/%s?maxPoolSize=%s&authSource=admin"
if username != "" && password != "" {
uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s&authSource=admin"
return fmt.Sprintf(uriFormat, username, password, address, database, maxPoolSize)
}
return fmt.Sprintf(uriFormat, address, database, maxPoolSize)
}
func shouldRetry(err error) bool {
if cmdErr, ok := err.(mongo.CommandError); ok {
return cmdErr.Code != 13 && cmdErr.Code != 18
}
return true
}
// GetClient returns the MongoDB client.
func (m *Mongo) GetClient() *mongo.Client { func (m *Mongo) GetClient() *mongo.Client {
return m.db return m.db
} }
// GetDatabase returns the specific database from MongoDB.
func (m *Mongo) GetDatabase() *mongo.Database { func (m *Mongo) GetDatabase() *mongo.Database {
return m.db.Database(config.Config.Mongo.Database) return m.db.Database(config.Config.Mongo.Database)
} }
// CreateMsgIndex creates an index for messages in MongoDB.
func (m *Mongo) CreateMsgIndex() error { func (m *Mongo) CreateMsgIndex() error {
return m.createMongoIndex(unrelation.Msg, true, "doc_id") return m.createMongoIndex(unrelation.Msg, true, "doc_id")
} }
// createMongoIndex creates an index in a MongoDB collection.
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error { func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
db := m.db.Database(config.Config.Mongo.Database).Collection(collection) db := m.GetDatabase().Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second) opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
indexView := db.Indexes() indexView := db.Indexes()
keysDoc := bson.D{}
// create composite indexes keysDoc := buildIndexKeys(keys)
for _, key := range keys {
if strings.HasPrefix(key, "-") {
keysDoc = append(keysDoc, bson.E{Key: strings.TrimLeft(key, "-"), Value: -1})
// keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1))
} else {
keysDoc = append(keysDoc, bson.E{Key: key, Value: 1})
// keysDoc = keysDoc.Append(key, bsonx.Int32(1))
}
}
// create index
index := mongo.IndexModel{ index := mongo.IndexModel{
Keys: keysDoc, Keys: keysDoc,
} }
if isUnique { if isUnique {
index.Options = options.Index().SetUnique(true) index.Options = options.Index().SetUnique(true)
} }
result, err := indexView.CreateOne(
context.Background(), _, err := indexView.CreateOne(context.Background(), index, opts)
index,
opts,
)
if err != nil { if err != nil {
return utils.Wrap(err, result) return utils.Wrap(err, "CreateIndex")
} }
return nil return nil
} }
// buildIndexKeys builds the BSON document for index keys.
func buildIndexKeys(keys []string) bson.D {
keysDoc := bson.D{}
for _, key := range keys {
direction := 1 // default direction is ascending
if strings.HasPrefix(key, "-") {
direction = -1 // descending order for prefixed with "-"
key = strings.TrimLeft(key, "-")
}
keysDoc = append(keysDoc, bson.E{Key: key, Value: direction})
}
return keysDoc
}
@@ -1,93 +1,22 @@
package discoveryregister package discoveryregister
import ( import (
"context"
"errors" "errors"
"fmt"
"time" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) { func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
var client discoveryregistry.SvcDiscoveryRegistry
var err error
switch envType { switch envType {
case "zookeeper": case "zookeeper":
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, return zookeeper.NewZookeeperDiscoveryRegister()
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password,
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))
case "k8s": case "k8s":
client, err = NewK8sDiscoveryRegister() return kubernetes.NewK8sDiscoveryRegister()
default: default:
client = nil return nil, errors.New("envType not correct")
err = errors.New("envType not correct")
} }
return client, err
}
type K8sDR struct {
options []grpc.DialOption
rpcRegisterAddr string
}
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
return &K8sDR{}, nil
}
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
cli.rpcRegisterAddr = serviceName
return nil
}
func (cli *K8sDR) UnRegister() error {
return nil
}
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
}
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
}
func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr
}
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...)
}
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// do not use this method for call rpc
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil
}
func (cli *K8sDR) Close() {
return
} }
@@ -1,407 +1,45 @@
package discoveryregister package discoveryregister
import ( import (
"context" "os"
"reflect"
"testing" "testing"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"google.golang.org/grpc" "github.com/stretchr/testify/assert"
) )
func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "")
}
func TestNewDiscoveryRegister(t *testing.T) { func TestNewDiscoveryRegister(t *testing.T) {
type args struct { setupTestEnvironment()
envType string
}
tests := []struct {
name string
args args
want discoveryregistry.SvcDiscoveryRegistry
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewDiscoveryRegister(tt.args.envType)
if (err != nil) != tt.wantErr {
t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want)
}
})
}
}
func TestNewK8sDiscoveryRegister(t *testing.T) {
tests := []struct { tests := []struct {
name string envType string
want discoveryregistry.SvcDiscoveryRegistry expectedError bool
wantErr bool expectedResult bool
}{ }{
// TODO: Add test cases. {"zookeeper", false, true},
{"k8s", false, true}, // 假设 k8s 配置也已正确设置
{"invalid", true, false},
} }
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewK8sDiscoveryRegister()
if (err != nil) != tt.wantErr {
t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_Register(t *testing.T) { for _, test := range tests {
type fields struct { client, err := NewDiscoveryRegister(test.envType)
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
serviceName string
host string
port int
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_UnRegister(t *testing.T) { if test.expectedError {
type fields struct { assert.Error(t, err)
options []grpc.DialOption } else {
rpcRegisterAddr string assert.NoError(t, err)
} if test.expectedResult {
tests := []struct { assert.Implements(t, (*discoveryregistry.SvcDiscoveryRegistry)(nil), client)
name string } else {
fields fields assert.Nil(t, client)
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
} }
if err := cli.UnRegister(); (err != nil) != tt.wantErr { }
t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_CreateRpcRootNodes(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
serviceNames []string
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_RegisterConf2Registry(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
key string
conf []byte
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_GetConfFromRegistry(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
key string
}
tests := []struct {
name string
fields fields
args args
want []byte
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConfFromRegistry(tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetConns(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
ctx context.Context
serviceName string
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
want []*grpc.ClientConn
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetConn(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
ctx context.Context
serviceName string
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
want *grpc.ClientConn
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetSelfConnTarget(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
want string
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if got := cli.GetSelfConnTarget(); got != tt.want {
t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_AddOption(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.AddOption(tt.args.opts...)
})
}
}
func TestK8sDR_CloseConn(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
conn *grpc.ClientConn
}
tests := []struct {
name string
fields fields
args args
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.CloseConn(tt.args.conn)
})
}
}
func TestK8sDR_GetClientLocalConns(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
want map[string][]*grpc.ClientConn
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_Close(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.Close()
})
} }
} }
@@ -0,0 +1,90 @@
package kubernetes
import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/OpenIMSDK/tools/discoveryregistry"
)
// K8sDR represents the Kubernetes service discovery and registration client.
type K8sDR struct {
options []grpc.DialOption
rpcRegisterAddr string
}
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
return &K8sDR{}, nil
}
// Register registers a service with Kubernetes.
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
cli.rpcRegisterAddr = serviceName
return nil
}
// UnRegister removes a service registration from Kubernetes.
func (cli *K8sDR) UnRegister() error {
return nil
}
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
// RegisterConf2Registry registers a configuration to the registry.
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
// GetConfFromRegistry retrieves a configuration from the registry.
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
// GetConns returns a list of gRPC client connections for a given service.
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
}
// GetConn returns a single gRPC client connection for a given service.
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
}
// GetSelfConnTarget returns the connection target of the client itself.
func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr
}
// AddOption adds gRPC dial options to the client.
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...)
}
// CloseConn closes a given gRPC client connection.
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// do not use this method for call rpc
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil
}
// Close closes the K8sDR client.
func (cli *K8sDR) Close() {
// Close any open resources here (if applicable)
return
}
@@ -0,0 +1,46 @@
package zookeeper
import (
"os"
"strings"
"time"
"github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
schema := getEnv("ZOOKEEPER_SCHEMA", config.Config.Zookeeper.Schema)
zkAddr := getZkAddrFromEnv(config.Config.Zookeeper.ZkAddr)
username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username)
password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password)
return openkeeper.NewClient(
zkAddr,
schema,
openkeeper.WithFreq(time.Hour),
openkeeper.WithUserNameAndPassword(username, password),
openkeeper.WithRoundRobin(),
openkeeper.WithTimeout(10),
openkeeper.WithLogger(log.NewZkLogger()),
)
}
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
func getEnv(key, fallback string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return fallback
}
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
func getZkAddrFromEnv(fallback []string) []string {
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
return strings.Split(value, ",")
}
return fallback
}
+86 -65
View File
@@ -1,17 +1,3 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka package kafka
import ( import (
@@ -21,19 +7,17 @@ import (
"strings" "strings"
"time" "time"
"github.com/IBM/sarama"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
log "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/IBM/sarama"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
const ( const (
maxRetry = 10 // number of retries maxRetry = 10 // Maximum number of retries for producer creation
) )
var errEmptyMsg = errors.New("binary msg is empty") var errEmptyMsg = errors.New("binary msg is empty")
@@ -45,62 +29,85 @@ type Producer struct {
producer sarama.SyncProducer producer sarama.SyncProducer
} }
// NewKafkaProducer Initialize kafka producer. // NewKafkaProducer initializes a new Kafka producer.
func NewKafkaProducer(addr []string, topic string) *Producer { func NewKafkaProducer(addr []string, topic string) *Producer {
p := Producer{} p := Producer{
p.config = sarama.NewConfig() // Instantiate a sarama Config addr: addr,
p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully topic: topic,
config: sarama.NewConfig(),
}
// Set producer return flags
p.config.Producer.Return.Successes = true
p.config.Producer.Return.Errors = true p.config.Producer.Return.Errors = true
p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
var producerAck = sarama.WaitForAll // default: WaitForAll // Set partitioner strategy
switch strings.ToLower(config.Config.Kafka.ProducerAck) { p.config.Producer.Partitioner = sarama.NewHashPartitioner
case "no_response":
producerAck = sarama.NoResponse
case "wait_for_local":
producerAck = sarama.WaitForLocal
case "wait_for_all":
producerAck = sarama.WaitForAll
}
p.config.Producer.RequiredAcks = producerAck
var compress = sarama.CompressionNone // default: no compress // Configure producer acknowledgement level
_ = compress.UnmarshalText(bytes.ToLower([]byte(config.Config.Kafka.CompressType))) configureProducerAck(&p, config.Config.Kafka.ProducerAck)
p.config.Producer.Compression = compress
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { // Configure message compression
configureCompression(&p, config.Config.Kafka.CompressType)
// Get Kafka configuration from environment variables or fallback to config file
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config
// Configure SASL authentication if credentials are provided
if kafkaUsername != "" && kafkaPassword != "" {
p.config.Net.SASL.Enable = true p.config.Net.SASL.Enable = true
p.config.Net.SASL.User = config.Config.Kafka.Username p.config.Net.SASL.User = kafkaUsername
p.config.Net.SASL.Password = config.Config.Kafka.Password p.config.Net.SASL.Password = kafkaPassword
} }
p.addr = addr
p.topic = topic // Set the Kafka address
p.addr = []string{kafkaAddr}
// Set up TLS configuration (if required)
SetupTLSConfig(p.config) SetupTLSConfig(p.config)
var producer sarama.SyncProducer
// Create the producer with retries
var err error var err error
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {
producer, err = sarama.NewSyncProducer(p.addr, p.config) // Initialize the client p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
if err == nil { if err == nil {
p.producer = producer
return &p return &p
} }
//TODO If the password is wrong, exit directly time.Sleep(1 * time.Second) // Wait before retrying
//if packetErr, ok := err.(*sarama.PacketEncodingError); ok {
//if _, ok := packetErr.Err.(sarama.AuthenticationError); ok {
// fmt.Println("Kafka password is wrong.")
//}
//} else {
// fmt.Printf("Failed to create Kafka producer: %v\n", err)
//}
time.Sleep(time.Duration(1) * time.Second)
} }
// Panic if unable to create producer after retries
if err != nil { if err != nil {
panic(err.Error()) panic("Failed to create Kafka producer: " + err.Error())
} }
p.producer = producer
return &p return &p
} }
// configureProducerAck configures the producer's acknowledgement level.
func configureProducerAck(p *Producer, ackConfig string) {
switch strings.ToLower(ackConfig) {
case "no_response":
p.config.Producer.RequiredAcks = sarama.NoResponse
case "wait_for_local":
p.config.Producer.RequiredAcks = sarama.WaitForLocal
case "wait_for_all":
p.config.Producer.RequiredAcks = sarama.WaitForAll
default:
p.config.Producer.RequiredAcks = sarama.WaitForAll
}
}
// configureCompression configures the message compression type for the producer.
func configureCompression(p *Producer, compressType string) {
var compress sarama.CompressionCodec = sarama.CompressionNone
compress.UnmarshalText(bytes.ToLower([]byte(compressType)))
p.config.Producer.Compression = compress
}
// GetMQHeaderWithContext extracts message queue headers from the context.
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
if err != nil { if err != nil {
@@ -111,22 +118,23 @@ func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, {Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
{Key: []byte(constant.ConnID), Value: []byte(connID)}, {Key: []byte(constant.ConnID), Value: []byte(connID)},
}, err }, nil
} }
// GetContextWithMQHeader creates a context from message queue headers.
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
var values []string var values []string
for _, recordHeader := range header { for _, recordHeader := range header {
values = append(values, string(recordHeader.Value)) values = append(values, string(recordHeader.Value))
} }
return mcontext.WithMustInfoCtx(values) // TODO return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
} }
// SendMessage sends a message to the Kafka topic configured in the Producer.
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) { func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key) log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key)
kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic // Marshal the protobuf message
kMsg.Key = sarama.StringEncoder(key)
bMsg, err := proto.Marshal(msg) bMsg, err := proto.Marshal(msg)
if err != nil { if err != nil {
return 0, 0, utils.Wrap(err, "kafka proto Marshal err") return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
@@ -134,20 +142,33 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
if len(bMsg) == 0 { if len(bMsg) == 0 {
return 0, 0, utils.Wrap(errEmptyMsg, "") return 0, 0, utils.Wrap(errEmptyMsg, "")
} }
kMsg.Value = sarama.ByteEncoder(bMsg)
// Prepare Kafka message
kMsg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(bMsg),
}
// Validate message key and value
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
return 0, 0, utils.Wrap(errEmptyMsg, "") return 0, 0, utils.Wrap(errEmptyMsg, "")
} }
kMsg.Metadata = ctx
// Attach context metadata as headers
header, err := GetMQHeaderWithContext(ctx) header, err := GetMQHeaderWithContext(ctx)
if err != nil { if err != nil {
return 0, 0, utils.Wrap(err, "") return 0, 0, utils.Wrap(err, "")
} }
kMsg.Headers = header kMsg.Headers = header
// Send the message
partition, offset, err := p.producer.SendMessage(kMsg) partition, offset, err := p.producer.SendMessage(kMsg)
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())
if err != nil { if err != nil {
log.ZWarn(ctx, "p.producer.SendMessage error", err) log.ZWarn(ctx, "p.producer.SendMessage error", err)
return 0, 0, utils.Wrap(err, "")
} }
return partition, offset, utils.Wrap(err, "")
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())
return partition, offset, nil
} }
+11
View File
@@ -15,6 +15,8 @@
package kafka package kafka
import ( import (
"os"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -33,3 +35,12 @@ func SetupTLSConfig(cfg *sarama.Config) {
) )
} }
} }
// getEnvOrConfig returns the value of the environment variable if it exists,
// otherwise, it returns the value from the configuration file.
func getEnvOrConfig(envName string, configValue string) string {
if value, exists := os.LookupEnv(envName); exists {
return value
}
return configValue
}
+2
View File
@@ -28,6 +28,8 @@ openim::log::info "\n# Use Docker to start all openim service"
trap 'openim::util::onCtrlC' INT trap 'openim::util::onCtrlC' INT
"${OPENIM_ROOT}"/scripts/init-config.sh --skip
"${OPENIM_ROOT}"/scripts/start-all.sh "${OPENIM_ROOT}"/scripts/start-all.sh
sleep 5 sleep 5
+102 -69
View File
@@ -1,17 +1,4 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Copyright © 2023 OpenIM. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script automatically initializes various configuration files and can generate example files. # This script automatically initializes various configuration files and can generate example files.
@@ -48,7 +35,8 @@ declare -A EXAMPLES=(
FORCE_OVERWRITE=false FORCE_OVERWRITE=false
SKIP_EXISTING=false SKIP_EXISTING=false
GENERATE_EXAMPLES=false GENERATE_EXAMPLES=false
CLEAN_ENV_EXAMPLES=false CLEAN_CONFIG=false
CLEAN_EXAMPLES=false
# Function to display help information # Function to display help information
show_help() { show_help() {
@@ -58,59 +46,45 @@ show_help() {
echo " --force Overwrite existing files without prompt" echo " --force Overwrite existing files without prompt"
echo " --skip Skip generation if file exists" echo " --skip Skip generation if file exists"
echo " --examples Generate example files" echo " --examples Generate example files"
echo " --clean-env-examples Generate example files in a clean environment" echo " --clean-config Clean all configuration files"
echo " --clean-examples Clean all example files"
} }
# Function to generate configuration files # Function to generate configuration files
generate_config_files() { generate_config_files() {
# Loop through each template in TEMPLATES
for template in "${!TEMPLATES[@]}"; do for template in "${!TEMPLATES[@]}"; do
# Read the corresponding output files for the template local output_file="${TEMPLATES[$template]}"
IFS=';' read -ra OUTPUT_FILES <<< "${TEMPLATES[$template]}" if [[ -f "${output_file}" ]]; then
for output_file in "${OUTPUT_FILES[@]}"; do if [[ "${FORCE_OVERWRITE}" == true ]]; then
# Check if the output file already exists openim::log::info "Force overwriting ${output_file}."
if [[ -f "${output_file}" ]]; then elif [[ "${SKIP_EXISTING}" == true ]]; then
# Handle existing file based on command-line options openim::log::info "Skipping generation of ${output_file} as it already exists."
if [[ "${FORCE_OVERWRITE}" == true ]]; then continue
openim::log::info "Force overwriting ${output_file}." else
elif [[ "${SKIP_EXISTING}" == true ]]; then echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
openim::log::info "Skipping generation of ${output_file} as it already exists." read -r -n 1 REPLY
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
openim::log::info "Skipping generation of ${output_file}."
continue continue
else
# Ask user for confirmation to overwrite
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
read -r -n 1 REPLY
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
openim::log::info "Skipping generation of ${output_file}."
continue
fi
fi fi
fi fi
else
# Process the template file to generate the output file if [[ "${SKIP_EXISTING}" == true ]]; then
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..." openim::log::info "Generating ${output_file} as it does not exist."
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
openim::log::error "genconfig.sh script not found"
exit 1
fi fi
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
openim::log::error "Error processing template file ${template}"
exit 1
}
sleep 0.5
done
done
}
# Function to generate example files
generate_example_files() {
for template in "${!EXAMPLES[@]}"; do
local example_file="${EXAMPLES[$template]}"
if [[ ! -f "${example_file}" ]]; then
openim::log::info "Generating example file: ${example_file} from ${template}..."
cp "${template}" "${example_file}"
fi fi
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
openim::log::error "genconfig.sh script not found"
exit 1
fi
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
openim::log::error "Error processing template file ${template}"
exit 1
}
sleep 0.5
done done
} }
@@ -120,7 +94,8 @@ declare -A env_vars=(
["LOG_STORAGE_LOCATION"]="../logs/" ["LOG_STORAGE_LOCATION"]="../logs/"
) )
generate_clean_environment_examples() { # Function to generate example files
generate_example_files() {
env_cmd="env -i" env_cmd="env -i"
for var in "${!env_vars[@]}"; do for var in "${!env_vars[@]}"; do
env_cmd+=" $var='${env_vars[$var]}'" env_cmd+=" $var='${env_vars[$var]}'"
@@ -128,12 +103,56 @@ generate_clean_environment_examples() {
for template in "${!EXAMPLES[@]}"; do for template in "${!EXAMPLES[@]}"; do
local example_file="${EXAMPLES[$template]}" local example_file="${EXAMPLES[$template]}"
openim::log::info "Generating example file: ${example_file} from ${template}..." if [[ -f "${example_file}" ]]; then
if [[ "${FORCE_OVERWRITE}" == true ]]; then
openim::log::info "Force overwriting example file: ${example_file}."
elif [[ "${SKIP_EXISTING}" == true ]]; then
openim::log::info "Skipping generation of example file: ${example_file} as it already exists."
continue
else
echo -n "Example file ${example_file} already exists. Overwrite? (Y/N): "
read -r -n 1 REPLY
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
openim::log::info "Skipping generation of example file: ${example_file}."
continue
fi
fi
elif [[ "${SKIP_EXISTING}" == true ]]; then
openim::log::info "Generating example file: ${example_file} as it does not exist."
fi
openim::log::info "⌚ Working with template file: ${template} to generate example file: ${example_file}..."
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
openim::log::error "genconfig.sh script not found"
exit 1
fi
eval "$env_cmd ${OPENIM_ROOT}/scripts/genconfig.sh '${ENV_FILE}' '${template}' > '${example_file}'" || { eval "$env_cmd ${OPENIM_ROOT}/scripts/genconfig.sh '${ENV_FILE}' '${template}' > '${example_file}'" || {
openim::log::error "Error processing template file ${template}" openim::log::error "Error processing template file ${template}"
exit 1 exit 1
} }
sleep 0.5
done
}
# Function to clean configuration files
clean_config_files() {
for output_file in "${TEMPLATES[@]}"; do
if [[ -f "${output_file}" ]]; then
rm -f "${output_file}"
openim::log::info "Removed configuration file: ${output_file}"
fi
done
}
# Function to clean example files
clean_example_files() {
for example_file in "${EXAMPLES[@]}"; do
if [[ -f "${example_file}" ]]; then
rm -f "${example_file}"
openim::log::info "Removed example file: ${example_file}"
fi
done done
} }
@@ -155,8 +174,12 @@ while [[ $# -gt 0 ]]; do
GENERATE_EXAMPLES=true GENERATE_EXAMPLES=true
shift shift
;; ;;
--clean-env-examples) --clean-config)
CLEAN_ENV_EXAMPLES=true CLEAN_CONFIG=true
shift
;;
--clean-examples)
CLEAN_EXAMPLES=true
shift shift
;; ;;
*) *)
@@ -167,19 +190,29 @@ while [[ $# -gt 0 ]]; do
esac esac
done done
# Clean configuration files if --clean-config option is provided
if [[ "${CLEAN_CONFIG}" == true ]]; then
clean_config_files
fi
# Clean example files if --clean-examples option is provided
if [[ "${CLEAN_EXAMPLES}" == true ]]; then
clean_example_files
fi
# Generate configuration files if requested # Generate configuration files if requested
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]]; then if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]] && [[ "${CLEAN_CONFIG}" == false ]]; then
generate_config_files
fi
# Generate configuration files if requested
if [[ "${SKIP_EXISTING}" == true ]]; then
generate_config_files generate_config_files
fi fi
# Generate example files if --examples option is provided # Generate example files if --examples option is provided
if [[ "${GENERATE_EXAMPLES}" == true ]]; then if [[ "${GENERATE_EXAMPLES}" == true ]] && [[ "${CLEAN_EXAMPLES}" == false ]]; then
generate_example_files generate_example_files
fi fi
# Generate example files in a clean environment if --clean-env-examples option is provided openim::log::success "Configuration and example files operation complete!"
if [[ "${CLEAN_ENV_EXAMPLES}" == true ]]; then
generate_clean_environment_examples
fi
openim::log::success "Configuration and example files generation complete!"