From e1c2b031ac236144365a7bcebc7d0a7d640f2e5f Mon Sep 17 00:00:00 2001 From: pasha-codefresh Date: Tue, 28 Nov 2023 11:18:36 +0200 Subject: [PATCH] feat: support new reporting component (#265) * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * take token from envs (#252) * feat: reporting v2 manifest generation (#254) * move reporter to new place * manifests generation * manifests generation * manifests generation and naming fixes (#255) * feat: codefresh client (#251) * codefresh client * event type, cmd arguments * fix * fix * update --------- Co-authored-by: pasha-codefresh * fetch argocd token from cluster (#256) * add parameters to manifest (#257) * update image * feat/reporter: metrics (#253) * reporter: metrics * reporter: default metrics port * reporter: renamed variable * reporter: fixed metrics server config * reporter: token * reporter: update metrics * cleanup * updated ports * change deployment to statefulset + change variables (#260) * fix bind adress (#259) * logs (#261) * adjust manifests (#263) * feat/reporting-v2-switching (#262) * reporter: fixed shard env variable * improve logs about skipped shard and fix metric * add application name to metrics * feat/reporting-v2-sharding-app-distribution (#264) * reporter: added 'app-distribution' query with conditional query param 'shardings' * reporter: moved request handlers to specific package * cleanup * integrate feature manager for report from v2 or v1 * reporter: fixed unit tests * resolve PR comments * resolve PR comments * resolve PR comments * resolve PR comments --------- Co-authored-by: Denis Melnik Co-authored-by: Andrii Shaforostov Co-authored-by: Oleksandr Saulyak Co-authored-by: Yaroslav Drachenko --- Dockerfile | 1 + cmd/event-reporter-server/commands/common.go | 6 + .../commands/event_reporter_server.go | 213 +++++ cmd/main.go | 3 + common/common.go | 39 +- event_reporter/codefresh/client.go | 78 ++ event_reporter/codefresh/utils.go | 151 +++ event_reporter/controller/controller.go | 118 +++ event_reporter/handlers/handlers.go | 81 ++ event_reporter/metrics/metrics.go | 148 +++ .../reporter/application_errors_parser.go | 147 +++ .../reporter/application_event_reporter.go | 877 ++++++++++++++++++ .../application_event_reporter_test.go | 461 +++++++++ event_reporter/reporter/broadcaster.go | 140 +++ event_reporter/reporter/feature_manager.go | 40 + event_reporter/server.go | 304 ++++++ event_reporter/sharding/sharding.go | 101 ++ go.mod | 2 +- .../event-reporter-metrics.yaml | 16 + .../event-reporter/event-reporter-role.yaml | 43 + .../event-reporter-rolebinding.yaml | 15 + .../event-reporter/event-reporter-sa.yaml | 8 + .../event-reporter-service.yaml | 20 + .../event-reporter-statefulset.yaml | 168 ++++ .../base/event-reporter/kustomization.yaml | 10 + manifests/base/kustomization.yaml | 1 + manifests/install.yaml | 276 ++++++ manifests/namespace-install.yaml | 276 ++++++ server/application/application.go | 8 + util/settings/settings.go | 22 + 30 files changed, 3761 insertions(+), 12 deletions(-) create mode 100644 cmd/event-reporter-server/commands/common.go create mode 100644 cmd/event-reporter-server/commands/event_reporter_server.go create mode 100644 event_reporter/codefresh/client.go create mode 100644 event_reporter/codefresh/utils.go create mode 100644 event_reporter/controller/controller.go create mode 100644 event_reporter/handlers/handlers.go create mode 100644 event_reporter/metrics/metrics.go create mode 100644 event_reporter/reporter/application_errors_parser.go create mode 100644 event_reporter/reporter/application_event_reporter.go create mode 100644 event_reporter/reporter/application_event_reporter_test.go create mode 100644 event_reporter/reporter/broadcaster.go create mode 100644 event_reporter/reporter/feature_manager.go create mode 100644 event_reporter/server.go create mode 100644 event_reporter/sharding/sharding.go create mode 100644 manifests/base/event-reporter/event-reporter-metrics.yaml create mode 100644 manifests/base/event-reporter/event-reporter-role.yaml create mode 100644 manifests/base/event-reporter/event-reporter-rolebinding.yaml create mode 100644 manifests/base/event-reporter/event-reporter-sa.yaml create mode 100644 manifests/base/event-reporter/event-reporter-service.yaml create mode 100644 manifests/base/event-reporter/event-reporter-statefulset.yaml create mode 100644 manifests/base/event-reporter/kustomization.yaml diff --git a/Dockerfile b/Dockerfile index 1b595d75623db..408bd348f54f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -124,6 +124,7 @@ COPY --from=argocd-build /go/src/github.com/argoproj/argo-cd/dist/argocd* /usr/l USER root RUN ln -s /usr/local/bin/argocd /usr/local/bin/argocd-server && \ ln -s /usr/local/bin/argocd /usr/local/bin/argocd-repo-server && \ + ln -s /usr/local/bin/argocd /usr/local/bin/event-reporter-server && \ ln -s /usr/local/bin/argocd /usr/local/bin/argocd-cmp-server && \ ln -s /usr/local/bin/argocd /usr/local/bin/argocd-application-controller && \ ln -s /usr/local/bin/argocd /usr/local/bin/argocd-dex && \ diff --git a/cmd/event-reporter-server/commands/common.go b/cmd/event-reporter-server/commands/common.go new file mode 100644 index 0000000000000..7ad2c7069df88 --- /dev/null +++ b/cmd/event-reporter-server/commands/common.go @@ -0,0 +1,6 @@ +package commands + +const ( + // cliName is the name of the CLI + cliName = "event-reporter-server" +) diff --git a/cmd/event-reporter-server/commands/event_reporter_server.go b/cmd/event-reporter-server/commands/event_reporter_server.go new file mode 100644 index 0000000000000..700e6360351fc --- /dev/null +++ b/cmd/event-reporter-server/commands/event_reporter_server.go @@ -0,0 +1,213 @@ +package commands + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/argoproj/argo-cd/v2/event_reporter" + "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" + "github.com/argoproj/argo-cd/v2/pkg/apiclient" + + "github.com/argoproj/pkg/stats" + "github.com/redis/go-redis/v9" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + cmdutil "github.com/argoproj/argo-cd/v2/cmd/util" + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" + repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + servercache "github.com/argoproj/argo-cd/v2/server/cache" + "github.com/argoproj/argo-cd/v2/util/cli" + "github.com/argoproj/argo-cd/v2/util/env" + "github.com/argoproj/argo-cd/v2/util/errors" + "github.com/argoproj/argo-cd/v2/util/kube" + "github.com/argoproj/argo-cd/v2/util/tls" +) + +const ( + failureRetryCountEnv = "EVENT_REPORTER_K8S_RETRY_COUNT" + failureRetryPeriodMilliSecondsEnv = "EVENT_REPORTE_K8S_RETRY_DURATION_MILLISECONDS" +) + +var ( + failureRetryCount = 0 + failureRetryPeriodMilliSeconds = 100 +) + +func init() { + failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, failureRetryCount, 0, 10) + failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000) +} + +// NewCommand returns a new instance of an event reporter command +func NewCommand() *cobra.Command { + var ( + redisClient *redis.Client + insecure bool + listenHost string + listenPort int + metricsHost string + metricsPort int + glogLevel int + clientConfig clientcmd.ClientConfig + repoServerTimeoutSeconds int + repoServerAddress string + applicationServerAddress string + cacheSrc func() (*servercache.Cache, error) + contentSecurityPolicy string + repoServerPlaintext bool + repoServerStrictTLS bool + applicationNamespaces []string + argocdToken string + codefreshUrl string + codefreshToken string + shardingAlgorithm string + ) + var command = &cobra.Command{ + Use: cliName, + Short: "Run the Event Reporter server", + Long: "The Event reporter is a server that listens to Kubernetes events and reports them to the Codefresh server.", + DisableAutoGenTag: true, + Run: func(c *cobra.Command, args []string) { + ctx := c.Context() + + vers := common.GetVersion() + namespace, _, err := clientConfig.Namespace() + errors.CheckError(err) + vers.LogStartupInfo( + "Event Reporter Server", + map[string]any{ + "namespace": namespace, + "port": listenPort, + }, + ) + + cli.SetLogFormat(cmdutil.LogFormat) + cli.SetLogLevel(cmdutil.LogLevel) + cli.SetGLogLevel(glogLevel) + + config, err := clientConfig.ClientConfig() + errors.CheckError(err) + errors.CheckError(v1alpha1.SetK8SConfigDefaults(config)) + + cache, err := cacheSrc() + errors.CheckError(err) + + kubeclientset := kubernetes.NewForConfigOrDie(config) + + appclientsetConfig, err := clientConfig.ClientConfig() + errors.CheckError(err) + errors.CheckError(v1alpha1.SetK8SConfigDefaults(appclientsetConfig)) + config.UserAgent = fmt.Sprintf("argocd-server/%s (%s)", vers.Version, vers.Platform) + + if failureRetryCount > 0 { + appclientsetConfig = kube.AddFailureRetryWrapper(appclientsetConfig, failureRetryCount, failureRetryPeriodMilliSeconds) + } + appClientSet := appclientset.NewForConfigOrDie(appclientsetConfig) + tlsConfig := repoapiclient.TLSConfiguration{ + DisableTLS: repoServerPlaintext, + StrictValidation: repoServerStrictTLS, + } + + // Load CA information to use for validating connections to the + // repository server, if strict TLS validation was requested. + if !repoServerPlaintext && repoServerStrictTLS { + pool, err := tls.LoadX509CertPool( + fmt.Sprintf("%s/server/tls/tls.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), + fmt.Sprintf("%s/server/tls/ca.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), + ) + if err != nil { + log.Fatalf("%v", err) + } + tlsConfig.Certificates = pool + } + + repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig) + + applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{ + ServerAddr: applicationServerAddress, + Insecure: true, + GRPCWeb: true, + PlainText: true, + AuthToken: argocdToken, + }) + + errors.CheckError(err) + + closer, applicationClient, err := applicationClientSet.NewApplicationClient() + + errors.CheckError(err) + + defer func() { + _ = closer.Close() + }() + + eventReporterServerOpts := event_reporter.EventReporterServerOpts{ + ListenPort: listenPort, + ListenHost: listenHost, + MetricsPort: metricsPort, + MetricsHost: metricsHost, + Namespace: namespace, + KubeClientset: kubeclientset, + AppClientset: appClientSet, + RepoClientset: repoclientset, + Cache: cache, + RedisClient: redisClient, + ApplicationNamespaces: applicationNamespaces, + ApplicationServiceClient: applicationClient, + CodefreshConfig: &codefresh.CodefreshConfig{ + BaseURL: codefreshUrl, + AuthToken: codefreshToken, + }, + } + + stats.RegisterStackDumper() + stats.StartStatsTicker(10 * time.Minute) + stats.RegisterHeapDumper("memprofile") + eventReporterServer := event_reporter.NewEventReporterServer(ctx, eventReporterServerOpts) + eventReporterServer.Init(ctx) + lns, err := eventReporterServer.Listen() + errors.CheckError(err) + for { + var closer func() + ctx, cancel := context.WithCancel(ctx) + eventReporterServer.Run(ctx, lns) + cancel() + if closer != nil { + closer() + } + } + }, + } + + clientConfig = cli.AddKubectlFlagsToCmd(command) + command.Flags().BoolVar(&insecure, "insecure", env.ParseBoolFromEnv("EVENT_REPORTER_INSECURE", false), "Run server without TLS") + command.Flags().StringVar(&cmdutil.LogFormat, "logformat", env.StringFromEnv("EVENT_REPORTER_LOGFORMAT", "text"), "Set the logging format. One of: text|json") + command.Flags().StringVar(&cmdutil.LogLevel, "loglevel", env.StringFromEnv("EVENT_REPORTER_LOG_LEVEL", "info"), "Set the logging level. One of: debug|info|warn|error") + command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level") + command.Flags().StringVar(&applicationServerAddress, "application-server", env.StringFromEnv("EVENT_REPORTER_APPLICATION_SERVER", common.DefaultApplicationServerAddr), "Application server address") + command.Flags().StringVar(&argocdToken, "argocd-token", env.StringFromEnv("ARGOCD_TOKEN", ""), "ArgoCD server JWT token") + command.Flags().StringVar(&repoServerAddress, "repo-server", env.StringFromEnv("EVENT_REPORTER_REPO_SERVER", common.DefaultRepoServerAddr), "Repo server address") + command.AddCommand(cli.NewVersionCmd(cliName)) + command.Flags().StringVar(&listenHost, "address", env.StringFromEnv("EVENT_REPORTER_LISTEN_ADDRESS", common.DefaultAddressEventReporterServer), "Listen on given address") + command.Flags().IntVar(&listenPort, "port", common.DefaultPortEventReporterServer, "Listen on given port") + command.Flags().StringVar(&metricsHost, env.StringFromEnv("EVENT_REPORTER_METRICS_LISTEN_ADDRESS", "metrics-address"), common.DefaultAddressEventReporterServerMetrics, "Listen for metrics on given address") + command.Flags().IntVar(&metricsPort, "metrics-port", common.DefaultPortEventReporterServerMetrics, "Start metrics on given port") + command.Flags().IntVar(&repoServerTimeoutSeconds, "repo-server-timeout-seconds", env.ParseNumFromEnv("EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS", 60, 0, math.MaxInt64), "Repo server RPC call timeout seconds.") + command.Flags().StringVar(&contentSecurityPolicy, "content-security-policy", env.StringFromEnv("EVENT_REPORTER_CONTENT_SECURITY_POLICY", "frame-ancestors 'self';"), "Set Content-Security-Policy header in HTTP responses to `value`. To disable, set to \"\".") + command.Flags().BoolVar(&repoServerPlaintext, "repo-server-plaintext", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_PLAINTEXT", false), "Use a plaintext client (non-TLS) to connect to repository server") + command.Flags().BoolVar(&repoServerStrictTLS, "repo-server-strict-tls", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_STRICT_TLS", false), "Perform strict validation of TLS certificates when connecting to repo server") + command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url") + command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ") + cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) { + redisClient = client + }) + return command +} diff --git a/cmd/main.go b/cmd/main.go index d972863992bce..af676e96ad0f8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -16,6 +16,7 @@ import ( reposerver "github.com/argoproj/argo-cd/v2/cmd/argocd-repo-server/commands" apiserver "github.com/argoproj/argo-cd/v2/cmd/argocd-server/commands" cli "github.com/argoproj/argo-cd/v2/cmd/argocd/commands" + eventreporterserver "github.com/argoproj/argo-cd/v2/cmd/event-reporter-server/commands" ) const ( @@ -34,6 +35,8 @@ func main() { command = cli.NewCommand() case "argocd-server": command = apiserver.NewCommand() + case "event-reporter-server": + command = eventreporterserver.NewCommand() case "argocd-application-controller": command = appcontroller.NewCommand() case "argocd-repo-server": diff --git a/common/common.go b/common/common.go index e4e3da5c7fd8c..d438ed4ab551b 100644 --- a/common/common.go +++ b/common/common.go @@ -15,7 +15,8 @@ import ( // Default service addresses and URLS of Argo CD internal services const ( // DefaultRepoServerAddr is the gRPC address of the Argo CD repo server - DefaultRepoServerAddr = "argocd-repo-server:8081" + DefaultRepoServerAddr = "argocd-repo-server:8081" + DefaultApplicationServerAddr = "argo-cd-server:80" // DefaultDexServerAddr is the HTTP address of the Dex OIDC server, which we run a reverse proxy against DefaultDexServerAddr = "argocd-dex-server:5556" // DefaultRedisAddr is the default redis address @@ -44,20 +45,24 @@ const ( // Default listener ports for ArgoCD components const ( - DefaultPortAPIServer = 8080 - DefaultPortRepoServer = 8081 - DefaultPortArgoCDMetrics = 8082 - DefaultPortArgoCDAPIServerMetrics = 8083 - DefaultPortRepoServerMetrics = 8084 + DefaultPortAPIServer = 8080 + DefaultPortRepoServer = 8081 + DefaultPortArgoCDMetrics = 8082 + DefaultPortArgoCDAPIServerMetrics = 8083 + DefaultPortRepoServerMetrics = 8084 + DefaultPortEventReporterServerMetrics = 8087 + DefaultPortEventReporterServer = 8088 ) // DefaultAddressAPIServer for ArgoCD components const ( - DefaultAddressAdminDashboard = "localhost" - DefaultAddressAPIServer = "0.0.0.0" - DefaultAddressAPIServerMetrics = "0.0.0.0" - DefaultAddressRepoServer = "0.0.0.0" - DefaultAddressRepoServerMetrics = "0.0.0.0" + DefaultAddressAdminDashboard = "localhost" + DefaultAddressAPIServer = "0.0.0.0" + DefaultAddressAPIServerMetrics = "0.0.0.0" + DefaultAddressRepoServer = "0.0.0.0" + DefaultAddressRepoServerMetrics = "0.0.0.0" + DefaultAddressEventReporterServer = "0.0.0.0" + DefaultAddressEventReporterServerMetrics = "0.0.0.0" ) // Default paths on the pod's file system @@ -237,6 +242,12 @@ const ( EnvCMPWorkDir = "ARGOCD_CMP_WORKDIR" // EnvGPGDataPath overrides the location where GPG keyring for signature verification is stored EnvGPGDataPath = "ARGOCD_GPG_DATA_PATH" + // EnvEventReporterShardingAlgorithm is the distribution sharding algorithm to be used: legacy + EnvEventReporterShardingAlgorithm = "EVENT_REPORTER_SHARDING_ALGORITHM" + // EnvEventReporterReplicas is the number of EventReporter replicas + EnvEventReporterReplicas = "EVENT_REPORTER_REPLICAS" + // EnvEventReporterShard is the shard number that should be handled by reporter + EnvEventReporterShard = "EVENT_REPORTER_SHARD" ) // Config Management Plugin related constants @@ -345,3 +356,9 @@ const TokenVerificationError = "failed to verify the token" var TokenVerificationErr = errors.New(TokenVerificationError) var PermissionDeniedAPIError = status.Error(codes.PermissionDenied, "permission denied") + +// Event reporter constants +const ( + EventReporterLegacyShardingAlgorithm = "legacy" + DefaultEventReporterShardingAlgorithm = EventReporterLegacyShardingAlgorithm +) diff --git a/event_reporter/codefresh/client.go b/event_reporter/codefresh/client.go new file mode 100644 index 0000000000000..f6c7001a18e24 --- /dev/null +++ b/event_reporter/codefresh/client.go @@ -0,0 +1,78 @@ +package codefresh + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +type CodefreshConfig struct { + BaseURL string + AuthToken string +} + +type codefreshClient struct { + cfConfig *CodefreshConfig + httpClient *http.Client +} + +type CodefreshClient interface { + Send(ctx context.Context, appName string, event *events.Event) error +} + +func NewCodefreshClient(cfConfig *CodefreshConfig) CodefreshClient { + return &codefreshClient{ + cfConfig: cfConfig, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +func (cc *codefreshClient) Send(ctx context.Context, appName string, event *events.Event) error { + return WithRetry(&DefaultBackoff, func() error { + url := cc.cfConfig.BaseURL + "/2.0/api/events" + log.Infof("Sending application event for %s", appName) + + wrappedPayload := map[string]json.RawMessage{ + "data": event.Payload, + } + + newPayloadBytes, err := json.Marshal(wrappedPayload) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(newPayloadBytes)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", cc.cfConfig.AuthToken) + + res, err := cc.httpClient.Do(req) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failed reporting to Codefresh, event: %s", string(event.Payload))) + } + defer res.Body.Close() + + isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300 + if !isStatusOK { + b, _ := io.ReadAll(res.Body) + return errors.Errorf("failed reporting to Codefresh, got response: status code %d and body %s, original request body: %s", + res.StatusCode, string(b), string(event.Payload)) + } + + log.Infof("Application event for %s successfully sent", appName) + return nil + }) +} diff --git a/event_reporter/codefresh/utils.go b/event_reporter/codefresh/utils.go new file mode 100644 index 0000000000000..7496ef5511012 --- /dev/null +++ b/event_reporter/codefresh/utils.go @@ -0,0 +1,151 @@ +package codefresh + +import ( + "fmt" + "strconv" + "time" + + "k8s.io/apimachinery/pkg/util/wait" +) + +// Backoff for an operation +type Backoff struct { + // The initial duration in nanoseconds or strings like "1s", "3m" + // +optional + Duration *Int64OrString `json:"duration,omitempty" protobuf:"bytes,1,opt,name=duration"` + // Duration is multiplied by factor each iteration + // +optional + Factor *Amount `json:"factor,omitempty" protobuf:"bytes,2,opt,name=factor"` + // The amount of jitter applied each iteration + // +optional + Jitter *Amount `json:"jitter,omitempty" protobuf:"bytes,3,opt,name=jitter"` + // Exit with error after this many steps + // +optional + Steps int32 `json:"steps,omitempty" protobuf:"varint,4,opt,name=steps"` +} + +// Amount represent a numeric amount. +type Amount struct { + Value []byte `json:"value" protobuf:"bytes,1,opt,name=value"` +} + +type Int64OrString struct { + Type Type `json:"type" protobuf:"varint,1,opt,name=type,casttype=Type"` + Int64Val int64 `json:"int64Val,omitempty" protobuf:"varint,2,opt,name=int64Val"` + StrVal string `json:"strVal,omitempty" protobuf:"bytes,3,opt,name=strVal"` +} + +// Type represents the stored type of Int64OrString. +type Type int64 + +const ( + Int64 Type = iota // The Int64OrString holds an int64. + String // The Int64OrString holds a string. +) + +func NewAmount(s string) Amount { + return Amount{Value: []byte(s)} +} + +// FromString creates an Int64OrString object with a string value. +func FromString(val string) Int64OrString { + return Int64OrString{Type: String, StrVal: val} +} + +// Int64Value returns the Int64Val if type Int64, or if +// it is a String, will attempt a conversion to int64, +// returning 0 if a parsing error occurs. +func (int64str *Int64OrString) Int64Value() int64 { + if int64str.Type == String { + i, _ := strconv.ParseInt(int64str.StrVal, 10, 64) + return i + } + return int64str.Int64Val +} + +var ( + defaultFactor = NewAmount("1.0") + defaultJitter = NewAmount("1") + defaultDuration = FromString("1s") + + DefaultBackoff = Backoff{ + Steps: 5, + Duration: &defaultDuration, + Factor: &defaultFactor, + Jitter: &defaultJitter, + } +) + +func (n *Amount) Float64() (float64, error) { + return strconv.ParseFloat(string(n.Value), 64) +} + +// Convert2WaitBackoff converts to a wait backoff option +func Convert2WaitBackoff(backoff *Backoff) (*wait.Backoff, error) { + result := wait.Backoff{} + + d := backoff.Duration + if d == nil { + d = &defaultDuration + } + if d.Type == Int64 { + result.Duration = time.Duration(d.Int64Value()) + } else { + parsedDuration, err := time.ParseDuration(d.StrVal) + if err != nil { + return nil, err + } + result.Duration = parsedDuration + } + + factor := backoff.Factor + if factor == nil { + factor = &defaultFactor + } + f, err := factor.Float64() + if err != nil { + return nil, fmt.Errorf("invalid factor, %w", err) + } + result.Factor = f + + jitter := backoff.Jitter + if jitter == nil { + jitter = &defaultJitter + } + j, err := jitter.Float64() + if err != nil { + return nil, fmt.Errorf("invalid jitter, %w", err) + } + result.Jitter = j + + if backoff.Steps > 0 { + result.Steps = backoff.GetSteps() + } else { + result.Steps = int(DefaultBackoff.Steps) + } + return &result, nil +} + +func (b Backoff) GetSteps() int { + return int(b.Steps) +} + +func WithRetry(backoff *Backoff, f func() error) error { + if backoff == nil { + backoff = &DefaultBackoff + } + b, err := Convert2WaitBackoff(backoff) + if err != nil { + return fmt.Errorf("invalid backoff configuration, %w", err) + } + _ = wait.ExponentialBackoff(*b, func() (bool, error) { + if err = f(); err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("failed after retries: %w", err) + } + return nil +} diff --git a/event_reporter/controller/controller.go b/event_reporter/controller/controller.go new file mode 100644 index 0000000000000..564712e742758 --- /dev/null +++ b/event_reporter/controller/controller.go @@ -0,0 +1,118 @@ +package controller + +import ( + "context" + "math" + "strings" + "time" + + argocommon "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" + "github.com/argoproj/argo-cd/v2/event_reporter/metrics" + "github.com/argoproj/argo-cd/v2/event_reporter/reporter" + applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + servercache "github.com/argoproj/argo-cd/v2/server/cache" + argoutil "github.com/argoproj/argo-cd/v2/util/argo" + "github.com/argoproj/argo-cd/v2/util/env" + "github.com/argoproj/argo-cd/v2/util/settings" + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +var ( + watchAPIBufferSize = 1000 + applicationEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvApplicationEventCacheDuration, 20, 0, math.MaxInt32)) +) + +type EventReporterController interface { + Run(ctx context.Context) +} + +type eventReporterController struct { + settingsMgr *settings.SettingsManager + appBroadcaster reporter.Broadcaster + applicationEventReporter reporter.ApplicationEventReporter + cache *servercache.Cache + appLister applisters.ApplicationLister + applicationServiceClient applicationpkg.ApplicationServiceClient + metricsServer *metrics.MetricsServer +} + +func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController { + appBroadcaster := reporter.NewBroadcaster(featureManager) + appInformer.AddEventHandler(appBroadcaster) + return &eventReporterController{ + appBroadcaster: appBroadcaster, + applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer), + cache: cache, + settingsMgr: settingsMgr, + applicationServiceClient: applicationServiceClient, + appLister: appLister, + metricsServer: metricsServer, + } +} + +func (c *eventReporterController) Run(ctx context.Context) { + var ( + logCtx log.FieldLogger = log.StandardLogger() + ) + + // sendIfPermitted is a helper to send the application to the client's streaming channel if the + // caller has RBAC privileges permissions to view it + sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error { + if eventType == watch.Bookmark { + return nil // ignore this event + } + + appInstanceLabelKey, err := c.settingsMgr.GetAppInstanceLabelKey() + if err != nil { + return err + } + trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr) + + err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod) + if err != nil { + return err + } + + if err := c.cache.SetLastApplicationEvent(&a, applicationEventCacheExpiration); err != nil { + logCtx.WithError(err).Error("failed to cache last sent application event") + return err + } + return nil + } + + //TODO: move to abstraction + eventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) + unsubscribe := c.appBroadcaster.Subscribe(eventsChannel) + defer unsubscribe() + for { + select { + case <-ctx.Done(): + return + case event := <-eventsChannel: + logCtx.Infof("channel size is %d", len(eventsChannel)) + c.metricsServer.SetQueueSizeCounter(len(eventsChannel)) + shouldProcess, ignoreResourceCache := c.applicationEventReporter.ShouldSendApplicationEvent(event) + if !shouldProcess { + logCtx.Infof("Skipping event %s/%s", event.Application.Name, event.Type) + c.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricAppEventType, event.Application.Name) + continue + } + ts := time.Now().Format("2006-01-02T15:04:05.000Z") + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache) + if err != nil { + logCtx.WithError(err).Error("failed to stream application events") + if strings.Contains(err.Error(), "context deadline exceeded") { + logCtx.Info("Closing event-source connection") + cancel() + } + } + cancel() + } + } +} diff --git a/event_reporter/handlers/handlers.go b/event_reporter/handlers/handlers.go new file mode 100644 index 0000000000000..951af200a0ad8 --- /dev/null +++ b/event_reporter/handlers/handlers.go @@ -0,0 +1,81 @@ +package handlers + +import ( + "encoding/json" + "github.com/argoproj/argo-cd/v2/event_reporter/sharding" + applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + "net/http" + "strconv" + "strings" +) + +type RequestHandlers struct { + ApplicationServiceClient applicationpkg.ApplicationServiceClient +} + +func GetRequestHandlers(applicationServiceClient applicationpkg.ApplicationServiceClient) *RequestHandlers { + return &RequestHandlers{ + ApplicationServiceClient: applicationServiceClient, + } +} + +// queryParams: []string{"shardings"} +// response JSON { "strategyName": { Distribution, //Apps } +func (rH *RequestHandlers) GetAppDistribution(w http.ResponseWriter, r *http.Request) { + type ShardingAlgorithmData struct { + Distribution map[string]int `json:"distribution"` + Apps map[string]int `json:"apps"` + } + response := map[string]ShardingAlgorithmData{} + + shardings := []string{""} + shardingsParam := r.URL.Query().Get("shardings") + if shardingsParam != "" { + shardings = strings.Split(shardingsParam, ",") + } + + apps, err := rH.ApplicationServiceClient.List(r.Context(), &applicationpkg.ApplicationQuery{}) + + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + shardingInstance := sharding.NewSharding() + + for _, shardingAlgorithm := range shardings { + distributionMap := make(map[string]int) + appsMap := make(map[string]int) + distributionFunction := shardingInstance.GetDistributionFunction(shardingAlgorithm) + + for _, app := range apps.Items { + expectedShard := distributionFunction(&app) + distributionMap[strconv.Itoa(expectedShard)] += 1 + appsMap[app.QualifiedName()] = expectedShard + } + + shardingAlgorithmDisplayName := shardingAlgorithm + if shardingAlgorithm == "" { + shardingAlgorithmDisplayName = "default" + } + + response[shardingAlgorithmDisplayName] = ShardingAlgorithmData{ + Distribution: distributionMap, + Apps: appsMap, + } + } + + jsonBytes, err := json.Marshal(response) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + _, err = w.Write(jsonBytes) + + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} diff --git a/event_reporter/metrics/metrics.go b/event_reporter/metrics/metrics.go new file mode 100644 index 0000000000000..08bdd855d3fde --- /dev/null +++ b/event_reporter/metrics/metrics.go @@ -0,0 +1,148 @@ +package metrics + +import ( + "fmt" + "github.com/argoproj/argo-cd/v2/event_reporter/sharding" + "net/http" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/argoproj/argo-cd/v2/util/profile" +) + +type MetricsServer struct { + *http.Server + shard string + redisRequestCounter *prometheus.CounterVec + redisRequestHistogram *prometheus.HistogramVec + queueSizeCounter *prometheus.GaugeVec + erroredEventsCounter *prometheus.CounterVec + cachedIgnoredEventsCounter *prometheus.CounterVec + eventProcessingDurationHistogram *prometheus.HistogramVec +} + +type MetricEventType string + +const ( + MetricAppEventType MetricEventType = "app" + MetricParentAppEventType MetricEventType = "parent_app" + MetricChildAppEventType MetricEventType = "child_app" + MetricResourceEventType MetricEventType = "resource" +) + +type MetricEventErrorType string + +const ( + MetricEventDeliveryErrorType MetricEventErrorType = "delivery" + MetricEventGetPayloadErrorType MetricEventErrorType = "get_payload" + MetricEventUnknownErrorType MetricEventErrorType = "unknown" +) + +var ( + redisRequestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "argocd_redis_request_total", + Help: "Number of kubernetes requests executed during application reconciliation.", + }, + []string{"initiator", "failed"}, + ) + redisRequestHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "argocd_redis_request_duration", + Help: "Redis requests duration.", + Buckets: []float64{0.1, 0.25, .5, 1, 2}, + }, + []string{"initiator"}, + ) + queueSizeCounter = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cf_e_reporter_queue_size", + Help: "Size of application events queue of taked shard.", + }, + []string{"reporter_shard"}, + ) + erroredEventsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cf_e_reporter_errored_events", + Help: "Total amount of errored events.", + }, + []string{"reporter_shard", "metric_event_type", "error_type", "application"}, + ) + cachedIgnoredEventsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cf_e_reporter_cached_ignored_events", + Help: "Total number of ignored events because of cache.", + }, + []string{"reporter_shard", "metric_event_type", "application"}, + ) + eventProcessingDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "cf_e_reporter_event_processing_duration", + Help: "Event processing duration.", + Buckets: []float64{0.1, 0.25, .5, 1, 2, 3, 4, 5, 7, 10, 15, 20}, + }, + []string{"reporter_shard", "metric_event_type"}, + ) +) + +// NewMetricsServer returns a new prometheus server which collects api server metrics +func NewMetricsServer(host string, port int) *MetricsServer { + mux := http.NewServeMux() + registry := prometheus.NewRegistry() + + mux.Handle("/metrics", promhttp.HandlerFor(prometheus.Gatherers{ + registry, + prometheus.DefaultGatherer, + }, promhttp.HandlerOpts{})) + profile.RegisterProfiler(mux) + + registry.MustRegister(redisRequestCounter) + registry.MustRegister(redisRequestHistogram) + + registry.MustRegister(queueSizeCounter) + registry.MustRegister(erroredEventsCounter) + registry.MustRegister(cachedIgnoredEventsCounter) + registry.MustRegister(eventProcessingDurationHistogram) + + shard := sharding.GetShardNumber() + + return &MetricsServer{ + Server: &http.Server{ + Addr: fmt.Sprintf("%s:%d", host, port), + Handler: mux, + }, + shard: strconv.FormatInt(int64(shard), 10), + queueSizeCounter: queueSizeCounter, + erroredEventsCounter: erroredEventsCounter, + cachedIgnoredEventsCounter: cachedIgnoredEventsCounter, + eventProcessingDurationHistogram: eventProcessingDurationHistogram, + } +} + +func (m *MetricsServer) IncRedisRequest(failed bool) { + m.redisRequestCounter.WithLabelValues("argocd-server", strconv.FormatBool(failed)).Inc() +} + +// ObserveRedisRequestDuration observes redis request duration +func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) { + m.redisRequestHistogram.WithLabelValues("argocd-server").Observe(duration.Seconds()) +} + +func (m *MetricsServer) SetQueueSizeCounter(size int) { + m.queueSizeCounter.WithLabelValues(m.shard).Set(float64(size)) +} + +func (m *MetricsServer) IncErroredEventsCounter(metricEventType MetricEventType, errorType MetricEventErrorType, application string) { + m.erroredEventsCounter.WithLabelValues(m.shard, string(metricEventType), string(errorType), application).Inc() +} + +func (m *MetricsServer) IncCachedIgnoredEventsCounter(metricEventType MetricEventType, application string) { + m.cachedIgnoredEventsCounter.WithLabelValues(m.shard, string(metricEventType), application).Inc() +} + +func (m *MetricsServer) ObserveEventProcessingDurationHistogramDuration(metricEventType MetricEventType, duration time.Duration) { + m.eventProcessingDurationHistogram.WithLabelValues(m.shard, string(metricEventType)).Observe(duration.Seconds()) +} diff --git a/event_reporter/reporter/application_errors_parser.go b/event_reporter/reporter/application_errors_parser.go new file mode 100644 index 0000000000000..8a6dd2b30c731 --- /dev/null +++ b/event_reporter/reporter/application_errors_parser.go @@ -0,0 +1,147 @@ +package reporter + +import ( + "fmt" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/argoproj/gitops-engine/pkg/health" + "github.com/argoproj/gitops-engine/pkg/sync/common" + + "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" + appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" +) + +func parseApplicationSyncResultErrors(os *appv1.OperationState) []*events.ObjectError { + var errors []*events.ObjectError + // mean that resource not found as sync result but application can contain error inside operation state itself, + // for example app created with invalid yaml + if os.Phase == common.OperationError || os.Phase == common.OperationFailed { + errors = append(errors, &events.ObjectError{ + Type: "sync", + Level: "error", + Message: os.Message, + LastSeen: os.StartedAt, + }) + } + return errors +} + +var syncTaskUnsuccessfullErrorMessage = "one or more synchronization tasks completed unsuccessfully" +var syncTaskNotValidErrorMessage = "one or more synchronization tasks are not valid" + +func parseApplicationSyncResultErrorsFromConditions(status appv1.ApplicationStatus) []*events.ObjectError { + var errs []*events.ObjectError + if status.Conditions == nil { + return errs + } + for _, cnd := range status.Conditions { + if !strings.Contains(strings.ToLower(cnd.Type), "error") { + continue + } + + lastSeen := metav1.Now() + if cnd.LastTransitionTime != nil { + lastSeen = *cnd.LastTransitionTime + } + + if (strings.Contains(cnd.Message, syncTaskUnsuccessfullErrorMessage) || strings.Contains(cnd.Message, syncTaskNotValidErrorMessage)) && status.OperationState != nil && status.OperationState.SyncResult != nil && status.OperationState.SyncResult.Resources != nil { + resourcesSyncErrors := parseAggregativeResourcesSyncErrors(status.OperationState.SyncResult.Resources) + + errs = append(errs, resourcesSyncErrors...) + } else { + errs = append(errs, &events.ObjectError{ + Type: "sync", + Level: "error", + Message: cnd.Message, + LastSeen: lastSeen, + }) + } + } + return errs +} + +func parseResourceSyncResultErrors(rs *appv1.ResourceStatus, os *appv1.OperationState) []*events.ObjectError { + errors := []*events.ObjectError{} + if os.SyncResult == nil { + return errors + } + + _, sr := os.SyncResult.Resources.Find( + rs.Group, + rs.Kind, + rs.Namespace, + rs.Name, + common.SyncPhaseSync, + ) + + if sr == nil || !(sr.HookPhase == common.OperationFailed || sr.HookPhase == common.OperationError || sr.Status == common.ResultCodeSyncFailed) { + return errors + } + + errors = append(errors, &events.ObjectError{ + Type: "sync", + Level: "error", + Message: sr.Message, + LastSeen: os.StartedAt, + }) + + return errors +} + +func parseAggregativeHealthErrors(rs *appv1.ResourceStatus, apptree *appv1.ApplicationTree) []*events.ObjectError { + errs := make([]*events.ObjectError, 0) + + if apptree == nil { + return errs + } + + n := apptree.FindNode(rs.Group, rs.Kind, rs.Namespace, rs.Name) + if n == nil { + return errs + } + + childNodes := n.GetAllChildNodes(apptree, "") + + for _, cn := range childNodes { + if cn.Health != nil && cn.Health.Status == health.HealthStatusDegraded { + errs = append(errs, &events.ObjectError{ + Type: "health", + Level: "error", + Message: cn.Health.Message, + LastSeen: *cn.CreatedAt, + }) + } + } + + return errs +} + +func parseAggregativeResourcesSyncErrors(resourceResults appv1.ResourceResults) []*events.ObjectError { + var errs []*events.ObjectError + + if resourceResults == nil { + return errs + } + + for _, rr := range resourceResults { + if rr.Message != "" { + objectError := events.ObjectError{ + Type: "sync", + Level: "error", + LastSeen: metav1.Now(), + Message: fmt.Sprintf("Resource %s(%s): \n %s", rr.Kind, rr.Name, rr.Message), + } + if rr.Status == common.ResultCodeSyncFailed { + errs = append(errs, &objectError) + } + if rr.HookPhase == common.OperationFailed || rr.HookPhase == common.OperationError { + errs = append(errs, &objectError) + } + + } + } + + return errs +} diff --git a/event_reporter/reporter/application_event_reporter.go b/event_reporter/reporter/application_event_reporter.go new file mode 100644 index 0000000000000..106dca7ef1184 --- /dev/null +++ b/event_reporter/reporter/application_event_reporter.go @@ -0,0 +1,877 @@ +package reporter + +import ( + "context" + "encoding/json" + "fmt" + "math" + "reflect" + "strings" + "time" + + argocommon "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" + "github.com/argoproj/argo-cd/v2/event_reporter/metrics" + applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + servercache "github.com/argoproj/argo-cd/v2/server/cache" + "github.com/argoproj/argo-cd/v2/util/env" + + "github.com/argoproj/argo-cd/v2/util/argo" + + "github.com/argoproj/gitops-engine/pkg/health" + "github.com/argoproj/gitops-engine/pkg/utils/kube" + "github.com/argoproj/gitops-engine/pkg/utils/text" + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/yaml" + + "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" + appv1reg "github.com/argoproj/argo-cd/v2/pkg/apis/application" + appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/reposerver/apiclient" +) + +var ( + resourceEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvResourceEventCacheDuration, 20, 0, math.MaxInt32)) +) + +type applicationEventReporter struct { + cache *servercache.Cache + codefreshClient codefresh.CodefreshClient + appLister applisters.ApplicationLister + applicationServiceClient applicationpkg.ApplicationServiceClient + metricsServer *metrics.MetricsServer +} + +type ApplicationEventReporter interface { + StreamApplicationEvents( + ctx context.Context, + a *appv1.Application, + ts string, + ignoreResourceCache bool, + appInstanceLabelKey string, + trackingMethod appv1.TrackingMethod, + ) error + ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) +} + +func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter { + return &applicationEventReporter{ + cache: cache, + applicationServiceClient: applicationServiceClient, + codefreshClient: codefresh.NewCodefreshClient(codefreshConfig), + appLister: appLister, + metricsServer: metricsServer, + } +} + +func (s *applicationEventReporter) shouldSendResourceEvent(a *appv1.Application, rs appv1.ResourceStatus) bool { + logCtx := logWithResourceStatus(log.WithFields(log.Fields{ + "app": a.Name, + "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), + "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), + }), rs) + + cachedRes, err := s.cache.GetLastResourceEvent(a, rs, getApplicationLatestRevision(a)) + if err != nil { + logCtx.Debug("resource not in cache") + return true + } + + if reflect.DeepEqual(&cachedRes, &rs) { + logCtx.Debug("resource status not changed") + + // status not changed + return false + } + + logCtx.Info("resource status changed") + return true +} + +func getParentAppName(a *appv1.Application, appInstanceLabelKey string, trackingMethod appv1.TrackingMethod) string { + resourceTracking := argo.NewResourceTracking() + unApp := kube.MustToUnstructured(&a) + + return resourceTracking.GetAppName(unApp, appInstanceLabelKey, trackingMethod) +} + +func isChildApp(parentAppName string) bool { + return parentAppName != "" +} + +func getAppAsResource(a *appv1.Application) *appv1.ResourceStatus { + return &appv1.ResourceStatus{ + Name: a.Name, + Namespace: a.Namespace, + Version: "v1alpha1", + Kind: "Application", + Group: "argoproj.io", + Status: a.Status.Sync.Status, + Health: &a.Status.Health, + RequiresPruning: a.DeletionTimestamp != nil, + } +} + +func (r *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx *log.Entry) (*apiclient.ManifestResponse, error, bool) { + // get the desired state manifests of the application + desiredManifests, err := r.applicationServiceClient.GetManifests(ctx, &application.ApplicationManifestQuery{ + Name: &a.Name, + Revision: &a.Status.Sync.Revision, + }) + if err != nil { + notManifestGenerationError := !strings.Contains(err.Error(), "Manifest generation error") + // when application deleted rbac also throws erorr with PermissionDenied + // we can ignore the error, as we check rbac access before reporting events + notPermissionDeniedError := !strings.Contains(err.Error(), "PermissionDenied") + + if notManifestGenerationError && notPermissionDeniedError { + return nil, fmt.Errorf("failed to get application desired state manifests: %w", err), false + } + // if it's manifest generation error we need to still report the actual state + // of the resources, but since we can't get the desired state, we will report + // each resource with empty desired state + logCtx.WithError(err).Warn("failed to get application desired state manifests, reporting actual state only") + desiredManifests = &apiclient.ManifestResponse{Manifests: []*apiclient.Manifest{}} + return desiredManifests, nil, true // will ignore requiresPruning=true to not delete resources with actual state + } + return desiredManifests, nil, false +} + +func (s *applicationEventReporter) StreamApplicationEvents( + ctx context.Context, + a *appv1.Application, + ts string, + ignoreResourceCache bool, + appInstanceLabelKey string, + trackingMethod appv1.TrackingMethod, +) error { + startTime := time.Now() + logCtx := log.WithField("app", a.Name) + + logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events") + + appTree, err := s.applicationServiceClient.ResourceTree(ctx, &application.ResourcesQuery{ + ApplicationName: &a.Name, + Project: &a.Spec.Project, + Namespace: &a.Namespace, + }) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return fmt.Errorf("failed to get application tree: %w", err) + } + + // we still need process app even without tree, it is in case of app yaml originally contain error, + // we still want to show it the erorrs that related to it on codefresh ui + logCtx.WithError(err).Warn("failed to get application tree, resuming") + } + + logCtx.Info("getting desired manifests") + + desiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, a, logCtx) + if err != nil { + return err + } + + logCtx.Info("getting parent application name") + + parentAppName := getParentAppName(a, appInstanceLabelKey, trackingMethod) + + if isChildApp(parentAppName) { + logCtx.Info("processing as child application") + parentApplicationEntity, err := s.applicationServiceClient.Get(ctx, &application.ApplicationQuery{ + Name: &parentAppName, + }) + if err != nil { + return fmt.Errorf("failed to get parent application entity: %w", err) + } + + rs := getAppAsResource(a) + + parentDesiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, logCtx) + if err != nil { + logCtx.WithError(err).Warn("failed to get parent application's desired manifests, resuming") + } + + // helm app hasnt revision + // TODO: add check if it helm application + parentOperationRevision := getOperationRevision(parentApplicationEntity) + parentRevisionMetadata, err := s.getApplicationRevisionDetails(ctx, parentApplicationEntity, parentOperationRevision) + if err != nil { + logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming") + } + + err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, parentDesiredManifests, appTree, manifestGenErr, a, parentRevisionMetadata, true, appInstanceLabelKey, trackingMethod, desiredManifests.ApplicationVersions) + if err != nil { + s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name) + return err + } + reconcileDuration := time.Since(startTime) + s.metricsServer.ObserveEventProcessingDurationHistogramDuration(metrics.MetricChildAppEventType, reconcileDuration) + } else { + logCtx.Info("processing as root application") + // will get here only for root applications (not managed as a resource by another application) + appEvent, err := s.getApplicationEventPayload(ctx, a, ts, appInstanceLabelKey, trackingMethod, desiredManifests.ApplicationVersions) + if err != nil { + s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name) + return fmt.Errorf("failed to get application event: %w", err) + } + + if appEvent == nil { + // event did not have an OperationState - skip all events + return nil + } + + logWithAppStatus(a, logCtx, ts).Info("sending root application event") + if err := s.codefreshClient.Send(ctx, a.Name, appEvent); err != nil { + s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventDeliveryErrorType, a.Name) + return fmt.Errorf("failed to send event for root application %s/%s: %w", a.Namespace, a.Name, err) + } + reconcileDuration := time.Since(startTime) + s.metricsServer.ObserveEventProcessingDurationHistogramDuration(metrics.MetricParentAppEventType, reconcileDuration) + } + + revisionMetadata, _ := s.getApplicationRevisionDetails(ctx, a, getOperationRevision(a)) + // for each resource in the application get desired and actual state, + // then stream the event + for _, rs := range a.Status.Resources { + if isApp(rs) { + continue + } + startTime := time.Now() + err := s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, appTree, manifestGenErr, nil, revisionMetadata, ignoreResourceCache, appInstanceLabelKey, trackingMethod, nil) + if err != nil { + s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name) + return err + } + reconcileDuration := time.Since(startTime) + s.metricsServer.ObserveEventProcessingDurationHistogramDuration(metrics.MetricResourceEventType, reconcileDuration) + } + return nil +} + +func (s *applicationEventReporter) getAppForResourceReporting( + rs appv1.ResourceStatus, + ctx context.Context, + a *appv1.Application, + revisionMetadata *appv1.RevisionMetadata, +) (*appv1.Application, *appv1.RevisionMetadata) { + if rs.Kind != "Rollout" { // for rollout it's crucial to report always correct operationSyncRevision + return a, revisionMetadata + } + + latestAppStatus, err := s.appLister.Applications(a.Namespace).Get(a.Name) + + if err != nil { + return a, revisionMetadata + } + + revisionMetadataToReport, err := s.getApplicationRevisionDetails(ctx, latestAppStatus, getOperationRevision(latestAppStatus)) + + if err != nil { + return a, revisionMetadata + } + + return latestAppStatus, revisionMetadataToReport +} + +func (s *applicationEventReporter) processResource( + ctx context.Context, + rs appv1.ResourceStatus, + parentApplication *appv1.Application, + logCtx *log.Entry, + ts string, + desiredManifests *apiclient.ManifestResponse, + appTree *appv1.ApplicationTree, + manifestGenErr bool, + originalApplication *appv1.Application, + revisionMetadata *appv1.RevisionMetadata, + ignoreResourceCache bool, + appInstanceLabelKey string, + trackingMethod appv1.TrackingMethod, + applicationVersions *apiclient.ApplicationVersions, +) error { + metricsEventType := metrics.MetricResourceEventType + if isApp(rs) { + metricsEventType = metrics.MetricChildAppEventType + } + + logCtx = logCtx.WithFields(log.Fields{ + "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), + "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), + }) + + if rs.Health == nil && rs.Status == appv1.SyncStatusCodeSynced { + // for resources without health status we need to add 'Healthy' status + // when they are synced because we might have sent an event with 'Missing' + // status earlier and they would be stuck in it if we don't switch to 'Healthy' + rs.Health = &appv1.HealthStatus{ + Status: health.HealthStatusHealthy, + } + } + + if !ignoreResourceCache && !s.shouldSendResourceEvent(parentApplication, rs) { + s.metricsServer.IncCachedIgnoredEventsCounter(metricsEventType, parentApplication.Name) + return nil + } + + // get resource desired state + desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx) + + // get resource actual state + actualState, err := s.applicationServiceClient.GetResource(ctx, &application.ApplicationResourceRequest{ + Name: &parentApplication.Name, + Namespace: &rs.Namespace, + ResourceName: &rs.Name, + Version: &rs.Version, + Group: &rs.Group, + Kind: &rs.Kind, + }) + if err != nil { + if !strings.Contains(err.Error(), "not found") { + // only return error if there is no point in trying to send the + // next resource. For example if the shared context has exceeded + // its deadline + if strings.Contains(err.Error(), "context deadline exceeded") { + return fmt.Errorf("failed to get actual state: %w", err) + } + + s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventUnknownErrorType, parentApplication.Name) + logCtx.WithError(err).Warn("failed to get actual state, resuming") + return nil + } + + manifest := "" + // empty actual state + actualState = &application.ApplicationResourceResponse{Manifest: &manifest} + } + + parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, parentApplication, revisionMetadata) + + var originalAppRevisionMetadata *appv1.RevisionMetadata = nil + + if originalApplication != nil { + originalAppRevisionMetadata, _ = s.getApplicationRevisionDetails(ctx, originalApplication, getOperationRevision(originalApplication)) + } + + ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, ts, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions) + if err != nil { + s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, parentApplication.Name) + logCtx.WithError(err).Warn("failed to get event payload, resuming") + return nil + } + + appRes := appv1.Application{} + appName := "" + if isApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil { + logWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event") + appName = appRes.Name + } else { + logWithResourceStatus(logCtx, rs).Info("streaming resource event") + appName = rs.Name + } + + if err := s.codefreshClient.Send(ctx, appName, ev); err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return fmt.Errorf("failed to send resource event: %w", err) + } + + s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventDeliveryErrorType, appName) + logCtx.WithError(err).Warn("failed to send resource event, resuming") + return nil + } + + if err := s.cache.SetLastResourceEvent(parentApplicationToReport, rs, resourceEventCacheExpiration, getApplicationLatestRevision(parentApplicationToReport)); err != nil { + logCtx.WithError(err).Warn("failed to cache resource event") + } + + return nil +} + +func (s *applicationEventReporter) ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) { + logCtx := log.WithField("app", ae.Application.Name) + + if ae.Type == watch.Deleted { + logCtx.Info("application deleted") + return true, false + } + + cachedApp, err := s.cache.GetLastApplicationEvent(&ae.Application) + if err != nil || cachedApp == nil { + return true, false + } + + cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff + cachedApp.Spec.Project = ae.Application.Spec.Project // + for i := range cachedApp.Status.Conditions { + cachedApp.Status.Conditions[i].LastTransitionTime = nil + } + for i := range ae.Application.Status.Conditions { + ae.Application.Status.Conditions[i].LastTransitionTime = nil + } + + // check if application changed to healthy status + if ae.Application.Status.Health.Status == health.HealthStatusHealthy && cachedApp.Status.Health.Status != health.HealthStatusHealthy { + return true, true + } + + if !reflect.DeepEqual(ae.Application.Spec, cachedApp.Spec) { + logCtx.Info("application spec changed") + return true, false + } + + if !reflect.DeepEqual(ae.Application.Status, cachedApp.Status) { + logCtx.Info("application status changed") + return true, false + } + + if !reflect.DeepEqual(ae.Application.Operation, cachedApp.Operation) { + logCtx.Info("application operation changed") + return true, false + } + + return false, false +} + +func isApp(rs appv1.ResourceStatus) bool { + return rs.GroupVersionKind().String() == appv1.ApplicationSchemaGroupVersionKind.String() +} + +func logWithAppStatus(a *appv1.Application, logCtx *log.Entry, ts string) *log.Entry { + return logCtx.WithFields(log.Fields{ + "sync": a.Status.Sync.Status, + "health": a.Status.Health.Status, + "resourceVersion": a.ResourceVersion, + "ts": ts, + }) +} + +func logWithResourceStatus(logCtx *log.Entry, rs appv1.ResourceStatus) *log.Entry { + logCtx = logCtx.WithField("sync", rs.Status) + if rs.Health != nil { + logCtx = logCtx.WithField("health", rs.Health.Status) + } + + return logCtx +} + +func getLatestAppHistoryItem(a *appv1.Application) *appv1.RevisionHistory { + if a.Status.History != nil && len(a.Status.History) > 0 { + return &a.Status.History[len(a.Status.History)-1] + } + + return nil +} + +func getApplicationLatestRevision(a *appv1.Application) string { + revision := a.Status.Sync.Revision + lastHistory := getLatestAppHistoryItem(a) + + if lastHistory != nil { + revision = lastHistory.Revision + } + + return revision +} + +func getOperationRevision(a *appv1.Application) string { + var revision string + if a != nil { + // this value will be used in case if application hasnt resources , like gitsource + revision = a.Status.Sync.Revision + if a.Status.OperationState != nil && a.Status.OperationState.Operation.Sync != nil && a.Status.OperationState.Operation.Sync.Revision != "" { + revision = a.Status.OperationState.Operation.Sync.Revision + } else if a.Operation != nil && a.Operation.Sync != nil && a.Operation.Sync.Revision != "" { + revision = a.Operation.Sync.Revision + } + } + + return revision +} + +func (s *applicationEventReporter) getApplicationRevisionDetails(ctx context.Context, a *appv1.Application, revision string) (*appv1.RevisionMetadata, error) { + return s.applicationServiceClient.RevisionMetadata(ctx, &application.RevisionMetadataQuery{ + Name: &a.Name, + Revision: &revision, + }) +} + +func getLatestAppHistoryId(a *appv1.Application) int64 { + var id int64 + lastHistory := getLatestAppHistoryItem(a) + + if lastHistory != nil { + id = lastHistory.ID + } + + return id +} + +func getResourceEventPayload( + parentApplication *appv1.Application, + rs *appv1.ResourceStatus, + actualState *application.ApplicationResourceResponse, + desiredState *apiclient.Manifest, + apptree *appv1.ApplicationTree, + manifestGenErr bool, + ts string, + originalApplication *appv1.Application, // passed when rs is application + revisionMetadata *appv1.RevisionMetadata, + originalAppRevisionMetadata *appv1.RevisionMetadata, // passed when rs is application + appInstanceLabelKey string, + trackingMethod appv1.TrackingMethod, + applicationVersions *apiclient.ApplicationVersions, +) (*events.Event, error) { + var ( + err error + syncStarted = metav1.Now() + syncFinished *metav1.Time + errors = []*events.ObjectError{} + logCtx *log.Entry + ) + + if originalApplication != nil { + logCtx = log.WithField("application", originalApplication.Name) + } else { + logCtx = log.NewEntry(log.StandardLogger()) + } + + object := []byte(*actualState.Manifest) + + if originalAppRevisionMetadata != nil && len(object) != 0 { + actualObject, err := appv1.UnmarshalToUnstructured(*actualState.Manifest) + + if err == nil { + actualObject = addCommitDetailsToLabels(actualObject, originalAppRevisionMetadata) + object, err = actualObject.MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal unstructured object: %w", err) + } + } + } + if len(object) == 0 { + if len(desiredState.CompiledManifest) == 0 { + // no actual or desired state, don't send event + u := &unstructured.Unstructured{} + apiVersion := rs.Version + if rs.Group != "" { + apiVersion = rs.Group + "/" + rs.Version + } + + u.SetAPIVersion(apiVersion) + u.SetKind(rs.Kind) + u.SetName(rs.Name) + u.SetNamespace(rs.Namespace) + if originalAppRevisionMetadata != nil { + u = addCommitDetailsToLabels(u, originalAppRevisionMetadata) + } + + object, err = u.MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal unstructured object: %w", err) + } + } else { + // no actual state, use desired state as event object + unstructuredWithNamespace, err := addDestNamespaceToManifest([]byte(desiredState.CompiledManifest), rs) + if err != nil { + return nil, fmt.Errorf("failed to add destination namespace to manifest: %w", err) + } + if originalAppRevisionMetadata != nil { + unstructuredWithNamespace = addCommitDetailsToLabels(unstructuredWithNamespace, originalAppRevisionMetadata) + } + + object, _ = unstructuredWithNamespace.MarshalJSON() + } + } else if rs.RequiresPruning && !manifestGenErr { + // resource should be deleted + desiredState.CompiledManifest = "" + manifest := "" + actualState.Manifest = &manifest + } + + if (originalApplication != nil && originalApplication.DeletionTimestamp != nil) || parentApplication.ObjectMeta.DeletionTimestamp != nil { + // resource should be deleted in case if application in process of deletion + desiredState.CompiledManifest = "" + manifest := "" + actualState.Manifest = &manifest + } + + if parentApplication.Status.OperationState != nil { + syncStarted = parentApplication.Status.OperationState.StartedAt + syncFinished = parentApplication.Status.OperationState.FinishedAt + errors = append(errors, parseResourceSyncResultErrors(rs, parentApplication.Status.OperationState)...) + } + + // for primitive resources that are synced right away and don't require progression time (like configmap) + if rs.Status == appv1.SyncStatusCodeSynced && rs.Health != nil && rs.Health.Status == health.HealthStatusHealthy { + syncFinished = &syncStarted + } + + // parent application not include errors in application originally was created with broken state, for example in destination missed namespace + if originalApplication != nil && originalApplication.Status.OperationState != nil { + errors = append(errors, parseApplicationSyncResultErrors(originalApplication.Status.OperationState)...) + } + + if originalApplication != nil && originalApplication.Status.Conditions != nil { + errors = append(errors, parseApplicationSyncResultErrorsFromConditions(originalApplication.Status)...) + } + + if len(desiredState.RawManifest) == 0 && len(desiredState.CompiledManifest) != 0 { + // for handling helm defined resources, etc... + y, err := yaml.JSONToYAML([]byte(desiredState.CompiledManifest)) + if err == nil { + desiredState.RawManifest = string(y) + } + } + + applicationVersionsEvents, err := repoAppVersionsToEvent(applicationVersions) + if err != nil { + logCtx.Errorf("failed to convert appVersions: %v", err) + } + + source := events.ObjectSource{ + DesiredManifest: desiredState.CompiledManifest, + ActualManifest: *actualState.Manifest, + GitManifest: desiredState.RawManifest, + RepoURL: parentApplication.Status.Sync.ComparedTo.Source.RepoURL, + Path: desiredState.Path, + Revision: getApplicationLatestRevision(parentApplication), + OperationSyncRevision: getOperationRevision(parentApplication), + HistoryId: getLatestAppHistoryId(parentApplication), + AppName: parentApplication.Name, + AppNamespace: parentApplication.Namespace, + AppUID: string(parentApplication.ObjectMeta.UID), + AppLabels: parentApplication.Labels, + SyncStatus: string(rs.Status), + SyncStartedAt: syncStarted, + SyncFinishedAt: syncFinished, + Cluster: parentApplication.Spec.Destination.Server, + AppInstanceLabelKey: appInstanceLabelKey, + TrackingMethod: string(trackingMethod), + } + + if revisionMetadata != nil { + source.CommitMessage = revisionMetadata.Message + source.CommitAuthor = revisionMetadata.Author + source.CommitDate = &revisionMetadata.Date + } + + if rs.Health != nil { + source.HealthStatus = (*string)(&rs.Health.Status) + source.HealthMessage = &rs.Health.Message + if rs.Health.Status != health.HealthStatusHealthy { + errors = append(errors, parseAggregativeHealthErrors(rs, apptree)...) + } + } + + payload := events.EventPayload{ + Timestamp: ts, + Object: object, + Source: &source, + Errors: errors, + AppVersions: applicationVersionsEvents, + } + + logCtx.Infof("AppVersion before encoding: %v", safeString(payload.AppVersions.AppVersion)) + + payloadBytes, err := json.Marshal(&payload) + if err != nil { + return nil, fmt.Errorf("failed to marshal payload for resource %s/%s: %w", rs.Namespace, rs.Name, err) + } + + return &events.Event{Payload: payloadBytes}, nil +} + +func (s *applicationEventReporter) getApplicationEventPayload( + ctx context.Context, + a *appv1.Application, + ts string, + appInstanceLabelKey string, + trackingMethod appv1.TrackingMethod, + applicationVersions *apiclient.ApplicationVersions, +) (*events.Event, error) { + var ( + syncStarted = metav1.Now() + syncFinished *metav1.Time + logCtx = log.WithField("application", a.Name) + ) + + obj := appv1.Application{} + a.DeepCopyInto(&obj) + + // make sure there is type meta on object + obj.TypeMeta = metav1.TypeMeta{ + Kind: appv1reg.ApplicationKind, + APIVersion: appv1.SchemeGroupVersion.String(), + } + + if a.Status.OperationState != nil { + syncStarted = a.Status.OperationState.StartedAt + syncFinished = a.Status.OperationState.FinishedAt + } + + applicationSource := a.Spec.GetSource() + if !applicationSource.IsHelm() && (a.Status.Sync.Revision != "" || (a.Status.History != nil && len(a.Status.History) > 0)) { + revisionMetadata, err := s.getApplicationRevisionDetails(ctx, a, getOperationRevision(a)) + + if err != nil { + if !strings.Contains(err.Error(), "not found") { + return nil, fmt.Errorf("failed to get revision metadata: %w", err) + } + + logCtx.Warnf("failed to get revision metadata: %s, reporting application deletion event", err.Error()) + } else { + if obj.ObjectMeta.Labels == nil { + obj.ObjectMeta.Labels = map[string]string{} + } + + obj.ObjectMeta.Labels["app.meta.commit-date"] = revisionMetadata.Date.Format("2006-01-02T15:04:05.000Z") + obj.ObjectMeta.Labels["app.meta.commit-author"] = revisionMetadata.Author + obj.ObjectMeta.Labels["app.meta.commit-message"] = revisionMetadata.Message + } + } + + object, err := json.Marshal(&obj) + if err != nil { + return nil, fmt.Errorf("failed to marshal application event") + } + + actualManifest := string(object) + if a.DeletionTimestamp != nil { + actualManifest = "" // mark as deleted + logCtx.Info("reporting application deletion event") + } + + applicationVersionsEvents, err := repoAppVersionsToEvent(applicationVersions) + if err != nil { + logCtx.Errorf("failed to convert appVersions: %v", err) + } + + hs := string(a.Status.Health.Status) + source := &events.ObjectSource{ + DesiredManifest: "", + GitManifest: "", + ActualManifest: actualManifest, + RepoURL: a.Spec.GetSource().RepoURL, + CommitMessage: "", + CommitAuthor: "", + Path: "", + Revision: "", + OperationSyncRevision: "", + HistoryId: 0, + AppName: "", + AppUID: "", + AppLabels: map[string]string{}, + SyncStatus: string(a.Status.Sync.Status), + SyncStartedAt: syncStarted, + SyncFinishedAt: syncFinished, + HealthStatus: &hs, + HealthMessage: &a.Status.Health.Message, + Cluster: a.Spec.Destination.Server, + AppInstanceLabelKey: appInstanceLabelKey, + TrackingMethod: string(trackingMethod), + } + + payload := events.EventPayload{ + Timestamp: ts, + Object: object, + Source: source, + Errors: parseApplicationSyncResultErrorsFromConditions(a.Status), + AppVersions: applicationVersionsEvents, + } + + logCtx.Infof("AppVersion before encoding: %v", safeString(payload.AppVersions.AppVersion)) + + payloadBytes, err := json.Marshal(&payload) + if err != nil { + return nil, fmt.Errorf("failed to marshal payload for resource %s/%s: %w", a.Namespace, a.Name, err) + } + + return &events.Event{Payload: payloadBytes}, nil +} + +func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger *log.Entry) *apiclient.Manifest { + if ds == nil { + return &apiclient.Manifest{} + } + for _, m := range ds.Manifests { + u, err := appv1.UnmarshalToUnstructured(m.CompiledManifest) + if err != nil { + logger.WithError(err).Warnf("failed to unmarshal compiled manifest") + continue + } + + if u == nil { + continue + } + + ns := text.FirstNonEmpty(u.GetNamespace(), rs.Namespace) + + if u.GroupVersionKind().String() == rs.GroupVersionKind().String() && + u.GetName() == rs.Name && + ns == rs.Namespace { + if rs.Kind == kube.SecretKind && rs.Version == "v1" { + m.RawManifest = m.CompiledManifest + } + + return m + } + } + + // no desired state for resource + // it's probably deleted from git + return &apiclient.Manifest{} +} + +func addDestNamespaceToManifest(resourceManifest []byte, rs *appv1.ResourceStatus) (*unstructured.Unstructured, error) { + u, err := appv1.UnmarshalToUnstructured(string(resourceManifest)) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal manifest: %w", err) + } + + if u.GetNamespace() == rs.Namespace { + return u, nil + } + + // need to change namespace + u.SetNamespace(rs.Namespace) + + return u, nil +} + +func addCommitDetailsToLabels(u *unstructured.Unstructured, revisionMetadata *appv1.RevisionMetadata) *unstructured.Unstructured { + if revisionMetadata == nil || u == nil { + return u + } + + if field, _, _ := unstructured.NestedFieldCopy(u.Object, "metadata", "labels"); field == nil { + _ = unstructured.SetNestedStringMap(u.Object, map[string]string{}, "metadata", "labels") + } + + _ = unstructured.SetNestedField(u.Object, revisionMetadata.Date.Format("2006-01-02T15:04:05.000Z"), "metadata", "labels", "app.meta.commit-date") + _ = unstructured.SetNestedField(u.Object, revisionMetadata.Author, "metadata", "labels", "app.meta.commit-author") + _ = unstructured.SetNestedField(u.Object, revisionMetadata.Message, "metadata", "labels", "app.meta.commit-message") + + return u +} + +func repoAppVersionsToEvent(applicationVersions *apiclient.ApplicationVersions) (*events.ApplicationVersions, error) { + applicationVersionsEvents := &events.ApplicationVersions{} + applicationVersionsData, _ := json.Marshal(applicationVersions) + err := json.Unmarshal(applicationVersionsData, applicationVersionsEvents) + if err != nil { + return nil, err + } + return applicationVersionsEvents, nil +} + +func safeString(s *string) string { + if s == nil { + return "" + } + return *s +} diff --git a/event_reporter/reporter/application_event_reporter_test.go b/event_reporter/reporter/application_event_reporter_test.go new file mode 100644 index 0000000000000..099af0242a3f0 --- /dev/null +++ b/event_reporter/reporter/application_event_reporter_test.go @@ -0,0 +1,461 @@ +package reporter + +import ( + "context" + "encoding/json" + "github.com/argoproj/argo-cd/v2/event_reporter/metrics" + "github.com/argoproj/argo-cd/v2/util/io" + "net/http" + "testing" + "time" + + "github.com/ghodss/yaml" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/argoproj/argo-cd/v2/pkg/apiclient" + appsv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + fakeapps "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake" + appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions" + applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + + servercache "github.com/argoproj/argo-cd/v2/server/cache" + cacheutil "github.com/argoproj/argo-cd/v2/util/cache" + appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + repoApiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + "github.com/argoproj/argo-cd/v2/util/argo" +) + +const ( + testNamespace = "default" +) + +func TestGetResourceEventPayload(t *testing.T) { + t.Run("Deleting timestamp is empty", func(t *testing.T) { + + app := v1alpha1.Application{} + rs := v1alpha1.ResourceStatus{} + + man := "{ \"key\" : \"manifest\" }" + + actualState := application.ApplicationResourceResponse{ + Manifest: &man, + } + desiredState := repoApiclient.Manifest{ + CompiledManifest: "{ \"key\" : \"manifest\" }", + } + appTree := v1alpha1.ApplicationTree{} + revisionMetadata := v1alpha1.RevisionMetadata{ + Author: "demo usert", + Date: metav1.Time{}, + Message: "some message", + } + + event, err := getResourceEventPayload(&app, &rs, &actualState, &desiredState, &appTree, true, "", nil, &revisionMetadata, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &repoApiclient.ApplicationVersions{}) + assert.NoError(t, err) + + var eventPayload events.EventPayload + + err = json.Unmarshal(event.Payload, &eventPayload) + assert.NoError(t, err) + + assert.Equal(t, "{ \"key\" : \"manifest\" }", eventPayload.Source.DesiredManifest) + assert.Equal(t, "{ \"key\" : \"manifest\" }", eventPayload.Source.ActualManifest) + }) + + t.Run("Deleting timestamp is empty", func(t *testing.T) { + + app := v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{}, + }, + Status: v1alpha1.ApplicationStatus{}, + } + rs := v1alpha1.ResourceStatus{} + man := "{ \"key\" : \"manifest\" }" + actualState := application.ApplicationResourceResponse{ + Manifest: &man, + } + desiredState := repoApiclient.Manifest{ + CompiledManifest: "{ \"key\" : \"manifest\" }", + } + appTree := v1alpha1.ApplicationTree{} + revisionMetadata := v1alpha1.RevisionMetadata{ + Author: "demo usert", + Date: metav1.Time{}, + Message: "some message", + } + + event, err := getResourceEventPayload(&app, &rs, &actualState, &desiredState, &appTree, true, "", nil, &revisionMetadata, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &repoApiclient.ApplicationVersions{}) + assert.NoError(t, err) + + var eventPayload events.EventPayload + + err = json.Unmarshal(event.Payload, &eventPayload) + assert.NoError(t, err) + + assert.Equal(t, "", eventPayload.Source.DesiredManifest) + assert.Equal(t, "", eventPayload.Source.ActualManifest) + }) +} + +func TestGetApplicationLatestRevision(t *testing.T) { + appRevision := "a-revision" + history1Revision := "history-revision-1" + history2Revision := "history-revision-2" + + t.Run("resource revision should be taken from sync.revision", func(t *testing.T) { + noStatusHistoryAppMock := v1alpha1.Application{ + Status: v1alpha1.ApplicationStatus{ + Sync: v1alpha1.SyncStatus{ + Revision: appRevision, + }, + }, + } + + revisionResult := getApplicationLatestRevision(&noStatusHistoryAppMock) + assert.Equal(t, revisionResult, appRevision) + + emptyStatusHistoryAppMock := v1alpha1.Application{ + Status: v1alpha1.ApplicationStatus{ + Sync: v1alpha1.SyncStatus{ + Revision: appRevision, + }, + History: []v1alpha1.RevisionHistory{}, + }, + } + + revision2Result := getApplicationLatestRevision(&emptyStatusHistoryAppMock) + assert.Equal(t, revision2Result, appRevision) + }) + + t.Run("resource revision should be taken from latest history.revision", func(t *testing.T) { + appMock := v1alpha1.Application{ + Status: v1alpha1.ApplicationStatus{ + Sync: v1alpha1.SyncStatus{ + Revision: appRevision, + }, + History: []v1alpha1.RevisionHistory{ + v1alpha1.RevisionHistory{ + Revision: history1Revision, + }, + v1alpha1.RevisionHistory{ + Revision: history2Revision, + }, + }, + }, + } + + revisionResult := getApplicationLatestRevision(&appMock) + assert.Equal(t, revisionResult, history2Revision) + }) +} + +func TestGetLatestAppHistoryId(t *testing.T) { + history1Id := int64(1) + history2Id := int64(2) + + t.Run("resource revision should be 0", func(t *testing.T) { + noStatusHistoryAppMock := v1alpha1.Application{} + + idResult := getLatestAppHistoryId(&noStatusHistoryAppMock) + assert.Equal(t, idResult, int64(0)) + + emptyStatusHistoryAppMock := v1alpha1.Application{ + Status: v1alpha1.ApplicationStatus{ + History: []v1alpha1.RevisionHistory{}, + }, + } + + id2Result := getLatestAppHistoryId(&emptyStatusHistoryAppMock) + assert.Equal(t, id2Result, int64(0)) + }) + + t.Run("resource revision should be taken from latest history.Id", func(t *testing.T) { + appMock := v1alpha1.Application{ + Status: v1alpha1.ApplicationStatus{ + History: []v1alpha1.RevisionHistory{ + v1alpha1.RevisionHistory{ + ID: history1Id, + }, + v1alpha1.RevisionHistory{ + ID: history2Id, + }, + }, + }, + } + + revisionResult := getLatestAppHistoryId(&appMock) + assert.Equal(t, revisionResult, history2Id) + }) +} + +func newAppLister(objects ...runtime.Object) applisters.ApplicationLister { + fakeAppsClientset := fakeapps.NewSimpleClientset(objects...) + factory := appinformer.NewSharedInformerFactoryWithOptions(fakeAppsClientset, 0, appinformer.WithNamespace(""), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) + appsInformer := factory.Argoproj().V1alpha1().Applications() + for _, obj := range objects { + switch obj.(type) { + case *appsv1.Application: + _ = appsInformer.Informer().GetStore().Add(obj) + } + } + appLister := appsInformer.Lister() + return appLister +} + +type MockcodefreshClient interface { + Send(ctx context.Context, appName string, event *events.Event) error +} + +type MockCodefreshConfig struct { + BaseURL string + AuthToken string +} + +type MockCodefreshClient struct { + cfConfig *MockCodefreshConfig + httpClient *http.Client +} + +func (cc *MockCodefreshClient) Send(ctx context.Context, appName string, event *events.Event) error { + return nil +} + +func fakeReporter() *applicationEventReporter { + guestbookApp := &appsv1.Application{ + TypeMeta: metav1.TypeMeta{ + Kind: "Application", + APIVersion: "argoproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "guestbook", + Namespace: testNamespace, + }, + Spec: appsv1.ApplicationSpec{ + Project: "default", + Source: &appsv1.ApplicationSource{ + RepoURL: "https://test", + TargetRevision: "HEAD", + Helm: &appsv1.ApplicationSourceHelm{ + ValueFiles: []string{"values.yaml"}, + }, + }, + }, + Status: appsv1.ApplicationStatus{ + History: appsv1.RevisionHistories{ + { + Revision: "abcdef123567", + Source: appsv1.ApplicationSource{ + RepoURL: "https://test", + TargetRevision: "HEAD", + Helm: &appsv1.ApplicationSourceHelm{ + ValueFiles: []string{"values-old.yaml"}, + }, + }, + }, + }, + }, + } + + appLister := newAppLister(guestbookApp) + + cache := servercache.NewCache( + appstatecache.NewCache( + cacheutil.NewCache(cacheutil.NewInMemoryCache(1*time.Hour)), + 1*time.Minute, + ), + 1*time.Minute, + 1*time.Minute, + 1*time.Minute, + ) + + cfClient := &MockCodefreshClient{ + cfConfig: &MockCodefreshConfig{ + BaseURL: "", + AuthToken: "", + }, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } + + metricsServ := metrics.NewMetricsServer("", 8099) + closer, cdClient, _ := apiclient.NewClientOrDie(&apiclient.ClientOptions{ + ServerAddr: "site.com", + }).NewApplicationClient() + defer io.Close(closer) + + return &applicationEventReporter{ + cache, + cfClient, + appLister, + cdClient, + metricsServ, + } +} + +func TestShouldSendEvent(t *testing.T) { + eventReporter := fakeReporter() + t.Run("should send because cache is missing", func(t *testing.T) { + app := &v1alpha1.Application{} + rs := v1alpha1.ResourceStatus{} + + res := eventReporter.shouldSendResourceEvent(app, rs) + assert.True(t, res) + }) + + t.Run("should not send - same entities", func(t *testing.T) { + app := &v1alpha1.Application{} + rs := v1alpha1.ResourceStatus{} + + _ = eventReporter.cache.SetLastResourceEvent(app, rs, time.Minute, "") + + res := eventReporter.shouldSendResourceEvent(app, rs) + assert.False(t, res) + }) + + t.Run("should send - different entities", func(t *testing.T) { + app := &v1alpha1.Application{} + rs := v1alpha1.ResourceStatus{} + + _ = eventReporter.cache.SetLastResourceEvent(app, rs, time.Minute, "") + + rs.Status = v1alpha1.SyncStatusCodeOutOfSync + + res := eventReporter.shouldSendResourceEvent(app, rs) + assert.True(t, res) + }) + +} + +type MockEventing_StartEventSourceServer struct { + grpc.ServerStream +} + +var result func(*events.Event) error + +func (m *MockEventing_StartEventSourceServer) Send(event *events.Event) error { + return result(event) +} + +func TestStreamApplicationEvent(t *testing.T) { + eventReporter := fakeReporter() + t.Run("root application", func(t *testing.T) { + app := &v1alpha1.Application{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "argoproj.io/v1alpha1", + Kind: "Application", + }, + } + + result = func(event *events.Event) error { + var payload events.EventPayload + _ = json.Unmarshal(event.Payload, &payload) + + var actualApp v1alpha1.Application + _ = json.Unmarshal([]byte(payload.Source.ActualManifest), &actualApp) + assert.Equal(t, *app, actualApp) + return nil + } + _ = eventReporter.StreamApplicationEvents(context.Background(), app, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel) + }) + +} + +func TestGetResourceEventPayloadWithoutRevision(t *testing.T) { + app := v1alpha1.Application{} + rs := v1alpha1.ResourceStatus{} + + mf := "{ \"key\" : \"manifest\" }" + + actualState := application.ApplicationResourceResponse{ + Manifest: &mf, + } + desiredState := repoApiclient.Manifest{ + CompiledManifest: "{ \"key\" : \"manifest\" }", + } + appTree := v1alpha1.ApplicationTree{} + + _, err := getResourceEventPayload(&app, &rs, &actualState, &desiredState, &appTree, true, "", nil, nil, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &repoApiclient.ApplicationVersions{}) + assert.NoError(t, err) + +} + +func StrToUnstructured(jsonStr string) *unstructured.Unstructured { + obj := make(map[string]interface{}) + err := yaml.Unmarshal([]byte(jsonStr), &obj) + if err != nil { + panic(err) + } + return &unstructured.Unstructured{Object: obj} +} + +func TestAddCommitDetailsToLabels(t *testing.T) { + revisionMetadata := v1alpha1.RevisionMetadata{ + Author: "demo usert", + Date: metav1.Time{}, + Message: "some message", + } + + t.Run("set labels when lable object missing", func(t *testing.T) { + resource := StrToUnstructured(` + apiVersion: v1 + kind: Service + metadata: + name: helm-guestbook + namespace: default + resourceVersion: "123" + uid: "4" + spec: + selector: + app: guestbook + type: LoadBalancer + status: + loadBalancer: + ingress: + - hostname: localhost`, + ) + + result := addCommitDetailsToLabels(resource, &revisionMetadata) + labels := result.GetLabels() + assert.Equal(t, revisionMetadata.Author, labels["app.meta.commit-author"]) + assert.Equal(t, revisionMetadata.Message, labels["app.meta.commit-message"]) + }) + + t.Run("set labels when labels present", func(t *testing.T) { + resource := StrToUnstructured(` + apiVersion: v1 + kind: Service + metadata: + name: helm-guestbook + namespace: default + labels: + link: http://my-grafana.com/pre-generated-link + spec: + selector: + app: guestbook + type: LoadBalancer + status: + loadBalancer: + ingress: + - hostname: localhost`, + ) + + result := addCommitDetailsToLabels(resource, &revisionMetadata) + labels := result.GetLabels() + assert.Equal(t, revisionMetadata.Author, labels["app.meta.commit-author"]) + assert.Equal(t, revisionMetadata.Message, labels["app.meta.commit-message"]) + assert.Equal(t, "http://my-grafana.com/pre-generated-link", result.GetLabels()["link"]) + }) +} diff --git a/event_reporter/reporter/broadcaster.go b/event_reporter/reporter/broadcaster.go new file mode 100644 index 0000000000000..fbefbd9922da4 --- /dev/null +++ b/event_reporter/reporter/broadcaster.go @@ -0,0 +1,140 @@ +package reporter + +import ( + argocommon "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/event_reporter/sharding" + "github.com/argoproj/argo-cd/v2/util/env" + "math" + "sync" + + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/watch" + + appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" +) + +type subscriber struct { + ch chan *appv1.ApplicationWatchEvent + filters []func(*appv1.ApplicationWatchEvent) bool +} + +func (s *subscriber) matches(event *appv1.ApplicationWatchEvent) bool { + for i := range s.filters { + if !s.filters[i](event) { + return false + } + } + return true +} + +// Broadcaster is an interface for broadcasting application informer watch events to multiple subscribers. +type Broadcaster interface { + Subscribe(ch chan *appv1.ApplicationWatchEvent, filters ...func(event *appv1.ApplicationWatchEvent) bool) func() + OnAdd(interface{}) + OnUpdate(interface{}, interface{}) + OnDelete(interface{}) +} + +type broadcasterHandler struct { + lock sync.Mutex + subscribers []*subscriber + filter sharding.ApplicationFilterFunction + featureManager *FeatureManager +} + +func NewBroadcaster(featureManager *FeatureManager) Broadcaster { + // todo: pass real value here + filter := getApplicationFilter("") + return &broadcasterHandler{ + filter: filter, + featureManager: featureManager, + } +} + +func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) { + // Make a local copy of b.subscribers, then send channel events outside the lock, + // to avoid data race on b.subscribers changes + subscribers := []*subscriber{} + b.lock.Lock() + subscribers = append(subscribers, b.subscribers...) + b.lock.Unlock() + + if !b.featureManager.ShouldReporterRun() { + log.Infof("filtering application '%s', event reporting is turned off and old one is in use", event.Application.Name) + return + } + + if b.filter != nil { + result, expectedShard := b.filter(&event.Application) + if !result { + log.Infof("filtering application '%s', wrong shard, should be %d", event.Application.Name, expectedShard) + return + } + } + + for _, s := range subscribers { + if s.matches(event) { + select { + case s.ch <- event: + log.Infof("adding application '%s' to channel", event.Application.Name) + default: + // drop event if cannot send right away + log.WithField("application", event.Application.Name).Warn("unable to send event notification") + } + } + } +} + +// Subscribe forward application informer watch events to the provided channel. +// The watch events are dropped if no receives are reading events from the channel so the channel must have +// buffer if dropping events is not acceptable. +func (b *broadcasterHandler) Subscribe(ch chan *appv1.ApplicationWatchEvent, filters ...func(event *appv1.ApplicationWatchEvent) bool) func() { + b.lock.Lock() + defer b.lock.Unlock() + subscriber := &subscriber{ch, filters} + b.subscribers = append(b.subscribers, subscriber) + return func() { + b.lock.Lock() + defer b.lock.Unlock() + for i := range b.subscribers { + if b.subscribers[i] == subscriber { + b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...) + break + } + } + } +} + +func (b *broadcasterHandler) OnAdd(obj interface{}) { + if app, ok := obj.(*appv1.Application); ok { + b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Added}) + } +} + +func (b *broadcasterHandler) OnUpdate(_, newObj interface{}) { + if app, ok := newObj.(*appv1.Application); ok { + b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Modified}) + } +} + +func (b *broadcasterHandler) OnDelete(obj interface{}) { + if app, ok := obj.(*appv1.Application); ok { + b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Deleted}) + } +} + +func getApplicationFilter(shardingAlgorithm string) sharding.ApplicationFilterFunction { + shardingSvc := sharding.NewSharding() + replicas := env.ParseNumFromEnv(argocommon.EnvEventReporterReplicas, 0, 0, math.MaxInt32) + var applicationFilter func(app *appv1.Application) (bool, int) + if replicas > 1 { + shard := sharding.GetShardNumber() + log.Infof("Processing applications from shard %d", shard) + log.Infof("Using filter function: %s", shardingAlgorithm) + distributionFunction := shardingSvc.GetDistributionFunction(shardingAlgorithm) + applicationFilter = shardingSvc.GetApplicationFilter(distributionFunction, shard) + } else { + log.Info("Processing all application shards") + } + return applicationFilter +} diff --git a/event_reporter/reporter/feature_manager.go b/event_reporter/reporter/feature_manager.go new file mode 100644 index 0000000000000..c483ec9191b8e --- /dev/null +++ b/event_reporter/reporter/feature_manager.go @@ -0,0 +1,40 @@ +package reporter + +import ( + settings_util "github.com/argoproj/argo-cd/v2/util/settings" + log "github.com/sirupsen/logrus" + "time" +) + +type FeatureManager struct { + settingsMgr *settings_util.SettingsManager + shouldRun bool +} + +func NewFeatureManager(settingsMgr *settings_util.SettingsManager) *FeatureManager { + return &FeatureManager{settingsMgr: settingsMgr} +} + +func (f *FeatureManager) setShouldRun() { + reporterVersion, err := f.settingsMgr.GetCodefreshReporterVersion() + if err != nil { + log.Warnf("Failed to get reporter version: %v", err) + f.shouldRun = false + return + } + f.shouldRun = reporterVersion == string(settings_util.CodefreshV2ReporterVersion) +} + +func (f *FeatureManager) Watch() { + f.setShouldRun() + // nolint:staticcheck + tick := time.Tick(5 * time.Second) + for { + <-tick + f.setShouldRun() + } +} + +func (f *FeatureManager) ShouldReporterRun() bool { + return f.shouldRun +} diff --git a/event_reporter/server.go b/event_reporter/server.go new file mode 100644 index 0000000000000..9f76a3b7e4920 --- /dev/null +++ b/event_reporter/server.go @@ -0,0 +1,304 @@ +package event_reporter + +import ( + "context" + "crypto/tls" + "fmt" + "github.com/argoproj/argo-cd/v2/event_reporter/reporter" + "net" + "net/http" + "os" + "strings" + "time" + + "github.com/argoproj/argo-cd/v2/common" + codefresh "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" + event_reporter "github.com/argoproj/argo-cd/v2/event_reporter/controller" + "github.com/argoproj/argo-cd/v2/event_reporter/handlers" + "github.com/argoproj/argo-cd/v2/event_reporter/metrics" + applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" + appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions" + applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + servercache "github.com/argoproj/argo-cd/v2/server/cache" + "github.com/argoproj/argo-cd/v2/server/rbacpolicy" + "github.com/argoproj/argo-cd/v2/server/repository" + "github.com/argoproj/argo-cd/v2/util/assets" + cacheutil "github.com/argoproj/argo-cd/v2/util/cache" + "github.com/argoproj/argo-cd/v2/util/db" + errorsutil "github.com/argoproj/argo-cd/v2/util/errors" + "github.com/argoproj/argo-cd/v2/util/healthz" + "github.com/argoproj/argo-cd/v2/util/io" + "github.com/argoproj/argo-cd/v2/util/rbac" + settings_util "github.com/argoproj/argo-cd/v2/util/settings" + "github.com/redis/go-redis/v9" + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +const ( + // catches corrupted informer state; see https://github.com/argoproj/argo-cd/issues/4960 for more information + notObjectErrMsg = "object does not implement the Object interfaces" +) + +var backoff = wait.Backoff{ + Steps: 5, + Duration: 500 * time.Millisecond, + Factor: 1.0, + Jitter: 0.1, +} + +type EventReporterServer struct { + EventReporterServerOpts + + settings *settings_util.ArgoCDSettings + log *log.Entry + settingsMgr *settings_util.SettingsManager + enf *rbac.Enforcer + projInformer cache.SharedIndexInformer + projLister applisters.AppProjectNamespaceLister + policyEnforcer *rbacpolicy.RBACPolicyEnforcer + appInformer cache.SharedIndexInformer + appLister applisters.ApplicationLister + db db.ArgoDB + + // stopCh is the channel which when closed, will shutdown the Event Reporter server + stopCh chan struct{} + serviceSet *EventReporterServerSet + featureManager *reporter.FeatureManager +} + +type EventReporterServerSet struct { + RepoService *repository.Server + MetricsServer *metrics.MetricsServer +} + +type EventReporterServerOpts struct { + ListenPort int + ListenHost string + MetricsPort int + MetricsHost string + Namespace string + KubeClientset kubernetes.Interface + AppClientset appclientset.Interface + RepoClientset repoapiclient.Clientset + ApplicationServiceClient applicationpkg.ApplicationServiceClient + Cache *servercache.Cache + RedisClient *redis.Client + ApplicationNamespaces []string + BaseHRef string + RootPath string + CodefreshConfig *codefresh.CodefreshConfig +} + +type handlerSwitcher struct { + handler http.Handler + urlToHandler map[string]http.Handler + contentTypeToHandler map[string]http.Handler +} + +type Listeners struct { + Main net.Listener + Metrics net.Listener +} + +func (l *Listeners) Close() error { + if l.Main != nil { + if err := l.Main.Close(); err != nil { + return err + } + l.Main = nil + } + if l.Metrics != nil { + if err := l.Metrics.Close(); err != nil { + return err + } + l.Metrics = nil + } + return nil +} + +func (s *handlerSwitcher) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if urlHandler, ok := s.urlToHandler[r.URL.Path]; ok { + urlHandler.ServeHTTP(w, r) + } else if contentHandler, ok := s.contentTypeToHandler[r.Header.Get("content-type")]; ok { + contentHandler.ServeHTTP(w, r) + } else { + s.handler.ServeHTTP(w, r) + } +} + +func (a *EventReporterServer) healthCheck(r *http.Request) error { + if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" { + argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset) + _, err := argoDB.ListClusters(r.Context()) + if err != nil && strings.Contains(err.Error(), notObjectErrMsg) { + return err + } + } + return nil +} + +// Init starts informers used by the API server +func (a *EventReporterServer) Init(ctx context.Context) { + go a.appInformer.Run(ctx.Done()) + go a.featureManager.Watch() + svcSet := newEventReporterServiceSet(a) + a.serviceSet = svcSet +} + +func (a *EventReporterServer) RunController(ctx context.Context) { + controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager) + go controller.Run(ctx) +} + +// newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented +// using grpc-gateway as a proxy to the gRPC server. +func (a *EventReporterServer) newHTTPServer(ctx context.Context, port int) *http.Server { + endpoint := fmt.Sprintf("localhost:%d", port) + mux := http.NewServeMux() + httpS := http.Server{ + Addr: endpoint, + Handler: &handlerSwitcher{ + handler: mux, + }, + } + + healthz.ServeHealthCheck(mux, a.healthCheck) + + rH := handlers.GetRequestHandlers(a.ApplicationServiceClient) + mux.HandleFunc("/app-distribution", rH.GetAppDistribution) + + return &httpS +} + +func (a *EventReporterServer) checkServeErr(name string, err error) { + if err != nil { + if a.stopCh == nil { + // a nil stopCh indicates a graceful shutdown + log.Infof("graceful shutdown %s: %v", name, err) + } else { + log.Fatalf("%s: %v", name, err) + } + } else { + log.Infof("graceful shutdown %s", name) + } +} + +func startListener(host string, port int) (net.Listener, error) { + var conn net.Listener + var realErr error + _ = wait.ExponentialBackoff(backoff, func() (bool, error) { + conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if realErr != nil { + return false, nil + } + return true, nil + }) + return conn, realErr +} + +func (a *EventReporterServer) Listen() (*Listeners, error) { + mainLn, err := startListener(a.ListenHost, a.ListenPort) + if err != nil { + return nil, err + } + metricsLn, err := startListener(a.MetricsHost, a.MetricsPort) + if err != nil { + io.Close(mainLn) + return nil, err + } + return &Listeners{Main: mainLn, Metrics: metricsLn}, nil +} + +// Run runs the API Server +// We use k8s.io/code-generator/cmd/go-to-protobuf to generate the .proto files from the API types. +// k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of +// golang/protobuf). +func (a *EventReporterServer) Run(ctx context.Context, lns *Listeners) { + var httpS = a.newHTTPServer(ctx, a.ListenPort) + tlsConfig := tls.Config{} + tlsConfig.GetCertificate = func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { + return a.settings.Certificate, nil + } + go func() { a.checkServeErr("httpS", httpS.Serve(lns.Main)) }() + go func() { a.checkServeErr("metrics", a.serviceSet.MetricsServer.Serve(lns.Metrics)) }() + go a.RunController(ctx) + + if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) { + log.Fatal("Timed out waiting for project cache to sync") + } + + a.stopCh = make(chan struct{}) + <-a.stopCh +} + +// NewServer returns a new instance of the Event Reporter server +func NewEventReporterServer(ctx context.Context, opts EventReporterServerOpts) *EventReporterServer { + settingsMgr := settings_util.NewSettingsManager(ctx, opts.KubeClientset, opts.Namespace) + settings, err := settingsMgr.InitializeSettings(true) + errorsutil.CheckError(err) + + appInformerNs := opts.Namespace + if len(opts.ApplicationNamespaces) > 0 { + appInformerNs = "" + } + projFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(opts.Namespace), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) + appFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(appInformerNs), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) + + projInformer := projFactory.Argoproj().V1alpha1().AppProjects().Informer() + projLister := projFactory.Argoproj().V1alpha1().AppProjects().Lister().AppProjects(opts.Namespace) + + appInformer := appFactory.Argoproj().V1alpha1().Applications().Informer() + appLister := appFactory.Argoproj().V1alpha1().Applications().Lister() + + enf := rbac.NewEnforcer(opts.KubeClientset, opts.Namespace, common.ArgoCDRBACConfigMapName, nil) + enf.EnableEnforce(false) + err = enf.SetBuiltinPolicy(assets.BuiltinPolicyCSV) + errorsutil.CheckError(err) + enf.EnableLog(os.Getenv(common.EnvVarRBACDebug) == "1") + + policyEnf := rbacpolicy.NewRBACPolicyEnforcer(enf, projLister) + enf.SetClaimsEnforcerFunc(policyEnf.EnforceClaims) + + dbInstance := db.NewDB(opts.Namespace, settingsMgr, opts.KubeClientset) + + server := &EventReporterServer{ + EventReporterServerOpts: opts, + log: log.NewEntry(log.StandardLogger()), + settings: settings, + settingsMgr: settingsMgr, + enf: enf, + projInformer: projInformer, + projLister: projLister, + appInformer: appInformer, + appLister: appLister, + policyEnforcer: policyEnf, + db: dbInstance, + featureManager: reporter.NewFeatureManager(settingsMgr), + } + + if err != nil { + // Just log. It's not critical. + log.Warnf("Failed to log in-cluster warnings: %v", err) + } + + return server +} + +func newEventReporterServiceSet(a *EventReporterServer) *EventReporterServerSet { + repoService := repository.NewServer(a.RepoClientset, a.db, a.enf, a.Cache, a.appLister, a.projInformer, a.Namespace, a.settingsMgr) + metricsServer := metrics.NewMetricsServer(a.MetricsHost, a.MetricsPort) + if a.RedisClient != nil { + cacheutil.CollectMetrics(a.RedisClient, metricsServer) + } + + return &EventReporterServerSet{ + RepoService: repoService, + MetricsServer: metricsServer, + } +} diff --git a/event_reporter/sharding/sharding.go b/event_reporter/sharding/sharding.go new file mode 100644 index 0000000000000..0149cb71694ab --- /dev/null +++ b/event_reporter/sharding/sharding.go @@ -0,0 +1,101 @@ +package sharding + +import ( + "fmt" + argocommon "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "hash/fnv" + "math" + "os" + "strconv" + "strings" + + "github.com/argoproj/argo-cd/v2/util/env" + log "github.com/sirupsen/logrus" +) + +var osHostnameFunction = os.Hostname + +type DistributionFunction func(c *v1alpha1.Application) int +type ApplicationFilterFunction func(c *v1alpha1.Application) (bool, int) + +type Sharding interface { + GetApplicationFilter(distributionFunction DistributionFunction, shard int) ApplicationFilterFunction + GetDistributionFunction(shardingAlgorithm string) DistributionFunction +} + +type sharding struct { +} + +func NewSharding() Sharding { + return &sharding{} +} + +func (s *sharding) GetApplicationFilter(distributionFunction DistributionFunction, shard int) ApplicationFilterFunction { + return func(app *v1alpha1.Application) (bool, int) { + expectedShard := distributionFunction(app) + // TODO: [reporter] provide ability define label with shard number + return expectedShard == shard, expectedShard + } +} + +// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and +// the current datas. +func (s *sharding) GetDistributionFunction(shardingAlgorithm string) DistributionFunction { + log.Infof("Using filter function: %s", shardingAlgorithm) + //TODO: implement switch case for multiple strategies + return s.LegacyDistributionFunction() +} + +func (s *sharding) LegacyDistributionFunction() DistributionFunction { + replicas := env.ParseNumFromEnv(argocommon.EnvEventReporterReplicas, 0, 0, math.MaxInt32) + return func(a *v1alpha1.Application) int { + if replicas == 0 { + return -1 + } + if a == nil { + return 0 + } + id := a.Name + if id == "" { + return 0 + } else { + h := fnv.New32a() + _, _ = h.Write([]byte(id)) + shard := int32(h.Sum32() % uint32(replicas)) + log.Debugf("Application with id=%s will be processed by shard %d", id, shard) + return int(shard) + } + } +} + +// InferShard extracts the shard index based on its hostname. +func InferShard() (int, error) { + hostname, err := osHostnameFunction() + if err != nil { + return 0, err + } + parts := strings.Split(hostname, "-") + if len(parts) == 0 { + return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname) + } + shard, err := strconv.Atoi(parts[len(parts)-1]) + if err != nil { + return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname) + } + return int(shard), nil +} + +func GetShardNumber() int { + shard := env.ParseNumFromEnv(argocommon.EnvEventReporterShard, -1, -math.MaxInt32, math.MaxInt32) + + if shard < 0 { + var err error + shard, err = InferShard() + if err != nil { + return -1 + } + } + + return shard +} diff --git a/go.mod b/go.mod index ec361d104dbc9..e21e75a883042 100644 --- a/go.mod +++ b/go.mod @@ -213,7 +213,7 @@ require ( github.com/opsgenie/opsgenie-go-sdk-v2 v1.0.5 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect diff --git a/manifests/base/event-reporter/event-reporter-metrics.yaml b/manifests/base/event-reporter/event-reporter-metrics.yaml new file mode 100644 index 0000000000000..bb9e261d8321e --- /dev/null +++ b/manifests/base/event-reporter/event-reporter-metrics.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/name: event-reporter-metrics + app.kubernetes.io/part-of: argocd + app.kubernetes.io/component: event-reporter + name: event-reporter-metrics +spec: + ports: + - name: metrics + protocol: TCP + port: 8087 + targetPort: 8087 + selector: + app.kubernetes.io/name: event-reporter diff --git a/manifests/base/event-reporter/event-reporter-role.yaml b/manifests/base/event-reporter/event-reporter-role.yaml new file mode 100644 index 0000000000000..ddb1508d89690 --- /dev/null +++ b/manifests/base/event-reporter/event-reporter-role.yaml @@ -0,0 +1,43 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + labels: + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + app.kubernetes.io/component: event-reporter + name: event-reporter +rules: +- apiGroups: + - "" + resources: + - secrets + - configmaps + verbs: + - create + - get + - list + - watch + - update + - patch + - delete +- apiGroups: + - argoproj.io + resources: + - applications + - appprojects + - applicationsets + verbs: + - create + - get + - list + - watch + - update + - delete + - patch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - list diff --git a/manifests/base/event-reporter/event-reporter-rolebinding.yaml b/manifests/base/event-reporter/event-reporter-rolebinding.yaml new file mode 100644 index 0000000000000..1bbc0851f803d --- /dev/null +++ b/manifests/base/event-reporter/event-reporter-rolebinding.yaml @@ -0,0 +1,15 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + labels: + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + app.kubernetes.io/component: event-reporter + name: event-reporter +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: event-reporter +subjects: +- kind: ServiceAccount + name: event-reporter diff --git a/manifests/base/event-reporter/event-reporter-sa.yaml b/manifests/base/event-reporter/event-reporter-sa.yaml new file mode 100644 index 0000000000000..51d4e39fbb50e --- /dev/null +++ b/manifests/base/event-reporter/event-reporter-sa.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + labels: + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + app.kubernetes.io/component: event-reporter + name: event-reporter diff --git a/manifests/base/event-reporter/event-reporter-service.yaml b/manifests/base/event-reporter/event-reporter-service.yaml new file mode 100644 index 0000000000000..6a89a2f9d17db --- /dev/null +++ b/manifests/base/event-reporter/event-reporter-service.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + app.kubernetes.io/component: event-reporter + name: event-reporter +spec: + ports: + - name: http + protocol: TCP + port: 80 + targetPort: 8088 + - name: https + protocol: TCP + port: 443 + targetPort: 8088 + selector: + app.kubernetes.io/name: event-reporter diff --git a/manifests/base/event-reporter/event-reporter-statefulset.yaml b/manifests/base/event-reporter/event-reporter-statefulset.yaml new file mode 100644 index 0000000000000..92d336b0681ce --- /dev/null +++ b/manifests/base/event-reporter/event-reporter-statefulset.yaml @@ -0,0 +1,168 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + labels: + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + app.kubernetes.io/component: event-reporter + name: event-reporter +spec: + replicas: 5 + serviceName: event-reporter + selector: + matchLabels: + app.kubernetes.io/name: event-reporter + template: + metadata: + labels: + app.kubernetes.io/name: event-reporter + spec: + serviceAccountName: event-reporter + containers: + - name: event-reporter + image: quay.io/argoproj/argocd:latest + imagePullPolicy: Always + args: + - /usr/local/bin/event-reporter-server + env: + - name: EVENT_REPORTER_REPLICAS + value: "5" + - name: ARGOCD_TOKEN + valueFrom: + secretKeyRef: + key: token + name: argocd-token + - name: CODEFRESH_URL + valueFrom: + configMapKeyRef: + name: codefresh-cm + key: base-url + optional: true + - name: CODEFRESH_TOKEN + valueFrom: + secretKeyRef: + key: token + name: codefresh-token + - name: EVENT_REPORTER_INSECURE + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: event-reporter.insecure + optional: true + - name: EVENT_REPORTER_LOGFORMAT + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: event-reporter.log.format + optional: true + - name: EVENT_REPORTER_LOG_LEVEL + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: event-reporter.log.level + optional: true + - name: EVENT_REPORTER_REPO_SERVER + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: repo.server + optional: true + - name: EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: event-reporter.repo.server.timeout.seconds + optional: true + - name: EVENT_REPORTER_REPO_SERVER_PLAINTEXT + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: event-reporter.repo.server.plaintext + optional: true + - name: REDIS_SERVER + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: redis.server + optional: true + - name: REDISDB + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: redis.db + optional: true + - name: EVENT_REPORTER_LISTEN_ADDRESS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: event-reporter.listen.address + optional: true + - name: EVENT_REPORTER_METRICS_LISTEN_ADDRESS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: event-reporter.metrics.listen.address + optional: true + volumeMounts: + - name: argocd-repo-server-tls + mountPath: /app/config/server/tls + - mountPath: /tmp + name: tmp + ports: + - containerPort: 8088 + name: health + - containerPort: 8087 + name: metrics + livenessProbe: + httpGet: + path: /healthz?full=true + port: health + initialDelaySeconds: 3 + periodSeconds: 30 + timeoutSeconds: 5 + readinessProbe: + httpGet: + path: /healthz + port: health + initialDelaySeconds: 3 + periodSeconds: 30 + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + runAsNonRoot: true + capabilities: + drop: + - ALL + seccompProfile: + type: RuntimeDefault + volumes: + - emptyDir: {} + name: plugins-home + - emptyDir: {} + name: tmp + - name: argocd-repo-server-tls + secret: + secretName: argocd-repo-server-tls + optional: true + items: + - key: tls.crt + path: tls.crt + - key: tls.key + path: tls.key + - key: ca.crt + path: ca.crt + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/name: event-reporter + topologyKey: kubernetes.io/hostname + - weight: 5 + podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/part-of: argocd + topologyKey: kubernetes.io/hostname diff --git a/manifests/base/event-reporter/kustomization.yaml b/manifests/base/event-reporter/kustomization.yaml new file mode 100644 index 0000000000000..eb7c1067f0008 --- /dev/null +++ b/manifests/base/event-reporter/kustomization.yaml @@ -0,0 +1,10 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: +- event-reporter-statefulset.yaml +- event-reporter-role.yaml +- event-reporter-rolebinding.yaml +- event-reporter-sa.yaml +- event-reporter-service.yaml +- event-reporter-metrics.yaml \ No newline at end of file diff --git a/manifests/base/kustomization.yaml b/manifests/base/kustomization.yaml index 7be81fd547fcf..50254c28bc233 100644 --- a/manifests/base/kustomization.yaml +++ b/manifests/base/kustomization.yaml @@ -13,3 +13,4 @@ resources: - ./config - ./redis - ./applicationset-controller +- ./event-reporter diff --git a/manifests/install.yaml b/manifests/install.yaml index 3f1f872d07bc1..c0320c443b7a4 100644 --- a/manifests/install.yaml +++ b/manifests/install.yaml @@ -18859,6 +18859,15 @@ metadata: app.kubernetes.io/part-of: argocd name: argocd-server --- +apiVersion: v1 +kind: ServiceAccount +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +--- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: @@ -19028,6 +19037,50 @@ rules: - list --- apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +rules: +- apiGroups: + - "" + resources: + - secrets + - configmaps + verbs: + - create + - get + - list + - watch + - update + - patch + - delete +- apiGroups: + - argoproj.io + resources: + - applications + - appprojects + - applicationsets + verbs: + - create + - get + - list + - watch + - update + - delete + - patch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - list +--- +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: labels: @@ -19151,6 +19204,22 @@ subjects: name: argocd-server --- apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: event-reporter +subjects: +- kind: ServiceAccount + name: event-reporter +--- +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: labels: @@ -19396,6 +19465,44 @@ spec: selector: app.kubernetes.io/name: argocd-server --- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +spec: + ports: + - name: http + port: 80 + protocol: TCP + targetPort: 8088 + - name: https + port: 443 + protocol: TCP + targetPort: 8088 + selector: + app.kubernetes.io/name: event-reporter +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter-metrics + app.kubernetes.io/part-of: argocd + name: event-reporter-metrics +spec: + ports: + - name: metrics + port: 8087 + protocol: TCP + targetPort: 8087 + selector: + app.kubernetes.io/name: event-reporter +--- apiVersion: apps/v1 kind: Deployment metadata: @@ -20548,6 +20655,175 @@ spec: optional: true secretName: argocd-repo-server-tls --- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +spec: + replicas: 5 + selector: + matchLabels: + app.kubernetes.io/name: event-reporter + serviceName: event-reporter + template: + metadata: + labels: + app.kubernetes.io/name: event-reporter + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/name: event-reporter + topologyKey: kubernetes.io/hostname + weight: 100 + - podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/part-of: argocd + topologyKey: kubernetes.io/hostname + weight: 5 + containers: + - args: + - /usr/local/bin/event-reporter-server + env: + - name: EVENT_REPORTER_REPLICAS + value: "5" + - name: ARGOCD_TOKEN + valueFrom: + secretKeyRef: + key: token + name: argocd-token + - name: CODEFRESH_URL + valueFrom: + configMapKeyRef: + key: base-url + name: codefresh-cm + optional: true + - name: CODEFRESH_TOKEN + valueFrom: + secretKeyRef: + key: token + name: codefresh-token + - name: EVENT_REPORTER_INSECURE + valueFrom: + configMapKeyRef: + key: event-reporter.insecure + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_LOGFORMAT + valueFrom: + configMapKeyRef: + key: event-reporter.log.format + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_LOG_LEVEL + valueFrom: + configMapKeyRef: + key: event-reporter.log.level + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_REPO_SERVER + valueFrom: + configMapKeyRef: + key: repo.server + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS + valueFrom: + configMapKeyRef: + key: event-reporter.repo.server.timeout.seconds + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_REPO_SERVER_PLAINTEXT + valueFrom: + configMapKeyRef: + key: event-reporter.repo.server.plaintext + name: argocd-cmd-params-cm + optional: true + - name: REDIS_SERVER + valueFrom: + configMapKeyRef: + key: redis.server + name: argocd-cmd-params-cm + optional: true + - name: REDISDB + valueFrom: + configMapKeyRef: + key: redis.db + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_LISTEN_ADDRESS + valueFrom: + configMapKeyRef: + key: event-reporter.listen.address + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_METRICS_LISTEN_ADDRESS + valueFrom: + configMapKeyRef: + key: event-reporter.metrics.listen.address + name: argocd-cmd-params-cm + optional: true + image: quay.io/codefresh/argocd:latest + imagePullPolicy: Always + livenessProbe: + httpGet: + path: /healthz?full=true + port: health + initialDelaySeconds: 3 + periodSeconds: 30 + timeoutSeconds: 5 + name: event-reporter + ports: + - containerPort: 8088 + name: health + - containerPort: 8087 + name: metrics + readinessProbe: + httpGet: + path: /healthz + port: health + initialDelaySeconds: 3 + periodSeconds: 30 + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + readOnlyRootFilesystem: true + runAsNonRoot: true + seccompProfile: + type: RuntimeDefault + volumeMounts: + - mountPath: /app/config/server/tls + name: argocd-repo-server-tls + - mountPath: /tmp + name: tmp + serviceAccountName: event-reporter + volumes: + - emptyDir: {} + name: plugins-home + - emptyDir: {} + name: tmp + - name: argocd-repo-server-tls + secret: + items: + - key: tls.crt + path: tls.crt + - key: tls.key + path: tls.key + - key: ca.crt + path: ca.crt + optional: true + secretName: argocd-repo-server-tls +--- apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: diff --git a/manifests/namespace-install.yaml b/manifests/namespace-install.yaml index a60485ed64a1a..d1060987a17e6 100644 --- a/manifests/namespace-install.yaml +++ b/manifests/namespace-install.yaml @@ -53,6 +53,15 @@ metadata: app.kubernetes.io/part-of: argocd name: argocd-server --- +apiVersion: v1 +kind: ServiceAccount +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +--- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: @@ -222,6 +231,50 @@ rules: - list --- apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +rules: +- apiGroups: + - "" + resources: + - secrets + - configmaps + verbs: + - create + - get + - list + - watch + - update + - patch + - delete +- apiGroups: + - argoproj.io + resources: + - applications + - appprojects + - applicationsets + verbs: + - create + - get + - list + - watch + - update + - delete + - patch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - list +--- +apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: labels: @@ -285,6 +338,22 @@ subjects: - kind: ServiceAccount name: argocd-server --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: event-reporter +subjects: +- kind: ServiceAccount + name: event-reporter +--- apiVersion: v1 kind: ConfigMap metadata: @@ -497,6 +566,44 @@ spec: selector: app.kubernetes.io/name: argocd-server --- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +spec: + ports: + - name: http + port: 80 + protocol: TCP + targetPort: 8088 + - name: https + port: 443 + protocol: TCP + targetPort: 8088 + selector: + app.kubernetes.io/name: event-reporter +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter-metrics + app.kubernetes.io/part-of: argocd + name: event-reporter-metrics +spec: + ports: + - name: metrics + port: 8087 + protocol: TCP + targetPort: 8087 + selector: + app.kubernetes.io/name: event-reporter +--- apiVersion: apps/v1 kind: Deployment metadata: @@ -1649,6 +1756,175 @@ spec: optional: true secretName: argocd-repo-server-tls --- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + labels: + app.kubernetes.io/component: event-reporter + app.kubernetes.io/name: event-reporter + app.kubernetes.io/part-of: argocd + name: event-reporter +spec: + replicas: 5 + selector: + matchLabels: + app.kubernetes.io/name: event-reporter + serviceName: event-reporter + template: + metadata: + labels: + app.kubernetes.io/name: event-reporter + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/name: event-reporter + topologyKey: kubernetes.io/hostname + weight: 100 + - podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/part-of: argocd + topologyKey: kubernetes.io/hostname + weight: 5 + containers: + - args: + - /usr/local/bin/event-reporter-server + env: + - name: EVENT_REPORTER_REPLICAS + value: "5" + - name: ARGOCD_TOKEN + valueFrom: + secretKeyRef: + key: token + name: argocd-token + - name: CODEFRESH_URL + valueFrom: + configMapKeyRef: + key: base-url + name: codefresh-cm + optional: true + - name: CODEFRESH_TOKEN + valueFrom: + secretKeyRef: + key: token + name: codefresh-token + - name: EVENT_REPORTER_INSECURE + valueFrom: + configMapKeyRef: + key: event-reporter.insecure + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_LOGFORMAT + valueFrom: + configMapKeyRef: + key: event-reporter.log.format + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_LOG_LEVEL + valueFrom: + configMapKeyRef: + key: event-reporter.log.level + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_REPO_SERVER + valueFrom: + configMapKeyRef: + key: repo.server + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS + valueFrom: + configMapKeyRef: + key: event-reporter.repo.server.timeout.seconds + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_REPO_SERVER_PLAINTEXT + valueFrom: + configMapKeyRef: + key: event-reporter.repo.server.plaintext + name: argocd-cmd-params-cm + optional: true + - name: REDIS_SERVER + valueFrom: + configMapKeyRef: + key: redis.server + name: argocd-cmd-params-cm + optional: true + - name: REDISDB + valueFrom: + configMapKeyRef: + key: redis.db + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_LISTEN_ADDRESS + valueFrom: + configMapKeyRef: + key: event-reporter.listen.address + name: argocd-cmd-params-cm + optional: true + - name: EVENT_REPORTER_METRICS_LISTEN_ADDRESS + valueFrom: + configMapKeyRef: + key: event-reporter.metrics.listen.address + name: argocd-cmd-params-cm + optional: true + image: quay.io/codefresh/argocd:latest + imagePullPolicy: Always + livenessProbe: + httpGet: + path: /healthz?full=true + port: health + initialDelaySeconds: 3 + periodSeconds: 30 + timeoutSeconds: 5 + name: event-reporter + ports: + - containerPort: 8088 + name: health + - containerPort: 8087 + name: metrics + readinessProbe: + httpGet: + path: /healthz + port: health + initialDelaySeconds: 3 + periodSeconds: 30 + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + readOnlyRootFilesystem: true + runAsNonRoot: true + seccompProfile: + type: RuntimeDefault + volumeMounts: + - mountPath: /app/config/server/tls + name: argocd-repo-server-tls + - mountPath: /tmp + name: tmp + serviceAccountName: event-reporter + volumes: + - emptyDir: {} + name: plugins-home + - emptyDir: {} + name: tmp + - name: argocd-repo-server-tls + secret: + items: + - key: tls.crt + path: tls.crt + - key: tls.key + path: tls.key + - key: ca.crt + path: ca.crt + optional: true + secretName: argocd-repo-server-tls +--- apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: diff --git a/server/application/application.go b/server/application/application.go index 011b09b9e6035..a4778c26025cd 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -1121,8 +1121,16 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing for { select { case event := <-eventsChannel: + log.Infof("event channel size is %d", len(eventsChannel)) + rVersion, _ := s.settingsMgr.GetCodefreshReporterVersion() + if rVersion == string(settings.CodefreshV2ReporterVersion) { + logCtx.Info("v1 reported disabled skipping event") + continue + } + shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) if !shouldProcess { + log.Infof("ignore event for app %s", event.Application.Name) continue } ts := time.Now().Format("2006-01-02T15:04:05.000Z") diff --git a/util/settings/settings.go b/util/settings/settings.go index 7762224f7284d..668ed8799e440 100644 --- a/util/settings/settings.go +++ b/util/settings/settings.go @@ -202,6 +202,14 @@ type KustomizeSettings struct { Versions []KustomizeVersion } +// CodefreshReporterVersion includes all cf reporter versions +type CodefreshReporterVersion string + +const ( + CodefreshV1ReporterVersion CodefreshReporterVersion = "v1" + CodefreshV2ReporterVersion CodefreshReporterVersion = "v2" +) + var ( ByClusterURLIndexer = "byClusterURL" byClusterURLIndexerFunc = func(obj interface{}) ([]string, error) { @@ -416,6 +424,8 @@ const ( settingsWebhookGogsSecretKey = "webhook.gogs.secret" // settingsApplicationInstanceLabelKey is the key to configure injected app instance label key settingsApplicationInstanceLabelKey = "application.instanceLabelKey" + // settingsCodefreshReporterVersion is the key to configure injected app instance label key + settingsCodefreshReporterVersion = "codefresh.reporterVersion" // settingsResourceTrackingMethodKey is the key to configure tracking method for application resources settingsResourceTrackingMethodKey = "application.resourceTrackingMethod" // resourcesCustomizationsKey is the key to the map of resource overrides @@ -720,6 +730,18 @@ func (mgr *SettingsManager) GetAppInstanceLabelKey() (string, error) { return label, nil } +func (mgr *SettingsManager) GetCodefreshReporterVersion() (string, error) { + argoCDCM, err := mgr.getConfigMap() + if err != nil { + return "", err + } + label := argoCDCM.Data[settingsCodefreshReporterVersion] + if label == "" { + return string(CodefreshV1ReporterVersion), nil + } + return label, nil +} + func (mgr *SettingsManager) GetKustomizeSetNamespaceEnabled() bool { argoCDCM, err := mgr.getConfigMap() if err != nil {