Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/on-add-priority-queue #267

Merged
merged 9 commits into from
Dec 1, 2023
112 changes: 89 additions & 23 deletions server/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,38 +1113,104 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
return nil
}

eventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)
unsubscribe := s.appBroadcaster.Subscribe(eventsChannel)
processEvent := func(event *appv1.ApplicationWatchEvent) error {
shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
log.Infof("ignore event for app %s", event.Application.Name)
return nil
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute)
err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
logCtx.Info("Closing event-source connection")
cancel()
return err
}
}
cancel()
return nil
}

priorityQueueEnabled := env.ParseBoolFromEnv("CODEFRESH_PRIORITY_QUEUE", false)

allEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)
onUpdateEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)
onDeleteEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)
onAddEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)

v1ReporterEnabledFilter := func(event *appv1.ApplicationWatchEvent) bool {
rVersion, _ := s.settingsMgr.GetCodefreshReporterVersion()
if rVersion == string(settings.CodefreshV2ReporterVersion) {
logCtx.Info("v1 reporter disabled skipping event")
return false
}
return true
}

if priorityQueueEnabled {
unsubscribeOnUpdateChannel := s.appBroadcaster.Subscribe(onUpdateEventsChannel, func(event *appv1.ApplicationWatchEvent) bool {
return event.Type == watch.Modified
}, v1ReporterEnabledFilter)

unsubscribeOnDeleteChannel := s.appBroadcaster.Subscribe(onDeleteEventsChannel, func(event *appv1.ApplicationWatchEvent) bool {
return event.Type == watch.Deleted
}, v1ReporterEnabledFilter)

unsubscribeOnAddChannel := s.appBroadcaster.Subscribe(onAddEventsChannel, func(event *appv1.ApplicationWatchEvent) bool {
return event.Type == watch.Added
}, v1ReporterEnabledFilter)

defer unsubscribeOnUpdateChannel()
defer unsubscribeOnDeleteChannel()
defer unsubscribeOnAddChannel()
} else {
unsubscribeEventsChannel := s.appBroadcaster.Subscribe(allEventsChannel, v1ReporterEnabledFilter)
defer unsubscribeEventsChannel()
}

ticker := time.NewTicker(5 * time.Second)
defer unsubscribe()
defer ticker.Stop()
for {
select {
case event := <-eventsChannel:
log.Infof("event channel size is %d", len(eventsChannel))
rVersion, _ := s.settingsMgr.GetCodefreshReporterVersion()
if rVersion == string(settings.CodefreshV2ReporterVersion) {
logCtx.Info("v1 reported disabled skipping event")
continue
case event := <-onAddEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true
{
logCtx.Infof("OnAdd channel size is %d", len(onAddEventsChannel))
logCtx.Infof("Received application \"%s\" added event", event.Application.Name)
err = processEvent(event)
if err != nil {
return err
}
}

shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
log.Infof("ignore event for app %s", event.Application.Name)
continue
case event := <-onDeleteEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true
{
logCtx.Infof("OnDelete channel size is %d", len(onDeleteEventsChannel))
logCtx.Infof("Received application \"%s\" deleted event", event.Application.Name)
err = processEvent(event)
if err != nil {
return err
}
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute)
err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
logCtx.Info("Closing event-source connection")
cancel()
case event := <-onUpdateEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true
{
logCtx.Infof("OnUpdate channel size is %d", len(onUpdateEventsChannel))
logCtx.Infof("Received application \"%s\" update event", event.Application.Name)
err = processEvent(event)
if err != nil {
return err
}
}
case event := <-allEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=false
{
logCtx.Infof("All events channel size is %d", len(allEventsChannel))
logCtx.Infof("Received application \"%s\" event", event.Application.Name)
err = processEvent(event)
if err != nil {
return err
}
}
cancel()
case <-ticker.C:
var err error
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
Expand Down
6 changes: 5 additions & 1 deletion server/application/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@ func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *a
})
if err != nil {
notManifestGenerationError := !strings.Contains(err.Error(), "Manifest generation error")

// we can ignore the error
notAppPathDoesntExistsError := !strings.Contains(err.Error(), "app path does not exist")

// when application deleted rbac also throws erorr with PermissionDenied
// we can ignore the error, as we check rbac access before reporting events
notPermissionDeniedError := !strings.Contains(err.Error(), "PermissionDenied")

if notManifestGenerationError && notPermissionDeniedError {
if notManifestGenerationError && notPermissionDeniedError && notAppPathDoesntExistsError {
return nil, fmt.Errorf("failed to get application desired state manifests: %w", err), false
}
// if it's manifest generation error we need to still report the actual state
Expand Down
Loading