From a10945adde975a98cf1fd04e530c0317e3867e39 Mon Sep 17 00:00:00 2001 From: pashakostohrys Date: Mon, 19 Feb 2024 18:55:18 +0200 Subject: [PATCH] Revert "revert 2.9-2024.2.4-fc84c8a9c" This reverts commit 6011e4c4e32e6544ba98b081cf170c964e3b4894. --- .../reporter/application_event_reporter.go | 14 +++-- .../application_event_reporter_test.go | 8 +-- server/application/application.go | 53 +++++++++++++------ .../application/application_event_reporter.go | 21 +++++--- .../application_event_reporter_test.go | 7 ++- 5 files changed, 67 insertions(+), 36 deletions(-) diff --git a/event_reporter/reporter/application_event_reporter.go b/event_reporter/reporter/application_event_reporter.go index f72ba71d06a3c..29189d8a483d2 100644 --- a/event_reporter/reporter/application_event_reporter.go +++ b/event_reporter/reporter/application_event_reporter.go @@ -153,11 +153,12 @@ func getAppAsResource(a *appv1.Application) *appv1.ResourceStatus { func (r *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx *log.Entry) (*apiclient.ManifestResponse, error, bool) { // get the desired state manifests of the application + project := a.Spec.GetProject() desiredManifests, err := r.applicationServiceClient.GetManifests(ctx, &application.ApplicationManifestQuery{ Name: &a.Name, AppNamespace: &a.Namespace, Revision: &a.Status.Sync.Revision, - Project: &a.Spec.Project, + Project: &project, }) if err != nil { // if it's manifest generation error we need to still report the actual state @@ -183,9 +184,10 @@ func (s *applicationEventReporter) StreamApplicationEvents( logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events") + project := a.Spec.GetProject() appTree, err := s.applicationServiceClient.ResourceTree(ctx, &application.ResourcesQuery{ ApplicationName: &a.Name, - Project: &a.Spec.Project, + Project: &project, AppNamespace: &a.Namespace, }) if err != nil { @@ -351,6 +353,7 @@ func (s *applicationEventReporter) processResource( desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx) // get resource actual state + project := parentApplication.Spec.GetProject() actualState, err := s.applicationServiceClient.GetResource(ctx, &application.ApplicationResourceRequest{ Name: &parentApplication.Name, AppNamespace: &parentApplication.Namespace, @@ -359,7 +362,7 @@ func (s *applicationEventReporter) processResource( Version: &rs.Version, Group: &rs.Group, Kind: &rs.Kind, - Project: &parentApplication.Spec.Project, + Project: &project, }) if err != nil { if !strings.Contains(err.Error(), "not found") { @@ -436,7 +439,7 @@ func (s *applicationEventReporter) ShouldSendApplicationEvent(ae *appv1.Applicat } cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff - cachedApp.Spec.Project = ae.Application.Spec.Project // + cachedApp.Spec.Project = ae.Application.Spec.Project // not useing GetProject() so that the comparison will be with the real field values for i := range cachedApp.Status.Conditions { cachedApp.Status.Conditions[i].LastTransitionTime = nil } @@ -524,11 +527,12 @@ func getOperationRevision(a *appv1.Application) string { } func (s *applicationEventReporter) getApplicationRevisionDetails(ctx context.Context, a *appv1.Application, revision string) (*appv1.RevisionMetadata, error) { + project := a.Spec.GetProject() return s.applicationServiceClient.RevisionMetadata(ctx, &application.RevisionMetadataQuery{ Name: &a.Name, AppNamespace: &a.Namespace, Revision: &revision, - Project: &a.Spec.Project, + Project: &project, }) } diff --git a/event_reporter/reporter/application_event_reporter_test.go b/event_reporter/reporter/application_event_reporter_test.go index 74e3f377dd1f1..31568b9056f7d 100644 --- a/event_reporter/reporter/application_event_reporter_test.go +++ b/event_reporter/reporter/application_event_reporter_test.go @@ -149,10 +149,10 @@ func TestGetApplicationLatestRevision(t *testing.T) { Revision: appRevision, }, History: []v1alpha1.RevisionHistory{ - v1alpha1.RevisionHistory{ + { Revision: history1Revision, }, - v1alpha1.RevisionHistory{ + { Revision: history2Revision, }, }, @@ -188,10 +188,10 @@ func TestGetLatestAppHistoryId(t *testing.T) { appMock := v1alpha1.Application{ Status: v1alpha1.ApplicationStatus{ History: []v1alpha1.RevisionHistory{ - v1alpha1.RevisionHistory{ + { ID: history1Id, }, - v1alpha1.RevisionHistory{ + { ID: history2Id, }, }, diff --git a/server/application/application.go b/server/application/application.go index 2065d4d8291ed..16dac21c6db6f 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -1069,11 +1069,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing // sendIfPermitted is a helper to send the application to the client's streaming channel if the // caller has RBAC privileges permissions to view it - sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error { - if eventType == watch.Bookmark { - return nil // ignore this event - } - + sendIfPermitted := func(ctx context.Context, logCtx log.FieldLogger, a appv1.Application, ts string, ignoreResourceCache bool) error { if appVersion, err := strconv.Atoi(a.ResourceVersion); err == nil && appVersion < minVersion { return nil } @@ -1100,7 +1096,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing } trackingMethod := argoutil.GetTrackingMethod(s.settingsMgr) - err = s.applicationEventReporter.streamApplicationEvents(ctx, &a, es, stream, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod) + err = s.applicationEventReporter.streamApplicationEvents(ctx, logCtx, &a, es, stream, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod) if err != nil { return err } @@ -1109,6 +1105,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing logCtx.WithError(err).Error("failed to cache last sent application event") return err } + return nil } @@ -1120,6 +1117,10 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing onAddEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) v1ReporterEnabledFilter := func(event *appv1.ApplicationWatchEvent) bool { + if event.Type == watch.Bookmark { + return false // ignore this event + } + rVersion, _ := s.settingsMgr.GetCodefreshReporterVersion() if rVersion == string(settings.CodefreshV2ReporterVersion) { logCtx.Info("v1 reporter disabled skipping event") @@ -1156,8 +1157,12 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing case event := <-onAddEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true { logCtx.Infof("OnAdd channel size is %d", len(onAddEventsChannel)) - logCtx.Infof("Received application \"%s\" added event", event.Application.Name) - err = s.processEvent(event, logCtx, stream, sendIfPermitted) + logAppEvent := logCtx.WithFields(log.Fields{ + "app": event.Application.Name, + "type": event.Type, + }) + logAppEvent.Infof("Received application added event") + err = s.processEvent(event, logAppEvent, stream, sendIfPermitted) if err != nil { return err } @@ -1165,8 +1170,12 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing case event := <-onDeleteEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true { logCtx.Infof("OnDelete channel size is %d", len(onDeleteEventsChannel)) - logCtx.Infof("Received application \"%s\" deleted event", event.Application.Name) - err = s.processEvent(event, logCtx, stream, sendIfPermitted) + logAppEvent := logCtx.WithFields(log.Fields{ + "app": event.Application.Name, + "type": event.Type, + }) + logAppEvent.Infof("Received application deleted event") + err = s.processEvent(event, logAppEvent, stream, sendIfPermitted) if err != nil { return err } @@ -1174,8 +1183,12 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing case event := <-onUpdateEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true { logCtx.Infof("OnUpdate channel size is %d", len(onUpdateEventsChannel)) - logCtx.Infof("Received application \"%s\" update event", event.Application.Name) - err = s.processEvent(event, logCtx, stream, sendIfPermitted) + logAppEvent := logCtx.WithFields(log.Fields{ + "app": event.Application.Name, + "type": event.Type, + }) + logAppEvent.Infof("Received application update event") + err = s.processEvent(event, logAppEvent, stream, sendIfPermitted) if err != nil { return err } @@ -1183,8 +1196,12 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing case event := <-allEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=false { logCtx.Infof("All events channel size is %d", len(allEventsChannel)) - logCtx.Infof("Received application \"%s\" event", event.Application.Name) - err = s.processEvent(event, logCtx, stream, sendIfPermitted) + logAppEvent := logCtx.WithFields(log.Fields{ + "app": event.Application.Name, + "type": event.Type, + }) + logAppEvent.Infof("Received application event") + err = s.processEvent(event, logAppEvent, stream, sendIfPermitted) if err != nil { return err } @@ -1214,16 +1231,17 @@ func (s *Server) processEvent( event *appv1.ApplicationWatchEvent, logCtx log.FieldLogger, stream events.Eventing_StartEventSourceServer, - sendIfPermitted func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error, + sendIfPermitted func(ctx context.Context, logCtx log.FieldLogger, a appv1.Application, ts string, ignoreResourceCache bool) error, ) error { shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) if !shouldProcess { - log.Infof("ignore event for app %s", event.Application.Name) + logCtx.Infof("ignore event") return nil } + ts := time.Now().Format("2006-01-02T15:04:05.000Z") ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute) - err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache) + err := sendIfPermitted(ctx, logCtx, event.Application, ts, ignoreResourceCache) if err != nil { logCtx.WithError(err).Error("failed to stream application events") if strings.Contains(err.Error(), "context deadline exceeded") { @@ -1232,6 +1250,7 @@ func (s *Server) processEvent( return err } } + cancel() return nil } diff --git a/server/application/application_event_reporter.go b/server/application/application_event_reporter.go index 2786b0c4b9478..9371316b45f1d 100644 --- a/server/application/application_event_reporter.go +++ b/server/application/application_event_reporter.go @@ -81,7 +81,7 @@ func getAppAsResource(a *appv1.Application) *appv1.ResourceStatus { } } -func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx *log.Entry) (*apiclient.ManifestResponse, error, bool) { +func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx log.FieldLogger) (*apiclient.ManifestResponse, error, bool) { // get the desired state manifests of the application desiredManifests, err := s.server.GetManifests(ctx, &application.ApplicationManifestQuery{ Name: &a.Name, @@ -100,6 +100,7 @@ func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *a func (s *applicationEventReporter) streamApplicationEvents( ctx context.Context, + logCtx log.FieldLogger, a *appv1.Application, es *events.EventSource, stream events.Eventing_StartEventSourceServer, @@ -108,7 +109,6 @@ func (s *applicationEventReporter) streamApplicationEvents( appInstanceLabelKey string, trackingMethod appv1.TrackingMethod, ) error { - logCtx := log.WithField("app", a.Name) logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events") @@ -220,7 +220,7 @@ func (s *applicationEventReporter) processResource( ctx context.Context, rs appv1.ResourceStatus, parentApplication *appv1.Application, - logCtx *log.Entry, + logCtx log.FieldLogger, ts string, desiredManifests *apiclient.ManifestResponse, stream events.Eventing_StartEventSourceServer, @@ -368,7 +368,7 @@ func isApp(rs appv1.ResourceStatus) bool { return rs.GroupVersionKind().String() == appv1.ApplicationSchemaGroupVersionKind.String() } -func logWithAppStatus(a *appv1.Application, logCtx *log.Entry, ts string) *log.Entry { +func logWithAppStatus(a *appv1.Application, logCtx log.FieldLogger, ts string) *log.Entry { return logCtx.WithFields(log.Fields{ "sync": a.Status.Sync.Status, "health": a.Status.Health.Status, @@ -377,7 +377,7 @@ func logWithAppStatus(a *appv1.Application, logCtx *log.Entry, ts string) *log.E }) } -func logWithResourceStatus(logCtx *log.Entry, rs appv1.ResourceStatus) *log.Entry { +func logWithResourceStatus(logCtx log.FieldLogger, rs appv1.ResourceStatus) log.FieldLogger { logCtx = logCtx.WithField("sync", rs.Status) if rs.Health != nil { logCtx = logCtx.WithField("health", rs.Health.Status) @@ -421,9 +421,14 @@ func getOperationRevision(a *appv1.Application) string { } func (s *applicationEventReporter) getApplicationRevisionDetails(ctx context.Context, a *appv1.Application, revision string) (*appv1.RevisionMetadata, error) { + name := a.GetName() + namespace := a.GetNamespace() + project := a.Spec.GetProject() return s.server.RevisionMetadata(ctx, &application.RevisionMetadataQuery{ - Name: &a.Name, - Revision: &revision, + Name: &name, + AppNamespace: &namespace, + Project: &project, + Revision: &revision, }) } @@ -723,7 +728,7 @@ func (s *applicationEventReporter) getApplicationEventPayload( return &events.Event{Payload: payloadBytes, Name: es.Name}, nil } -func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger *log.Entry) *apiclient.Manifest { +func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger log.FieldLogger) *apiclient.Manifest { if ds == nil { return &apiclient.Manifest{} } diff --git a/server/application/application_event_reporter_test.go b/server/application/application_event_reporter_test.go index 5703e1249b51b..756b1d350f1a9 100644 --- a/server/application/application_event_reporter_test.go +++ b/server/application/application_event_reporter_test.go @@ -9,6 +9,7 @@ import ( "github.com/ghodss/yaml" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + log "github.com/sirupsen/logrus" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/runtime" @@ -26,6 +27,7 @@ import ( "github.com/argoproj/argo-cd/v2/test" cacheutil "github.com/argoproj/argo-cd/v2/util/cache" appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" + "github.com/argoproj/argo-cd/v2/util/rbac" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -270,7 +272,8 @@ func fakeServer() *Server { 1*time.Minute, ) - server, _ := NewServer(test.FakeArgoCDNamespace, kubeclientset, appClientSet, appLister, appInformer, nil, nil, cache, nil, nil, nil, nil, nil, nil, nil) + enf := rbac.NewEnforcer(kubeclientset, testNamespace, common.ArgoCDRBACConfigMapName, nil) + server, _ := NewServer(test.FakeArgoCDNamespace, kubeclientset, appClientSet, appLister, appInformer, nil, nil, cache, nil, nil, enf, nil, nil, nil, nil) return server.(*Server) } @@ -355,7 +358,7 @@ func TestStreamApplicationEvent(t *testing.T) { return nil } - _ = eventReporter.streamApplicationEvents(context.Background(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel) + _ = eventReporter.streamApplicationEvents(context.Background(), log.New(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel) }) }