Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(event-handler): more appropriate variable naming #349

Merged
merged 6 commits into from
Oct 25, 2024
8 changes: 4 additions & 4 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *eventReporterController) Run(ctx context.Context) {

// sendIfPermitted is a helper to send the application to the client's streaming channel if the
// caller has RBAC privileges permissions to view it
sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error {
sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, eventProcessingStartedAt string, ignoreResourceCache bool) error {
if eventType == watch.Bookmark {
return nil // ignore this event
}
Expand All @@ -76,7 +76,7 @@ func (c *eventReporterController) Run(ctx context.Context) {
}
trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr)

err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
if err != nil {
return err
}
Expand Down Expand Up @@ -105,9 +105,9 @@ func (c *eventReporterController) Run(ctx context.Context) {
c.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricAppEventType, event.Application.Name)
continue
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
eventProcessingStartedAt := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache)
err := sendIfPermitted(ctx, event.Application, event.Type, eventProcessingStartedAt, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
Expand Down
19 changes: 19 additions & 0 deletions event_reporter/metrics/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package metrics_utils

import (
"time"
)

type MetricTimer struct {
startAt time.Time
}

func NewMetricTimer() *MetricTimer {
return &MetricTimer{
startAt: time.Now(),
}
}

func (m *MetricTimer) Duration() time.Duration {
return time.Since(m.startAt)
}
6 changes: 3 additions & 3 deletions event_reporter/reporter/app_revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ func (s *applicationEventReporter) getRevisionsDetails(ctx context.Context, a *v
return rms, nil
}

func (s *applicationEventReporter) getApplicationRevisionsMetadata(ctx context.Context, logCtx *log.Entry, a *v1alpha1.Application) (*utils.AppSyncRevisionsMetadata, error) {
func (s *applicationEventReporter) getApplicationRevisionsMetadata(ctx context.Context, logWithAppName *log.Entry, a *v1alpha1.Application) (*utils.AppSyncRevisionsMetadata, error) {
result := &utils.AppSyncRevisionsMetadata{}

if a.Status.Sync.Revision != "" || a.Status.Sync.Revisions != nil || (a.Status.History != nil && len(a.Status.History) > 0) {
// can be the latest revision of repository
operationSyncRevisionsMetadata, err := s.getRevisionsDetails(ctx, a, utils.GetOperationSyncRevisions(a))
if err != nil {
logCtx.WithError(err).Warnf("failed to get application(%s) sync revisions metadata, resuming", a.GetName())
logWithAppName.WithError(err).Warnf("failed to get application(%s) sync revisions metadata, resuming", a.GetName())
}

if err == nil && operationSyncRevisionsMetadata != nil {
Expand All @@ -79,7 +79,7 @@ func (s *applicationEventReporter) getApplicationRevisionsMetadata(ctx context.C
// latest revision of repository where changes to app resource were actually made; empty if no changeRevision(-s) present
operationChangeRevisionsMetadata, err := s.getRevisionsDetails(ctx, a, utils.GetOperationChangeRevisions(a))
if err != nil {
logCtx.WithError(err).Warnf("failed to get application(%s) change revisions metadata, resuming", a.GetName())
logWithAppName.WithError(err).Warnf("failed to get application(%s) change revisions metadata, resuming", a.GetName())
}

if err == nil && operationChangeRevisionsMetadata != nil && len(operationChangeRevisionsMetadata) > 0 {
Expand Down
73 changes: 36 additions & 37 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
metrics_utils "github.com/argoproj/argo-cd/v2/event_reporter/metrics/utils"
"math"
"reflect"
"strings"
Expand Down Expand Up @@ -45,7 +46,7 @@ type ApplicationEventReporter interface {
StreamApplicationEvents(
ctx context.Context,
a *appv1.Application,
ts string,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
Expand Down Expand Up @@ -110,15 +111,15 @@ func (r *applicationEventReporter) getDesiredManifests(ctx context.Context, a *a
func (s *applicationEventReporter) StreamApplicationEvents(
ctx context.Context,
a *appv1.Application,
ts string,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
) error {
startTime := time.Now()
logCtx := log.WithField("app", a.Name)
metricTimer := metrics_utils.NewMetricTimer()

logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events")
logWithAppName := log.WithField("app", a.Name)
logWithAppName.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events")

project := a.Spec.GetProject()
appTree, err := s.applicationServiceClient.ResourceTree(ctx, &application.ResourcesQuery{
Expand All @@ -133,28 +134,28 @@ func (s *applicationEventReporter) StreamApplicationEvents(

// we still need process app even without tree, it is in case of app yaml originally contain error,
// we still want to show it the errors that related to it on codefresh ui
logCtx.WithError(err).Warn("failed to get application tree, resuming")
logWithAppName.WithError(err).Warn("failed to get application tree, resuming")
}

logCtx.Info("getting desired manifests")
logWithAppName.Info("getting desired manifests")

desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, a, nil, logCtx)
desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, a, nil, logWithAppName)

syncRevision := utils.GetOperationStateRevision(a)
var applicationVersions *apiclient.ApplicationVersions
if syncRevision != nil {
syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logCtx)
syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logWithAppName)
applicationVersions = syncManifests.GetApplicationVersions()
} else {
applicationVersions = nil
}

logCtx.Info("getting parent application name")
logWithAppName.Info("getting parent application name")

parentAppIdentity := utils.GetParentAppIdentity(a, appInstanceLabelKey, trackingMethod)

if utils.IsChildApp(parentAppIdentity) {
logCtx.Info("processing as child application")
logWithAppName.Info("processing as child application")
parentApplicationEntity, err := s.applicationServiceClient.Get(ctx, &application.ApplicationQuery{
Name: &parentAppIdentity.Name,
AppNamespace: &parentAppIdentity.Namespace,
Expand All @@ -165,27 +166,26 @@ func (s *applicationEventReporter) StreamApplicationEvents(

rs := utils.GetAppAsResource(a)

parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, nil, logCtx)
parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, nil, logWithAppName)

// helm app hasnt revision
// TODO: add check if it helm application
parentAppSyncRevisionsMetadata, err := s.getApplicationRevisionsMetadata(ctx, logCtx, parentApplicationEntity)
parentAppSyncRevisionsMetadata, err := s.getApplicationRevisionsMetadata(ctx, logWithAppName, parentApplicationEntity)
if err != nil {
logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming")
logWithAppName.WithError(err).Warn("failed to get parent application's revision metadata, resuming")
}

utils.SetHealthStatusIfMissing(rs)
err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, parentDesiredManifests, appTree, manifestGenErr, a, parentAppSyncRevisionsMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
err = s.processResource(ctx, *rs, parentApplicationEntity, logWithAppName, eventProcessingStartedAt, parentDesiredManifests, appTree, manifestGenErr, a, parentAppSyncRevisionsMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
}
reconcileDuration := time.Since(startTime)
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, reconcileDuration)
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, metricTimer.Duration())
} else {
logCtx.Info("processing as root application")
logWithAppName.Info("processing as root application")
// will get here only for root applications (not managed as a resource by another application)
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, ts, appInstanceLabelKey, trackingMethod, applicationVersions)
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, appInstanceLabelKey, trackingMethod, applicationVersions)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name)
return fmt.Errorf("failed to get application event: %w", err)
Expand All @@ -196,16 +196,15 @@ func (s *applicationEventReporter) StreamApplicationEvents(
return nil
}

utils.LogWithAppStatus(a, logCtx, ts).Info("sending root application event")
utils.LogWithAppStatus(a, logWithAppName, eventProcessingStartedAt).Info("sending root application event")
if err := s.codefreshClient.SendEvent(ctx, a.Name, appEvent); err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventDeliveryErrorType, a.Name)
return fmt.Errorf("failed to send event for root application %s/%s: %w", a.Namespace, a.Name, err)
}
reconcileDuration := time.Since(startTime)
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricParentAppEventType, reconcileDuration)
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricParentAppEventType, metricTimer.Duration())
}

revisionsMetadata, _ := s.getApplicationRevisionsMetadata(ctx, logCtx, a)
revisionsMetadata, _ := s.getApplicationRevisionsMetadata(ctx, logWithAppName, a)
// for each resource in the application get desired and actual state,
// then stream the event
for _, rs := range a.Status.Resources {
Expand All @@ -217,7 +216,7 @@ func (s *applicationEventReporter) StreamApplicationEvents(
s.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricResourceEventType, a.Name)
continue
}
err := s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, appTree, manifestGenErr, nil, revisionsMetadata, appInstanceLabelKey, trackingMethod, nil)
err := s.processResource(ctx, rs, a, logWithAppName, eventProcessingStartedAt, desiredManifests, appTree, manifestGenErr, nil, revisionsMetadata, appInstanceLabelKey, trackingMethod, nil)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
Expand Down Expand Up @@ -254,8 +253,8 @@ func (s *applicationEventReporter) processResource(
ctx context.Context,
rs appv1.ResourceStatus,
parentApplication *appv1.Application,
logCtx *log.Entry,
ts string,
logWithAppName *log.Entry,
appEventProcessingStartedAt string,
desiredManifests *apiclient.ManifestResponse,
appTree *appv1.ApplicationTree,
manifestGenErr bool,
Expand All @@ -270,44 +269,44 @@ func (s *applicationEventReporter) processResource(
metricsEventType = metrics.MetricChildAppEventType
}

logCtx = logCtx.WithFields(log.Fields{
logWithAppAndResource := logWithAppName.WithFields(log.Fields{
"gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind),
"resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name),
})

// get resource desired state
desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx)
desiredState := getResourceDesiredState(&rs, desiredManifests, logWithAppAndResource)

actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, parentApplication, originalApplication)
actualState, err := s.getResourceActualState(ctx, logWithAppAndResource, metricsEventType, rs, parentApplication, originalApplication)
if err != nil {
return err
}
if actualState == nil {
return nil
}

parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, parentApplication, revisionsMetadata)
parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logWithAppAndResource, parentApplication, revisionsMetadata)

