This commit is contained in:
withchao
2025-02-08 17:31:06 +08:00
parent 48291faefb
commit 53316b8ad8
59 changed files with 1072 additions and 583 deletions
+25 -4
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
@@ -31,16 +32,36 @@ type ApiCmd struct {
}
func NewApiCmd() *ApiCmd {
var apiConfig api.Config
apiConfig := api.Config{AllConfig: &config.AllConfig{}}
ret := &ApiCmd{apiConfig: &apiConfig}
ret.configMap = map[string]any{
OpenIMAPICfgFileName: &apiConfig.API,
ShareFileName: &apiConfig.Share,
DiscoveryConfigFilename: &apiConfig.Discovery,
config.DiscoveryConfigFilename: &apiConfig.Discovery,
config.KafkaConfigFileName: &apiConfig.Kafka,
config.LocalCacheConfigFileName: &apiConfig.LocalCache,
config.LogConfigFileName: &apiConfig.Log,
config.MinioConfigFileName: &apiConfig.Minio,
config.MongodbConfigFileName: &apiConfig.Mongo,
config.NotificationFileName: &apiConfig.Notification,
config.OpenIMAPICfgFileName: &apiConfig.API,
config.OpenIMCronTaskCfgFileName: &apiConfig.CronTask,
config.OpenIMMsgGatewayCfgFileName: &apiConfig.MsgGateway,
config.OpenIMMsgTransferCfgFileName: &apiConfig.MsgTransfer,
config.OpenIMPushCfgFileName: &apiConfig.Push,
config.OpenIMRPCAuthCfgFileName: &apiConfig.Auth,
config.OpenIMRPCConversationCfgFileName: &apiConfig.Conversation,
config.OpenIMRPCFriendCfgFileName: &apiConfig.Friend,
config.OpenIMRPCGroupCfgFileName: &apiConfig.Group,
config.OpenIMRPCMsgCfgFileName: &apiConfig.Msg,
config.OpenIMRPCThirdCfgFileName: &apiConfig.Third,
config.OpenIMRPCUserCfgFileName: &apiConfig.User,
config.RedisConfigFileName: &apiConfig.Redis,
config.ShareFileName: &apiConfig.Share,
config.WebhooksConfigFileName: &apiConfig.Webhooks,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
apiConfig.ConfigPath = ret.configPath
return ret.runE()
}
return ret
+10 -6
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/auth"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -35,10 +36,10 @@ func NewAuthRpcCmd() *AuthRpcCmd {
var authConfig auth.Config
ret := &AuthRpcCmd{authConfig: &authConfig}
ret.configMap = map[string]any{
OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig,
RedisConfigFileName: &authConfig.RedisConfig,
ShareFileName: &authConfig.Share,
DiscoveryConfigFilename: &authConfig.Discovery,
config.OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig,
config.RedisConfigFileName: &authConfig.RedisConfig,
config.ShareFileName: &authConfig.Share,
config.DiscoveryConfigFilename: &authConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
@@ -56,9 +57,12 @@ func (a *AuthRpcCmd) Exec() error {
func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig,
a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig,
[]string{
a.authConfig.Share.RpcRegisterName.MessageGateway,
a.authConfig.RpcConfig.GetConfigFileName(),
a.authConfig.Share.GetConfigFileName(),
a.authConfig.RedisConfig.GetConfigFileName(),
a.authConfig.Discovery.GetConfigFileName(),
},
[]string{
a.authConfig.Discovery.RpcService.MessageGateway,
+8 -7
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/conversation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -35,13 +36,13 @@ func NewConversationRpcCmd() *ConversationRpcCmd {
var conversationConfig conversation.Config
ret := &ConversationRpcCmd{conversationConfig: &conversationConfig}
ret.configMap = map[string]any{
OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig,
RedisConfigFileName: &conversationConfig.RedisConfig,
MongodbConfigFileName: &conversationConfig.MongodbConfig,
ShareFileName: &conversationConfig.Share,
NotificationFileName: &conversationConfig.NotificationConfig,
LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig,
DiscoveryConfigFilename: &conversationConfig.Discovery,
config.OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig,
config.RedisConfigFileName: &conversationConfig.RedisConfig,
config.MongodbConfigFileName: &conversationConfig.MongodbConfig,
config.ShareFileName: &conversationConfig.Share,
config.NotificationFileName: &conversationConfig.NotificationConfig,
config.LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &conversationConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+4 -3
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
@@ -34,9 +35,9 @@ func NewCronTaskCmd() *CronTaskCmd {
var cronTaskConfig tools.CronTaskConfig
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
ret.configMap = map[string]any{
OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
ShareFileName: &cronTaskConfig.Share,
DiscoveryConfigFilename: &cronTaskConfig.Discovery,
config.OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
config.ShareFileName: &cronTaskConfig.Share,
config.DiscoveryConfigFilename: &cronTaskConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+9 -8
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -35,14 +36,14 @@ func NewFriendRpcCmd() *FriendRpcCmd {
var relationConfig relation.Config
ret := &FriendRpcCmd{relationConfig: &relationConfig}
ret.configMap = map[string]any{
OpenIMRPCFriendCfgFileName: &relationConfig.RpcConfig,
RedisConfigFileName: &relationConfig.RedisConfig,
MongodbConfigFileName: &relationConfig.MongodbConfig,
ShareFileName: &relationConfig.Share,
NotificationFileName: &relationConfig.NotificationConfig,
WebhooksConfigFileName: &relationConfig.WebhooksConfig,
LocalCacheConfigFileName: &relationConfig.LocalCacheConfig,
DiscoveryConfigFilename: &relationConfig.Discovery,
config.OpenIMRPCFriendCfgFileName: &relationConfig.RpcConfig,
config.RedisConfigFileName: &relationConfig.RedisConfig,
config.MongodbConfigFileName: &relationConfig.MongodbConfig,
config.ShareFileName: &relationConfig.Share,
config.NotificationFileName: &relationConfig.NotificationConfig,
config.WebhooksConfigFileName: &relationConfig.WebhooksConfig,
config.LocalCacheConfigFileName: &relationConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &relationConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+9 -8
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/group"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/version"
@@ -36,14 +37,14 @@ func NewGroupRpcCmd() *GroupRpcCmd {
var groupConfig group.Config
ret := &GroupRpcCmd{groupConfig: &groupConfig}
ret.configMap = map[string]any{
OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig,
RedisConfigFileName: &groupConfig.RedisConfig,
MongodbConfigFileName: &groupConfig.MongodbConfig,
ShareFileName: &groupConfig.Share,
NotificationFileName: &groupConfig.NotificationConfig,
WebhooksConfigFileName: &groupConfig.WebhooksConfig,
LocalCacheConfigFileName: &groupConfig.LocalCacheConfig,
DiscoveryConfigFilename: &groupConfig.Discovery,
config.OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig,
config.RedisConfigFileName: &groupConfig.RedisConfig,
config.MongodbConfigFileName: &groupConfig.MongodbConfig,
config.ShareFileName: &groupConfig.Share,
config.NotificationFileName: &groupConfig.NotificationConfig,
config.WebhooksConfigFileName: &groupConfig.WebhooksConfig,
config.LocalCacheConfigFileName: &groupConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &groupConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+10 -9
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -35,15 +36,15 @@ func NewMsgRpcCmd() *MsgRpcCmd {
var msgConfig msg.Config
ret := &MsgRpcCmd{msgConfig: &msgConfig}
ret.configMap = map[string]any{
OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig,
RedisConfigFileName: &msgConfig.RedisConfig,
MongodbConfigFileName: &msgConfig.MongodbConfig,
KafkaConfigFileName: &msgConfig.KafkaConfig,
ShareFileName: &msgConfig.Share,
NotificationFileName: &msgConfig.NotificationConfig,
WebhooksConfigFileName: &msgConfig.WebhooksConfig,
LocalCacheConfigFileName: &msgConfig.LocalCacheConfig,
DiscoveryConfigFilename: &msgConfig.Discovery,
config.OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig,
config.RedisConfigFileName: &msgConfig.RedisConfig,
config.MongodbConfigFileName: &msgConfig.MongodbConfig,
config.KafkaConfigFileName: &msgConfig.KafkaConfig,
config.ShareFileName: &msgConfig.Share,
config.NotificationFileName: &msgConfig.NotificationConfig,
config.WebhooksConfigFileName: &msgConfig.WebhooksConfig,
config.LocalCacheConfigFileName: &msgConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &msgConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+6 -5
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/msggateway"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -35,11 +36,11 @@ func NewMsgGatewayCmd() *MsgGatewayCmd {
var msgGatewayConfig msggateway.Config
ret := &MsgGatewayCmd{msgGatewayConfig: &msgGatewayConfig}
ret.configMap = map[string]any{
OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway,
ShareFileName: &msgGatewayConfig.Share,
RedisConfigFileName: &msgGatewayConfig.RedisConfig,
WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig,
DiscoveryConfigFilename: &msgGatewayConfig.Discovery,
config.OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway,
config.ShareFileName: &msgGatewayConfig.Share,
config.RedisConfigFileName: &msgGatewayConfig.RedisConfig,
config.WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig,
config.DiscoveryConfigFilename: &msgGatewayConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+8 -7
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
@@ -34,13 +35,13 @@ func NewMsgTransferCmd() *MsgTransferCmd {
var msgTransferConfig msgtransfer.Config
ret := &MsgTransferCmd{msgTransferConfig: &msgTransferConfig}
ret.configMap = map[string]any{
OpenIMMsgTransferCfgFileName: &msgTransferConfig.MsgTransfer,
RedisConfigFileName: &msgTransferConfig.RedisConfig,
MongodbConfigFileName: &msgTransferConfig.MongodbConfig,
KafkaConfigFileName: &msgTransferConfig.KafkaConfig,
ShareFileName: &msgTransferConfig.Share,
WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig,
DiscoveryConfigFilename: &msgTransferConfig.Discovery,
config.OpenIMMsgTransferCfgFileName: &msgTransferConfig.MsgTransfer,
config.RedisConfigFileName: &msgTransferConfig.RedisConfig,
config.MongodbConfigFileName: &msgTransferConfig.MongodbConfig,
config.KafkaConfigFileName: &msgTransferConfig.KafkaConfig,
config.ShareFileName: &msgTransferConfig.Share,
config.WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig,
config.DiscoveryConfigFilename: &msgTransferConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+3 -2
View File
@@ -15,6 +15,7 @@
package cmd
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
)
@@ -26,11 +27,11 @@ func (m *MsgUtilsCmd) AddUserIDFlag() {
m.Command.PersistentFlags().StringP("userID", "u", "", "openIM userID")
}
func (m *MsgUtilsCmd) AddIndexFlag() {
m.Command.PersistentFlags().IntP(FlagTransferIndex, "i", 0, "process startup sequence number")
m.Command.PersistentFlags().IntP(config.FlagTransferIndex, "i", 0, "process startup sequence number")
}
func (m *MsgUtilsCmd) AddConfigDirFlag() {
m.Command.PersistentFlags().StringP(FlagConf, "c", "", "path of config directory")
m.Command.PersistentFlags().StringP(config.FlagConf, "c", "", "path of config directory")
}
+18 -10
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/push"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -35,14 +36,14 @@ func NewPushRpcCmd() *PushRpcCmd {
var pushConfig push.Config
ret := &PushRpcCmd{pushConfig: &pushConfig}
ret.configMap = map[string]any{
OpenIMPushCfgFileName: &pushConfig.RpcConfig,
RedisConfigFileName: &pushConfig.RedisConfig,
KafkaConfigFileName: &pushConfig.KafkaConfig,
ShareFileName: &pushConfig.Share,
NotificationFileName: &pushConfig.NotificationConfig,
WebhooksConfigFileName: &pushConfig.WebhooksConfig,
LocalCacheConfigFileName: &pushConfig.LocalCacheConfig,
DiscoveryConfigFilename: &pushConfig.Discovery,
config.OpenIMPushCfgFileName: &pushConfig.RpcConfig,
config.RedisConfigFileName: &pushConfig.RedisConfig,
config.KafkaConfigFileName: &pushConfig.KafkaConfig,
config.ShareFileName: &pushConfig.Share,
config.NotificationFileName: &pushConfig.NotificationConfig,
config.WebhooksConfigFileName: &pushConfig.WebhooksConfig,
config.LocalCacheConfigFileName: &pushConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &pushConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
@@ -60,9 +61,16 @@ func (a *PushRpcCmd) Exec() error {
func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig,
a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig,
[]string{
a.pushConfig.Share.RpcRegisterName.MessageGateway,
a.pushConfig.RpcConfig.GetConfigFileName(),
a.pushConfig.RedisConfig.GetConfigFileName(),
a.pushConfig.KafkaConfig.GetConfigFileName(),
a.pushConfig.NotificationConfig.GetConfigFileName(),
a.pushConfig.Share.GetConfigFileName(),
a.pushConfig.WebhooksConfig.GetConfigFileName(),
a.pushConfig.LocalCacheConfig.GetConfigFileName(),
a.pushConfig.Discovery.GetConfigFileName(),
},
[]string{
a.pushConfig.Discovery.RpcService.MessageGateway,
+22 -22
View File
@@ -1,28 +1,20 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)
type RootCmd struct {
@@ -33,6 +25,7 @@ type RootCmd struct {
log config.Log
index int
configPath string
etcdClient *clientv3.Client
}
func (r *RootCmd) ConfigPath() string {
@@ -80,8 +73,8 @@ func NewRootCmd(processName string, opts ...func(*CmdOpts)) *RootCmd {
SilenceUsage: true,
SilenceErrors: false,
}
cmd.Flags().StringP(FlagConf, "c", "", "path of config directory")
cmd.Flags().IntP(FlagTransferIndex, "i", 0, "process startup sequence number")
cmd.Flags().StringP(config.FlagConf, "c", "", "path of config directory")
cmd.Flags().IntP(config.FlagTransferIndex, "i", 0, "process startup sequence number")
rootCmd.Command = cmd
return rootCmd
@@ -107,11 +100,16 @@ func (r *RootCmd) initEtcd() error {
}
func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error {
if err := r.initEtcd(); err != nil {
return err
}
cmdOpts := r.applyOptions(opts...)
if err := r.initializeConfiguration(cmd, cmdOpts); err != nil {
return err
}
if err := r.updateConfigFromEtcd(cmdOpts); err != nil {
return err
}
if err := r.initializeLogger(cmdOpts); err != nil {
return errs.WrapMsg(err, "failed to initialize logger")
}
@@ -126,11 +124,13 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
if err != nil {
return err
}
runtimeEnv := runtimeenv.PrintRuntimeEnvironment()
// Load common configuration file
//opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share}
for configFileName, configStruct := range opts.configMap {
err := config.LoadConfig(filepath.Join(configDirectory, configFileName),
ConfigEnvPrefixMap[configFileName], configStruct)
err := config.Load(configDirectory, configFileName, config.EnvPrefixMap[configFileName], runtimeEnv, configStruct)
if err != nil {
return err
}
@@ -235,12 +235,12 @@ func defaultCmdOpts() *CmdOpts {
}
func (r *RootCmd) getFlag(cmd *cobra.Command) (string, int, error) {
configDirectory, err := cmd.Flags().GetString(FlagConf)
configDirectory, err := cmd.Flags().GetString(config.FlagConf)
if err != nil {
return "", 0, errs.Wrap(err)
}
r.configPath = configDirectory
index, err := cmd.Flags().GetInt(FlagTransferIndex)
index, err := cmd.Flags().GetInt(config.FlagTransferIndex)
if err != nil {
return "", 0, errs.Wrap(err)
}
+9 -8
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/third"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -35,14 +36,14 @@ func NewThirdRpcCmd() *ThirdRpcCmd {
var thirdConfig third.Config
ret := &ThirdRpcCmd{thirdConfig: &thirdConfig}
ret.configMap = map[string]any{
OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig,
RedisConfigFileName: &thirdConfig.RedisConfig,
MongodbConfigFileName: &thirdConfig.MongodbConfig,
ShareFileName: &thirdConfig.Share,
NotificationFileName: &thirdConfig.NotificationConfig,
MinioConfigFileName: &thirdConfig.MinioConfig,
LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig,
DiscoveryConfigFilename: &thirdConfig.Discovery,
config.OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig,
config.RedisConfigFileName: &thirdConfig.RedisConfig,
config.MongodbConfigFileName: &thirdConfig.MongodbConfig,
config.ShareFileName: &thirdConfig.Share,
config.NotificationFileName: &thirdConfig.NotificationConfig,
config.MinioConfigFileName: &thirdConfig.MinioConfig,
config.LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &thirdConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+10 -9
View File
@@ -18,6 +18,7 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -35,15 +36,15 @@ func NewUserRpcCmd() *UserRpcCmd {
var userConfig user.Config
ret := &UserRpcCmd{userConfig: &userConfig}
ret.configMap = map[string]any{
OpenIMRPCUserCfgFileName: &userConfig.RpcConfig,
RedisConfigFileName: &userConfig.RedisConfig,
MongodbConfigFileName: &userConfig.MongodbConfig,
KafkaConfigFileName: &userConfig.KafkaConfig,
ShareFileName: &userConfig.Share,
NotificationFileName: &userConfig.NotificationConfig,
WebhooksConfigFileName: &userConfig.WebhooksConfig,
LocalCacheConfigFileName: &userConfig.LocalCacheConfig,
DiscoveryConfigFilename: &userConfig.Discovery,
config.OpenIMRPCUserCfgFileName: &userConfig.RpcConfig,
config.RedisConfigFileName: &userConfig.RedisConfig,
config.MongodbConfigFileName: &userConfig.MongodbConfig,
config.KafkaConfigFileName: &userConfig.KafkaConfig,
config.ShareFileName: &userConfig.Share,
config.NotificationFileName: &userConfig.NotificationConfig,
config.WebhooksConfigFileName: &userConfig.WebhooksConfig,
config.LocalCacheConfigFileName: &userConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &userConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
+208 -7
View File
@@ -380,10 +380,9 @@ type AfterConfig struct {
}
type Share struct {
Secret string `mapstructure:"secret"`
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
IMAdminUserID []string `mapstructure:"imAdminUserID"`
MultiLogin MultiLogin `mapstructure:"multiLogin"`
Secret string `mapstructure:"secret"`
IMAdminUserID []string `mapstructure:"imAdminUserID"`
MultiLogin MultiLogin `mapstructure:"multiLogin"`
}
type MultiLogin struct {
@@ -478,11 +477,41 @@ type ZooKeeper struct {
}
type Discovery struct {
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
ZooKeeper ZooKeeper `mapstructure:"zooKeeper"`
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
Kubernetes Kubernetes `mapstructure:"kubernetes"`
RpcService RpcService `mapstructure:"rpcService"`
}
type RpcService struct {
User string `mapstructure:"user"`
Friend string `mapstructure:"friend"`
Msg string `mapstructure:"msg"`
Push string `mapstructure:"push"`
MessageGateway string `mapstructure:"messageGateway"`
Group string `mapstructure:"group"`
Auth string `mapstructure:"auth"`
Conversation string `mapstructure:"conversation"`
Third string `mapstructure:"third"`
}
func (r *RpcService) GetServiceNames() []string {
return []string{
r.User,
r.Friend,
r.Msg,
r.Push,
r.MessageGateway,
r.Group,
r.Auth,
r.Conversation,
r.Third,
}
}
type Kubernetes struct {
Namespace string `yaml:"namespace"`
}
type Etcd struct {
RootDirectory string `mapstructure:"rootDirectory"`
Address []string `mapstructure:"address"`
@@ -717,3 +746,175 @@ func (s *Share) GetConfigFileName() string {
func (w *Webhooks) GetConfigFileName() string {
return WebhooksConfigFileName
}
func InitNotification(notification *Notification) {
notification.GroupCreated.UnreadCount = false
notification.GroupCreated.ReliabilityLevel = 1
notification.GroupInfoSet.UnreadCount = false
notification.GroupInfoSet.ReliabilityLevel = 1
notification.JoinGroupApplication.UnreadCount = false
notification.JoinGroupApplication.ReliabilityLevel = 1
notification.MemberQuit.UnreadCount = false
notification.MemberQuit.ReliabilityLevel = 1
notification.GroupApplicationAccepted.UnreadCount = false
notification.GroupApplicationAccepted.ReliabilityLevel = 1
notification.GroupApplicationRejected.UnreadCount = false
notification.GroupApplicationRejected.ReliabilityLevel = 1
notification.GroupOwnerTransferred.UnreadCount = false
notification.GroupOwnerTransferred.ReliabilityLevel = 1
notification.MemberKicked.UnreadCount = false
notification.MemberKicked.ReliabilityLevel = 1
notification.MemberInvited.UnreadCount = false
notification.MemberInvited.ReliabilityLevel = 1
notification.MemberEnter.UnreadCount = false
notification.MemberEnter.ReliabilityLevel = 1
notification.GroupDismissed.UnreadCount = false
notification.GroupDismissed.ReliabilityLevel = 1
notification.GroupMuted.UnreadCount = false
notification.GroupMuted.ReliabilityLevel = 1
notification.GroupCancelMuted.UnreadCount = false
notification.GroupCancelMuted.ReliabilityLevel = 1
notification.GroupMemberMuted.UnreadCount = false
notification.GroupMemberMuted.ReliabilityLevel = 1
notification.GroupMemberCancelMuted.UnreadCount = false
notification.GroupMemberCancelMuted.ReliabilityLevel = 1
notification.GroupMemberInfoSet.UnreadCount = false
notification.GroupMemberInfoSet.ReliabilityLevel = 1
notification.GroupMemberSetToAdmin.UnreadCount = false
notification.GroupMemberSetToAdmin.ReliabilityLevel = 1
notification.GroupMemberSetToOrdinary.UnreadCount = false
notification.GroupMemberSetToOrdinary.ReliabilityLevel = 1
notification.GroupInfoSetAnnouncement.UnreadCount = false
notification.GroupInfoSetAnnouncement.ReliabilityLevel = 1
notification.GroupInfoSetName.UnreadCount = false
notification.GroupInfoSetName.ReliabilityLevel = 1
notification.FriendApplicationAdded.UnreadCount = false
notification.FriendApplicationAdded.ReliabilityLevel = 1
notification.FriendApplicationApproved.UnreadCount = false
notification.FriendApplicationApproved.ReliabilityLevel = 1
notification.FriendApplicationRejected.UnreadCount = false
notification.FriendApplicationRejected.ReliabilityLevel = 1
notification.FriendAdded.UnreadCount = false
notification.FriendAdded.ReliabilityLevel = 1
notification.FriendDeleted.UnreadCount = false
notification.FriendDeleted.ReliabilityLevel = 1
notification.FriendRemarkSet.UnreadCount = false
notification.FriendRemarkSet.ReliabilityLevel = 1
notification.BlackAdded.UnreadCount = false
notification.BlackAdded.ReliabilityLevel = 1
notification.BlackDeleted.UnreadCount = false
notification.BlackDeleted.ReliabilityLevel = 1
notification.FriendInfoUpdated.UnreadCount = false
notification.FriendInfoUpdated.ReliabilityLevel = 1
notification.UserInfoUpdated.UnreadCount = false
notification.UserInfoUpdated.ReliabilityLevel = 1
notification.UserStatusChanged.UnreadCount = false
notification.UserStatusChanged.ReliabilityLevel = 1
notification.ConversationChanged.UnreadCount = false
notification.ConversationChanged.ReliabilityLevel = 1
notification.ConversationSetPrivate.UnreadCount = false
notification.ConversationSetPrivate.ReliabilityLevel = 1
}
type AllConfig struct {
Discovery Discovery
Kafka Kafka
LocalCache LocalCache
Log Log
Minio Minio
Mongo Mongo
Notification Notification
API API
CronTask CronTask
MsgGateway MsgGateway
MsgTransfer MsgTransfer
Push Push
Auth Auth
Conversation Conversation
Friend Friend
Group Group
Msg Msg
Third Third
User User
Redis Redis
Share Share
Webhooks Webhooks
}
func (a *AllConfig) Name2Config(name string) any {
switch name {
case a.Discovery.GetConfigFileName():
return a.Discovery
case a.Kafka.GetConfigFileName():
return a.Kafka
case a.LocalCache.GetConfigFileName():
return a.LocalCache
case a.Log.GetConfigFileName():
return a.Log
case a.Minio.GetConfigFileName():
return a.Minio
case a.Mongo.GetConfigFileName():
return a.Mongo
case a.Notification.GetConfigFileName():
return a.Notification
case a.API.GetConfigFileName():
return a.API
case a.CronTask.GetConfigFileName():
return a.CronTask
case a.MsgGateway.GetConfigFileName():
return a.MsgGateway
case a.MsgTransfer.GetConfigFileName():
return a.MsgTransfer
case a.Push.GetConfigFileName():
return a.Push
case a.Auth.GetConfigFileName():
return a.Auth
case a.Conversation.GetConfigFileName():
return a.Conversation
case a.Friend.GetConfigFileName():
return a.Friend
case a.Group.GetConfigFileName():
return a.Group
case a.Msg.GetConfigFileName():
return a.Msg
case a.Third.GetConfigFileName():
return a.Third
case a.User.GetConfigFileName():
return a.User
case a.Redis.GetConfigFileName():
return a.Redis
case a.Share.GetConfigFileName():
return a.Share
case a.Webhooks.GetConfigFileName():
return a.Webhooks
default:
return nil
}
}
func (a *AllConfig) GetConfigNames() []string {
return []string{
a.Discovery.GetConfigFileName(),
a.Kafka.GetConfigFileName(),
a.LocalCache.GetConfigFileName(),
a.Log.GetConfigFileName(),
a.Minio.GetConfigFileName(),
a.Mongo.GetConfigFileName(),
a.Notification.GetConfigFileName(),
a.API.GetConfigFileName(),
a.CronTask.GetConfigFileName(),
a.MsgGateway.GetConfigFileName(),
a.MsgTransfer.GetConfigFileName(),
a.Push.GetConfigFileName(),
a.Auth.GetConfigFileName(),
a.Conversation.GetConfigFileName(),
a.Friend.GetConfigFileName(),
a.Group.GetConfigFileName(),
a.Msg.GetConfigFileName(),
a.Third.GetConfigFileName(),
a.User.GetConfigFileName(),
a.Redis.GetConfigFileName(),
a.Share.GetConfigFileName(),
a.Webhooks.GetConfigFileName(),
}
}
+6 -2
View File
@@ -14,9 +14,13 @@
package config
const ConfKey = "conf"
const (
ConfKey = "conf"
ETCD = "etcd"
MountConfigFilePath = "CONFIG_PATH"
DeploymentType = "DEPLOYMENT_TYPE"
KUBERNETES = "kubernetes"
ETCD = "etcd"
)
const (
+30
View File
@@ -0,0 +1,30 @@
package config
import "strings"
var EnvPrefixMap map[string]string
func init() {
EnvPrefixMap = make(map[string]string)
fileNames := []string{
FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName,
KafkaConfigFileName, RedisConfigFileName,
MongodbConfigFileName, MinioConfigFileName, LogConfigFileName,
OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName,
OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName,
OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName,
OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename,
}
for _, fileName := range fileNames {
envKey := strings.TrimSuffix(strings.TrimSuffix(fileName, ".yml"), ".yaml")
envKey = "IMENV_" + envKey
envKey = strings.ToUpper(strings.ReplaceAll(envKey, "-", "_"))
EnvPrefixMap[fileName] = envKey
}
}
const (
FlagConf = "config_folder_path"
FlagTransferIndex = "index"
)
+18 -2
View File
@@ -1,13 +1,29 @@
package config
import (
"os"
"path/filepath"
"strings"
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/tools/errs"
"github.com/spf13/viper"
"strings"
)
func LoadConfig(path string, envPrefix string, config any) error {
func Load(configDirectory string, configFileName string, envPrefix string, runtimeEnv string, config any) error {
if runtimeEnv == KUBERNETES {
mountPath := os.Getenv(MountConfigFilePath)
if mountPath == "" {
return errs.ErrArgs.WrapMsg(MountConfigFilePath + " env is empty")
}
return loadConfig(filepath.Join(mountPath, configFileName), envPrefix, config)
}
return loadConfig(filepath.Join(configDirectory, configFileName), envPrefix, config)
}
func loadConfig(path string, envPrefix string, config any) error {
v := viper.New()
v.SetConfigFile(path)
v.SetEnvPrefix(envPrefix)
@@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package direct // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct"
package direct // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/direct"
@@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package discoveryregister
package discovery
import (
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/tools/discovery"
"google.golang.org/grpc"
"github.com/openimsdk/tools/discovery/kubernetes"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
)
@@ -35,9 +38,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchN
}
switch discovery.Enable {
case "k8s":
return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
case "etcd":
case config.ETCD:
return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package discoveryregister
package discovery
import (
"os"
@@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package discoveryregister // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
package discovery // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
@@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd"
package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/kubernetes"
@@ -0,0 +1,270 @@
package kubernetes
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
type KubernetesConnManager struct {
clientset *kubernetes.Clientset
namespace string
dialOptions []grpc.DialOption
rpcTargets map[string]string
selfTarget string
mu sync.RWMutex
connMap map[string][]*grpc.ClientConn
}
// NewKubernetesConnManager creates a new connection manager that uses Kubernetes services for service discovery.
func NewKubernetesConnManager(namespace string, options ...grpc.DialOption) (*KubernetesConnManager, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to create in-cluster config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create clientset: %v", err)
}
k := &KubernetesConnManager{
clientset: clientset,
namespace: namespace,
dialOptions: options,
connMap: make(map[string][]*grpc.ClientConn),
}
go k.watchEndpoints()
return k, nil
}
func (k *KubernetesConnManager) initializeConns(serviceName string) error {
port, err := k.getServicePort(serviceName)
if err != nil {
return err
}
endpoints, err := k.clientset.CoreV1().Endpoints(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err)
}
// fmt.Println("Endpoints:", endpoints, "endpoints.Subsets:", endpoints.Subsets)
var conns []*grpc.ClientConn
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
target := fmt.Sprintf("%s:%d", address.IP, port)
// fmt.Println("IP target:", target)
conn, err := grpc.Dial(target, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
if err != nil {
return fmt.Errorf("failed to dial endpoint %s: %v", target, err)
}
conns = append(conns, conn)
}
}
k.mu.Lock()
k.connMap[serviceName] = conns
k.mu.Unlock()
return nil
}
// GetConns returns gRPC client connections for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
k.mu.RLock()
conns, exists := k.connMap[serviceName]
k.mu.RUnlock()
if exists {
return conns, nil
}
k.mu.Lock()
// Check if another goroutine has already initialized the connections when we released the read lock
conns, exists = k.connMap[serviceName]
if exists {
return conns, nil
}
k.mu.Unlock()
if err := k.initializeConns(serviceName); err != nil {
fmt.Println("Failed to initialize connections:", err)
return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err)
}
return k.connMap[serviceName], nil
}
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
var target string
if k.rpcTargets[serviceName] == "" {
var err error
svcPort, err := k.getServicePort(serviceName)
if err != nil {
return nil, err
}
target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, svcPort)
// fmt.Println("SVC target:", target)
} else {
target = k.rpcTargets[serviceName]
}
return grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*10), grpc.MaxCallSendMsgSize(1024*1024*20)),
}, k.dialOptions...)...,
)
}
// GetSelfConnTarget returns the connection target for the current service.
func (k *KubernetesConnManager) GetSelfConnTarget() string {
if k.selfTarget == "" {
hostName := os.Getenv("HOSTNAME")
pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(context.Background(), hostName, metav1.GetOptions{})
if err != nil {
log.Printf("failed to get pod %s: %v \n", hostName, err)
}
for pod.Status.PodIP == "" {
pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{})
if err != nil {
log.Printf("Error getting pod: %v \n", err)
}
time.Sleep(3 * time.Second)
}
var selfPort int32
for _, port := range pod.Spec.Containers[0].Ports {
if port.ContainerPort != 10001 {
selfPort = port.ContainerPort
break
}
}
k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort)
}
return k.selfTarget
}
// AddOption appends gRPC dial options to the existing options.
func (k *KubernetesConnManager) AddOption(opts ...grpc.DialOption) {
k.mu.Lock()
defer k.mu.Unlock()
k.dialOptions = append(k.dialOptions, opts...)
}
// CloseConn closes a given gRPC client connection.
func (k *KubernetesConnManager) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// Close closes all gRPC connections managed by KubernetesConnManager.
func (k *KubernetesConnManager) Close() {
k.mu.Lock()
defer k.mu.Unlock()
for _, conns := range k.connMap {
for _, conn := range conns {
_ = conn.Close()
}
}
k.connMap = make(map[string][]*grpc.ClientConn)
}
func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
return nil
}
func (k *KubernetesConnManager) UnRegister() error {
return nil
}
func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}
func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
var svcPort int32
svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
if err != nil {
fmt.Print("namespace:", k.namespace)
return 0, fmt.Errorf("failed to get service %s: %v", serviceName, err)
}
if len(svc.Spec.Ports) == 0 {
return 0, fmt.Errorf("service %s has no ports defined", serviceName)
}
for _, port := range svc.Spec.Ports {
// fmt.Println(serviceName, " Now Get Port:", port.Port)
if port.Port != 10001 {
svcPort = port.Port
break
}
}
return svcPort, nil
}
// watchEndpoints listens for changes in Pod resources.
func (k *KubernetesConnManager) watchEndpoints() {
informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute*10)
informer := informerFactory.Core().V1().Pods().Informer()
// Watch for Pod changes (add, update, delete)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
k.handleEndpointChange(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
k.handleEndpointChange(newObj)
},
DeleteFunc: func(obj interface{}) {
k.handleEndpointChange(obj)
},
})
informerFactory.Start(context.Background().Done())
<-context.Background().Done() // Block forever
}
func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) {
endpoint, ok := obj.(*v1.Endpoints)
if !ok {
return
}
serviceName := endpoint.Name
if err := k.initializeConns(serviceName); err != nil {
fmt.Printf("Error initializing connections for %s: %v\n", serviceName, err)
}
}
@@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
@@ -1,199 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log"
"github.com/stathat/consistent"
"google.golang.org/grpc"
)
// K8sDR represents the Kubernetes service discovery and registration client.
type K8sDR struct {
options []grpc.DialOption
rpcRegisterAddr string
gatewayHostConsistent *consistent.Consistent
gatewayName string
}
func NewK8sDiscoveryRegister(gatewayName string) (discovery.SvcDiscoveryRegistry, error) {
gatewayConsistent := consistent.New()
gatewayHosts := getMsgGatewayHost(context.Background(), gatewayName)
for _, v := range gatewayHosts {
gatewayConsistent.Add(v)
}
return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil
}
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
if serviceName != cli.gatewayName {
cli.rpcRegisterAddr = serviceName
} else {
cli.rpcRegisterAddr = getSelfHost(context.Background(), cli.gatewayName)
}
return nil
}
func (cli *K8sDR) UnRegister() error {
return nil
}
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
host, err := cli.gatewayHostConsistent.Get(userId)
if err != nil {
log.ZError(ctx, "GetUserIdHashGatewayHost error", err)
}
return host, err
}
func getSelfHost(ctx context.Context, gatewayName string) string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
ns := os.Getenv("MY_POD_NAMESPACE")
statefuleIndex := 0
gatewayEnds := strings.Split(gatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
count := len(podInfo)
statefuleIndex, _ = strconv.Atoi(podInfo[count-1])
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, statefuleIndex, instance, ns, port)
return host
}
// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88.
// Replica set in kubernetes environment
func getMsgGatewayHost(ctx context.Context, gatewayName string) []string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
replicas := os.Getenv("MY_MSGGATEWAY_REPLICACOUNT")
ns := os.Getenv("MY_POD_NAMESPACE")
gatewayEnds := strings.Split(gatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
nReplicas, _ := strconv.Atoi(replicas)
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
var ret []string
for i := 0; i < nReplicas; i++ {
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, i, instance, ns, port)
ret = append(ret, host)
}
log.ZDebug(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret)
return ret
}
// GetConns returns the gRPC client connections to the specified service.
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
// This conditional checks if the serviceName is not the OpenImMessageGatewayName.
// It seems to handle a special case for the OpenImMessageGateway.
if serviceName != cli.gatewayName {
// DialContext creates a client connection to the given target (serviceName) using the specified context.
// 'cli.options' are likely default or common options for all connections in this struct.
// 'opts...' allows for additional gRPC dial options to be passed and used.
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
// The function returns a slice of client connections with the new connection, or an error if occurred.
return []*grpc.ClientConn{conn}, err
} else {
// This block is executed if the serviceName is OpenImMessageGatewayName.
// 'ret' will accumulate the connections to return.
var ret []*grpc.ClientConn
// getMsgGatewayHost presumably retrieves hosts for the message gateway service.
// The context is passed, likely for cancellation and timeout control.
gatewayHosts := getMsgGatewayHost(ctx, cli.gatewayName)
// Iterating over the retrieved gateway hosts.
for _, host := range gatewayHosts {
// Establishes a connection to each host.
// Again, appending cli.options with any additional opts provided.
conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...)
// If there's an error while dialing any host, the function returns immediately with the error.
if err != nil {
return nil, err
} else {
// If the connection is successful, it is added to the 'ret' slice.
ret = append(ret, conn)
}
}
// After all hosts are processed, the slice of connections is returned.
return ret, nil
}
}
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
}
func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr
}
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...)
}
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// do not use this method for call rpc.
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
log.ZError(context.Background(), "should not call this function!", nil)
return nil
}
func (cli *K8sDR) Close() {
}
@@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zookeeper // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
+6 -6
View File
@@ -43,25 +43,25 @@ func GetGrpcServerMetrics() *gp.ServerMetrics {
return grpcMetrics
}
func GetGrpcCusMetrics(registerName string, share *config.Share) []prometheus.Collector {
func GetGrpcCusMetrics(registerName string, discovery *config.Discovery) []prometheus.Collector {
switch registerName {
case share.RpcRegisterName.MessageGateway:
case discovery.RpcService.MessageGateway:
return []prometheus.Collector{OnlineUserGauge}
case share.RpcRegisterName.Msg:
case discovery.RpcService.Msg:
return []prometheus.Collector{
SingleChatMsgProcessSuccessCounter,
SingleChatMsgProcessFailedCounter,
GroupChatMsgProcessSuccessCounter,
GroupChatMsgProcessFailedCounter,
}
case share.RpcRegisterName.Push:
case discovery.RpcService.Push:
return []prometheus.Collector{
MsgOfflinePushFailedCounter,
MsgLoneTimePushCounter,
}
case share.RpcRegisterName.Auth:
case discovery.RpcService.Auth:
return []prometheus.Collector{UserLoginCounter}
case share.RpcRegisterName.User:
case discovery.RpcService.User:
return []prometheus.Collector{UserRegisterCounter}
default:
return nil
+17 -2
View File
@@ -27,12 +27,15 @@ import (
"time"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"google.golang.org/grpc/status"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/tools/utils/runtimeenv"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
@@ -50,6 +53,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption) error {
watchConfigNames = append(watchConfigNames, conf.LogConfigFileName)
var (
rpcTcpAddr string
netDone = make(chan struct{}, 2)
@@ -57,11 +61,17 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
prometheusPort int
)
if notification != nil {
conf.InitNotification(notification)
}
registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil {
return err
}
runTimeEnv := runtimeenv.PrintRuntimeEnvironment()
if !autoSetPorts {
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
if err != nil {
@@ -132,7 +142,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
}
}
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery)
go func() {
if err := prommetrics.RpcInit(cs, listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
@@ -184,6 +194,11 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
}
}()
if discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), watchConfigNames)
cm.Watch(ctx)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {