mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 14:29:19 +08:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a42a44e0a3 | |||
| d8838ee6b8 | |||
| f480f52e2d | |||
| a23cbf13cf | |||
| 498e26a942 | |||
| e1990c179e |
@@ -1,139 +0,0 @@
|
||||
# Copyright © 2023 OpenIM open source community. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
name: Build OpenIM Web Docker image
|
||||
|
||||
on:
|
||||
# schedule:
|
||||
# - cron: '30 3 * * *'
|
||||
push:
|
||||
branches:
|
||||
# - main
|
||||
- release-*
|
||||
tags:
|
||||
- v*
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
# Common versions
|
||||
GO_VERSION: "1.20"
|
||||
|
||||
jobs:
|
||||
build-openim-web-dockerhub:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# docker.io/openim/openim-web:latest
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta
|
||||
uses: docker/metadata-action@v5.0.0
|
||||
with:
|
||||
images: openim/openim-web
|
||||
# generate Docker tags based on the following events/attributes
|
||||
tags: |
|
||||
type=schedule
|
||||
type=ref,event=branch
|
||||
type=ref,event=pr
|
||||
type=semver,pattern={{version}}
|
||||
type=semver,pattern={{major}}.{{minor}}
|
||||
type=semver,pattern={{major}}
|
||||
type=sha
|
||||
|
||||
- name: Log in to Docker Hub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./build/images/openim-tools/openim-web/Dockerfile
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
|
||||
build-openim-web-aliyun:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
# registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-web:latest
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta2
|
||||
uses: docker/metadata-action@v5.0.0
|
||||
with:
|
||||
images: registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-web
|
||||
|
||||
- name: Log in to AliYun Docker Hub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: registry.cn-hangzhou.aliyuncs.com
|
||||
username: ${{ secrets.ALIREGISTRY_USERNAME }}
|
||||
password: ${{ secrets.ALIREGISTRY_TOKEN }}
|
||||
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./build/images/openim-tools/openim-web/Dockerfile
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.meta2.outputs.tags }}
|
||||
labels: ${{ steps.meta2.outputs.labels }}
|
||||
|
||||
build-openim-web-ghcr:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
# ghcr.io/openimsdk/openim-web:latest
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta2
|
||||
uses: docker/metadata-action@v5.0.0
|
||||
with:
|
||||
images: ghcr.io/openimsdk/openim-web
|
||||
|
||||
- name: Log in to GitHub Container Registry
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./build/images/openim-tools/openim-web/Dockerfile
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.meta2.outputs.tags }}
|
||||
labels: ${{ steps.meta2.outputs.labels }}
|
||||
@@ -37,13 +37,20 @@ zookeeper:
|
||||
|
||||
###################### Mongo ######################
|
||||
# MongoDB configuration
|
||||
# If uri is not empty, it will be used directly
|
||||
#
|
||||
# MongoDB address for standalone setup, Mongos address for sharded cluster setup
|
||||
# Default MongoDB database name
|
||||
# Maximum connection pool size
|
||||
|
||||
# If uri is not empty, it will be used directly for the MongoDB connection.
|
||||
# This is a complete MongoDB URI string.
|
||||
# Example: mongodb://user:password@host1:port1,host2:port2/dbname?options
|
||||
mongo:
|
||||
uri: ${MONGO_URI}
|
||||
|
||||
# List of MongoDB server addresses.
|
||||
# Used for constructing the MongoDB URI if 'uri' above is empty.
|
||||
# For a standalone setup, specify the address of the single server.
|
||||
# For a sharded cluster, specify the addresses of the Mongos servers.
|
||||
# Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ]
|
||||
# Default MongoDB database name
|
||||
# Maximum connection pool size
|
||||
address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ]
|
||||
database: ${MONGO_DATABASE}
|
||||
username: ${MONGO_USERNAME}
|
||||
|
||||
@@ -34,7 +34,6 @@ services:
|
||||
ipv4_address: ${MONGO_NETWORK_ADDRESS:-172.28.0.2}
|
||||
|
||||
redis:
|
||||
# image: redis:7.0.0
|
||||
image: redis:${REDIS_IMAGE_VERSION:-7.0.0}
|
||||
container_name: redis
|
||||
ports:
|
||||
@@ -53,7 +52,6 @@ services:
|
||||
ipv4_address: ${REDIS_NETWORK_ADDRESS:-172.28.0.3}
|
||||
|
||||
zookeeper:
|
||||
# image: bitnami/zookeeper:3.8
|
||||
image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION:-3.8}
|
||||
container_name: zookeeper
|
||||
ports:
|
||||
@@ -69,7 +67,6 @@ services:
|
||||
ipv4_address: ${ZOOKEEPER_NETWORK_ADDRESS:-172.28.0.5}
|
||||
|
||||
kafka:
|
||||
# image: 'bitnami/kafka:3.5.1'
|
||||
image: 'bitnami/kafka:${KAFKA_IMAGE_VERSION:-3.5.1}'
|
||||
container_name: kafka
|
||||
restart: always
|
||||
@@ -95,7 +92,6 @@ services:
|
||||
ipv4_address: ${KAFKA_NETWORK_ADDRESS:-172.28.0.4}
|
||||
|
||||
minio:
|
||||
# image: minio/minio
|
||||
image: minio/minio:${MINIO_IMAGE_VERSION:-latest}
|
||||
ports:
|
||||
- "${MINIO_PORT:-10005}:9000"
|
||||
@@ -114,7 +110,6 @@ services:
|
||||
ipv4_address: ${MINIO_NETWORK_ADDRESS:-172.28.0.6}
|
||||
|
||||
openim-web:
|
||||
# image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:latest
|
||||
image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:${OPENIM_WEB_IMAGE_VERSION:-latest}
|
||||
container_name: openim-web
|
||||
environment:
|
||||
|
||||
Vendored
+19
-1
@@ -18,6 +18,8 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
@@ -43,6 +45,9 @@ func NewRedis() (redis.UniversalClient, error) {
|
||||
return redisClient, nil
|
||||
}
|
||||
|
||||
// Read configuration from environment variables
|
||||
overrideConfigFromEnv()
|
||||
|
||||
if len(config.Config.Redis.Address) == 0 {
|
||||
return nil, errors.New("redis address is empty")
|
||||
}
|
||||
@@ -60,7 +65,7 @@ func NewRedis() (redis.UniversalClient, error) {
|
||||
rdb = redis.NewClient(&redis.Options{
|
||||
Addr: config.Config.Redis.Address[0],
|
||||
Username: config.Config.Redis.Username,
|
||||
Password: config.Config.Redis.Password, // no password set
|
||||
Password: config.Config.Redis.Password,
|
||||
DB: 0, // use default DB
|
||||
PoolSize: 100, // connection pool size
|
||||
MaxRetries: maxRetry,
|
||||
@@ -78,3 +83,16 @@ func NewRedis() (redis.UniversalClient, error) {
|
||||
redisClient = rdb
|
||||
return rdb, err
|
||||
}
|
||||
|
||||
// overrideConfigFromEnv overrides configuration fields with environment variables if present.
|
||||
func overrideConfigFromEnv() {
|
||||
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
|
||||
config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated
|
||||
}
|
||||
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
|
||||
config.Config.Redis.Username = envUser
|
||||
}
|
||||
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
|
||||
config.Config.Redis.Password = envPass
|
||||
}
|
||||
}
|
||||
@@ -1,27 +1,13 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
|
||||
@@ -34,7 +20,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetry = 10 // number of retries
|
||||
maxRetry = 10 // number of retries
|
||||
mongoConnTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
type Mongo struct {
|
||||
@@ -44,90 +31,123 @@ type Mongo struct {
|
||||
// NewMongo Initialize MongoDB connection.
|
||||
func NewMongo() (*Mongo, error) {
|
||||
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
|
||||
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
|
||||
if config.Config.Mongo.Uri != "" {
|
||||
uri = config.Config.Mongo.Uri
|
||||
} else {
|
||||
mongodbHosts := ""
|
||||
for i, v := range config.Config.Mongo.Address {
|
||||
if i == len(config.Config.Mongo.Address)-1 {
|
||||
mongodbHosts += v
|
||||
} else {
|
||||
mongodbHosts += v + ","
|
||||
}
|
||||
}
|
||||
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
|
||||
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
|
||||
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
|
||||
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
|
||||
} else {
|
||||
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
|
||||
mongodbHosts, config.Config.Mongo.Database,
|
||||
config.Config.Mongo.MaxPoolSize)
|
||||
}
|
||||
}
|
||||
uri := buildMongoURI()
|
||||
fmt.Println("mongo:", uri)
|
||||
|
||||
var mongoClient *mongo.Client
|
||||
var err error = nil
|
||||
var err error
|
||||
|
||||
// Retry connecting to MongoDB
|
||||
for i := 0; i <= maxRetry; i++ {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
|
||||
defer cancel()
|
||||
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
|
||||
if err == nil {
|
||||
return &Mongo{db: mongoClient}, nil
|
||||
}
|
||||
if cmdErr, ok := err.(mongo.CommandError); ok {
|
||||
if cmdErr.Code == 13 || cmdErr.Code == 18 {
|
||||
return nil, err
|
||||
} else {
|
||||
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
|
||||
}
|
||||
if shouldRetry(err) {
|
||||
fmt.Printf("Failed to connect to MongoDB, retrying: %s\n", err)
|
||||
time.Sleep(time.Second) // exponential backoff could be implemented here
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func buildMongoURI() string {
|
||||
uri := os.Getenv("MONGO_URI")
|
||||
if uri != "" {
|
||||
return uri
|
||||
}
|
||||
|
||||
username := os.Getenv("MONGO_USERNAME")
|
||||
password := os.Getenv("MONGO_PASSWORD")
|
||||
address := os.Getenv("MONGO_ADDRESS")
|
||||
port := os.Getenv("MONGO_PORT")
|
||||
database := os.Getenv("MONGO_DATABASE")
|
||||
maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE")
|
||||
|
||||
if username == "" {
|
||||
username = config.Config.Mongo.Username
|
||||
}
|
||||
if password == "" {
|
||||
password = config.Config.Mongo.Password
|
||||
}
|
||||
if address == "" {
|
||||
address = strings.Join(config.Config.Mongo.Address, ",")
|
||||
} else if port != "" {
|
||||
address = fmt.Sprintf("%s:%s", address, port)
|
||||
}
|
||||
if database == "" {
|
||||
database = config.Config.Mongo.Database
|
||||
}
|
||||
if maxPoolSize == "" {
|
||||
maxPoolSize = fmt.Sprint(config.Config.Mongo.MaxPoolSize)
|
||||
}
|
||||
|
||||
uriFormat := "mongodb://%s/%s?maxPoolSize=%s&authSource=admin"
|
||||
if username != "" && password != "" {
|
||||
uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s&authSource=admin"
|
||||
return fmt.Sprintf(uriFormat, username, password, address, database, maxPoolSize)
|
||||
}
|
||||
return fmt.Sprintf(uriFormat, address, database, maxPoolSize)
|
||||
}
|
||||
|
||||
func shouldRetry(err error) bool {
|
||||
if cmdErr, ok := err.(mongo.CommandError); ok {
|
||||
return cmdErr.Code != 13 && cmdErr.Code != 18
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// GetClient returns the MongoDB client.
|
||||
func (m *Mongo) GetClient() *mongo.Client {
|
||||
return m.db
|
||||
}
|
||||
|
||||
// GetDatabase returns the specific database from MongoDB.
|
||||
func (m *Mongo) GetDatabase() *mongo.Database {
|
||||
return m.db.Database(config.Config.Mongo.Database)
|
||||
}
|
||||
|
||||
// CreateMsgIndex creates an index for messages in MongoDB.
|
||||
func (m *Mongo) CreateMsgIndex() error {
|
||||
return m.createMongoIndex(unrelation.Msg, true, "doc_id")
|
||||
}
|
||||
|
||||
// createMongoIndex creates an index in a MongoDB collection.
|
||||
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
|
||||
db := m.db.Database(config.Config.Mongo.Database).Collection(collection)
|
||||
db := m.GetDatabase().Collection(collection)
|
||||
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
|
||||
indexView := db.Indexes()
|
||||
keysDoc := bson.D{}
|
||||
// create composite indexes
|
||||
for _, key := range keys {
|
||||
if strings.HasPrefix(key, "-") {
|
||||
keysDoc = append(keysDoc, bson.E{Key: strings.TrimLeft(key, "-"), Value: -1})
|
||||
// keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1))
|
||||
} else {
|
||||
keysDoc = append(keysDoc, bson.E{Key: key, Value: 1})
|
||||
// keysDoc = keysDoc.Append(key, bsonx.Int32(1))
|
||||
}
|
||||
}
|
||||
// create index
|
||||
|
||||
keysDoc := buildIndexKeys(keys)
|
||||
|
||||
index := mongo.IndexModel{
|
||||
Keys: keysDoc,
|
||||
}
|
||||
if isUnique {
|
||||
index.Options = options.Index().SetUnique(true)
|
||||
}
|
||||
result, err := indexView.CreateOne(
|
||||
context.Background(),
|
||||
index,
|
||||
opts,
|
||||
)
|
||||
|
||||
_, err := indexView.CreateOne(context.Background(), index, opts)
|
||||
if err != nil {
|
||||
return utils.Wrap(err, result)
|
||||
return utils.Wrap(err, "CreateIndex")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildIndexKeys builds the BSON document for index keys.
|
||||
func buildIndexKeys(keys []string) bson.D {
|
||||
keysDoc := bson.D{}
|
||||
for _, key := range keys {
|
||||
direction := 1 // default direction is ascending
|
||||
if strings.HasPrefix(key, "-") {
|
||||
direction = -1 // descending order for prefixed with "-"
|
||||
key = strings.TrimLeft(key, "-")
|
||||
}
|
||||
keysDoc = append(keysDoc, bson.E{Key: key, Value: direction})
|
||||
}
|
||||
return keysDoc
|
||||
}
|
||||
|
||||
@@ -1,93 +1,22 @@
|
||||
package discoveryregister
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
)
|
||||
|
||||
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
|
||||
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||
var client discoveryregistry.SvcDiscoveryRegistry
|
||||
var err error
|
||||
switch envType {
|
||||
case "zookeeper":
|
||||
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
|
||||
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
|
||||
config.Config.Zookeeper.Username,
|
||||
config.Config.Zookeeper.Password,
|
||||
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))
|
||||
return zookeeper.NewZookeeperDiscoveryRegister()
|
||||
case "k8s":
|
||||
client, err = NewK8sDiscoveryRegister()
|
||||
return kubernetes.NewK8sDiscoveryRegister()
|
||||
default:
|
||||
client = nil
|
||||
err = errors.New("envType not correct")
|
||||
return nil, errors.New("envType not correct")
|
||||
}
|
||||
return client, err
|
||||
}
|
||||
|
||||
type K8sDR struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
|
||||
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||
return &K8sDR{}, nil
|
||||
}
|
||||
|
||||
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
||||
cli.rpcRegisterAddr = serviceName
|
||||
return nil
|
||||
}
|
||||
func (cli *K8sDR) UnRegister() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
||||
|
||||
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||
return []*grpc.ClientConn{conn}, err
|
||||
}
|
||||
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||
|
||||
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||
}
|
||||
func (cli *K8sDR) GetSelfConnTarget() string {
|
||||
|
||||
return cli.rpcRegisterAddr
|
||||
}
|
||||
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
|
||||
cli.options = append(cli.options, opts...)
|
||||
}
|
||||
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// do not use this method for call rpc
|
||||
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
|
||||
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
|
||||
return nil
|
||||
}
|
||||
func (cli *K8sDR) Close() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,407 +1,45 @@
|
||||
package discoveryregister
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"google.golang.org/grpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func setupTestEnvironment() {
|
||||
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
|
||||
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
|
||||
os.Setenv("ZOOKEEPER_USERNAME", "")
|
||||
os.Setenv("ZOOKEEPER_PASSWORD", "")
|
||||
}
|
||||
|
||||
func TestNewDiscoveryRegister(t *testing.T) {
|
||||
type args struct {
|
||||
envType string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want discoveryregistry.SvcDiscoveryRegistry
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := NewDiscoveryRegister(tt.args.envType)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
setupTestEnvironment()
|
||||
|
||||
func TestNewK8sDiscoveryRegister(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
want discoveryregistry.SvcDiscoveryRegistry
|
||||
wantErr bool
|
||||
envType string
|
||||
expectedError bool
|
||||
expectedResult bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
{"zookeeper", false, true},
|
||||
{"k8s", false, true}, // 假设 k8s 配置也已正确设置
|
||||
{"invalid", true, false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := NewK8sDiscoveryRegister()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_Register(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
serviceName string
|
||||
host string
|
||||
port int
|
||||
opts []grpc.DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
for _, test := range tests {
|
||||
client, err := NewDiscoveryRegister(test.envType)
|
||||
|
||||
func TestK8sDR_UnRegister(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
if test.expectedResult {
|
||||
assert.Implements(t, (*discoveryregistry.SvcDiscoveryRegistry)(nil), client)
|
||||
} else {
|
||||
assert.Nil(t, client)
|
||||
}
|
||||
if err := cli.UnRegister(); (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_CreateRpcRootNodes(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
serviceNames []string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_RegisterConf2Registry(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
key string
|
||||
conf []byte
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetConfFromRegistry(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
key string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want []byte
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
got, err := cli.GetConfFromRegistry(tt.args.key)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetConns(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
serviceName string
|
||||
opts []grpc.DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want []*grpc.ClientConn
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetConn(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
serviceName string
|
||||
opts []grpc.DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want *grpc.ClientConn
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetSelfConnTarget(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
want string
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if got := cli.GetSelfConnTarget(); got != tt.want {
|
||||
t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_AddOption(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
opts []grpc.DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
cli.AddOption(tt.args.opts...)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_CloseConn(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
conn *grpc.ClientConn
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
cli.CloseConn(tt.args.conn)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetClientLocalConns(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
want map[string][]*grpc.ClientConn
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_Close(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
cli.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
)
|
||||
|
||||
// K8sDR represents the Kubernetes service discovery and registration client.
|
||||
type K8sDR struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
|
||||
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
|
||||
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||
|
||||
return &K8sDR{}, nil
|
||||
}
|
||||
|
||||
// Register registers a service with Kubernetes.
|
||||
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
||||
cli.rpcRegisterAddr = serviceName
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnRegister removes a service registration from Kubernetes.
|
||||
func (cli *K8sDR) UnRegister() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
|
||||
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterConf2Registry registers a configuration to the registry.
|
||||
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConfFromRegistry retrieves a configuration from the registry.
|
||||
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetConns returns a list of gRPC client connections for a given service.
|
||||
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
||||
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||
return []*grpc.ClientConn{conn}, err
|
||||
}
|
||||
|
||||
// GetConn returns a single gRPC client connection for a given service.
|
||||
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||
}
|
||||
|
||||
// GetSelfConnTarget returns the connection target of the client itself.
|
||||
func (cli *K8sDR) GetSelfConnTarget() string {
|
||||
return cli.rpcRegisterAddr
|
||||
}
|
||||
|
||||
// AddOption adds gRPC dial options to the client.
|
||||
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
|
||||
cli.options = append(cli.options, opts...)
|
||||
}
|
||||
|
||||
// CloseConn closes a given gRPC client connection.
|
||||
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// do not use this method for call rpc
|
||||
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
|
||||
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the K8sDR client.
|
||||
func (cli *K8sDR) Close() {
|
||||
|
||||
// Close any open resources here (if applicable)
|
||||
return
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package zookeeper
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
)
|
||||
|
||||
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
|
||||
func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||
schema := getEnv("ZOOKEEPER_SCHEMA", config.Config.Zookeeper.Schema)
|
||||
zkAddr := getZkAddrFromEnv(config.Config.Zookeeper.ZkAddr)
|
||||
username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username)
|
||||
password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password)
|
||||
|
||||
return openkeeper.NewClient(
|
||||
zkAddr,
|
||||
schema,
|
||||
openkeeper.WithFreq(time.Hour),
|
||||
openkeeper.WithUserNameAndPassword(username, password),
|
||||
openkeeper.WithRoundRobin(),
|
||||
openkeeper.WithTimeout(10),
|
||||
openkeeper.WithLogger(log.NewZkLogger()),
|
||||
)
|
||||
}
|
||||
|
||||
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||
func getEnv(key, fallback string) string {
|
||||
if value, exists := os.LookupEnv(key); exists {
|
||||
return value
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||
func getZkAddrFromEnv(fallback []string) []string {
|
||||
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
|
||||
return strings.Split(value, ",")
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
@@ -1,17 +1,3 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package kafka
|
||||
|
||||
import (
|
||||
@@ -21,19 +7,17 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
log "github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetry = 10 // number of retries
|
||||
maxRetry = 10 // Maximum number of retries for producer creation
|
||||
)
|
||||
|
||||
var errEmptyMsg = errors.New("binary msg is empty")
|
||||
@@ -45,62 +29,85 @@ type Producer struct {
|
||||
producer sarama.SyncProducer
|
||||
}
|
||||
|
||||
// NewKafkaProducer Initialize kafka producer.
|
||||
// NewKafkaProducer initializes a new Kafka producer.
|
||||
func NewKafkaProducer(addr []string, topic string) *Producer {
|
||||
p := Producer{}
|
||||
p.config = sarama.NewConfig() // Instantiate a sarama Config
|
||||
p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully
|
||||
p := Producer{
|
||||
addr: addr,
|
||||
topic: topic,
|
||||
config: sarama.NewConfig(),
|
||||
}
|
||||
|
||||
// Set producer return flags
|
||||
p.config.Producer.Return.Successes = true
|
||||
p.config.Producer.Return.Errors = true
|
||||
p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
|
||||
|
||||
var producerAck = sarama.WaitForAll // default: WaitForAll
|
||||
switch strings.ToLower(config.Config.Kafka.ProducerAck) {
|
||||
case "no_response":
|
||||
producerAck = sarama.NoResponse
|
||||
case "wait_for_local":
|
||||
producerAck = sarama.WaitForLocal
|
||||
case "wait_for_all":
|
||||
producerAck = sarama.WaitForAll
|
||||
}
|
||||
p.config.Producer.RequiredAcks = producerAck
|
||||
// Set partitioner strategy
|
||||
p.config.Producer.Partitioner = sarama.NewHashPartitioner
|
||||
|
||||
var compress = sarama.CompressionNone // default: no compress
|
||||
_ = compress.UnmarshalText(bytes.ToLower([]byte(config.Config.Kafka.CompressType)))
|
||||
p.config.Producer.Compression = compress
|
||||
// Configure producer acknowledgement level
|
||||
configureProducerAck(&p, config.Config.Kafka.ProducerAck)
|
||||
|
||||
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
|
||||
// Configure message compression
|
||||
configureCompression(&p, config.Config.Kafka.CompressType)
|
||||
|
||||
// Get Kafka configuration from environment variables or fallback to config file
|
||||
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
|
||||
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
|
||||
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config
|
||||
|
||||
// Configure SASL authentication if credentials are provided
|
||||
if kafkaUsername != "" && kafkaPassword != "" {
|
||||
p.config.Net.SASL.Enable = true
|
||||
p.config.Net.SASL.User = config.Config.Kafka.Username
|
||||
p.config.Net.SASL.Password = config.Config.Kafka.Password
|
||||
p.config.Net.SASL.User = kafkaUsername
|
||||
p.config.Net.SASL.Password = kafkaPassword
|
||||
}
|
||||
p.addr = addr
|
||||
p.topic = topic
|
||||
|
||||
// Set the Kafka address
|
||||
p.addr = []string{kafkaAddr}
|
||||
|
||||
// Set up TLS configuration (if required)
|
||||
SetupTLSConfig(p.config)
|
||||
var producer sarama.SyncProducer
|
||||
|
||||
// Create the producer with retries
|
||||
var err error
|
||||
for i := 0; i <= maxRetry; i++ {
|
||||
producer, err = sarama.NewSyncProducer(p.addr, p.config) // Initialize the client
|
||||
p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
|
||||
if err == nil {
|
||||
p.producer = producer
|
||||
return &p
|
||||
}
|
||||
//TODO If the password is wrong, exit directly
|
||||
//if packetErr, ok := err.(*sarama.PacketEncodingError); ok {
|
||||
//if _, ok := packetErr.Err.(sarama.AuthenticationError); ok {
|
||||
// fmt.Println("Kafka password is wrong.")
|
||||
//}
|
||||
//} else {
|
||||
// fmt.Printf("Failed to create Kafka producer: %v\n", err)
|
||||
//}
|
||||
time.Sleep(time.Duration(1) * time.Second)
|
||||
time.Sleep(1 * time.Second) // Wait before retrying
|
||||
}
|
||||
|
||||
// Panic if unable to create producer after retries
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
panic("Failed to create Kafka producer: " + err.Error())
|
||||
}
|
||||
p.producer = producer
|
||||
|
||||
return &p
|
||||
}
|
||||
|
||||
// configureProducerAck configures the producer's acknowledgement level.
|
||||
func configureProducerAck(p *Producer, ackConfig string) {
|
||||
switch strings.ToLower(ackConfig) {
|
||||
case "no_response":
|
||||
p.config.Producer.RequiredAcks = sarama.NoResponse
|
||||
case "wait_for_local":
|
||||
p.config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
case "wait_for_all":
|
||||
p.config.Producer.RequiredAcks = sarama.WaitForAll
|
||||
default:
|
||||
p.config.Producer.RequiredAcks = sarama.WaitForAll
|
||||
}
|
||||
}
|
||||
|
||||
// configureCompression configures the message compression type for the producer.
|
||||
func configureCompression(p *Producer, compressType string) {
|
||||
var compress sarama.CompressionCodec = sarama.CompressionNone
|
||||
compress.UnmarshalText(bytes.ToLower([]byte(compressType)))
|
||||
p.config.Producer.Compression = compress
|
||||
}
|
||||
|
||||
// GetMQHeaderWithContext extracts message queue headers from the context.
|
||||
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
|
||||
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
|
||||
if err != nil {
|
||||
@@ -111,22 +118,23 @@ func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)
|
||||
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
|
||||
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
|
||||
{Key: []byte(constant.ConnID), Value: []byte(connID)},
|
||||
}, err
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetContextWithMQHeader creates a context from message queue headers.
|
||||
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
|
||||
var values []string
|
||||
for _, recordHeader := range header {
|
||||
values = append(values, string(recordHeader.Value))
|
||||
}
|
||||
return mcontext.WithMustInfoCtx(values) // TODO
|
||||
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
|
||||
}
|
||||
|
||||
// SendMessage sends a message to the Kafka topic configured in the Producer.
|
||||
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
|
||||
log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key)
|
||||
kMsg := &sarama.ProducerMessage{}
|
||||
kMsg.Topic = p.topic
|
||||
kMsg.Key = sarama.StringEncoder(key)
|
||||
|
||||
// Marshal the protobuf message
|
||||
bMsg, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
|
||||
@@ -134,20 +142,33 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
|
||||
if len(bMsg) == 0 {
|
||||
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
||||
}
|
||||
kMsg.Value = sarama.ByteEncoder(bMsg)
|
||||
|
||||
// Prepare Kafka message
|
||||
kMsg := &sarama.ProducerMessage{
|
||||
Topic: p.topic,
|
||||
Key: sarama.StringEncoder(key),
|
||||
Value: sarama.ByteEncoder(bMsg),
|
||||
}
|
||||
|
||||
// Validate message key and value
|
||||
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
||||
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
||||
}
|
||||
kMsg.Metadata = ctx
|
||||
|
||||
// Attach context metadata as headers
|
||||
header, err := GetMQHeaderWithContext(ctx)
|
||||
if err != nil {
|
||||
return 0, 0, utils.Wrap(err, "")
|
||||
}
|
||||
kMsg.Headers = header
|
||||
|
||||
// Send the message
|
||||
partition, offset, err := p.producer.SendMessage(kMsg)
|
||||
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "p.producer.SendMessage error", err)
|
||||
return 0, 0, utils.Wrap(err, "")
|
||||
}
|
||||
return partition, offset, utils.Wrap(err, "")
|
||||
|
||||
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())
|
||||
return partition, offset, nil
|
||||
}
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
@@ -33,3 +35,12 @@ func SetupTLSConfig(cfg *sarama.Config) {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// getEnvOrConfig returns the value of the environment variable if it exists,
|
||||
// otherwise, it returns the value from the configuration file.
|
||||
func getEnvOrConfig(envName string, configValue string) string {
|
||||
if value, exists := os.LookupEnv(envName); exists {
|
||||
return value
|
||||
}
|
||||
return configValue
|
||||
}
|
||||
|
||||
@@ -28,6 +28,8 @@ openim::log::info "\n# Use Docker to start all openim service"
|
||||
|
||||
trap 'openim::util::onCtrlC' INT
|
||||
|
||||
"${OPENIM_ROOT}"/scripts/init-config.sh --skip
|
||||
|
||||
"${OPENIM_ROOT}"/scripts/start-all.sh
|
||||
|
||||
sleep 5
|
||||
|
||||
+102
-69
@@ -1,17 +1,4 @@
|
||||
#!/usr/bin/env bash
|
||||
# Copyright © 2023 OpenIM. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# You may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# This script automatically initializes various configuration files and can generate example files.
|
||||
|
||||
@@ -48,7 +35,8 @@ declare -A EXAMPLES=(
|
||||
FORCE_OVERWRITE=false
|
||||
SKIP_EXISTING=false
|
||||
GENERATE_EXAMPLES=false
|
||||
CLEAN_ENV_EXAMPLES=false
|
||||
CLEAN_CONFIG=false
|
||||
CLEAN_EXAMPLES=false
|
||||
|
||||
# Function to display help information
|
||||
show_help() {
|
||||
@@ -58,59 +46,45 @@ show_help() {
|
||||
echo " --force Overwrite existing files without prompt"
|
||||
echo " --skip Skip generation if file exists"
|
||||
echo " --examples Generate example files"
|
||||
echo " --clean-env-examples Generate example files in a clean environment"
|
||||
echo " --clean-config Clean all configuration files"
|
||||
echo " --clean-examples Clean all example files"
|
||||
}
|
||||
|
||||
# Function to generate configuration files
|
||||
generate_config_files() {
|
||||
# Loop through each template in TEMPLATES
|
||||
for template in "${!TEMPLATES[@]}"; do
|
||||
# Read the corresponding output files for the template
|
||||
IFS=';' read -ra OUTPUT_FILES <<< "${TEMPLATES[$template]}"
|
||||
for output_file in "${OUTPUT_FILES[@]}"; do
|
||||
# Check if the output file already exists
|
||||
if [[ -f "${output_file}" ]]; then
|
||||
# Handle existing file based on command-line options
|
||||
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
||||
openim::log::info "Force overwriting ${output_file}."
|
||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Skipping generation of ${output_file} as it already exists."
|
||||
local output_file="${TEMPLATES[$template]}"
|
||||
if [[ -f "${output_file}" ]]; then
|
||||
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
||||
openim::log::info "Force overwriting ${output_file}."
|
||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Skipping generation of ${output_file} as it already exists."
|
||||
continue
|
||||
else
|
||||
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
|
||||
read -r -n 1 REPLY
|
||||
echo
|
||||
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
||||
openim::log::info "Skipping generation of ${output_file}."
|
||||
continue
|
||||
else
|
||||
# Ask user for confirmation to overwrite
|
||||
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
|
||||
read -r -n 1 REPLY
|
||||
echo
|
||||
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
||||
openim::log::info "Skipping generation of ${output_file}."
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
# Process the template file to generate the output file
|
||||
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
|
||||
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
||||
openim::log::error "genconfig.sh script not found"
|
||||
exit 1
|
||||
else
|
||||
if [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Generating ${output_file} as it does not exist."
|
||||
fi
|
||||
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
|
||||
openim::log::error "Error processing template file ${template}"
|
||||
exit 1
|
||||
}
|
||||
sleep 0.5
|
||||
done
|
||||
done
|
||||
}
|
||||
|
||||
# Function to generate example files
|
||||
generate_example_files() {
|
||||
for template in "${!EXAMPLES[@]}"; do
|
||||
local example_file="${EXAMPLES[$template]}"
|
||||
if [[ ! -f "${example_file}" ]]; then
|
||||
openim::log::info "Generating example file: ${example_file} from ${template}..."
|
||||
cp "${template}" "${example_file}"
|
||||
fi
|
||||
|
||||
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
|
||||
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
||||
openim::log::error "genconfig.sh script not found"
|
||||
exit 1
|
||||
fi
|
||||
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
|
||||
openim::log::error "Error processing template file ${template}"
|
||||
exit 1
|
||||
}
|
||||
sleep 0.5
|
||||
done
|
||||
}
|
||||
|
||||
@@ -120,7 +94,8 @@ declare -A env_vars=(
|
||||
["LOG_STORAGE_LOCATION"]="../logs/"
|
||||
)
|
||||
|
||||
generate_clean_environment_examples() {
|
||||
# Function to generate example files
|
||||
generate_example_files() {
|
||||
env_cmd="env -i"
|
||||
for var in "${!env_vars[@]}"; do
|
||||
env_cmd+=" $var='${env_vars[$var]}'"
|
||||
@@ -128,12 +103,56 @@ generate_clean_environment_examples() {
|
||||
|
||||
for template in "${!EXAMPLES[@]}"; do
|
||||
local example_file="${EXAMPLES[$template]}"
|
||||
openim::log::info "Generating example file: ${example_file} from ${template}..."
|
||||
if [[ -f "${example_file}" ]]; then
|
||||
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
||||
openim::log::info "Force overwriting example file: ${example_file}."
|
||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Skipping generation of example file: ${example_file} as it already exists."
|
||||
continue
|
||||
else
|
||||
echo -n "Example file ${example_file} already exists. Overwrite? (Y/N): "
|
||||
read -r -n 1 REPLY
|
||||
echo
|
||||
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
||||
openim::log::info "Skipping generation of example file: ${example_file}."
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Generating example file: ${example_file} as it does not exist."
|
||||
fi
|
||||
|
||||
openim::log::info "⌚ Working with template file: ${template} to generate example file: ${example_file}..."
|
||||
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
||||
openim::log::error "genconfig.sh script not found"
|
||||
exit 1
|
||||
fi
|
||||
eval "$env_cmd ${OPENIM_ROOT}/scripts/genconfig.sh '${ENV_FILE}' '${template}' > '${example_file}'" || {
|
||||
openim::log::error "Error processing template file ${template}"
|
||||
exit 1
|
||||
}
|
||||
sleep 0.5
|
||||
done
|
||||
}
|
||||
|
||||
|
||||
# Function to clean configuration files
|
||||
clean_config_files() {
|
||||
for output_file in "${TEMPLATES[@]}"; do
|
||||
if [[ -f "${output_file}" ]]; then
|
||||
rm -f "${output_file}"
|
||||
openim::log::info "Removed configuration file: ${output_file}"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
# Function to clean example files
|
||||
clean_example_files() {
|
||||
for example_file in "${EXAMPLES[@]}"; do
|
||||
if [[ -f "${example_file}" ]]; then
|
||||
rm -f "${example_file}"
|
||||
openim::log::info "Removed example file: ${example_file}"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
@@ -155,8 +174,12 @@ while [[ $# -gt 0 ]]; do
|
||||
GENERATE_EXAMPLES=true
|
||||
shift
|
||||
;;
|
||||
--clean-env-examples)
|
||||
CLEAN_ENV_EXAMPLES=true
|
||||
--clean-config)
|
||||
CLEAN_CONFIG=true
|
||||
shift
|
||||
;;
|
||||
--clean-examples)
|
||||
CLEAN_EXAMPLES=true
|
||||
shift
|
||||
;;
|
||||
*)
|
||||
@@ -167,19 +190,29 @@ while [[ $# -gt 0 ]]; do
|
||||
esac
|
||||
done
|
||||
|
||||
# Clean configuration files if --clean-config option is provided
|
||||
if [[ "${CLEAN_CONFIG}" == true ]]; then
|
||||
clean_config_files
|
||||
fi
|
||||
|
||||
# Clean example files if --clean-examples option is provided
|
||||
if [[ "${CLEAN_EXAMPLES}" == true ]]; then
|
||||
clean_example_files
|
||||
fi
|
||||
|
||||
# Generate configuration files if requested
|
||||
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]]; then
|
||||
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]] && [[ "${CLEAN_CONFIG}" == false ]]; then
|
||||
generate_config_files
|
||||
fi
|
||||
|
||||
# Generate configuration files if requested
|
||||
if [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
generate_config_files
|
||||
fi
|
||||
|
||||
# Generate example files if --examples option is provided
|
||||
if [[ "${GENERATE_EXAMPLES}" == true ]]; then
|
||||
if [[ "${GENERATE_EXAMPLES}" == true ]] && [[ "${CLEAN_EXAMPLES}" == false ]]; then
|
||||
generate_example_files
|
||||
fi
|
||||
|
||||
# Generate example files in a clean environment if --clean-env-examples option is provided
|
||||
if [[ "${CLEAN_ENV_EXAMPLES}" == true ]]; then
|
||||
generate_clean_environment_examples
|
||||
fi
|
||||
|
||||
openim::log::success "Configuration and example files generation complete!"
|
||||
openim::log::success "Configuration and example files operation complete!"
|
||||
|
||||
Reference in New Issue
Block a user