​Build: Implement rate limiting and circuit breaker for API and RPC services.​​ (#3572)

* feat: implement ratelimit and circuitbreaker in middleware.

* ​Build: Implement rate limiting and circuit breaker for API and RPC services.​​

* revert change.

* update ratelimiter and circuitbreaker config.

* update tools to openimsdk tools
This commit is contained in:
Monet Lee
2025-11-05 15:12:06 +08:00
committed by GitHub
parent 3a1c8df3b9
commit 07badb162f
31 changed files with 574 additions and 24 deletions
+3
View File
@@ -81,6 +81,9 @@ func (a *ApiCmd) runE() error {
}
return startrpc.Start(
a.ctx, &a.apiConfig.Discovery,
nil,
nil,
// &a.apiConfig.API.RateLimiter,
&prometheus,
a.apiConfig.API.Api.ListenIP, "",
a.apiConfig.API.Prometheus.AutoSetPorts,
+1 -1
View File
@@ -57,7 +57,7 @@ func (a *AuthRpcCmd) Exec() error {
}
func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.CircuitBreaker, &a.authConfig.RpcConfig.RateLimiter, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig,
[]string{
+1 -1
View File
@@ -58,7 +58,7 @@ func (a *ConversationRpcCmd) Exec() error {
}
func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.CircuitBreaker, &a.conversationConfig.RpcConfig.RateLimiter, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig,
[]string{
+2
View File
@@ -56,6 +56,8 @@ func (a *CronTaskCmd) runE() error {
var prometheus config.Prometheus
return startrpc.Start(
a.ctx, &a.cronTaskConfig.Discovery,
nil,
nil,
&prometheus,
"", "",
true,
+1 -1
View File
@@ -58,7 +58,7 @@ func (a *FriendRpcCmd) Exec() error {
}
func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.CircuitBreaker, &a.relationConfig.RpcConfig.RateLimiter, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig,
[]string{
+1 -1
View File
@@ -59,7 +59,7 @@ func (a *GroupRpcCmd) Exec() error {
}
func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.CircuitBreaker, &a.groupConfig.RpcConfig.RateLimiter, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig,
[]string{
+1 -1
View File
@@ -59,7 +59,7 @@ func (a *MsgRpcCmd) Exec() error {
}
func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.CircuitBreaker, &a.msgConfig.RpcConfig.RateLimiter, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig,
[]string{
+2
View File
@@ -61,6 +61,8 @@ func (m *MsgGatewayCmd) runE() error {
var prometheus config.Prometheus
return startrpc.Start(
m.ctx, &m.msgGatewayConfig.Discovery,
&m.msgGatewayConfig.MsgGateway.CircuitBreaker,
&m.msgGatewayConfig.MsgGateway.RateLimiter,
&prometheus,
rpc.ListenIP, rpc.RegisterIP,
rpc.AutoSetPorts,
+2
View File
@@ -62,6 +62,8 @@ func (m *MsgTransferCmd) runE() error {
var prometheus config.Prometheus
return startrpc.Start(
m.ctx, &m.msgTransferConfig.Discovery,
&m.msgTransferConfig.MsgTransfer.CircuitBreaker,
&m.msgTransferConfig.MsgTransfer.RateLimiter,
&prometheus,
"", "",
true,
+1 -1
View File
@@ -60,7 +60,7 @@ func (a *PushRpcCmd) Exec() error {
}
func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.CircuitBreaker, &a.pushConfig.RpcConfig.RateLimiter, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig,
[]string{
+1 -1
View File
@@ -58,7 +58,7 @@ func (a *ThirdRpcCmd) Exec() error {
}
func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.CircuitBreaker, &a.thirdConfig.RpcConfig.RateLimiter, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig,
[]string{
+1 -1
View File
@@ -59,7 +59,7 @@ func (a *UserRpcCmd) Exec() error {
}
func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.CircuitBreaker, &a.userConfig.RpcConfig.RateLimiter, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig,
[]string{
+50 -13
View File
@@ -143,6 +143,23 @@ type API struct {
Ports []int `yaml:"ports"`
GrafanaURL string `yaml:"grafanaURL"`
} `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
}
type RateLimiter struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"`
Bucket int `yaml:"bucket"`
CPUThreshold int64 `yaml:"cpuThreshold"`
}
type CircuitBreaker struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"`
Bucket int `yaml:"bucket"`
Success float64 `yaml:"success"`
Request int64 `yaml:"request"`
}
type CronTask struct {
@@ -217,6 +234,8 @@ type MsgGateway struct {
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeout int `yaml:"websocketTimeout"`
} `yaml:"longConnSvr"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type MsgTransfer struct {
@@ -225,6 +244,8 @@ type MsgTransfer struct {
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Push struct {
@@ -255,7 +276,9 @@ type Push struct {
BadgeCount bool `yaml:"badgeCount"`
Production bool `yaml:"production"`
} `yaml:"iosPush"`
FullUserCache bool `yaml:"fullUserCache"`
FullUserCache bool `yaml:"fullUserCache"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Auth struct {
@@ -264,28 +287,38 @@ type Auth struct {
TokenPolicy struct {
Expire int64 `yaml:"expire"`
} `yaml:"tokenPolicy"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Conversation struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Friend struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Group struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Msg struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
FriendVerify bool `yaml:"friendVerify"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
FriendVerify bool `yaml:"friendVerify"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Third struct {
@@ -298,6 +331,8 @@ type Third struct {
Kodo Kodo `yaml:"kodo"`
Aws Aws `yaml:"aws"`
} `yaml:"object"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Cos struct {
BucketURL string `yaml:"bucketURL"`
@@ -336,8 +371,10 @@ type Aws struct {
}
type User struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type RPC struct {
+107
View File
@@ -0,0 +1,107 @@
package startrpc
import (
"context"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/stability/circuitbreaker"
"github.com/openimsdk/tools/stability/circuitbreaker/sre"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type CircuitBreaker struct {
Enable bool `yaml:"enable"`
Success float64 `yaml:"success"` // success rate threshold (0.0-1.0)
Request int64 `yaml:"request"` // request threshold
Bucket int `yaml:"bucket"` // number of buckets
Window time.Duration `yaml:"window"` // time window for statistics
}
func NewCircuitBreaker(config *CircuitBreaker) circuitbreaker.CircuitBreaker {
if !config.Enable {
return nil
}
return sre.NewSREBraker(
sre.WithWindow(config.Window),
sre.WithBucket(config.Bucket),
sre.WithSuccess(config.Success),
sre.WithRequest(config.Request),
)
}
func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption {
if breaker == nil {
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(ctx, req)
})
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
if err := breaker.Allow(); err != nil {
log.ZWarn(ctx, "rpc circuit breaker open", err, "method", info.FullMethod)
return nil, status.Error(codes.Unavailable, "service unavailable due to circuit breaker")
}
resp, err = handler(ctx, req)
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.OK:
breaker.MarkSuccess()
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied:
breaker.MarkSuccess()
default:
breaker.MarkFailed()
}
} else {
breaker.MarkFailed()
}
} else {
breaker.MarkSuccess()
}
return resp, err
})
}
func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption {
if breaker == nil {
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, ss)
})
}
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := breaker.Allow(); err != nil {
log.ZWarn(ss.Context(), "rpc circuit breaker open", err, "method", info.FullMethod)
return status.Error(codes.Unavailable, "service unavailable due to circuit breaker")
}
err := handler(srv, ss)
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.OK:
breaker.MarkSuccess()
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied:
breaker.MarkSuccess()
default:
breaker.MarkFailed()
}
} else {
breaker.MarkFailed()
}
} else {
breaker.MarkSuccess()
}
return err
})
}
+70
View File
@@ -0,0 +1,70 @@
package startrpc
import (
"context"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/stability/ratelimit"
"github.com/openimsdk/tools/stability/ratelimit/bbr"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type RateLimiter struct {
Enable bool
Window time.Duration
Bucket int
CPUThreshold int64
}
func NewRateLimiter(config *RateLimiter) ratelimit.Limiter {
if !config.Enable {
return nil
}
return bbr.NewBBRLimiter(
bbr.WithWindow(config.Window),
bbr.WithBucket(config.Bucket),
bbr.WithCPUThreshold(config.CPUThreshold),
)
}
func UnaryRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption {
if limiter == nil {
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(ctx, req)
})
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(ctx, "rpc rate limited", err, "method", info.FullMethod)
return nil, status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err)
}
defer done(ratelimit.DoneInfo{})
return handler(ctx, req)
})
}
func StreamRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption {
if limiter == nil {
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, ss)
})
}
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(ss.Context(), "rpc rate limited", err, "method", info.FullMethod)
return status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err)
}
defer done(ratelimit.DoneInfo{})
return handler(srv, ss)
})
}
+41 -2
View File
@@ -47,7 +47,7 @@ func init() {
prommetrics.RegistryAll()
}
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
func Start[T any](ctx context.Context, disc *conf.Discovery, circuitBreakerConfig *conf.CircuitBreaker, rateLimiterConfig *conf.RateLimiter, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error,
@@ -84,6 +84,45 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
}
}
if circuitBreakerConfig != nil && circuitBreakerConfig.Enable {
cb := &CircuitBreaker{
Enable: circuitBreakerConfig.Enable,
Success: circuitBreakerConfig.Success,
Request: circuitBreakerConfig.Request,
Bucket: circuitBreakerConfig.Bucket,
Window: circuitBreakerConfig.Window,
}
breaker := NewCircuitBreaker(cb)
options = append(options,
UnaryCircuitBreakerInterceptor(breaker),
StreamCircuitBreakerInterceptor(breaker),
)
log.ZInfo(ctx, "RPC circuit breaker enabled",
"service", rpcRegisterName,
"window", circuitBreakerConfig.Window,
"bucket", circuitBreakerConfig.Bucket,
"success", circuitBreakerConfig.Success,
"requestThreshold", circuitBreakerConfig.Request)
}
if rateLimiterConfig != nil && rateLimiterConfig.Enable {
limiter := NewRateLimiter((*RateLimiter)(rateLimiterConfig))
options = append(options,
UnaryRateLimitInterceptor(limiter),
StreamRateLimitInterceptor(limiter),
)
log.ZInfo(ctx, "RPC rate limiter enabled",
"service", rpcRegisterName,
"window", rateLimiterConfig.Window,
"bucket", rateLimiterConfig.Bucket,
"cpuThreshold", rateLimiterConfig.CPUThreshold)
}
registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil {
return err
@@ -123,7 +162,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
select {
case <-ctx.Done():
return