Skip to content

Commit

Permalink
Revert "revert 2.9-2024.2.4-fc84c8a9c"
Browse files Browse the repository at this point in the history
This reverts commit 6011e4c.
  • Loading branch information
pasha-codefresh committed Feb 19, 2024
1 parent c0ec1fa commit a10945a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 36 deletions.
14 changes: 9 additions & 5 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,12 @@ func getAppAsResource(a *appv1.Application) *appv1.ResourceStatus {

func (r *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx *log.Entry) (*apiclient.ManifestResponse, error, bool) {
// get the desired state manifests of the application
project := a.Spec.GetProject()
desiredManifests, err := r.applicationServiceClient.GetManifests(ctx, &application.ApplicationManifestQuery{
Name: &a.Name,
AppNamespace: &a.Namespace,
Revision: &a.Status.Sync.Revision,
Project: &a.Spec.Project,
Project: &project,
})
if err != nil {
// if it's manifest generation error we need to still report the actual state
Expand All @@ -183,9 +184,10 @@ func (s *applicationEventReporter) StreamApplicationEvents(

logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events")

project := a.Spec.GetProject()
appTree, err := s.applicationServiceClient.ResourceTree(ctx, &application.ResourcesQuery{
ApplicationName: &a.Name,
Project: &a.Spec.Project,
Project: &project,
AppNamespace: &a.Namespace,
})
if err != nil {
Expand Down Expand Up @@ -351,6 +353,7 @@ func (s *applicationEventReporter) processResource(
desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx)

// get resource actual state
project := parentApplication.Spec.GetProject()
actualState, err := s.applicationServiceClient.GetResource(ctx, &application.ApplicationResourceRequest{
Name: &parentApplication.Name,
AppNamespace: &parentApplication.Namespace,
Expand All @@ -359,7 +362,7 @@ func (s *applicationEventReporter) processResource(
Version: &rs.Version,
Group: &rs.Group,
Kind: &rs.Kind,
Project: &parentApplication.Spec.Project,
Project: &project,
})
if err != nil {
if !strings.Contains(err.Error(), "not found") {
Expand Down Expand Up @@ -436,7 +439,7 @@ func (s *applicationEventReporter) ShouldSendApplicationEvent(ae *appv1.Applicat
}

cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff
cachedApp.Spec.Project = ae.Application.Spec.Project //
cachedApp.Spec.Project = ae.Application.Spec.Project // not useing GetProject() so that the comparison will be with the real field values
for i := range cachedApp.Status.Conditions {
cachedApp.Status.Conditions[i].LastTransitionTime = nil
}
Expand Down Expand Up @@ -524,11 +527,12 @@ func getOperationRevision(a *appv1.Application) string {
}

func (s *applicationEventReporter) getApplicationRevisionDetails(ctx context.Context, a *appv1.Application, revision string) (*appv1.RevisionMetadata, error) {
project := a.Spec.GetProject()
return s.applicationServiceClient.RevisionMetadata(ctx, &application.RevisionMetadataQuery{
Name: &a.Name,
AppNamespace: &a.Namespace,
Revision: &revision,
Project: &a.Spec.Project,
Project: &project,
})
}

Expand Down
8 changes: 4 additions & 4 deletions event_reporter/reporter/application_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ func TestGetApplicationLatestRevision(t *testing.T) {
Revision: appRevision,
},
History: []v1alpha1.RevisionHistory{
v1alpha1.RevisionHistory{
{
Revision: history1Revision,
},
v1alpha1.RevisionHistory{
{
Revision: history2Revision,
},
},
Expand Down Expand Up @@ -188,10 +188,10 @@ func TestGetLatestAppHistoryId(t *testing.T) {
appMock := v1alpha1.Application{
Status: v1alpha1.ApplicationStatus{
History: []v1alpha1.RevisionHistory{
v1alpha1.RevisionHistory{
{
ID: history1Id,
},
v1alpha1.RevisionHistory{
{
ID: history2Id,
},
},
Expand Down
53 changes: 36 additions & 17 deletions server/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,11 +1069,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing

// sendIfPermitted is a helper to send the application to the client's streaming channel if the
// caller has RBAC privileges permissions to view it
sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error {
if eventType == watch.Bookmark {
return nil // ignore this event
}

sendIfPermitted := func(ctx context.Context, logCtx log.FieldLogger, a appv1.Application, ts string, ignoreResourceCache bool) error {
if appVersion, err := strconv.Atoi(a.ResourceVersion); err == nil && appVersion < minVersion {
return nil
}
Expand All @@ -1100,7 +1096,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
}
trackingMethod := argoutil.GetTrackingMethod(s.settingsMgr)

err = s.applicationEventReporter.streamApplicationEvents(ctx, &a, es, stream, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
err = s.applicationEventReporter.streamApplicationEvents(ctx, logCtx, &a, es, stream, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
if err != nil {
return err
}
Expand All @@ -1109,6 +1105,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
logCtx.WithError(err).Error("failed to cache last sent application event")
return err
}

return nil
}

Expand All @@ -1120,6 +1117,10 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
onAddEventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)

v1ReporterEnabledFilter := func(event *appv1.ApplicationWatchEvent) bool {
if event.Type == watch.Bookmark {
return false // ignore this event
}

rVersion, _ := s.settingsMgr.GetCodefreshReporterVersion()
if rVersion == string(settings.CodefreshV2ReporterVersion) {
logCtx.Info("v1 reporter disabled skipping event")
Expand Down Expand Up @@ -1156,35 +1157,51 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
case event := <-onAddEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true
{
logCtx.Infof("OnAdd channel size is %d", len(onAddEventsChannel))
logCtx.Infof("Received application \"%s\" added event", event.Application.Name)
err = s.processEvent(event, logCtx, stream, sendIfPermitted)
logAppEvent := logCtx.WithFields(log.Fields{
"app": event.Application.Name,
"type": event.Type,
})
logAppEvent.Infof("Received application added event")
err = s.processEvent(event, logAppEvent, stream, sendIfPermitted)
if err != nil {
return err
}
}
case event := <-onDeleteEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true
{
logCtx.Infof("OnDelete channel size is %d", len(onDeleteEventsChannel))
logCtx.Infof("Received application \"%s\" deleted event", event.Application.Name)
err = s.processEvent(event, logCtx, stream, sendIfPermitted)
logAppEvent := logCtx.WithFields(log.Fields{
"app": event.Application.Name,
"type": event.Type,
})
logAppEvent.Infof("Received application deleted event")
err = s.processEvent(event, logAppEvent, stream, sendIfPermitted)
if err != nil {
return err
}
}
case event := <-onUpdateEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=true
{
logCtx.Infof("OnUpdate channel size is %d", len(onUpdateEventsChannel))
logCtx.Infof("Received application \"%s\" update event", event.Application.Name)
err = s.processEvent(event, logCtx, stream, sendIfPermitted)
logAppEvent := logCtx.WithFields(log.Fields{
"app": event.Application.Name,
"type": event.Type,
})
logAppEvent.Infof("Received application update event")
err = s.processEvent(event, logAppEvent, stream, sendIfPermitted)
if err != nil {
return err
}
}
case event := <-allEventsChannel: // active only when CODEFRESH_PRIORITY_QUEUE=false
{
logCtx.Infof("All events channel size is %d", len(allEventsChannel))
logCtx.Infof("Received application \"%s\" event", event.Application.Name)
err = s.processEvent(event, logCtx, stream, sendIfPermitted)
logAppEvent := logCtx.WithFields(log.Fields{
"app": event.Application.Name,
"type": event.Type,
})
logAppEvent.Infof("Received application event")
err = s.processEvent(event, logAppEvent, stream, sendIfPermitted)
if err != nil {
return err
}
Expand Down Expand Up @@ -1214,16 +1231,17 @@ func (s *Server) processEvent(
event *appv1.ApplicationWatchEvent,
logCtx log.FieldLogger,
stream events.Eventing_StartEventSourceServer,
sendIfPermitted func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error,
sendIfPermitted func(ctx context.Context, logCtx log.FieldLogger, a appv1.Application, ts string, ignoreResourceCache bool) error,
) error {
shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
log.Infof("ignore event for app %s", event.Application.Name)
logCtx.Infof("ignore event")
return nil
}

ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute)
err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache)
err := sendIfPermitted(ctx, logCtx, event.Application, ts, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
Expand All @@ -1232,6 +1250,7 @@ func (s *Server) processEvent(
return err
}
}

cancel()
return nil
}
Expand Down
21 changes: 13 additions & 8 deletions server/application/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func getAppAsResource(a *appv1.Application) *appv1.ResourceStatus {
}
}

func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx *log.Entry) (*apiclient.ManifestResponse, error, bool) {
func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx log.FieldLogger) (*apiclient.ManifestResponse, error, bool) {
// get the desired state manifests of the application
desiredManifests, err := s.server.GetManifests(ctx, &application.ApplicationManifestQuery{
Name: &a.Name,
Expand All @@ -100,6 +100,7 @@ func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *a

func (s *applicationEventReporter) streamApplicationEvents(
ctx context.Context,
logCtx log.FieldLogger,
a *appv1.Application,
es *events.EventSource,
stream events.Eventing_StartEventSourceServer,
Expand All @@ -108,7 +109,6 @@ func (s *applicationEventReporter) streamApplicationEvents(
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
) error {
logCtx := log.WithField("app", a.Name)

logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events")

Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *applicationEventReporter) processResource(
ctx context.Context,
rs appv1.ResourceStatus,
parentApplication *appv1.Application,
logCtx *log.Entry,
logCtx log.FieldLogger,
ts string,
desiredManifests *apiclient.ManifestResponse,
stream events.Eventing_StartEventSourceServer,
Expand Down Expand Up @@ -368,7 +368,7 @@ func isApp(rs appv1.ResourceStatus) bool {
return rs.GroupVersionKind().String() == appv1.ApplicationSchemaGroupVersionKind.String()
}

func logWithAppStatus(a *appv1.Application, logCtx *log.Entry, ts string) *log.Entry {
func logWithAppStatus(a *appv1.Application, logCtx log.FieldLogger, ts string) *log.Entry {
return logCtx.WithFields(log.Fields{
"sync": a.Status.Sync.Status,
"health": a.Status.Health.Status,
Expand All @@ -377,7 +377,7 @@ func logWithAppStatus(a *appv1.Application, logCtx *log.Entry, ts string) *log.E
})
}

func logWithResourceStatus(logCtx *log.Entry, rs appv1.ResourceStatus) *log.Entry {
func logWithResourceStatus(logCtx log.FieldLogger, rs appv1.ResourceStatus) log.FieldLogger {
logCtx = logCtx.WithField("sync", rs.Status)
if rs.Health != nil {
logCtx = logCtx.WithField("health", rs.Health.Status)
Expand Down Expand Up @@ -421,9 +421,14 @@ func getOperationRevision(a *appv1.Application) string {
}

func (s *applicationEventReporter) getApplicationRevisionDetails(ctx context.Context, a *appv1.Application, revision string) (*appv1.RevisionMetadata, error) {
name := a.GetName()
namespace := a.GetNamespace()
project := a.Spec.GetProject()
return s.server.RevisionMetadata(ctx, &application.RevisionMetadataQuery{
Name: &a.Name,
Revision: &revision,
Name: &name,
AppNamespace: &namespace,
Project: &project,
Revision: &revision,
})
}

Expand Down Expand Up @@ -723,7 +728,7 @@ func (s *applicationEventReporter) getApplicationEventPayload(
return &events.Event{Payload: payloadBytes, Name: es.Name}, nil
}

func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger *log.Entry) *apiclient.Manifest {
func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger log.FieldLogger) *apiclient.Manifest {
if ds == nil {
return &apiclient.Manifest{}
}
Expand Down
7 changes: 5 additions & 2 deletions server/application/application_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ghodss/yaml"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/argoproj/argo-cd/v2/test"
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/rbac"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -270,7 +272,8 @@ func fakeServer() *Server {
1*time.Minute,
)

server, _ := NewServer(test.FakeArgoCDNamespace, kubeclientset, appClientSet, appLister, appInformer, nil, nil, cache, nil, nil, nil, nil, nil, nil, nil)
enf := rbac.NewEnforcer(kubeclientset, testNamespace, common.ArgoCDRBACConfigMapName, nil)
server, _ := NewServer(test.FakeArgoCDNamespace, kubeclientset, appClientSet, appLister, appInformer, nil, nil, cache, nil, nil, enf, nil, nil, nil, nil)
return server.(*Server)
}

Expand Down Expand Up @@ -355,7 +358,7 @@ func TestStreamApplicationEvent(t *testing.T) {
return nil
}

_ = eventReporter.streamApplicationEvents(context.Background(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel)
_ = eventReporter.streamApplicationEvents(context.Background(), log.New(), app, &events.EventSource{Name: &name}, &MockEventing_StartEventSourceServer{}, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel)
})

}
Expand Down

0 comments on commit a10945a

Please sign in to comment.