Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support learning mode for event reporter #276

Merged
merged 3 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### Event reporter v2.0.4
1. Support learning mode for event reporter.
15 changes: 9 additions & 6 deletions cmd/event-reporter-server/commands/event_reporter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ func NewCommand() *cobra.Command {
rootpath string
useGrpc bool

rateLimiterEnabled bool
rateLimiterBucketSize int
rateLimiterDuration time.Duration
rateLimiterEnabled bool
rateLimiterBucketSize int
rateLimiterDuration time.Duration
rateLimiterLearningMode bool
)
var command = &cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -178,9 +179,10 @@ func NewCommand() *cobra.Command {
AuthToken: codefreshToken,
},
RateLimiterOpts: &reporter.RateLimiterOpts{
Enabled: rateLimiterEnabled,
Rate: rateLimiterDuration,
Capacity: rateLimiterBucketSize,
Enabled: rateLimiterEnabled,
Rate: rateLimiterDuration,
Capacity: rateLimiterBucketSize,
LearningMode: rateLimiterLearningMode,
},
}

Expand Down Expand Up @@ -231,6 +233,7 @@ func NewCommand() *cobra.Command {
command.Flags().BoolVar(&rateLimiterEnabled, "rate-limiter-enabled", env.ParseBoolFromEnv("RATE_LIMITER_ENABLED", false), "Use rate limiter for prevent queue to be overflowed")
command.Flags().IntVar(&rateLimiterBucketSize, "rate-limiter-bucket-size", env.ParseNumFromEnv("RATE_LIMITER_BUCKET_SIZE", math.MaxInt, 0, math.MaxInt), "The maximum amount of requests allowed per window.")
command.Flags().DurationVar(&rateLimiterDuration, "rate-limiter-period", env.ParseDurationFromEnv("RATE_LIMITER_DURATION", 24*time.Hour, 0, math.MaxInt64), "The rate limit window size.")
command.Flags().BoolVar(&rateLimiterLearningMode, "rate-limiter-learning-mode", env.ParseBoolFromEnv("RATE_LIMITER_LEARNING_MODE_ENABLED", false), "The rate limit enabled in learning mode ( not blocking sending to queue but logging it )")
cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) {
redisClient = client
})
Expand Down
6 changes: 3 additions & 3 deletions event_reporter/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (
Name: "cf_e_reporter_app_events_size",
Help: "Size of specific application events queue of taked shard.",
},
[]string{"reporter_shard", "application", "got_in_queue"},
[]string{"reporter_shard", "application", "got_in_queue", "error_in_learning_mode"},
)
erroredEventsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -145,8 +145,8 @@ func (m *MetricsServer) SetQueueSizeCounter(size int) {
m.queueSizeCounter.WithLabelValues(m.shard).Set(float64(size))
}

func (m *MetricsServer) IncAppEventsCounter(application string, gotToProcessingQueue bool) {
m.appEventsCounter.WithLabelValues(m.shard, application, strconv.FormatBool(gotToProcessingQueue)).Inc()
func (m *MetricsServer) IncAppEventsCounter(application string, gotToProcessingQueue bool, errorInLearningMode bool) {
m.appEventsCounter.WithLabelValues(m.shard, application, strconv.FormatBool(gotToProcessingQueue), strconv.FormatBool(errorInLearningMode)).Inc()
}

func (m *MetricsServer) IncErroredEventsCounter(metricEventType MetricEventType, errorType MetricEventErrorType, application string) {
Expand Down
16 changes: 10 additions & 6 deletions event_reporter/reporter/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,27 @@ func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) {
for _, s := range subscribers {
if s.matches(event) {

duration, err := b.rateLimiter.Limit(event.Application.Name)
duration, err, learningMode := b.rateLimiter.Limit(event.Application.Name)
errorInLearningMode := learningMode && err != nil
if err != nil {
log.Infof("adding application '%s' to channel failed, due to rate limit, duration left %s", event.Application.Name, duration.String())
b.metricsServer.IncAppEventsCounter(event.Application.Name, false)
continue
log.Errorf("adding application '%s' to channel failed, due to rate limit, duration left %s, learningMode %t", event.Application.Name, duration.String(), learningMode)
// if learning mode is enabled, we will continue to send events
if !learningMode {
b.metricsServer.IncAppEventsCounter(event.Application.Name, false, false)
continue
}
}

select {
case s.ch <- event:
{
log.Infof("adding application '%s' to channel", event.Application.Name)
b.metricsServer.IncAppEventsCounter(event.Application.Name, true)
b.metricsServer.IncAppEventsCounter(event.Application.Name, true, errorInLearningMode)
}
default:
// drop event if cannot send right away
log.WithField("application", event.Application.Name).Warn("unable to send event notification")
b.metricsServer.IncAppEventsCounter(event.Application.Name, false)
b.metricsServer.IncAppEventsCounter(event.Application.Name, false, errorInLearningMode)
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions event_reporter/reporter/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type RateLimiterOpts struct {
Enabled bool
Rate time.Duration
Capacity int
Enabled bool
Rate time.Duration
Capacity int
LearningMode bool
}

type RateLimiter struct {
Expand All @@ -21,9 +22,9 @@ func NewRateLimiter(opts *RateLimiterOpts) *RateLimiter {
return &RateLimiter{opts: opts, limiters: make(map[string]*limiters.FixedWindow)}
}

func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error) {
func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error, bool) {
if !rl.opts.Enabled {
return time.Duration(0), nil
return time.Duration(0), nil, rl.opts.LearningMode
}

limiter := rl.limiters[applicationName]
Expand All @@ -32,5 +33,6 @@ func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error) {
rl.limiters[applicationName] = limiter
}

return limiter.Limit(context.Background())
duration, err := limiter.Limit(context.Background())
return duration, err, rl.opts.LearningMode
}
6 changes: 3 additions & 3 deletions event_reporter/reporter/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func TestRateLimiter(t *testing.T) {
rl := NewRateLimiter(&RateLimiterOpts{
Enabled: false,
})
d, err := rl.Limit("foo")
d, err, _ := rl.Limit("foo")
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
Expand All @@ -24,7 +24,7 @@ func TestRateLimiter(t *testing.T) {
Rate: time.Second,
Capacity: 1,
})
d, err := rl.Limit("foo")
d, err, _ := rl.Limit("foo")
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
Expand All @@ -38,7 +38,7 @@ func TestRateLimiter(t *testing.T) {
Rate: time.Second,
Capacity: 0,
})
_, err := rl.Limit("foo")
_, err, _ := rl.Limit("foo")
if err == nil {
t.Errorf("Expected error, got nil")
}
Expand Down
Loading