build: update kubernetes deployment Run. (#2919)

* build: k8s improve.

* refactor: update docker image contents.

* rename seq file.

* build: update k8s origin deploys.

* update check logic.

* update magefile

* update image name.

* update readme

* update Kubernetes Discovery.

* revert pkg.

* update create in k8s

* update service image release CI.

* update deployment image source.

* update mage contents.

* update pkg source.

* update go get pkg.

* fix test file.

* update discovery register.

* update

* update deploy yaml.

* update replica.

* update deployment.

* remove notfication config.

* remove notification and zookeeper.

* update discovery in kubernetes.

* build: improve kubernetes deployment.

* update config field in discovery.

* update ReadMe in deployments.

* update go mod.

* update const quote.

* fix test fields.

* remove unused method.

* remove unused contents.
This commit is contained in:
Monet Lee
2024-12-13 11:02:45 +08:00
committed by GitHub
parent 1eaae5f980
commit 7c7a99f801
192 changed files with 2520 additions and 7897 deletions
+1
View File
@@ -20,6 +20,7 @@ const (
MountConfigFilePath = "CONFIG_PATH"
DeploymentType = "DEPLOYMENT_TYPE"
KUBERNETES = "kubernetes"
ETCD = "etcd"
)
const (
+1 -1
View File
@@ -86,7 +86,7 @@ func TestLoadOpenIMThirdConfig(t *testing.T) {
func TestTransferConfig(t *testing.T) {
var tran MsgTransfer
err := LoadConfig("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", &tran)
err := Load("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", "", "source", &tran)
assert.Nil(t, err)
assert.Equal(t, true, tran.Prometheus.Enable)
assert.Equal(t, true, tran.Prometheus.AutoSetPorts)
@@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/discovery"
"google.golang.org/grpc"
"github.com/openimsdk/tools/discovery/kubernetes"
@@ -26,20 +27,18 @@ import (
"github.com/openimsdk/tools/errs"
)
const (
Etcd = "etcd"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) {
if runtimeEnv == "kubernetes" {
discovery.Enable = "kubernetes"
if runtimeEnv == config.KUBERNETES {
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(1024*1024*20),
),
)
}
switch discovery.Enable {
case "kubernetes":
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace)
case Etcd:
case config.ETCD:
return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,
@@ -1,22 +1,10 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
@@ -35,6 +23,7 @@ type KubernetesConnManager struct {
namespace string
dialOptions []grpc.DialOption
rpcTargets map[string]string
selfTarget string
mu sync.RWMutex
@@ -76,11 +65,14 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error {
return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err)
}
// fmt.Println("Endpoints:", endpoints, "endpoints.Subsets:", endpoints.Subsets)
var conns []*grpc.ClientConn
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
target := fmt.Sprintf("%s:%d", address.IP, port)
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
// fmt.Println("IP target:", target)
conn, err := grpc.Dial(target, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
if err != nil {
return fmt.Errorf("failed to dial endpoint %s: %v", target, err)
}
@@ -89,10 +81,8 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error {
}
k.mu.Lock()
defer k.mu.Unlock()
k.connMap[serviceName] = conns
// go k.watchEndpoints(serviceName)
k.mu.Unlock()
return nil
}
@@ -100,23 +90,23 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error {
// GetConns returns gRPC client connections for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
k.mu.RLock()
conns, exists := k.connMap[serviceName]
defer k.mu.RUnlock()
conns, exists := k.connMap[serviceName]
k.mu.RUnlock()
if exists {
return conns, nil
}
k.mu.Lock()
defer k.mu.Unlock()
// Check if another goroutine has already initialized the connections when we released the read lock
conns, exists = k.connMap[serviceName]
if exists {
return conns, nil
}
k.mu.Unlock()
if err := k.initializeConns(serviceName); err != nil {
fmt.Println("Failed to initialize connections:", err)
return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err)
}
@@ -125,26 +115,64 @@ func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
port, err := k.getServicePort(serviceName)
if err != nil {
return nil, err
var target string
if k.rpcTargets[serviceName] == "" {
var err error
svcPort, err := k.getServicePort(serviceName)
if err != nil {
return nil, err
}
target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, svcPort)
// fmt.Println("SVC target:", target)
} else {
target = k.rpcTargets[serviceName]
}
fmt.Println("SVC port:", port)
target := fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, port)
fmt.Println("SVC target:", target)
return grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, k.dialOptions...)...,
append([]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*10), grpc.MaxCallSendMsgSize(1024*1024*20)),
}, k.dialOptions...)...,
)
}
// GetSelfConnTarget returns the connection target for the current service.
func (k *KubernetesConnManager) GetSelfConnTarget() string {
if k.selfTarget == "" {
hostName := os.Getenv("HOSTNAME")
pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(context.Background(), hostName, metav1.GetOptions{})
if err != nil {
log.Printf("failed to get pod %s: %v \n", hostName, err)
}
for pod.Status.PodIP == "" {
pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{})
if err != nil {
log.Printf("Error getting pod: %v \n", err)
}
time.Sleep(3 * time.Second)
}
var selfPort int32
for _, port := range pod.Spec.Containers[0].Ports {
if port.ContainerPort != 10001 {
selfPort = port.ContainerPort
break
}
}
k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort)
}
return k.selfTarget
}
@@ -175,6 +203,7 @@ func (k *KubernetesConnManager) Close() {
func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
return nil
}
func (k *KubernetesConnManager) UnRegister() error {
return nil
}
@@ -184,6 +213,8 @@ func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, us
}
func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
var svcPort int32
svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
if err != nil {
fmt.Print("namespace:", k.namespace)
@@ -194,7 +225,15 @@ func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error
return 0, fmt.Errorf("service %s has no ports defined", serviceName)
}
return svc.Spec.Ports[0].Port, nil
for _, port := range svc.Spec.Ports {
// fmt.Println(serviceName, " Now Get Port:", port.Port)
if port.Port != 10001 {
svcPort = port.Port
break
}
}
return svcPort, nil
}
// watchEndpoints listens for changes in Pod resources.
@@ -229,68 +268,3 @@ func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) {
fmt.Printf("Error initializing connections for %s: %v\n", serviceName, err)
}
}
// =================
// initEndpoints initializes connections by fetching all available endpoints in the specified namespace.
// func (k *KubernetesConnManager) initEndpoints() error {
// k.mu.Lock()
// defer k.mu.Unlock()
// pods, err := k.clientset.CoreV1().Pods(k.namespace).List(context.TODO(), metav1.ListOptions{})
// if err != nil {
// return fmt.Errorf("failed to list pods: %v", err)
// }
// for _, pod := range pods.Items {
// if pod.Status.Phase == v1.PodRunning {
// target := fmt.Sprintf("%s:%d", address.IP, port)
// conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
// conn, err := k.createGRPCConnection(pod)
// if err != nil {
// return fmt.Errorf("failed to create GRPC connection for pod %s: %v", pod.Name, err)
// }
// k.connMap[pod.Name] = append(k.connMap[pod.Name], conn)
// }
// }
// return nil
// }
// -----
// func (k *KubernetesConnManager) watchEndpoints1(serviceName string) {
// // watch for changes to the service's endpoints
// informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute)
// endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()
// endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// AddFunc: func(obj interface{}) {
// eps := obj.(*v1.Endpoints)
// if eps.Name == serviceName {
// k.initializeConns(serviceName)
// }
// },
// UpdateFunc: func(oldObj, newObj interface{}) {
// eps := newObj.(*v1.Endpoints)
// if eps.Name == serviceName {
// k.initializeConns(serviceName)
// }
// },
// DeleteFunc: func(obj interface{}) {
// eps := obj.(*v1.Endpoints)
// if eps.Name == serviceName {
// k.mu.Lock()
// defer k.mu.Unlock()
// for _, conn := range k.connMap[serviceName] {
// _ = conn.Close()
// }
// delete(k.connMap, serviceName)
// }
// },
// })
// informerFactory.Start(wait.NeverStop)
// informerFactory.WaitForCacheSync(wait.NeverStop)
// }
@@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zookeeper // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
+4 -4
View File
@@ -26,7 +26,7 @@ import (
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
@@ -46,8 +46,8 @@ import (
)
// Start rpc server.
func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context,
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *conf.Share, config T, rpcFn func(ctx context.Context,
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
var (
@@ -84,7 +84,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
return listener, port, nil
}
if autoSetPorts && discovery.Enable != kdisc.Etcd {
if autoSetPorts && discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap()
}
client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv)