Files
open-im-server/pkg/common/discoveryregister/etcd/etcd.go
T

302 lines
8.7 KiB
Go
Raw Normal View History

2024-05-10 21:30:12 +08:00
package etcd
import (
"context"
"fmt"
2024-05-11 16:44:15 +08:00
"github.com/pkg/errors"
2024-05-10 21:30:12 +08:00
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
gresolver "google.golang.org/grpc/resolver"
2024-05-13 17:20:15 +08:00
"strings"
"sync"
2024-05-10 21:30:12 +08:00
"time"
)
2024-05-11 15:40:17 +08:00
// ZkOption defines a function type for modifying clientv3.Config
type ZkOption func(*clientv3.Config)
2024-05-10 21:30:12 +08:00
// SvcDiscoveryRegistryImpl implementation
type SvcDiscoveryRegistryImpl struct {
2024-05-11 18:06:54 +08:00
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
rpcRegisterTarget string
2024-05-11 15:40:17 +08:00
rootDirectory string
2024-05-13 17:20:15 +08:00
mu sync.RWMutex
connMap map[string][]*grpc.ClientConn
2024-05-10 21:30:12 +08:00
}
2024-05-11 15:40:17 +08:00
// NewSvcDiscoveryRegistry creates a new service discovery registry implementation
func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) {
2024-05-10 21:30:12 +08:00
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
2024-05-11 15:40:17 +08:00
// Increase keep-alive queue capacity and message size
PermitWithoutStream: true,
MaxCallSendMsgSize: 10 * 1024 * 1024, // 10 MB
}
// Apply provided options to the config
for _, opt := range options {
opt(&cfg)
2024-05-10 21:30:12 +08:00
}
2024-05-11 15:40:17 +08:00
2024-05-10 21:30:12 +08:00
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
r, err := resolver.NewBuilder(client)
if err != nil {
return nil, err
}
2024-05-13 17:20:15 +08:00
s := &SvcDiscoveryRegistryImpl{
2024-05-11 15:40:17 +08:00
client: client,
resolver: r,
rootDirectory: rootDirectory,
2024-05-13 17:20:15 +08:00
connMap: make(map[string][]*grpc.ClientConn),
}
go s.watchServiceChanges()
return s, nil
2024-05-10 21:30:12 +08:00
}
2024-05-11 15:40:17 +08:00
// WithDialTimeout sets a custom dial timeout for the etcd client
func WithDialTimeout(timeout time.Duration) ZkOption {
return func(cfg *clientv3.Config) {
cfg.DialTimeout = timeout
}
}
// WithMaxCallSendMsgSize sets a custom max call send message size for the etcd client
func WithMaxCallSendMsgSize(size int) ZkOption {
return func(cfg *clientv3.Config) {
cfg.MaxCallSendMsgSize = size
}
}
// WithUsernameAndPassword sets a username and password for the etcd client
func WithUsernameAndPassword(username, password string) ZkOption {
return func(cfg *clientv3.Config) {
cfg.Username = username
cfg.Password = password
}
}
// GetUserIdHashGatewayHost returns the gateway host for a given user ID hash
2024-05-11 09:31:16 +08:00
func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}
2024-05-11 15:40:17 +08:00
// GetConns returns gRPC client connections for a given service name
2024-05-10 21:30:12 +08:00
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
2024-05-13 11:10:25 +08:00
fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName)
2024-05-13 17:20:15 +08:00
r.mu.RLock()
defer r.mu.RUnlock()
2024-05-13 17:50:40 +08:00
fmt.Printf("all conns ", serviceName, r.connMap[fullServiceKey])
2024-05-13 17:20:15 +08:00
return r.connMap[fullServiceKey], nil
2024-05-10 21:30:12 +08:00
}
2024-05-11 15:40:17 +08:00
// GetConn returns a single gRPC client connection for a given service name
2024-05-10 21:30:12 +08:00
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
2024-05-11 17:39:15 +08:00
target := fmt.Sprintf("etcd:///%s/%s", r.rootDirectory, serviceName)
2024-05-11 10:30:47 +08:00
return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
2024-05-10 21:30:12 +08:00
}
2024-05-11 15:40:17 +08:00
// GetSelfConnTarget returns the connection target for the current service
2024-05-10 21:30:12 +08:00
func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
2024-05-11 18:06:54 +08:00
return r.rpcRegisterTarget
2024-05-10 21:30:12 +08:00
}
2024-05-11 15:40:17 +08:00
// AddOption appends gRPC dial options to the existing options
2024-05-10 21:30:12 +08:00
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
r.dialOptions = append(r.dialOptions, opts...)
}
2024-05-11 15:40:17 +08:00
// CloseConn closes a given gRPC client connection
2024-05-10 21:30:12 +08:00
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
if err := conn.Close(); err != nil {
2024-05-11 15:40:17 +08:00
fmt.Printf("Failed to close connection: %v\n", err)
2024-05-10 21:30:12 +08:00
}
}
2024-05-11 15:40:17 +08:00
// Register registers a new service endpoint with etcd
2024-05-10 21:30:12 +08:00
func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
2024-05-13 15:22:17 +08:00
r.serviceKey = fmt.Sprintf("%s/%s/%s:%d", r.rootDirectory, serviceName, host, port)
2024-05-11 15:40:17 +08:00
em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName)
2024-05-10 21:30:12 +08:00
if err != nil {
return err
}
r.endpointMgr = em
2024-05-11 15:40:17 +08:00
leaseResp, err := r.client.Grant(context.Background(), 60) // Increase TTL time
2024-05-10 21:30:12 +08:00
if err != nil {
return err
}
r.leaseID = leaseResp.ID
2024-05-11 18:06:54 +08:00
r.rpcRegisterTarget = fmt.Sprintf("%s:%d", host, port)
endpoint := endpoints.Endpoint{Addr: r.rpcRegisterTarget}
2024-05-10 21:30:12 +08:00
err = em.AddEndpoint(context.TODO(), r.serviceKey, endpoint, clientv3.WithLease(leaseResp.ID))
2024-05-11 11:22:49 +08:00
if err != nil {
return err
}
2024-05-11 10:55:57 +08:00
2024-05-11 11:40:44 +08:00
go r.keepAliveLease(r.leaseID)
return nil
}
2024-05-11 15:40:17 +08:00
// keepAliveLease maintains the lease alive by sending keep-alive requests
2024-05-11 11:40:44 +08:00
func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
ch, err := r.client.KeepAlive(context.Background(), leaseID)
if err != nil {
2024-05-11 15:40:17 +08:00
fmt.Printf("Failed to keep lease alive: %v\n", err)
2024-05-11 11:40:44 +08:00
return
}
for ka := range ch {
if ka != nil {
} else {
2024-05-11 15:40:17 +08:00
fmt.Printf("Lease keep-alive response channel closed\n")
return
2024-05-11 11:40:44 +08:00
}
}
2024-05-10 21:30:12 +08:00
}
2024-05-13 17:20:15 +08:00
// watchServiceChanges watches for changes in the service directory
func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() {
watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix())
for watchResp := range watchChan {
for _, event := range watchResp.Events {
prefix, _ := r.splitEndpoint(string(event.Kv.Key))
fmt.Printf("Change detected for prefix: %s\n", prefix)
r.refreshConnMap(prefix)
}
}
}
// refreshConnMap fetches the latest endpoints and updates the local map
func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
r.mu.Lock()
defer r.mu.Unlock()
fullPrefix := fmt.Sprintf("%s/", prefix)
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Printf("Failed to get endpoints: %v\n", err)
return
}
2024-05-13 17:44:40 +08:00
r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections
2024-05-13 17:20:15 +08:00
for _, kv := range resp.Kvs {
_, addr := r.splitEndpoint(string(kv.Key))
conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...)
if err != nil {
fmt.Printf("Failed to dial new endpoint: %v\n", err)
continue
}
r.connMap[prefix] = append(r.connMap[prefix], conn)
}
}
// splitEndpoint splits the endpoint string into prefix and address
func (r *SvcDiscoveryRegistryImpl) splitEndpoint(input string) (string, string) {
lastSlashIndex := strings.LastIndex(input, "/")
if lastSlashIndex != -1 {
part1 := input[:lastSlashIndex]
part2 := input[lastSlashIndex+1:]
return part1, part2
}
return input, ""
}
2024-05-11 15:40:17 +08:00
// UnRegister removes the service endpoint from etcd
2024-05-10 21:30:12 +08:00
func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
if r.endpointMgr == nil {
return fmt.Errorf("endpoint manager is not initialized")
}
2024-05-13 17:20:15 +08:00
err := r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
if err != nil {
return err
}
return nil
2024-05-10 21:30:12 +08:00
}
2024-05-11 15:40:17 +08:00
// Close closes the etcd client connection
2024-05-10 21:30:12 +08:00
func (r *SvcDiscoveryRegistryImpl) Close() {
if r.client != nil {
_ = r.client.Close()
}
2024-05-13 17:20:15 +08:00
r.mu.Lock()
defer r.mu.Unlock()
2024-05-10 21:30:12 +08:00
}
2024-05-11 16:44:15 +08:00
2024-05-11 17:19:00 +08:00
// Check verifies if etcd is running by checking the existence of the root node and optionally creates it with a lease
2024-05-11 16:44:15 +08:00
func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfNotExist bool, options ...ZkOption) error {
cfg := clientv3.Config{
Endpoints: etcdServers,
}
for _, opt := range options {
opt(&cfg)
}
client, err := clientv3.New(cfg)
if err != nil {
return errors.Wrap(err, "failed to connect to etcd")
}
defer client.Close()
2024-05-11 17:19:00 +08:00
var opCtx context.Context
var cancel context.CancelFunc
2024-05-11 16:58:49 +08:00
if cfg.DialTimeout != 0 {
2024-05-11 17:19:00 +08:00
opCtx, cancel = context.WithTimeout(ctx, cfg.DialTimeout)
2024-05-11 16:58:49 +08:00
} else {
2024-05-11 17:19:00 +08:00
opCtx, cancel = context.WithTimeout(ctx, 10*time.Second)
2024-05-11 16:58:49 +08:00
}
2024-05-11 17:19:00 +08:00
defer cancel()
2024-05-11 16:44:15 +08:00
2024-05-11 17:19:00 +08:00
resp, err := client.Get(opCtx, etcdRoot)
2024-05-11 16:44:15 +08:00
if err != nil {
return errors.Wrap(err, "failed to get the root node from etcd")
}
if len(resp.Kvs) == 0 {
if createIfNotExist {
2024-05-11 17:19:00 +08:00
var leaseTTL int64 = 10
var leaseResp *clientv3.LeaseGrantResponse
if leaseTTL > 0 {
leaseResp, err = client.Grant(opCtx, leaseTTL)
if err != nil {
return errors.Wrap(err, "failed to create lease in etcd")
}
}
putOpts := []clientv3.OpOption{}
if leaseResp != nil {
putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID))
}
_, err := client.Put(opCtx, etcdRoot, "", putOpts...)
2024-05-11 16:44:15 +08:00
if err != nil {
return errors.Wrap(err, "failed to create the root node in etcd")
}
2024-05-11 17:19:00 +08:00
fmt.Printf("Root node %s did not exist, but has been created.\n", etcdRoot)
2024-05-11 16:44:15 +08:00
} else {
return fmt.Errorf("root node %s does not exist in etcd", etcdRoot)
}
} else {
fmt.Printf("Etcd is running and the root node %s exists.\n", etcdRoot)
}
2024-05-11 17:19:00 +08:00
2024-05-11 16:44:15 +08:00
return nil
}