var originalAppRevisionMetadata *utils.AppSyncRevisionsMetadata = nil

if originalApplication != nil {
originalAppRevisionMetadata, _ = s.getApplicationRevisionsMetadata(ctx, logCtx, originalApplication)
originalAppRevisionMetadata, _ = s.getApplicationRevisionsMetadata(ctx, logWithAppAndResource, originalApplication)
}

ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, ts, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, appEventProcessingStartedAt, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, parentApplication.Name)
logCtx.WithError(err).Warn("failed to get event payload, resuming")
logWithAppAndResource.WithError(err).Warn("failed to get event payload, resuming")
return nil
}

appRes := appv1.Application{}
appName := ""
if utils.IsApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil {
utils.LogWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event")
utils.LogWithAppStatus(&appRes, logWithAppAndResource, appEventProcessingStartedAt).Info("streaming resource event")
appName = appRes.Name
} else {
utils.LogWithResourceStatus(logCtx, rs).Info("streaming resource event")
utils.LogWithResourceStatus(logWithAppAndResource, rs).Info("streaming resource event")
appName = parentApplication.Name
}

Expand All @@ -317,12 +316,12 @@ func (s *applicationEventReporter) processResource(
}

s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventDeliveryErrorType, appName)
logCtx.WithError(err).Warn("failed to send resource event, resuming")
logWithAppAndResource.WithError(err).Warn("failed to send resource event, resuming")
return nil
}

