Skip to content

Commit

Permalink
feat: rate limiter support
Browse files Browse the repository at this point in the history
  • Loading branch information
pasha-codefresh committed Jan 16, 2024
1 parent d2880af commit 7c4c089
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 22 deletions.
15 changes: 14 additions & 1 deletion cmd/event-reporter-server/commands/event_reporter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"context"
"fmt"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"
"math"
"time"

Expand Down Expand Up @@ -93,6 +94,10 @@ func NewCommand() *cobra.Command {
shardingAlgorithm string
rootpath string
useGrpc bool

rateLimiterEnabled bool
rateLimiterBucketSize int
rateLimiterDuration time.Duration
)
var command = &cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -172,6 +177,11 @@ func NewCommand() *cobra.Command {
BaseURL: codefreshUrl,
AuthToken: codefreshToken,
},
RateLimiterOpts: &reporter.RateLimiterOpts{
Enabled: rateLimiterEnabled,
Rate: rateLimiterDuration,
Capacity: rateLimiterBucketSize,
},
}

log.Infof("Starting event reporter server with grpc transport %v", useGrpc)
Expand Down Expand Up @@ -217,7 +227,10 @@ func NewCommand() *cobra.Command {
command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ")
command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces where application resources can be managed in")
command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", true), "Use grpc for interact with argocd server")
command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", false), "Use grpc for interact with argocd server")
command.Flags().BoolVar(&rateLimiterEnabled, "rate-limiter-enabled", env.ParseBoolFromEnv("RATE_LIMITER_ENABLED", false), "Use rate limiter for prevent queue to be overflowed")
command.Flags().IntVar(&rateLimiterBucketSize, "rate-limiter-bucket-size", env.ParseNumFromEnv("RATE_LIMITER_BUCKET_SIZE", math.MaxInt, 0, math.MaxInt), "The maximum amount of requests allowed per window.")
command.Flags().DurationVar(&rateLimiterDuration, "rate-limiter-period", env.ParseDurationFromEnv("RATE_LIMITER_DURATION", 24*time.Hour, 0, math.MaxInt64), "The rate limit window size.")
cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) {
redisClient = client
})
Expand Down
2 changes: 2 additions & 0 deletions controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
cacheSettings := c.cacheSettings
c.lock.RUnlock()

appName := newRes.Info.(*ResourceInfo).AppName
fmt.Printf("appName: %s\n", appName)
if cacheSettings.ignoreResourceUpdatesEnabled && oldRes != nil && newRes != nil && skipResourceUpdate(resInfo(oldRes), resInfo(newRes)) {
// Additional check for debug level so we don't need to evaluate the
// format string in case of non-debug scenarios
Expand Down
4 changes: 2 additions & 2 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type eventReporterController struct {
metricsServer *metrics.MetricsServer
}

func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer)
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer, rateLimiterOpts)
appInformer.AddEventHandler(appBroadcaster)
return &eventReporterController{
appBroadcaster: appBroadcaster,
Expand Down
20 changes: 14 additions & 6 deletions event_reporter/reporter/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
argocommon "github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/event_reporter/metrics"
"github.com/argoproj/argo-cd/v2/event_reporter/sharding"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/env"
"math"
"sync"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/watch"

appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"math"
"sync"
)

type subscriber struct {
Expand Down Expand Up @@ -42,15 +40,17 @@ type broadcasterHandler struct {
filter sharding.ApplicationFilterFunction
featureManager *FeatureManager
metricsServer *metrics.MetricsServer
rateLimiter *RateLimiter
}

func NewBroadcaster(featureManager *FeatureManager, metricsServer *metrics.MetricsServer) Broadcaster {
func NewBroadcaster(featureManager *FeatureManager, metricsServer *metrics.MetricsServer, rateLimiterOpts *RateLimiterOpts) Broadcaster {
// todo: pass real value here
filter := getApplicationFilter("")
return &broadcasterHandler{
filter: filter,
featureManager: featureManager,
metricsServer: metricsServer,
rateLimiter: NewRateLimiter(rateLimiterOpts),
}
}

Expand All @@ -77,6 +77,14 @@ func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) {

for _, s := range subscribers {
if s.matches(event) {

duration, err := b.rateLimiter.Limit(event.Application.Name)
if err != nil {
log.Infof("adding application '%s' to channel failed, due to rate limit, duration left %s", event.Application.Name, duration.String())
b.metricsServer.IncAppEventsCounter(event.Application.Name, false)
continue
}

select {
case s.ch <- event:
{
Expand Down
36 changes: 36 additions & 0 deletions event_reporter/reporter/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package reporter

import (
"context"
"github.com/mennanov/limiters"
"time"
)

type RateLimiterOpts struct {
Enabled bool
Rate time.Duration
Capacity int
}

type RateLimiter struct {
opts *RateLimiterOpts
limiters map[string]*limiters.FixedWindow
}

func NewRateLimiter(opts *RateLimiterOpts) *RateLimiter {
return &RateLimiter{opts: opts, limiters: make(map[string]*limiters.FixedWindow)}
}

func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error) {
if !rl.opts.Enabled {
return time.Duration(0), nil
}

limiter := rl.limiters[applicationName]
if limiter == nil {
limiter = limiters.NewFixedWindow(int64(rl.opts.Capacity), rl.opts.Rate, limiters.NewFixedWindowInMemory(), limiters.NewSystemClock())
rl.limiters[applicationName] = limiter
}

return limiter.Limit(context.Background())
}
3 changes: 2 additions & 1 deletion event_reporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type EventReporterServerOpts struct {
BaseHRef string
RootPath string
CodefreshConfig *codefresh.CodefreshConfig
RateLimiterOpts *reporter.RateLimiterOpts
}

type handlerSwitcher struct {
Expand Down Expand Up @@ -152,7 +153,7 @@ func (a *EventReporterServer) Init(ctx context.Context) {
}

func (a *EventReporterServer) RunController(ctx context.Context) {
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager)
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager, a.RateLimiterOpts)
go controller.Run(ctx)
}

Expand Down
52 changes: 40 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ require (
golang.org/x/sync v0.3.0
golang.org/x/term v0.13.0
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc
google.golang.org/grpc v1.56.2
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.31.0
gopkg.in/square/go-jose.v2 v2.6.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -111,25 +111,53 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.2 // indirect
github.com/aws/aws-sdk-go-v2 v1.17.3 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.8 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.8 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.21 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.17.6 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.17 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.17 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.18 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.30 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.24 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.31 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.1 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.14.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.24 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.24 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.6 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/go-redsync/redsync/v4 v4.8.1 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/hashicorp/consul/api v1.18.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-hclog v1.4.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mennanov/limiters v1.2.3 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
go.etcd.io/etcd/api/v3 v3.5.7 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect
go.etcd.io/etcd/client/v3 v3.5.7 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
gopkg.in/retry.v1 v1.0.3 // indirect
Expand Down
Loading

0 comments on commit 7c4c089

Please sign in to comment.