feat: Add enable config center button && fix: grpc connection leakage (#3036)

* feat: add enable config center

* fix: config

* fix: config

* fix: config

* fix: config
This commit is contained in:
icey-yu
2025-01-04 20:21:21 +08:00
committed by OpenIM-Robot
parent 5c192d05ee
commit 3e12bf3d49
22 changed files with 668 additions and 353 deletions
+3
View File
@@ -60,5 +60,8 @@ func (a *AuthRpcCmd) runE() error {
[]string{
a.authConfig.Share.RpcRegisterName.MessageGateway,
},
[]string{
a.authConfig.Discovery.RpcService.MessageGateway,
},
auth.Start)
}
+10 -2
View File
@@ -58,7 +58,15 @@ func (a *ConversationRpcCmd) Exec() error {
func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig,
nil,
a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig,
[]string{
a.conversationConfig.RpcConfig.GetConfigFileName(),
a.conversationConfig.RedisConfig.GetConfigFileName(),
a.conversationConfig.MongodbConfig.GetConfigFileName(),
a.conversationConfig.NotificationConfig.GetConfigFileName(),
a.conversationConfig.Share.GetConfigFileName(),
a.conversationConfig.LocalCacheConfig.GetConfigFileName(),
a.conversationConfig.Discovery.GetConfigFileName(),
}, nil,
conversation.Start)
}
+11 -2
View File
@@ -59,7 +59,16 @@ func (a *FriendRpcCmd) Exec() error {
func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig,
nil,
a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig,
[]string{
a.relationConfig.RpcConfig.GetConfigFileName(),
a.relationConfig.RedisConfig.GetConfigFileName(),
a.relationConfig.MongodbConfig.GetConfigFileName(),
a.relationConfig.NotificationConfig.GetConfigFileName(),
a.relationConfig.Share.GetConfigFileName(),
a.relationConfig.WebhooksConfig.GetConfigFileName(),
a.relationConfig.LocalCacheConfig.GetConfigFileName(),
a.relationConfig.Discovery.GetConfigFileName(),
}, nil,
relation.Start)
}
+11 -2
View File
@@ -60,7 +60,16 @@ func (a *GroupRpcCmd) Exec() error {
func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig,
nil,
a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig,
[]string{
a.groupConfig.RpcConfig.GetConfigFileName(),
a.groupConfig.RedisConfig.GetConfigFileName(),
a.groupConfig.MongodbConfig.GetConfigFileName(),
a.groupConfig.NotificationConfig.GetConfigFileName(),
a.groupConfig.Share.GetConfigFileName(),
a.groupConfig.WebhooksConfig.GetConfigFileName(),
a.groupConfig.LocalCacheConfig.GetConfigFileName(),
a.groupConfig.Discovery.GetConfigFileName(),
}, nil,
group.Start, versionctx.EnableVersionCtx())
}
+12 -2
View File
@@ -60,7 +60,17 @@ func (a *MsgRpcCmd) Exec() error {
func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig,
nil,
a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig,
[]string{
a.msgConfig.RpcConfig.GetConfigFileName(),
a.msgConfig.RedisConfig.GetConfigFileName(),
a.msgConfig.MongodbConfig.GetConfigFileName(),
a.msgConfig.KafkaConfig.GetConfigFileName(),
a.msgConfig.NotificationConfig.GetConfigFileName(),
a.msgConfig.Share.GetConfigFileName(),
a.msgConfig.WebhooksConfig.GetConfigFileName(),
a.msgConfig.LocalCacheConfig.GetConfigFileName(),
a.msgConfig.Discovery.GetConfigFileName(),
}, nil,
msg.Start)
}
+3
View File
@@ -64,5 +64,8 @@ func (a *PushRpcCmd) runE() error {
[]string{
a.pushConfig.Share.RpcRegisterName.MessageGateway,
},
[]string{
a.pushConfig.Discovery.RpcService.MessageGateway,
},
push.Start)
}
+81 -3
View File
@@ -87,6 +87,25 @@ func NewRootCmd(processName string, opts ...func(*CmdOpts)) *RootCmd {
return rootCmd
}
func (r *RootCmd) initEtcd() error {
configDirectory, _, err := r.getFlag(&r.Command)
if err != nil {
return err
}
disConfig := config.Discovery{}
env := runtimeenv.PrintRuntimeEnvironment()
err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename],
env, &disConfig)
if err != nil {
return err
}
if disConfig.Enable == config.ETCD {
discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil)
r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
}
return nil
}
func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error {
cmdOpts := r.applyOptions(opts...)
if err := r.initializeConfiguration(cmd, cmdOpts); err != nil {
@@ -96,7 +115,9 @@ func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) e
if err := r.initializeLogger(cmdOpts); err != nil {
return errs.WrapMsg(err, "failed to initialize logger")
}
if err := r.etcdClient.Close(); err != nil {
return errs.WrapMsg(err, "failed to close etcd client")
}
return nil
}
@@ -115,8 +136,65 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
}
}
// Load common log configuration file
return config.LoadConfig(filepath.Join(configDirectory, LogConfigFileName),
ConfigEnvPrefixMap[LogConfigFileName], &r.log)
return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], runtimeEnv, &r.log)
}
func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error {
if r.etcdClient == nil {
return nil
}
ctx := context.TODO()
res, err := r.etcdClient.Get(ctx, disetcd.BuildKey(disetcd.EnableConfigCenterKey))
if err != nil {
log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Get EnableConfigCenterKey err: %v", errs.Wrap(err))
return nil
}
if res.Count == 0 {
return nil
} else {
if string(res.Kvs[0].Value) == disetcd.Disable {
return nil
} else if string(res.Kvs[0].Value) != disetcd.Enable {
return errs.New("unknown EnableConfigCenter value").Wrap()
}
}
update := func(configFileName string, configStruct any) error {
key := disetcd.BuildKey(configFileName)
etcdRes, err := r.etcdClient.Get(ctx, key)
if err != nil {
log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Get err: %v", errs.Wrap(err))
return nil
}
if etcdRes.Count == 0 {
data, err := json.Marshal(configStruct)
if err != nil {
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
}
_, err = r.etcdClient.Put(ctx, disetcd.BuildKey(configFileName), string(data))
if err != nil {
log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Put err: %v", errs.Wrap(err))
}
return nil
}
err = json.Unmarshal(etcdRes.Kvs[0].Value, configStruct)
if err != nil {
return errs.WrapMsg(err, "failed to unmarshal config from etcd")
}
return nil
}
for configFileName, configStruct := range opts.configMap {
if err := update(configFileName, configStruct); err != nil {
return err
}
}
if err := update(config.LogConfigFileName, &r.log); err != nil {
return err
}
// Load common log configuration file
return nil
}
func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
+11 -2
View File
@@ -59,7 +59,16 @@ func (a *ThirdRpcCmd) Exec() error {
func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig,
nil,
a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig,
[]string{
a.thirdConfig.RpcConfig.GetConfigFileName(),
a.thirdConfig.RedisConfig.GetConfigFileName(),
a.thirdConfig.MongodbConfig.GetConfigFileName(),
a.thirdConfig.NotificationConfig.GetConfigFileName(),
a.thirdConfig.Share.GetConfigFileName(),
a.thirdConfig.MinioConfig.GetConfigFileName(),
a.thirdConfig.LocalCacheConfig.GetConfigFileName(),
a.thirdConfig.Discovery.GetConfigFileName(),
}, nil,
third.Start)
}
+12 -2
View File
@@ -60,7 +60,17 @@ func (a *UserRpcCmd) Exec() error {
func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig,
nil,
a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig,
[]string{
a.userConfig.RpcConfig.GetConfigFileName(),
a.userConfig.RedisConfig.GetConfigFileName(),
a.userConfig.MongodbConfig.GetConfigFileName(),
a.userConfig.KafkaConfig.GetConfigFileName(),
a.userConfig.NotificationConfig.GetConfigFileName(),
a.userConfig.Share.GetConfigFileName(),
a.userConfig.WebhooksConfig.GetConfigFileName(),
a.userConfig.LocalCacheConfig.GetConfigFileName(),
a.userConfig.Discovery.GetConfigFileName(),
}, nil,
user.Start)
}
+106
View File
@@ -0,0 +1,106 @@
package etcd
import (
"context"
"os"
"os/exec"
"runtime"
"sync"
"syscall"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
ShutDowns []func() error
)
func RegisterShutDown(shutDown ...func() error) {
ShutDowns = append(ShutDowns, shutDown...)
}
type ConfigManager struct {
client *clientv3.Client
watchConfigNames []string
lock sync.Mutex
}
func BuildKey(s string) string {
return ConfigKeyPrefix + s
}
func NewConfigManager(client *clientv3.Client, configNames []string) *ConfigManager {
return &ConfigManager{
client: client,
watchConfigNames: datautil.Batch(func(s string) string { return BuildKey(s) }, append(configNames, RestartKey))}
}
func (c *ConfigManager) Watch(ctx context.Context) {
chans := make([]clientv3.WatchChan, 0, len(c.watchConfigNames))
for _, name := range c.watchConfigNames {
chans = append(chans, c.client.Watch(ctx, name, clientv3.WithPrefix()))
}
doWatch := func(watchChan clientv3.WatchChan) {
for watchResp := range watchChan {
if watchResp.Err() != nil {
log.ZError(ctx, "watch err", errs.Wrap(watchResp.Err()))
continue
}
for _, event := range watchResp.Events {
if event.IsModify() {
if datautil.Contain(string(event.Kv.Key), c.watchConfigNames...) {
c.lock.Lock()
err := restartServer(ctx)
if err != nil {
log.ZError(ctx, "restart server err", err)
}
c.lock.Unlock()
}
}
}
}
}
for _, ch := range chans {
go doWatch(ch)
}
}
func restartServer(ctx context.Context) error {
exePath, err := os.Executable()
if err != nil {
return errs.New("get executable path fail").Wrap()
}
args := os.Args
env := os.Environ()
cmd := exec.Command(exePath, args[1:]...)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
if runtime.GOOS != "windows" {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
log.ZInfo(ctx, "shutdown server")
for _, f := range ShutDowns {
if err = f(); err != nil {
log.ZError(ctx, "shutdown fail", err)
}
}
log.ZInfo(ctx, "restart server")
err = cmd.Start()
if err != nil {
return errs.New("restart server fail").Wrap()
}
log.ZInfo(ctx, "cmd start over")
os.Exit(0)
return nil
}
+9
View File
@@ -0,0 +1,9 @@
package etcd
const (
ConfigKeyPrefix = "/open-im/config/"
RestartKey = "restart"
EnableConfigCenterKey = "enable-config-center"
Enable = "enable"
Disable = "disable"
)
@@ -25,7 +25,15 @@ import (
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
if runtimeEnv == config.KUBERNETES {
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(1024*1024*20),
),
)
}
switch discovery.Enable {
case "k8s":
return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
+3 -3
View File
@@ -45,8 +45,8 @@ import (
// Start rpc server.
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *conf.Share, config T,
watchServiceNames []string,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption) error {
@@ -85,7 +85,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
if autoSetPorts && discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap()
}
client, err := kdisc.NewDiscoveryRegister(discovery, share, watchServiceNames)
client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv, watchServiceNames)
if err != nil {
return err
}