if err := s.cache.SetLastResourceEvent(parentApplicationToReport, rs, resourceEventCacheExpiration, utils.GetApplicationLatestRevision(parentApplicationToReport)); err != nil {
logCtx.WithError(err).Warn("failed to cache resource event")
logWithAppAndResource.WithError(err).Warn("failed to cache resource event")
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions event_reporter/reporter/event_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func getResourceEventPayload(
desiredState *apiclient.Manifest,
apptree *appv1.ApplicationTree,
manifestGenErr bool,
ts string,
appEventProcessingStartedAt string,
originalApplication *appv1.Application, // passed when rs is application
revisionsMetadata *utils.AppSyncRevisionsMetadata,
originalAppRevisionsMetadata *utils.AppSyncRevisionsMetadata, // passed when rs is application
Expand Down Expand Up @@ -194,7 +194,7 @@ func getResourceEventPayload(
}

payload := events.EventPayload{
Timestamp: ts,
Timestamp: appEventProcessingStartedAt,
Object: object,
Source: &source,
Errors: errors,
Expand All @@ -215,7 +215,7 @@ func (s *applicationEventReporter) getApplicationEventPayload(
ctx context.Context,
a *appv1.Application,
appTree *appv1.ApplicationTree,
ts string,
eventProcessingStartedAt string,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
applicationVersions *apiclient.ApplicationVersions,
Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *applicationEventReporter) getApplicationEventPayload(
errors = append(errors, parseAggregativeHealthErrorsOfApplication(a, appTree)...)

payload := events.EventPayload{
Timestamp: ts,
Timestamp: eventProcessingStartedAt,
Object: object,
Source: source,
Errors: errors,
Expand Down
Loading