diff --git a/server/application/application.go b/server/application/application.go index 19956ce587e69..2c23522c71655 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -1113,27 +1113,6 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing return nil } - processEvent := func(event *appv1.ApplicationWatchEvent) error { - shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) - if !shouldProcess { - log.Infof("ignore event for app %s", event.Application.Name) - return nil - } - ts := time.Now().Format("2006-01-02T15:04:05.000Z") - ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute) - err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache) - if err != nil { - logCtx.WithError(err).Error("failed to stream application events") - if strings.Contains(err.Error(), "context deadline exceeded") { - logCtx.Info("Closing event-source connection") - cancel() - return err - } - } - cancel() - return nil - } - priorityQueueEnabled := env.ParseBoolFromEnv("CODEFRESH_PRIORITY_QUEUE", false) allEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) @@ -1179,7 +1158,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing { logCtx.Infof("OnAdd channel size is %d", len(onAddEventsChannel)) logCtx.Infof("Received application \"%s\" added event", event.Application.Name) - err = processEvent(event) + err = s.processEvent(event, logCtx, stream, sendIfPermitted) if err != nil { return err } @@ -1188,7 +1167,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing { logCtx.Infof("OnDelete channel size is %d", len(onDeleteEventsChannel)) logCtx.Infof("Received application \"%s\" deleted event", event.Application.Name) - err = processEvent(event) + err = s.processEvent(event, logCtx, stream, sendIfPermitted) if err != nil { return err } @@ -1197,7 +1176,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing { logCtx.Infof("OnUpdate channel size is %d", len(onUpdateEventsChannel)) logCtx.Infof("Received application \"%s\" update event", event.Application.Name) - err = processEvent(event) + err = s.processEvent(event, logCtx, stream, sendIfPermitted) if err != nil { return err } @@ -1206,7 +1185,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing { logCtx.Infof("All events channel size is %d", len(allEventsChannel)) logCtx.Infof("Received application \"%s\" event", event.Application.Name) - err = processEvent(event) + err = s.processEvent(event, logCtx, stream, sendIfPermitted) if err != nil { return err } @@ -1232,6 +1211,32 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing } } +func (s *Server) processEvent( + event *appv1.ApplicationWatchEvent, + logCtx log.FieldLogger, + stream events.Eventing_StartEventSourceServer, + sendIfPermitted func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error, +) error { + shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) + if !shouldProcess { + log.Infof("ignore event for app %s", event.Application.Name) + return nil + } + ts := time.Now().Format("2006-01-02T15:04:05.000Z") + ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute) + err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache) + if err != nil { + logCtx.WithError(err).Error("failed to stream application events") + if strings.Contains(err.Error(), "context deadline exceeded") { + logCtx.Info("Closing event-source connection") + cancel() + return err + } + } + cancel() + return nil +} + func (s *Server) ValidateSrcAndDst(ctx context.Context, requset *application.ApplicationValidationRequest) (*application.ApplicationValidateResponse, error) { app := requset.Application proj, err := argo.GetAppProject(app, applisters.NewAppProjectLister(s.projInformer.GetIndexer()), s.ns, s.settingsMgr, s.db, ctx)