Skip to content

Commit

Permalink
feat/on-add-priority-queue-2 (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksandr-codefresh authored Dec 1, 2023
1 parent 0ab7190 commit a2539e3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.8.1-cap-CR-21281-new-reporter
2.8.1-cap-CR-21281-priority-queue
55 changes: 30 additions & 25 deletions server/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,27 +1113,6 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
return nil
}

processEvent := func(event *appv1.ApplicationWatchEvent) error {
shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
log.Infof("ignore event for app %s", event.Application.Name)
return nil
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute)
err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
logCtx.Info("Closing event-source connection")
cancel()
return err
}
}
cancel()
return nil
}

priorityQueueEnabled := env.ParseBoolFromEnv("CODEFRESH_PRIORITY_QUEUE", false)

allEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)
Expand Down Expand Up @@ -1179,7 +1158,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
{
logCtx.Infof("OnAdd channel size is %d", len(onAddEventsChannel))
logCtx.Infof("Received application \"%s\" added event", event.Application.Name)
err = processEvent(event)
err = s.processEvent(event, logCtx, stream, sendIfPermitted)
if err != nil {
return err
}
Expand All @@ -1188,7 +1167,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
{
logCtx.Infof("OnDelete channel size is %d", len(onDeleteEventsChannel))
logCtx.Infof("Received application \"%s\" deleted event", event.Application.Name)
err = processEvent(event)
err = s.processEvent(event, logCtx, stream, sendIfPermitted)
if err != nil {
return err
}
Expand All @@ -1197,7 +1176,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
{
logCtx.Infof("OnUpdate channel size is %d", len(onUpdateEventsChannel))
logCtx.Infof("Received application \"%s\" update event", event.Application.Name)
err = processEvent(event)
err = s.processEvent(event, logCtx, stream, sendIfPermitted)
if err != nil {
return err
}
Expand All @@ -1206,7 +1185,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
{
logCtx.Infof("All events channel size is %d", len(allEventsChannel))
logCtx.Infof("Received application \"%s\" event", event.Application.Name)
err = processEvent(event)
err = s.processEvent(event, logCtx, stream, sendIfPermitted)
if err != nil {
return err
}
Expand All @@ -1232,6 +1211,32 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
}
}

func (s *Server) processEvent(
event *appv1.ApplicationWatchEvent,
logCtx log.FieldLogger,
stream events.Eventing_StartEventSourceServer,
sendIfPermitted func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error,
) error {
shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
log.Infof("ignore event for app %s", event.Application.Name)
return nil
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute)
err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
logCtx.Info("Closing event-source connection")
cancel()
return err
}
}
cancel()
return nil
}

func (s *Server) ValidateSrcAndDst(ctx context.Context, requset *application.ApplicationValidationRequest) (*application.ApplicationValidateResponse, error) {
app := requset.Application
proj, err := argo.GetAppProject(app, applisters.NewAppProjectLister(s.projInformer.GetIndexer()), s.ns, s.settingsMgr, s.db, ctx)
Expand Down

0 comments on commit a2539e3

Please sign in to comment.