* fix: minor log typo

* fix: #2895 no need to specify listen port in config file, just use system random

* drop useless code
This commit is contained in:
Morya
2024-12-04 11:18:39 +08:00
committed by GitHub
parent 14477321fd
commit a69d174e86
22 changed files with 198 additions and 67 deletions
-1
View File
@@ -55,6 +55,5 @@ func (a *AuthRpcCmd) Exec() error {
func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start)
}
-1
View File
@@ -57,6 +57,5 @@ func (a *ConversationRpcCmd) Exec() error {
func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start)
}
-1
View File
@@ -58,6 +58,5 @@ func (a *FriendRpcCmd) Exec() error {
func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start)
}
-1
View File
@@ -59,6 +59,5 @@ func (a *GroupRpcCmd) Exec() error {
func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx())
}
-1
View File
@@ -59,6 +59,5 @@ func (a *MsgRpcCmd) Exec() error {
func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start)
}
-1
View File
@@ -59,6 +59,5 @@ func (a *PushRpcCmd) Exec() error {
func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start)
}
-1
View File
@@ -58,6 +58,5 @@ func (a *ThirdRpcCmd) Exec() error {
func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start)
}
-1
View File
@@ -59,6 +59,5 @@ func (a *UserRpcCmd) Exec() error {
func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start)
}
-1
View File
@@ -197,7 +197,6 @@ type Push struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"`
+25 -28
View File
@@ -24,10 +24,10 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/internal/tools/addr"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
@@ -37,22 +37,15 @@ import (
"github.com/openimsdk/tools/utils/network"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"strconv"
)
// Start rpc server.
func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP,
registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context,
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
if err != nil {
return err
}
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort,
"prometheusPorts", prometheusConfig.Ports)
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort))
func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP string,
index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context,
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), "0")
listener, err := net.Listen(
"tcp",
rpcTcpAddr,
@@ -60,6 +53,14 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
if err != nil {
return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
}
h, portStr, _ := net.SplitHostPort(listener.Addr().String())
host, _ := addr.Extract(h)
port, _ := strconv.Atoi(portStr)
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr,
"prometheusPorts", prometheusConfig.Ports)
defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(discovery, share)
if err != nil {
@@ -68,17 +69,13 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
registerIP, err = network.GetRpcRegisterIP(registerIP)
if err != nil {
return err
}
//var reg *prometheus.Registry
//var metric *grpcprometheus.ServerMetrics
// var reg *prometheus.Registry
// var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {
//cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
//reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
//options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
// grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
options = append(
options, mw.GrpcServer(),
@@ -98,8 +95,8 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
err = client.Register(
rpcRegisterName,
registerIP,
rpcPort,
host,
port,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
@@ -123,13 +120,13 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
netDone <- struct{}{}
}
//metric.InitializeMetrics(srv)
// metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
//httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
//if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
// netDone <- struct{}{}
//}
// }
}()
}