From 449495394b238d63ab4052be15efdd8e054d67de Mon Sep 17 00:00:00 2001 From: pashakostohrys Date: Wed, 17 Jan 2024 19:09:42 +0200 Subject: [PATCH 1/3] support learning mode for event reporter --- changelog/CHANGELOG.md | 2 ++ .../commands/event_reporter_server.go | 15 +++++++++------ event_reporter/metrics/metrics.go | 6 +++--- event_reporter/reporter/broadcaster.go | 16 ++++++++++------ event_reporter/reporter/rate_limiter.go | 14 ++++++++------ 5 files changed, 32 insertions(+), 21 deletions(-) create mode 100644 changelog/CHANGELOG.md diff --git a/changelog/CHANGELOG.md b/changelog/CHANGELOG.md new file mode 100644 index 0000000000000..ea16fc0726b6c --- /dev/null +++ b/changelog/CHANGELOG.md @@ -0,0 +1,2 @@ +### Event reporter v2.0.4 +1. Support learning mode for event reporter. \ No newline at end of file diff --git a/cmd/event-reporter-server/commands/event_reporter_server.go b/cmd/event-reporter-server/commands/event_reporter_server.go index e0e7e3af43356..92b7137cb9815 100644 --- a/cmd/event-reporter-server/commands/event_reporter_server.go +++ b/cmd/event-reporter-server/commands/event_reporter_server.go @@ -95,9 +95,10 @@ func NewCommand() *cobra.Command { rootpath string useGrpc bool - rateLimiterEnabled bool - rateLimiterBucketSize int - rateLimiterDuration time.Duration + rateLimiterEnabled bool + rateLimiterBucketSize int + rateLimiterDuration time.Duration + rateLimiterLearningMode bool ) var command = &cobra.Command{ Use: cliName, @@ -178,9 +179,10 @@ func NewCommand() *cobra.Command { AuthToken: codefreshToken, }, RateLimiterOpts: &reporter.RateLimiterOpts{ - Enabled: rateLimiterEnabled, - Rate: rateLimiterDuration, - Capacity: rateLimiterBucketSize, + Enabled: rateLimiterEnabled, + Rate: rateLimiterDuration, + Capacity: rateLimiterBucketSize, + LearningMode: rateLimiterLearningMode, }, } @@ -231,6 +233,7 @@ func NewCommand() *cobra.Command { command.Flags().BoolVar(&rateLimiterEnabled, "rate-limiter-enabled", env.ParseBoolFromEnv("RATE_LIMITER_ENABLED", false), "Use rate limiter for prevent queue to be overflowed") command.Flags().IntVar(&rateLimiterBucketSize, "rate-limiter-bucket-size", env.ParseNumFromEnv("RATE_LIMITER_BUCKET_SIZE", math.MaxInt, 0, math.MaxInt), "The maximum amount of requests allowed per window.") command.Flags().DurationVar(&rateLimiterDuration, "rate-limiter-period", env.ParseDurationFromEnv("RATE_LIMITER_DURATION", 24*time.Hour, 0, math.MaxInt64), "The rate limit window size.") + command.Flags().BoolVar(&rateLimiterLearningMode, "rate-limiter-learning-mode", env.ParseBoolFromEnv("RATE_LIMITER_LEARNING_MODE_ENABLED", false), "The rate limit enabled in learning mode ( not blocking sending to queue but logging it )") cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) { redisClient = client }) diff --git a/event_reporter/metrics/metrics.go b/event_reporter/metrics/metrics.go index 0c7861b28589f..b84e4c538e002 100644 --- a/event_reporter/metrics/metrics.go +++ b/event_reporter/metrics/metrics.go @@ -70,7 +70,7 @@ var ( Name: "cf_e_reporter_app_events_size", Help: "Size of specific application events queue of taked shard.", }, - []string{"reporter_shard", "application", "got_in_queue"}, + []string{"reporter_shard", "application", "got_in_queue", "error_in_learning_mode"}, ) erroredEventsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -145,8 +145,8 @@ func (m *MetricsServer) SetQueueSizeCounter(size int) { m.queueSizeCounter.WithLabelValues(m.shard).Set(float64(size)) } -func (m *MetricsServer) IncAppEventsCounter(application string, gotToProcessingQueue bool) { - m.appEventsCounter.WithLabelValues(m.shard, application, strconv.FormatBool(gotToProcessingQueue)).Inc() +func (m *MetricsServer) IncAppEventsCounter(application string, gotToProcessingQueue bool, errorInLearningMode bool) { + m.appEventsCounter.WithLabelValues(m.shard, application, strconv.FormatBool(gotToProcessingQueue), strconv.FormatBool(errorInLearningMode)).Inc() } func (m *MetricsServer) IncErroredEventsCounter(metricEventType MetricEventType, errorType MetricEventErrorType, application string) { diff --git a/event_reporter/reporter/broadcaster.go b/event_reporter/reporter/broadcaster.go index 4d2b58f3beebf..10878066e9eb4 100644 --- a/event_reporter/reporter/broadcaster.go +++ b/event_reporter/reporter/broadcaster.go @@ -78,23 +78,27 @@ func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) { for _, s := range subscribers { if s.matches(event) { - duration, err := b.rateLimiter.Limit(event.Application.Name) + duration, err, learningMode := b.rateLimiter.Limit(event.Application.Name) + errorInLearningMode := learningMode && err != nil if err != nil { - log.Infof("adding application '%s' to channel failed, due to rate limit, duration left %s", event.Application.Name, duration.String()) - b.metricsServer.IncAppEventsCounter(event.Application.Name, false) - continue + log.Errorf("adding application '%s' to channel failed, due to rate limit, duration left %s, learningMode %t", event.Application.Name, duration.String(), learningMode) + // if learning mode is enabled, we will continue to send events + if !learningMode { + b.metricsServer.IncAppEventsCounter(event.Application.Name, false, false) + continue + } } select { case s.ch <- event: { log.Infof("adding application '%s' to channel", event.Application.Name) - b.metricsServer.IncAppEventsCounter(event.Application.Name, true) + b.metricsServer.IncAppEventsCounter(event.Application.Name, true, errorInLearningMode) } default: // drop event if cannot send right away log.WithField("application", event.Application.Name).Warn("unable to send event notification") - b.metricsServer.IncAppEventsCounter(event.Application.Name, false) + b.metricsServer.IncAppEventsCounter(event.Application.Name, false, errorInLearningMode) } } } diff --git a/event_reporter/reporter/rate_limiter.go b/event_reporter/reporter/rate_limiter.go index 3b65f7db41da4..9aea1ca0000cc 100644 --- a/event_reporter/reporter/rate_limiter.go +++ b/event_reporter/reporter/rate_limiter.go @@ -7,9 +7,10 @@ import ( ) type RateLimiterOpts struct { - Enabled bool - Rate time.Duration - Capacity int + Enabled bool + Rate time.Duration + Capacity int + LearningMode bool } type RateLimiter struct { @@ -21,9 +22,9 @@ func NewRateLimiter(opts *RateLimiterOpts) *RateLimiter { return &RateLimiter{opts: opts, limiters: make(map[string]*limiters.FixedWindow)} } -func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error) { +func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error, bool) { if !rl.opts.Enabled { - return time.Duration(0), nil + return time.Duration(0), nil, false } limiter := rl.limiters[applicationName] @@ -32,5 +33,6 @@ func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error) { rl.limiters[applicationName] = limiter } - return limiter.Limit(context.Background()) + duration, err := limiter.Limit(context.Background()) + return duration, err, rl.opts.LearningMode } From 3333892a52c3388550cdf6eef39e1d28e09008d3 Mon Sep 17 00:00:00 2001 From: pashakostohrys Date: Wed, 17 Jan 2024 19:11:38 +0200 Subject: [PATCH 2/3] support learning mode for event reporter --- event_reporter/reporter/rate_limiter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event_reporter/reporter/rate_limiter.go b/event_reporter/reporter/rate_limiter.go index 9aea1ca0000cc..116f48b469792 100644 --- a/event_reporter/reporter/rate_limiter.go +++ b/event_reporter/reporter/rate_limiter.go @@ -24,7 +24,7 @@ func NewRateLimiter(opts *RateLimiterOpts) *RateLimiter { func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error, bool) { if !rl.opts.Enabled { - return time.Duration(0), nil, false + return time.Duration(0), nil, rl.opts.LearningMode } limiter := rl.limiters[applicationName] From 319eb5863541211b807e767085bee8c1f65108ad Mon Sep 17 00:00:00 2001 From: pashakostohrys Date: Wed, 17 Jan 2024 19:21:15 +0200 Subject: [PATCH 3/3] support learning mode for event reporter --- event_reporter/reporter/rate_limiter_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/event_reporter/reporter/rate_limiter_test.go b/event_reporter/reporter/rate_limiter_test.go index bba89a9a39929..499374f3f4159 100644 --- a/event_reporter/reporter/rate_limiter_test.go +++ b/event_reporter/reporter/rate_limiter_test.go @@ -10,7 +10,7 @@ func TestRateLimiter(t *testing.T) { rl := NewRateLimiter(&RateLimiterOpts{ Enabled: false, }) - d, err := rl.Limit("foo") + d, err, _ := rl.Limit("foo") if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -24,7 +24,7 @@ func TestRateLimiter(t *testing.T) { Rate: time.Second, Capacity: 1, }) - d, err := rl.Limit("foo") + d, err, _ := rl.Limit("foo") if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -38,7 +38,7 @@ func TestRateLimiter(t *testing.T) { Rate: time.Second, Capacity: 0, }) - _, err := rl.Limit("foo") + _, err, _ := rl.Limit("foo") if err == nil { t.Errorf("Expected error, got nil") }