fix: add autoPort && prometheus port discovery

This commit is contained in:
icey-yu
2024-12-10 11:43:19 +08:00
parent 7912ac0623
commit 1e68f99d11
38 changed files with 670 additions and 253 deletions
+75 -16
View File
@@ -18,11 +18,17 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/network"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
@@ -30,8 +36,9 @@ import (
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@@ -53,16 +60,17 @@ type MsgTransfer struct {
}
type Config struct {
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
Share config.Share
WebhooksConfig config.Webhooks
Discovery config.Discovery
MsgTransfer conf.MsgTransfer
RedisConfig conf.Redis
MongodbConfig conf.Mongo
KafkaConfig conf.Kafka
Share conf.Share
WebhooksConfig conf.Webhooks
Discovery conf.Discovery
}
func Start(ctx context.Context, index int, config *Config) error {
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts",
config.MsgTransfer.Prometheus.Ports, "index", index)
@@ -116,7 +124,7 @@ func Start(ctx context.Context, index int, config *Config) error {
return msgTransfer.Start(index, config)
}
func (m *MsgTransfer) Start(index int, config *Config) error {
func (m *MsgTransfer) Start(index int, cfg *Config) error {
m.ctx, m.cancel = context.WithCancel(context.Background())
var (
netDone = make(chan struct{}, 1)
@@ -131,16 +139,67 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
return err
}
if config.MsgTransfer.Prometheus.Enable {
go func() {
prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index)
client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, &cfg.Share, nil)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
registerIP, err := network.GetRpcRegisterIP("")
if err != nil {
return err
}
getAutoPort := func() (net.Listener, int, error) {
registerAddr := net.JoinHostPort(registerIP, "0")
listener, err := net.Listen("tcp", registerAddr)
if err != nil {
return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr)
}
_, portStr, _ := net.SplitHostPort(listener.Addr().String())
port, _ := strconv.Atoi(portStr)
return listener, port, nil
}
if cfg.MsgTransfer.Prometheus.AutoSetPorts && cfg.Discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap()
}
if cfg.MsgTransfer.Prometheus.Enable {
var (
listener net.Listener
prometheusPort int
)
if cfg.MsgTransfer.Prometheus.AutoSetPorts {
listener, prometheusPort, err = getAutoPort()
if err != nil {
netErr = err
netDone <- struct{}{}
return
return err
}
if err := prommetrics.TransferInit(prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) {
etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
_, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)))
if err != nil {
return errs.WrapMsg(err, "etcd put err")
}
} else {
prometheusPort, err = datautil.GetElemByIndex(cfg.MsgTransfer.Prometheus.Ports, index)
if err != nil {
return err
}
listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort))
if err != nil {
return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort))
}
}
go func() {
defer func() {
if r := recover(); r != nil {
log.ZPanic(m.ctx, "MsgTransfer Start Panic", errs.ErrPanic(r))
}
}()
if err := prommetrics.TransferInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort)
netDone <- struct{}{}
}