feat: Prometheus can auto set port (#2943)

* feat: config

* feat: prometheus auto set port
This commit is contained in:
icey-yu
2024-12-10 11:43:19 +08:00
committed by GitHub
parent 54be837765
commit 6a7ae690ee
29 changed files with 521 additions and 164 deletions
+10 -6
View File
@@ -18,11 +18,10 @@ import (
"strings"
"time"
"github.com/openimsdk/tools/s3/aws"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/s3/aws"
"github.com/openimsdk/tools/s3/cos"
"github.com/openimsdk/tools/s3/kodo"
"github.com/openimsdk/tools/s3/minio"
@@ -108,9 +107,10 @@ type API struct {
CompressionLevel int `mapstructure:"compressionLevel"`
} `mapstructure:"api"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
GrafanaURL string `mapstructure:"grafanaURL"`
Enable bool `mapstructure:"enable"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
GrafanaURL string `mapstructure:"grafanaURL"`
} `mapstructure:"prometheus"`
}
@@ -193,7 +193,11 @@ type MsgGateway struct {
}
type MsgTransfer struct {
Prometheus Prometheus `mapstructure:"prometheus"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
}
type Push struct {
+8
View File
@@ -83,3 +83,11 @@ func TestLoadOpenIMThirdConfig(t *testing.T) {
// Environment: IMENV_OPENIM_RPC_THIRD_OBJECT_ENABLE=enabled;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ENDPOINT=https://oss-cn-chengdu.aliyuncs.com;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_BUCKET=my_bucket_name;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_BUCKETURL=https://my_bucket_name.oss-cn-chengdu.aliyuncs.com;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ACCESSKEYID=AKID1234567890;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ACCESSKEYSECRET=abc123xyz789;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_SESSIONTOKEN=session_token_value;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_PUBLICREAD=true
}
func TestTransferConfig(t *testing.T) {
var tran MsgTransfer
err := LoadConfig("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", &tran)
assert.Nil(t, err)
assert.Equal(t, true, tran.Prometheus.Enable)
assert.Equal(t, true, tran.Prometheus.AutoSetPorts)
}
@@ -26,6 +26,10 @@ import (
"github.com/openimsdk/tools/errs"
)
const (
Etcd = "etcd"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) {
if runtimeEnv == "kubernetes" {
@@ -35,7 +39,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (disco
switch discovery.Enable {
case "kubernetes":
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace)
case "etcd":
case Etcd:
return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,
+3 -2
View File
@@ -3,6 +3,7 @@ package prommetrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
"strconv"
)
@@ -23,14 +24,14 @@ var (
)
)
func ApiInit(prometheusPort int) error {
func ApiInit(listener net.Listener) error {
apiRegistry := prometheus.NewRegistry()
cs := append(
baseCollector,
apiCounter,
httpCounter,
)
return Init(apiRegistry, prometheusPort, commonPath, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), cs...)
return Init(apiRegistry, listener, commonPath, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), cs...)
}
func APICall(path string, method string, apiCode int) {
+31
View File
@@ -0,0 +1,31 @@
package prommetrics
import "fmt"
const (
APIKeyName = "api"
MessageTransferKeyName = "message-transfer"
)
type Target struct {
Target string `json:"target"`
Labels map[string]string `json:"labels"`
}
type RespTarget struct {
Targets []string `json:"targets"`
Labels map[string]string `json:"labels"`
}
func BuildDiscoveryKey(name string) string {
return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name)
}
func BuildDefaultTarget(host string, ip int) Target {
return Target{
Target: fmt.Sprintf("%s:%d", host, ip),
Labels: map[string]string{
"namespace": "default",
},
}
}
+3 -3
View File
@@ -15,9 +15,9 @@
package prommetrics
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"net"
"net/http"
)
@@ -30,9 +30,9 @@ var (
}
)
func Init(registry *prometheus.Registry, prometheusPort int, path string, handler http.Handler, cs ...prometheus.Collector) error {
func Init(registry *prometheus.Registry, listener net.Listener, path string, handler http.Handler, cs ...prometheus.Collector) error {
registry.MustRegister(cs...)
srv := http.NewServeMux()
srv.Handle(path, handler)
return http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv)
return http.Serve(listener, srv)
}
+3 -2
View File
@@ -5,6 +5,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
"strconv"
)
@@ -21,13 +22,13 @@ var (
)
)
func RpcInit(cs []prometheus.Collector, prometheusPort int) error {
func RpcInit(cs []prometheus.Collector, listener net.Listener) error {
reg := prometheus.NewRegistry()
cs = append(append(
baseCollector,
rpcCounter,
), cs...)
return Init(reg, prometheusPort, rpcPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...)
return Init(reg, listener, rpcPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...)
}
func RPCCall(name string, path string, code int) {
+3 -2
View File
@@ -17,6 +17,7 @@ package prommetrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
)
var (
@@ -42,7 +43,7 @@ var (
})
)
func TransferInit(prometheusPort int) error {
func TransferInit(listener net.Listener) error {
reg := prometheus.NewRegistry()
cs := append(
baseCollector,
@@ -52,5 +53,5 @@ func TransferInit(prometheusPort int) error {
MsgInsertMongoFailedCounter,
SeqSetFailedCounter,
)
return Init(reg, prometheusPort, commonPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...)
return Init(reg, listener, commonPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...)
}
+66 -29
View File
@@ -22,15 +22,17 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/openimsdk/tools/utils/jsonutil"
"google.golang.org/grpc/status"
"strconv"
"github.com/openimsdk/tools/utils/runtimeenv"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@@ -49,11 +51,17 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
var (
rpcTcpAddr string
netDone = make(chan struct{}, 2)
netErr error
rpcTcpAddr string
netDone = make(chan struct{}, 2)
netErr error
prometheusPort int
)
registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil {
return err
}
runTimeEnv := runtimeenv.PrintRuntimeEnvironment()
if !autoSetPorts {
@@ -66,6 +74,27 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0")
}
getAutoPort := func() (net.Listener, int, error) {
listener, err := net.Listen("tcp", rpcTcpAddr)
if err != nil {
return nil, 0, errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
}
_, portStr, _ := net.SplitHostPort(listener.Addr().String())
port, _ := strconv.Atoi(portStr)
return listener, port, nil
}
if autoSetPorts && discovery.Enable != kdisc.Etcd {
return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap()
}
client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv)
if err != nil {
return err
}
defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
// var reg *prometheus.Registry
// var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {
@@ -78,17 +107,40 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
prommetricsUnaryInterceptor(rpcRegisterName),
prommetricsStreamInterceptor(rpcRegisterName),
)
prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index)
if err != nil {
return err
var (
listener net.Listener
)
if autoSetPorts {
listener, prometheusPort, err = getAutoPort()
if err != nil {
return err
}
etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
_, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)))
if err != nil {
return errs.WrapMsg(err, "etcd put err")
}
} else {
prometheusPort, err = datautil.GetElemByIndex(prometheusConfig.Ports, index)
if err != nil {
return err
}
listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort))
if err != nil {
return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
}
}
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery)
go func() {
if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) {
if err := prommetrics.RpcInit(cs, listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
netDone <- struct{}{}
}
// metric.InitializeMetrics(srv)
//metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
@@ -100,30 +152,15 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
options = append(options, mw.GrpcServer())
}
listener, err := net.Listen(
"tcp",
rpcTcpAddr,
)
if err != nil {
return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
}
_, portStr, _ := net.SplitHostPort(listener.Addr().String())
registerIP = network.GetListenIP(registerIP)
port, _ := strconv.Atoi(portStr)
log.CInfo(ctx, "RPC server is initializing", "runTimeEnv", runTimeEnv, "rpcRegisterName", rpcRegisterName, "rpcPort", portStr,
"prometheusPorts", prometheusConfig.Ports)
defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv)
listener, port, err := getAutoPort()
if err != nil {
return err
}
defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", port,
"prometheusPort", prometheusPort)
defer listener.Close()
srv := grpc.NewServer(options...)
err = rpcFn(ctx, config, client, srv)