feat: Add OpenIM server, environment support for Docker Compose, and Kubernetes deployment. (#1559)

* feat: add openim server code

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim mongo and redis env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add zk and redis mongo env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add kafka and redis mongo env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim copyright

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: docker compose

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: remove openim chat config file

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim config set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim config set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: fix Security vulnerability

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: fix Security vulnerability

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: docker compose

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* Update kubernetes.go

* Update discoveryregister.go

* fix: copyright-add

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

---------

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
This commit is contained in:
Xinwei Xiong
2023-12-18 10:24:12 +08:00
committed by GitHub
parent c5c5b2fd8e
commit f1c9686ada
211 changed files with 3989 additions and 1239 deletions
+26 -143
View File
@@ -1,159 +1,42 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discoveryregister
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
"github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
var client discoveryregistry.SvcDiscoveryRegistry
var err error
if os.Getenv("ENVS_DISCOVERY") != "" {
envType = os.Getenv("ENVS_DISCOVERY")
}
switch envType {
case "zookeeper":
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password,
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))
return zookeeper.NewZookeeperDiscoveryRegister()
case "k8s":
client, err = NewK8sDiscoveryRegister()
return kubernetes.NewK8sDiscoveryRegister()
default:
client = nil
err = errors.New("envType not correct")
}
return client, err
}
type K8sDR struct {
options []grpc.DialOption
rpcRegisterAddr string
}
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
return &K8sDR{}, nil
}
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
cli.rpcRegisterAddr = serviceName
} else {
cli.rpcRegisterAddr = cli.getSelfHost(context.Background())
}
return nil
}
func (cli *K8sDR) UnRegister() error {
return nil
}
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
func (cli *K8sDR) getSelfHost(ctx context.Context) string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
ns := os.Getenv("MY_POD_NAMESPACE")
statefuleIndex := 0
gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
count := len(podInfo)
statefuleIndex, _ = strconv.Atoi(podInfo[count-1])
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, statefuleIndex, instance, ns, port)
return host
}
// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88
func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
replicas := os.Getenv("MY_MSGGATEWAY_REPLICACOUNT")
ns := os.Getenv("MY_POD_NAMESPACE")
gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
nReplicas, _ := strconv.Atoi(replicas)
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
var ret []string
for i := 0; i < nReplicas; i++ {
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, i, instance, ns, port)
ret = append(ret, host)
}
log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret)
return ret
}
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
} else {
var ret []*grpc.ClientConn
gatewayHosts := cli.getMsgGatewayHost(ctx)
for _, host := range gatewayHosts {
conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...)
if err != nil {
return nil, err
} else {
ret = append(ret, conn)
}
}
return ret, nil
return nil, errors.New("envType not correct")
}
}
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
}
func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr
}
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...)
}
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// do not use this method for call rpc
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil
}
func (cli *K8sDR) Close() {
return
}
@@ -1,407 +1,59 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discoveryregister
import (
"context"
"reflect"
"os"
"testing"
"github.com/OpenIMSDK/tools/discoveryregistry"
"google.golang.org/grpc"
"github.com/stretchr/testify/assert"
)
func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "")
}
func TestNewDiscoveryRegister(t *testing.T) {
type args struct {
envType string
}
tests := []struct {
name string
args args
want discoveryregistry.SvcDiscoveryRegistry
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewDiscoveryRegister(tt.args.envType)
if (err != nil) != tt.wantErr {
t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want)
}
})
}
}
setupTestEnvironment()
func TestNewK8sDiscoveryRegister(t *testing.T) {
tests := []struct {
name string
want discoveryregistry.SvcDiscoveryRegistry
wantErr bool
envType string
expectedError bool
expectedResult bool
}{
// TODO: Add test cases.
{"zookeeper", false, true},
{"k8s", false, true}, // 假设 k8s 配置也已正确设置
{"invalid", true, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewK8sDiscoveryRegister()
if (err != nil) != tt.wantErr {
t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_Register(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
serviceName string
host string
port int
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
for _, test := range tests {
client, err := NewDiscoveryRegister(test.envType)
func TestK8sDR_UnRegister(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
if test.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
if test.expectedResult {
assert.Implements(t, (*discoveryregistry.SvcDiscoveryRegistry)(nil), client)
} else {
assert.Nil(t, client)
}
if err := cli.UnRegister(); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_CreateRpcRootNodes(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
serviceNames []string
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_RegisterConf2Registry(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
key string
conf []byte
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_GetConfFromRegistry(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
key string
}
tests := []struct {
name string
fields fields
args args
want []byte
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConfFromRegistry(tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetConns(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
ctx context.Context
serviceName string
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
want []*grpc.ClientConn
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetConn(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
ctx context.Context
serviceName string
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
want *grpc.ClientConn
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetSelfConnTarget(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
want string
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if got := cli.GetSelfConnTarget(); got != tt.want {
t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_AddOption(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.AddOption(tt.args.opts...)
})
}
}
func TestK8sDR_CloseConn(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
conn *grpc.ClientConn
}
tests := []struct {
name string
fields fields
args args
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.CloseConn(tt.args.conn)
})
}
}
func TestK8sDR_GetClientLocalConns(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
want map[string][]*grpc.ClientConn
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_Close(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.Close()
})
}
}
}
@@ -0,0 +1,174 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"google.golang.org/grpc"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// K8sDR represents the Kubernetes service discovery and registration client.
type K8sDR struct {
options []grpc.DialOption
rpcRegisterAddr string
}
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
return &K8sDR{}, nil
}
// Register registers a service with Kubernetes.
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
cli.rpcRegisterAddr = serviceName
} else {
cli.rpcRegisterAddr = cli.getSelfHost(context.Background())
}
return nil
}
// UnRegister removes a service registration from Kubernetes.
func (cli *K8sDR) UnRegister() error {
return nil
}
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
// RegisterConf2Registry registers a configuration to the registry.
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
// GetConfFromRegistry retrieves a configuration from the registry.
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
func (cli *K8sDR) getSelfHost(ctx context.Context) string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
ns := os.Getenv("MY_POD_NAMESPACE")
statefuleIndex := 0
gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
count := len(podInfo)
statefuleIndex, _ = strconv.Atoi(podInfo[count-1])
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, statefuleIndex, instance, ns, port)
return host
}
// GetConns returns a list of gRPC client connections for a given service.
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
}
var ret []*grpc.ClientConn
gatewayHosts := cli.getMsgGatewayHost(ctx)
for _, host := range gatewayHosts {
conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...)
if err != nil {
return nil, err
}
ret = append(ret, conn)
}
return ret, nil
}
// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88
func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
replicas := os.Getenv("MY_MSGGATEWAY_REPLICACOUNT")
ns := os.Getenv("MY_POD_NAMESPACE")
gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
nReplicas, _ := strconv.Atoi(replicas)
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
var ret []string
for i := 0; i < nReplicas; i++ {
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, i, instance, ns, port)
ret = append(ret, host)
}
log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret)
return ret
}
// GetConn returns a single gRPC client connection for a given service.
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
}
// GetSelfConnTarget returns the connection target of the client itself.
func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr
}
// AddOption adds gRPC dial options to the client.
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...)
}
// CloseConn closes a given gRPC client connection.
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// do not use this method for call rpc.
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil
}
// Close closes the K8sDR client.
func (cli *K8sDR) Close() {
// Close any open resources here (if applicable)
return
}
@@ -0,0 +1,61 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zookeeper
import (
"os"
"strings"
"time"
"github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
schema := getEnv("ZOOKEEPER_SCHEMA", config.Config.Zookeeper.Schema)
zkAddr := getZkAddrFromEnv(config.Config.Zookeeper.ZkAddr)
username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username)
password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password)
return openkeeper.NewClient(
zkAddr,
schema,
openkeeper.WithFreq(time.Hour),
openkeeper.WithUserNameAndPassword(username, password),
openkeeper.WithRoundRobin(),
openkeeper.WithTimeout(10),
openkeeper.WithLogger(log.NewZkLogger()),
)
}
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
func getEnv(key, fallback string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return fallback
}
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
func getZkAddrFromEnv(fallback []string) []string {
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
return strings.Split(value, ",")
}
return fallback
}