From 3514011024ec9623bc85946bf609a500e4a338f0 Mon Sep 17 00:00:00 2001 From: pashakostohrys Date: Tue, 28 Nov 2023 12:19:34 +0200 Subject: [PATCH] Revert "feat: support new reporting component (#265)" This reverts commit e1c2b031ac236144365a7bcebc7d0a7d640f2e5f. --- Dockerfile | 1 - cmd/event-reporter-server/commands/common.go | 6 - .../commands/event_reporter_server.go | 213 ----- cmd/main.go | 3 - common/common.go | 39 +- event_reporter/codefresh/client.go | 78 -- event_reporter/codefresh/utils.go | 151 --- event_reporter/controller/controller.go | 118 --- event_reporter/handlers/handlers.go | 81 -- event_reporter/metrics/metrics.go | 148 --- .../reporter/application_errors_parser.go | 147 --- .../reporter/application_event_reporter.go | 877 ------------------ .../application_event_reporter_test.go | 461 --------- event_reporter/reporter/broadcaster.go | 140 --- event_reporter/reporter/feature_manager.go | 40 - event_reporter/server.go | 304 ------ event_reporter/sharding/sharding.go | 101 -- go.mod | 2 +- .../event-reporter-metrics.yaml | 16 - .../event-reporter/event-reporter-role.yaml | 43 - .../event-reporter-rolebinding.yaml | 15 - .../event-reporter/event-reporter-sa.yaml | 8 - .../event-reporter-service.yaml | 20 - .../event-reporter-statefulset.yaml | 168 ---- .../base/event-reporter/kustomization.yaml | 10 - manifests/base/kustomization.yaml | 1 - manifests/install.yaml | 276 ------ manifests/namespace-install.yaml | 276 ------ server/application/application.go | 8 - util/settings/settings.go | 22 - 30 files changed, 12 insertions(+), 3761 deletions(-) delete mode 100644 cmd/event-reporter-server/commands/common.go delete mode 100644 cmd/event-reporter-server/commands/event_reporter_server.go delete mode 100644 event_reporter/codefresh/client.go delete mode 100644 event_reporter/codefresh/utils.go delete mode 100644 event_reporter/controller/controller.go delete mode 100644 event_reporter/handlers/handlers.go delete mode 100644 event_reporter/metrics/metrics.go delete mode 100644 event_reporter/reporter/application_errors_parser.go delete mode 100644 event_reporter/reporter/application_event_reporter.go delete mode 100644 event_reporter/reporter/application_event_reporter_test.go delete mode 100644 event_reporter/reporter/broadcaster.go delete mode 100644 event_reporter/reporter/feature_manager.go delete mode 100644 event_reporter/server.go delete mode 100644 event_reporter/sharding/sharding.go delete mode 100644 manifests/base/event-reporter/event-reporter-metrics.yaml delete mode 100644 manifests/base/event-reporter/event-reporter-role.yaml delete mode 100644 manifests/base/event-reporter/event-reporter-rolebinding.yaml delete mode 100644 manifests/base/event-reporter/event-reporter-sa.yaml delete mode 100644 manifests/base/event-reporter/event-reporter-service.yaml delete mode 100644 manifests/base/event-reporter/event-reporter-statefulset.yaml delete mode 100644 manifests/base/event-reporter/kustomization.yaml diff --git a/Dockerfile b/Dockerfile index 408bd348f54f9..1b595d75623db 100644 --- a/Dockerfile +++ b/Dockerfile @@ -124,7 +124,6 @@ COPY --from=argocd-build /go/src/github.com/argoproj/argo-cd/dist/argocd* /usr/l USER root RUN ln -s /usr/local/bin/argocd /usr/local/bin/argocd-server && \ ln -s /usr/local/bin/argocd /usr/local/bin/argocd-repo-server && \ - ln -s /usr/local/bin/argocd /usr/local/bin/event-reporter-server && \ ln -s /usr/local/bin/argocd /usr/local/bin/argocd-cmp-server && \ ln -s /usr/local/bin/argocd /usr/local/bin/argocd-application-controller && \ ln -s /usr/local/bin/argocd /usr/local/bin/argocd-dex && \ diff --git a/cmd/event-reporter-server/commands/common.go b/cmd/event-reporter-server/commands/common.go deleted file mode 100644 index 7ad2c7069df88..0000000000000 --- a/cmd/event-reporter-server/commands/common.go +++ /dev/null @@ -1,6 +0,0 @@ -package commands - -const ( - // cliName is the name of the CLI - cliName = "event-reporter-server" -) diff --git a/cmd/event-reporter-server/commands/event_reporter_server.go b/cmd/event-reporter-server/commands/event_reporter_server.go deleted file mode 100644 index 700e6360351fc..0000000000000 --- a/cmd/event-reporter-server/commands/event_reporter_server.go +++ /dev/null @@ -1,213 +0,0 @@ -package commands - -import ( - "context" - "fmt" - "math" - "time" - - "github.com/argoproj/argo-cd/v2/event_reporter" - "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" - "github.com/argoproj/argo-cd/v2/pkg/apiclient" - - "github.com/argoproj/pkg/stats" - "github.com/redis/go-redis/v9" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - - cmdutil "github.com/argoproj/argo-cd/v2/cmd/util" - "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" - repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - "github.com/argoproj/argo-cd/v2/util/cli" - "github.com/argoproj/argo-cd/v2/util/env" - "github.com/argoproj/argo-cd/v2/util/errors" - "github.com/argoproj/argo-cd/v2/util/kube" - "github.com/argoproj/argo-cd/v2/util/tls" -) - -const ( - failureRetryCountEnv = "EVENT_REPORTER_K8S_RETRY_COUNT" - failureRetryPeriodMilliSecondsEnv = "EVENT_REPORTE_K8S_RETRY_DURATION_MILLISECONDS" -) - -var ( - failureRetryCount = 0 - failureRetryPeriodMilliSeconds = 100 -) - -func init() { - failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, failureRetryCount, 0, 10) - failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000) -} - -// NewCommand returns a new instance of an event reporter command -func NewCommand() *cobra.Command { - var ( - redisClient *redis.Client - insecure bool - listenHost string - listenPort int - metricsHost string - metricsPort int - glogLevel int - clientConfig clientcmd.ClientConfig - repoServerTimeoutSeconds int - repoServerAddress string - applicationServerAddress string - cacheSrc func() (*servercache.Cache, error) - contentSecurityPolicy string - repoServerPlaintext bool - repoServerStrictTLS bool - applicationNamespaces []string - argocdToken string - codefreshUrl string - codefreshToken string - shardingAlgorithm string - ) - var command = &cobra.Command{ - Use: cliName, - Short: "Run the Event Reporter server", - Long: "The Event reporter is a server that listens to Kubernetes events and reports them to the Codefresh server.", - DisableAutoGenTag: true, - Run: func(c *cobra.Command, args []string) { - ctx := c.Context() - - vers := common.GetVersion() - namespace, _, err := clientConfig.Namespace() - errors.CheckError(err) - vers.LogStartupInfo( - "Event Reporter Server", - map[string]any{ - "namespace": namespace, - "port": listenPort, - }, - ) - - cli.SetLogFormat(cmdutil.LogFormat) - cli.SetLogLevel(cmdutil.LogLevel) - cli.SetGLogLevel(glogLevel) - - config, err := clientConfig.ClientConfig() - errors.CheckError(err) - errors.CheckError(v1alpha1.SetK8SConfigDefaults(config)) - - cache, err := cacheSrc() - errors.CheckError(err) - - kubeclientset := kubernetes.NewForConfigOrDie(config) - - appclientsetConfig, err := clientConfig.ClientConfig() - errors.CheckError(err) - errors.CheckError(v1alpha1.SetK8SConfigDefaults(appclientsetConfig)) - config.UserAgent = fmt.Sprintf("argocd-server/%s (%s)", vers.Version, vers.Platform) - - if failureRetryCount > 0 { - appclientsetConfig = kube.AddFailureRetryWrapper(appclientsetConfig, failureRetryCount, failureRetryPeriodMilliSeconds) - } - appClientSet := appclientset.NewForConfigOrDie(appclientsetConfig) - tlsConfig := repoapiclient.TLSConfiguration{ - DisableTLS: repoServerPlaintext, - StrictValidation: repoServerStrictTLS, - } - - // Load CA information to use for validating connections to the - // repository server, if strict TLS validation was requested. - if !repoServerPlaintext && repoServerStrictTLS { - pool, err := tls.LoadX509CertPool( - fmt.Sprintf("%s/server/tls/tls.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), - fmt.Sprintf("%s/server/tls/ca.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), - ) - if err != nil { - log.Fatalf("%v", err) - } - tlsConfig.Certificates = pool - } - - repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig) - - applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{ - ServerAddr: applicationServerAddress, - Insecure: true, - GRPCWeb: true, - PlainText: true, - AuthToken: argocdToken, - }) - - errors.CheckError(err) - - closer, applicationClient, err := applicationClientSet.NewApplicationClient() - - errors.CheckError(err) - - defer func() { - _ = closer.Close() - }() - - eventReporterServerOpts := event_reporter.EventReporterServerOpts{ - ListenPort: listenPort, - ListenHost: listenHost, - MetricsPort: metricsPort, - MetricsHost: metricsHost, - Namespace: namespace, - KubeClientset: kubeclientset, - AppClientset: appClientSet, - RepoClientset: repoclientset, - Cache: cache, - RedisClient: redisClient, - ApplicationNamespaces: applicationNamespaces, - ApplicationServiceClient: applicationClient, - CodefreshConfig: &codefresh.CodefreshConfig{ - BaseURL: codefreshUrl, - AuthToken: codefreshToken, - }, - } - - stats.RegisterStackDumper() - stats.StartStatsTicker(10 * time.Minute) - stats.RegisterHeapDumper("memprofile") - eventReporterServer := event_reporter.NewEventReporterServer(ctx, eventReporterServerOpts) - eventReporterServer.Init(ctx) - lns, err := eventReporterServer.Listen() - errors.CheckError(err) - for { - var closer func() - ctx, cancel := context.WithCancel(ctx) - eventReporterServer.Run(ctx, lns) - cancel() - if closer != nil { - closer() - } - } - }, - } - - clientConfig = cli.AddKubectlFlagsToCmd(command) - command.Flags().BoolVar(&insecure, "insecure", env.ParseBoolFromEnv("EVENT_REPORTER_INSECURE", false), "Run server without TLS") - command.Flags().StringVar(&cmdutil.LogFormat, "logformat", env.StringFromEnv("EVENT_REPORTER_LOGFORMAT", "text"), "Set the logging format. One of: text|json") - command.Flags().StringVar(&cmdutil.LogLevel, "loglevel", env.StringFromEnv("EVENT_REPORTER_LOG_LEVEL", "info"), "Set the logging level. One of: debug|info|warn|error") - command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level") - command.Flags().StringVar(&applicationServerAddress, "application-server", env.StringFromEnv("EVENT_REPORTER_APPLICATION_SERVER", common.DefaultApplicationServerAddr), "Application server address") - command.Flags().StringVar(&argocdToken, "argocd-token", env.StringFromEnv("ARGOCD_TOKEN", ""), "ArgoCD server JWT token") - command.Flags().StringVar(&repoServerAddress, "repo-server", env.StringFromEnv("EVENT_REPORTER_REPO_SERVER", common.DefaultRepoServerAddr), "Repo server address") - command.AddCommand(cli.NewVersionCmd(cliName)) - command.Flags().StringVar(&listenHost, "address", env.StringFromEnv("EVENT_REPORTER_LISTEN_ADDRESS", common.DefaultAddressEventReporterServer), "Listen on given address") - command.Flags().IntVar(&listenPort, "port", common.DefaultPortEventReporterServer, "Listen on given port") - command.Flags().StringVar(&metricsHost, env.StringFromEnv("EVENT_REPORTER_METRICS_LISTEN_ADDRESS", "metrics-address"), common.DefaultAddressEventReporterServerMetrics, "Listen for metrics on given address") - command.Flags().IntVar(&metricsPort, "metrics-port", common.DefaultPortEventReporterServerMetrics, "Start metrics on given port") - command.Flags().IntVar(&repoServerTimeoutSeconds, "repo-server-timeout-seconds", env.ParseNumFromEnv("EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS", 60, 0, math.MaxInt64), "Repo server RPC call timeout seconds.") - command.Flags().StringVar(&contentSecurityPolicy, "content-security-policy", env.StringFromEnv("EVENT_REPORTER_CONTENT_SECURITY_POLICY", "frame-ancestors 'self';"), "Set Content-Security-Policy header in HTTP responses to `value`. To disable, set to \"\".") - command.Flags().BoolVar(&repoServerPlaintext, "repo-server-plaintext", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_PLAINTEXT", false), "Use a plaintext client (non-TLS) to connect to repository server") - command.Flags().BoolVar(&repoServerStrictTLS, "repo-server-strict-tls", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_STRICT_TLS", false), "Perform strict validation of TLS certificates when connecting to repo server") - command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url") - command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token") - command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ") - cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) { - redisClient = client - }) - return command -} diff --git a/cmd/main.go b/cmd/main.go index af676e96ad0f8..d972863992bce 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -16,7 +16,6 @@ import ( reposerver "github.com/argoproj/argo-cd/v2/cmd/argocd-repo-server/commands" apiserver "github.com/argoproj/argo-cd/v2/cmd/argocd-server/commands" cli "github.com/argoproj/argo-cd/v2/cmd/argocd/commands" - eventreporterserver "github.com/argoproj/argo-cd/v2/cmd/event-reporter-server/commands" ) const ( @@ -35,8 +34,6 @@ func main() { command = cli.NewCommand() case "argocd-server": command = apiserver.NewCommand() - case "event-reporter-server": - command = eventreporterserver.NewCommand() case "argocd-application-controller": command = appcontroller.NewCommand() case "argocd-repo-server": diff --git a/common/common.go b/common/common.go index d438ed4ab551b..e4e3da5c7fd8c 100644 --- a/common/common.go +++ b/common/common.go @@ -15,8 +15,7 @@ import ( // Default service addresses and URLS of Argo CD internal services const ( // DefaultRepoServerAddr is the gRPC address of the Argo CD repo server - DefaultRepoServerAddr = "argocd-repo-server:8081" - DefaultApplicationServerAddr = "argo-cd-server:80" + DefaultRepoServerAddr = "argocd-repo-server:8081" // DefaultDexServerAddr is the HTTP address of the Dex OIDC server, which we run a reverse proxy against DefaultDexServerAddr = "argocd-dex-server:5556" // DefaultRedisAddr is the default redis address @@ -45,24 +44,20 @@ const ( // Default listener ports for ArgoCD components const ( - DefaultPortAPIServer = 8080 - DefaultPortRepoServer = 8081 - DefaultPortArgoCDMetrics = 8082 - DefaultPortArgoCDAPIServerMetrics = 8083 - DefaultPortRepoServerMetrics = 8084 - DefaultPortEventReporterServerMetrics = 8087 - DefaultPortEventReporterServer = 8088 + DefaultPortAPIServer = 8080 + DefaultPortRepoServer = 8081 + DefaultPortArgoCDMetrics = 8082 + DefaultPortArgoCDAPIServerMetrics = 8083 + DefaultPortRepoServerMetrics = 8084 ) // DefaultAddressAPIServer for ArgoCD components const ( - DefaultAddressAdminDashboard = "localhost" - DefaultAddressAPIServer = "0.0.0.0" - DefaultAddressAPIServerMetrics = "0.0.0.0" - DefaultAddressRepoServer = "0.0.0.0" - DefaultAddressRepoServerMetrics = "0.0.0.0" - DefaultAddressEventReporterServer = "0.0.0.0" - DefaultAddressEventReporterServerMetrics = "0.0.0.0" + DefaultAddressAdminDashboard = "localhost" + DefaultAddressAPIServer = "0.0.0.0" + DefaultAddressAPIServerMetrics = "0.0.0.0" + DefaultAddressRepoServer = "0.0.0.0" + DefaultAddressRepoServerMetrics = "0.0.0.0" ) // Default paths on the pod's file system @@ -242,12 +237,6 @@ const ( EnvCMPWorkDir = "ARGOCD_CMP_WORKDIR" // EnvGPGDataPath overrides the location where GPG keyring for signature verification is stored EnvGPGDataPath = "ARGOCD_GPG_DATA_PATH" - // EnvEventReporterShardingAlgorithm is the distribution sharding algorithm to be used: legacy - EnvEventReporterShardingAlgorithm = "EVENT_REPORTER_SHARDING_ALGORITHM" - // EnvEventReporterReplicas is the number of EventReporter replicas - EnvEventReporterReplicas = "EVENT_REPORTER_REPLICAS" - // EnvEventReporterShard is the shard number that should be handled by reporter - EnvEventReporterShard = "EVENT_REPORTER_SHARD" ) // Config Management Plugin related constants @@ -356,9 +345,3 @@ const TokenVerificationError = "failed to verify the token" var TokenVerificationErr = errors.New(TokenVerificationError) var PermissionDeniedAPIError = status.Error(codes.PermissionDenied, "permission denied") - -// Event reporter constants -const ( - EventReporterLegacyShardingAlgorithm = "legacy" - DefaultEventReporterShardingAlgorithm = EventReporterLegacyShardingAlgorithm -) diff --git a/event_reporter/codefresh/client.go b/event_reporter/codefresh/client.go deleted file mode 100644 index f6c7001a18e24..0000000000000 --- a/event_reporter/codefresh/client.go +++ /dev/null @@ -1,78 +0,0 @@ -package codefresh - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "time" - - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" -) - -type CodefreshConfig struct { - BaseURL string - AuthToken string -} - -type codefreshClient struct { - cfConfig *CodefreshConfig - httpClient *http.Client -} - -type CodefreshClient interface { - Send(ctx context.Context, appName string, event *events.Event) error -} - -func NewCodefreshClient(cfConfig *CodefreshConfig) CodefreshClient { - return &codefreshClient{ - cfConfig: cfConfig, - httpClient: &http.Client{ - Timeout: 30 * time.Second, - }, - } -} - -func (cc *codefreshClient) Send(ctx context.Context, appName string, event *events.Event) error { - return WithRetry(&DefaultBackoff, func() error { - url := cc.cfConfig.BaseURL + "/2.0/api/events" - log.Infof("Sending application event for %s", appName) - - wrappedPayload := map[string]json.RawMessage{ - "data": event.Payload, - } - - newPayloadBytes, err := json.Marshal(wrappedPayload) - if err != nil { - return err - } - - req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(newPayloadBytes)) - if err != nil { - return err - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", cc.cfConfig.AuthToken) - - res, err := cc.httpClient.Do(req) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("failed reporting to Codefresh, event: %s", string(event.Payload))) - } - defer res.Body.Close() - - isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300 - if !isStatusOK { - b, _ := io.ReadAll(res.Body) - return errors.Errorf("failed reporting to Codefresh, got response: status code %d and body %s, original request body: %s", - res.StatusCode, string(b), string(event.Payload)) - } - - log.Infof("Application event for %s successfully sent", appName) - return nil - }) -} diff --git a/event_reporter/codefresh/utils.go b/event_reporter/codefresh/utils.go deleted file mode 100644 index 7496ef5511012..0000000000000 --- a/event_reporter/codefresh/utils.go +++ /dev/null @@ -1,151 +0,0 @@ -package codefresh - -import ( - "fmt" - "strconv" - "time" - - "k8s.io/apimachinery/pkg/util/wait" -) - -// Backoff for an operation -type Backoff struct { - // The initial duration in nanoseconds or strings like "1s", "3m" - // +optional - Duration *Int64OrString `json:"duration,omitempty" protobuf:"bytes,1,opt,name=duration"` - // Duration is multiplied by factor each iteration - // +optional - Factor *Amount `json:"factor,omitempty" protobuf:"bytes,2,opt,name=factor"` - // The amount of jitter applied each iteration - // +optional - Jitter *Amount `json:"jitter,omitempty" protobuf:"bytes,3,opt,name=jitter"` - // Exit with error after this many steps - // +optional - Steps int32 `json:"steps,omitempty" protobuf:"varint,4,opt,name=steps"` -} - -// Amount represent a numeric amount. -type Amount struct { - Value []byte `json:"value" protobuf:"bytes,1,opt,name=value"` -} - -type Int64OrString struct { - Type Type `json:"type" protobuf:"varint,1,opt,name=type,casttype=Type"` - Int64Val int64 `json:"int64Val,omitempty" protobuf:"varint,2,opt,name=int64Val"` - StrVal string `json:"strVal,omitempty" protobuf:"bytes,3,opt,name=strVal"` -} - -// Type represents the stored type of Int64OrString. -type Type int64 - -const ( - Int64 Type = iota // The Int64OrString holds an int64. - String // The Int64OrString holds a string. -) - -func NewAmount(s string) Amount { - return Amount{Value: []byte(s)} -} - -// FromString creates an Int64OrString object with a string value. -func FromString(val string) Int64OrString { - return Int64OrString{Type: String, StrVal: val} -} - -// Int64Value returns the Int64Val if type Int64, or if -// it is a String, will attempt a conversion to int64, -// returning 0 if a parsing error occurs. -func (int64str *Int64OrString) Int64Value() int64 { - if int64str.Type == String { - i, _ := strconv.ParseInt(int64str.StrVal, 10, 64) - return i - } - return int64str.Int64Val -} - -var ( - defaultFactor = NewAmount("1.0") - defaultJitter = NewAmount("1") - defaultDuration = FromString("1s") - - DefaultBackoff = Backoff{ - Steps: 5, - Duration: &defaultDuration, - Factor: &defaultFactor, - Jitter: &defaultJitter, - } -) - -func (n *Amount) Float64() (float64, error) { - return strconv.ParseFloat(string(n.Value), 64) -} - -// Convert2WaitBackoff converts to a wait backoff option -func Convert2WaitBackoff(backoff *Backoff) (*wait.Backoff, error) { - result := wait.Backoff{} - - d := backoff.Duration - if d == nil { - d = &defaultDuration - } - if d.Type == Int64 { - result.Duration = time.Duration(d.Int64Value()) - } else { - parsedDuration, err := time.ParseDuration(d.StrVal) - if err != nil { - return nil, err - } - result.Duration = parsedDuration - } - - factor := backoff.Factor - if factor == nil { - factor = &defaultFactor - } - f, err := factor.Float64() - if err != nil { - return nil, fmt.Errorf("invalid factor, %w", err) - } - result.Factor = f - - jitter := backoff.Jitter - if jitter == nil { - jitter = &defaultJitter - } - j, err := jitter.Float64() - if err != nil { - return nil, fmt.Errorf("invalid jitter, %w", err) - } - result.Jitter = j - - if backoff.Steps > 0 { - result.Steps = backoff.GetSteps() - } else { - result.Steps = int(DefaultBackoff.Steps) - } - return &result, nil -} - -func (b Backoff) GetSteps() int { - return int(b.Steps) -} - -func WithRetry(backoff *Backoff, f func() error) error { - if backoff == nil { - backoff = &DefaultBackoff - } - b, err := Convert2WaitBackoff(backoff) - if err != nil { - return fmt.Errorf("invalid backoff configuration, %w", err) - } - _ = wait.ExponentialBackoff(*b, func() (bool, error) { - if err = f(); err != nil { - return false, nil - } - return true, nil - }) - if err != nil { - return fmt.Errorf("failed after retries: %w", err) - } - return nil -} diff --git a/event_reporter/controller/controller.go b/event_reporter/controller/controller.go deleted file mode 100644 index 564712e742758..0000000000000 --- a/event_reporter/controller/controller.go +++ /dev/null @@ -1,118 +0,0 @@ -package controller - -import ( - "context" - "math" - "strings" - "time" - - argocommon "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" - "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - "github.com/argoproj/argo-cd/v2/event_reporter/reporter" - applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - argoutil "github.com/argoproj/argo-cd/v2/util/argo" - "github.com/argoproj/argo-cd/v2/util/env" - "github.com/argoproj/argo-cd/v2/util/settings" - log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" -) - -var ( - watchAPIBufferSize = 1000 - applicationEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvApplicationEventCacheDuration, 20, 0, math.MaxInt32)) -) - -type EventReporterController interface { - Run(ctx context.Context) -} - -type eventReporterController struct { - settingsMgr *settings.SettingsManager - appBroadcaster reporter.Broadcaster - applicationEventReporter reporter.ApplicationEventReporter - cache *servercache.Cache - appLister applisters.ApplicationLister - applicationServiceClient applicationpkg.ApplicationServiceClient - metricsServer *metrics.MetricsServer -} - -func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController { - appBroadcaster := reporter.NewBroadcaster(featureManager) - appInformer.AddEventHandler(appBroadcaster) - return &eventReporterController{ - appBroadcaster: appBroadcaster, - applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer), - cache: cache, - settingsMgr: settingsMgr, - applicationServiceClient: applicationServiceClient, - appLister: appLister, - metricsServer: metricsServer, - } -} - -func (c *eventReporterController) Run(ctx context.Context) { - var ( - logCtx log.FieldLogger = log.StandardLogger() - ) - - // sendIfPermitted is a helper to send the application to the client's streaming channel if the - // caller has RBAC privileges permissions to view it - sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string, ignoreResourceCache bool) error { - if eventType == watch.Bookmark { - return nil // ignore this event - } - - appInstanceLabelKey, err := c.settingsMgr.GetAppInstanceLabelKey() - if err != nil { - return err - } - trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr) - - err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, ts, ignoreResourceCache, appInstanceLabelKey, trackingMethod) - if err != nil { - return err - } - - if err := c.cache.SetLastApplicationEvent(&a, applicationEventCacheExpiration); err != nil { - logCtx.WithError(err).Error("failed to cache last sent application event") - return err - } - return nil - } - - //TODO: move to abstraction - eventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) - unsubscribe := c.appBroadcaster.Subscribe(eventsChannel) - defer unsubscribe() - for { - select { - case <-ctx.Done(): - return - case event := <-eventsChannel: - logCtx.Infof("channel size is %d", len(eventsChannel)) - c.metricsServer.SetQueueSizeCounter(len(eventsChannel)) - shouldProcess, ignoreResourceCache := c.applicationEventReporter.ShouldSendApplicationEvent(event) - if !shouldProcess { - logCtx.Infof("Skipping event %s/%s", event.Application.Name, event.Type) - c.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricAppEventType, event.Application.Name) - continue - } - ts := time.Now().Format("2006-01-02T15:04:05.000Z") - ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) - err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache) - if err != nil { - logCtx.WithError(err).Error("failed to stream application events") - if strings.Contains(err.Error(), "context deadline exceeded") { - logCtx.Info("Closing event-source connection") - cancel() - } - } - cancel() - } - } -} diff --git a/event_reporter/handlers/handlers.go b/event_reporter/handlers/handlers.go deleted file mode 100644 index 951af200a0ad8..0000000000000 --- a/event_reporter/handlers/handlers.go +++ /dev/null @@ -1,81 +0,0 @@ -package handlers - -import ( - "encoding/json" - "github.com/argoproj/argo-cd/v2/event_reporter/sharding" - applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - "net/http" - "strconv" - "strings" -) - -type RequestHandlers struct { - ApplicationServiceClient applicationpkg.ApplicationServiceClient -} - -func GetRequestHandlers(applicationServiceClient applicationpkg.ApplicationServiceClient) *RequestHandlers { - return &RequestHandlers{ - ApplicationServiceClient: applicationServiceClient, - } -} - -// queryParams: []string{"shardings"} -// response JSON { "strategyName": { Distribution, //Apps } -func (rH *RequestHandlers) GetAppDistribution(w http.ResponseWriter, r *http.Request) { - type ShardingAlgorithmData struct { - Distribution map[string]int `json:"distribution"` - Apps map[string]int `json:"apps"` - } - response := map[string]ShardingAlgorithmData{} - - shardings := []string{""} - shardingsParam := r.URL.Query().Get("shardings") - if shardingsParam != "" { - shardings = strings.Split(shardingsParam, ",") - } - - apps, err := rH.ApplicationServiceClient.List(r.Context(), &applicationpkg.ApplicationQuery{}) - - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - shardingInstance := sharding.NewSharding() - - for _, shardingAlgorithm := range shardings { - distributionMap := make(map[string]int) - appsMap := make(map[string]int) - distributionFunction := shardingInstance.GetDistributionFunction(shardingAlgorithm) - - for _, app := range apps.Items { - expectedShard := distributionFunction(&app) - distributionMap[strconv.Itoa(expectedShard)] += 1 - appsMap[app.QualifiedName()] = expectedShard - } - - shardingAlgorithmDisplayName := shardingAlgorithm - if shardingAlgorithm == "" { - shardingAlgorithmDisplayName = "default" - } - - response[shardingAlgorithmDisplayName] = ShardingAlgorithmData{ - Distribution: distributionMap, - Apps: appsMap, - } - } - - jsonBytes, err := json.Marshal(response) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - _, err = w.Write(jsonBytes) - - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} diff --git a/event_reporter/metrics/metrics.go b/event_reporter/metrics/metrics.go deleted file mode 100644 index 08bdd855d3fde..0000000000000 --- a/event_reporter/metrics/metrics.go +++ /dev/null @@ -1,148 +0,0 @@ -package metrics - -import ( - "fmt" - "github.com/argoproj/argo-cd/v2/event_reporter/sharding" - "net/http" - "strconv" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - - "github.com/argoproj/argo-cd/v2/util/profile" -) - -type MetricsServer struct { - *http.Server - shard string - redisRequestCounter *prometheus.CounterVec - redisRequestHistogram *prometheus.HistogramVec - queueSizeCounter *prometheus.GaugeVec - erroredEventsCounter *prometheus.CounterVec - cachedIgnoredEventsCounter *prometheus.CounterVec - eventProcessingDurationHistogram *prometheus.HistogramVec -} - -type MetricEventType string - -const ( - MetricAppEventType MetricEventType = "app" - MetricParentAppEventType MetricEventType = "parent_app" - MetricChildAppEventType MetricEventType = "child_app" - MetricResourceEventType MetricEventType = "resource" -) - -type MetricEventErrorType string - -const ( - MetricEventDeliveryErrorType MetricEventErrorType = "delivery" - MetricEventGetPayloadErrorType MetricEventErrorType = "get_payload" - MetricEventUnknownErrorType MetricEventErrorType = "unknown" -) - -var ( - redisRequestCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "argocd_redis_request_total", - Help: "Number of kubernetes requests executed during application reconciliation.", - }, - []string{"initiator", "failed"}, - ) - redisRequestHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "argocd_redis_request_duration", - Help: "Redis requests duration.", - Buckets: []float64{0.1, 0.25, .5, 1, 2}, - }, - []string{"initiator"}, - ) - queueSizeCounter = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "cf_e_reporter_queue_size", - Help: "Size of application events queue of taked shard.", - }, - []string{"reporter_shard"}, - ) - erroredEventsCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "cf_e_reporter_errored_events", - Help: "Total amount of errored events.", - }, - []string{"reporter_shard", "metric_event_type", "error_type", "application"}, - ) - cachedIgnoredEventsCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "cf_e_reporter_cached_ignored_events", - Help: "Total number of ignored events because of cache.", - }, - []string{"reporter_shard", "metric_event_type", "application"}, - ) - eventProcessingDurationHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "cf_e_reporter_event_processing_duration", - Help: "Event processing duration.", - Buckets: []float64{0.1, 0.25, .5, 1, 2, 3, 4, 5, 7, 10, 15, 20}, - }, - []string{"reporter_shard", "metric_event_type"}, - ) -) - -// NewMetricsServer returns a new prometheus server which collects api server metrics -func NewMetricsServer(host string, port int) *MetricsServer { - mux := http.NewServeMux() - registry := prometheus.NewRegistry() - - mux.Handle("/metrics", promhttp.HandlerFor(prometheus.Gatherers{ - registry, - prometheus.DefaultGatherer, - }, promhttp.HandlerOpts{})) - profile.RegisterProfiler(mux) - - registry.MustRegister(redisRequestCounter) - registry.MustRegister(redisRequestHistogram) - - registry.MustRegister(queueSizeCounter) - registry.MustRegister(erroredEventsCounter) - registry.MustRegister(cachedIgnoredEventsCounter) - registry.MustRegister(eventProcessingDurationHistogram) - - shard := sharding.GetShardNumber() - - return &MetricsServer{ - Server: &http.Server{ - Addr: fmt.Sprintf("%s:%d", host, port), - Handler: mux, - }, - shard: strconv.FormatInt(int64(shard), 10), - queueSizeCounter: queueSizeCounter, - erroredEventsCounter: erroredEventsCounter, - cachedIgnoredEventsCounter: cachedIgnoredEventsCounter, - eventProcessingDurationHistogram: eventProcessingDurationHistogram, - } -} - -func (m *MetricsServer) IncRedisRequest(failed bool) { - m.redisRequestCounter.WithLabelValues("argocd-server", strconv.FormatBool(failed)).Inc() -} - -// ObserveRedisRequestDuration observes redis request duration -func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) { - m.redisRequestHistogram.WithLabelValues("argocd-server").Observe(duration.Seconds()) -} - -func (m *MetricsServer) SetQueueSizeCounter(size int) { - m.queueSizeCounter.WithLabelValues(m.shard).Set(float64(size)) -} - -func (m *MetricsServer) IncErroredEventsCounter(metricEventType MetricEventType, errorType MetricEventErrorType, application string) { - m.erroredEventsCounter.WithLabelValues(m.shard, string(metricEventType), string(errorType), application).Inc() -} - -func (m *MetricsServer) IncCachedIgnoredEventsCounter(metricEventType MetricEventType, application string) { - m.cachedIgnoredEventsCounter.WithLabelValues(m.shard, string(metricEventType), application).Inc() -} - -func (m *MetricsServer) ObserveEventProcessingDurationHistogramDuration(metricEventType MetricEventType, duration time.Duration) { - m.eventProcessingDurationHistogram.WithLabelValues(m.shard, string(metricEventType)).Observe(duration.Seconds()) -} diff --git a/event_reporter/reporter/application_errors_parser.go b/event_reporter/reporter/application_errors_parser.go deleted file mode 100644 index 8a6dd2b30c731..0000000000000 --- a/event_reporter/reporter/application_errors_parser.go +++ /dev/null @@ -1,147 +0,0 @@ -package reporter - -import ( - "fmt" - "strings" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/argoproj/gitops-engine/pkg/health" - "github.com/argoproj/gitops-engine/pkg/sync/common" - - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" -) - -func parseApplicationSyncResultErrors(os *appv1.OperationState) []*events.ObjectError { - var errors []*events.ObjectError - // mean that resource not found as sync result but application can contain error inside operation state itself, - // for example app created with invalid yaml - if os.Phase == common.OperationError || os.Phase == common.OperationFailed { - errors = append(errors, &events.ObjectError{ - Type: "sync", - Level: "error", - Message: os.Message, - LastSeen: os.StartedAt, - }) - } - return errors -} - -var syncTaskUnsuccessfullErrorMessage = "one or more synchronization tasks completed unsuccessfully" -var syncTaskNotValidErrorMessage = "one or more synchronization tasks are not valid" - -func parseApplicationSyncResultErrorsFromConditions(status appv1.ApplicationStatus) []*events.ObjectError { - var errs []*events.ObjectError - if status.Conditions == nil { - return errs - } - for _, cnd := range status.Conditions { - if !strings.Contains(strings.ToLower(cnd.Type), "error") { - continue - } - - lastSeen := metav1.Now() - if cnd.LastTransitionTime != nil { - lastSeen = *cnd.LastTransitionTime - } - - if (strings.Contains(cnd.Message, syncTaskUnsuccessfullErrorMessage) || strings.Contains(cnd.Message, syncTaskNotValidErrorMessage)) && status.OperationState != nil && status.OperationState.SyncResult != nil && status.OperationState.SyncResult.Resources != nil { - resourcesSyncErrors := parseAggregativeResourcesSyncErrors(status.OperationState.SyncResult.Resources) - - errs = append(errs, resourcesSyncErrors...) - } else { - errs = append(errs, &events.ObjectError{ - Type: "sync", - Level: "error", - Message: cnd.Message, - LastSeen: lastSeen, - }) - } - } - return errs -} - -func parseResourceSyncResultErrors(rs *appv1.ResourceStatus, os *appv1.OperationState) []*events.ObjectError { - errors := []*events.ObjectError{} - if os.SyncResult == nil { - return errors - } - - _, sr := os.SyncResult.Resources.Find( - rs.Group, - rs.Kind, - rs.Namespace, - rs.Name, - common.SyncPhaseSync, - ) - - if sr == nil || !(sr.HookPhase == common.OperationFailed || sr.HookPhase == common.OperationError || sr.Status == common.ResultCodeSyncFailed) { - return errors - } - - errors = append(errors, &events.ObjectError{ - Type: "sync", - Level: "error", - Message: sr.Message, - LastSeen: os.StartedAt, - }) - - return errors -} - -func parseAggregativeHealthErrors(rs *appv1.ResourceStatus, apptree *appv1.ApplicationTree) []*events.ObjectError { - errs := make([]*events.ObjectError, 0) - - if apptree == nil { - return errs - } - - n := apptree.FindNode(rs.Group, rs.Kind, rs.Namespace, rs.Name) - if n == nil { - return errs - } - - childNodes := n.GetAllChildNodes(apptree, "") - - for _, cn := range childNodes { - if cn.Health != nil && cn.Health.Status == health.HealthStatusDegraded { - errs = append(errs, &events.ObjectError{ - Type: "health", - Level: "error", - Message: cn.Health.Message, - LastSeen: *cn.CreatedAt, - }) - } - } - - return errs -} - -func parseAggregativeResourcesSyncErrors(resourceResults appv1.ResourceResults) []*events.ObjectError { - var errs []*events.ObjectError - - if resourceResults == nil { - return errs - } - - for _, rr := range resourceResults { - if rr.Message != "" { - objectError := events.ObjectError{ - Type: "sync", - Level: "error", - LastSeen: metav1.Now(), - Message: fmt.Sprintf("Resource %s(%s): \n %s", rr.Kind, rr.Name, rr.Message), - } - if rr.Status == common.ResultCodeSyncFailed { - errs = append(errs, &objectError) - } - if rr.HookPhase == common.OperationFailed || rr.HookPhase == common.OperationError { - errs = append(errs, &objectError) - } - - } - } - - return errs -} diff --git a/event_reporter/reporter/application_event_reporter.go b/event_reporter/reporter/application_event_reporter.go deleted file mode 100644 index 106dca7ef1184..0000000000000 --- a/event_reporter/reporter/application_event_reporter.go +++ /dev/null @@ -1,877 +0,0 @@ -package reporter - -import ( - "context" - "encoding/json" - "fmt" - "math" - "reflect" - "strings" - "time" - - argocommon "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" - "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - "github.com/argoproj/argo-cd/v2/util/env" - - "github.com/argoproj/argo-cd/v2/util/argo" - - "github.com/argoproj/gitops-engine/pkg/health" - "github.com/argoproj/gitops-engine/pkg/utils/kube" - "github.com/argoproj/gitops-engine/pkg/utils/text" - log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/watch" - "sigs.k8s.io/yaml" - - "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - appv1reg "github.com/argoproj/argo-cd/v2/pkg/apis/application" - appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - "github.com/argoproj/argo-cd/v2/reposerver/apiclient" -) - -var ( - resourceEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvResourceEventCacheDuration, 20, 0, math.MaxInt32)) -) - -type applicationEventReporter struct { - cache *servercache.Cache - codefreshClient codefresh.CodefreshClient - appLister applisters.ApplicationLister - applicationServiceClient applicationpkg.ApplicationServiceClient - metricsServer *metrics.MetricsServer -} - -type ApplicationEventReporter interface { - StreamApplicationEvents( - ctx context.Context, - a *appv1.Application, - ts string, - ignoreResourceCache bool, - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, - ) error - ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) -} - -func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter { - return &applicationEventReporter{ - cache: cache, - applicationServiceClient: applicationServiceClient, - codefreshClient: codefresh.NewCodefreshClient(codefreshConfig), - appLister: appLister, - metricsServer: metricsServer, - } -} - -func (s *applicationEventReporter) shouldSendResourceEvent(a *appv1.Application, rs appv1.ResourceStatus) bool { - logCtx := logWithResourceStatus(log.WithFields(log.Fields{ - "app": a.Name, - "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), - "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), - }), rs) - - cachedRes, err := s.cache.GetLastResourceEvent(a, rs, getApplicationLatestRevision(a)) - if err != nil { - logCtx.Debug("resource not in cache") - return true - } - - if reflect.DeepEqual(&cachedRes, &rs) { - logCtx.Debug("resource status not changed") - - // status not changed - return false - } - - logCtx.Info("resource status changed") - return true -} - -func getParentAppName(a *appv1.Application, appInstanceLabelKey string, trackingMethod appv1.TrackingMethod) string { - resourceTracking := argo.NewResourceTracking() - unApp := kube.MustToUnstructured(&a) - - return resourceTracking.GetAppName(unApp, appInstanceLabelKey, trackingMethod) -} - -func isChildApp(parentAppName string) bool { - return parentAppName != "" -} - -func getAppAsResource(a *appv1.Application) *appv1.ResourceStatus { - return &appv1.ResourceStatus{ - Name: a.Name, - Namespace: a.Namespace, - Version: "v1alpha1", - Kind: "Application", - Group: "argoproj.io", - Status: a.Status.Sync.Status, - Health: &a.Status.Health, - RequiresPruning: a.DeletionTimestamp != nil, - } -} - -func (r *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx *log.Entry) (*apiclient.ManifestResponse, error, bool) { - // get the desired state manifests of the application - desiredManifests, err := r.applicationServiceClient.GetManifests(ctx, &application.ApplicationManifestQuery{ - Name: &a.Name, - Revision: &a.Status.Sync.Revision, - }) - if err != nil { - notManifestGenerationError := !strings.Contains(err.Error(), "Manifest generation error") - // when application deleted rbac also throws erorr with PermissionDenied - // we can ignore the error, as we check rbac access before reporting events - notPermissionDeniedError := !strings.Contains(err.Error(), "PermissionDenied") - - if notManifestGenerationError && notPermissionDeniedError { - return nil, fmt.Errorf("failed to get application desired state manifests: %w", err), false - } - // if it's manifest generation error we need to still report the actual state - // of the resources, but since we can't get the desired state, we will report - // each resource with empty desired state - logCtx.WithError(err).Warn("failed to get application desired state manifests, reporting actual state only") - desiredManifests = &apiclient.ManifestResponse{Manifests: []*apiclient.Manifest{}} - return desiredManifests, nil, true // will ignore requiresPruning=true to not delete resources with actual state - } - return desiredManifests, nil, false -} - -func (s *applicationEventReporter) StreamApplicationEvents( - ctx context.Context, - a *appv1.Application, - ts string, - ignoreResourceCache bool, - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, -) error { - startTime := time.Now() - logCtx := log.WithField("app", a.Name) - - logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events") - - appTree, err := s.applicationServiceClient.ResourceTree(ctx, &application.ResourcesQuery{ - ApplicationName: &a.Name, - Project: &a.Spec.Project, - Namespace: &a.Namespace, - }) - if err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { - return fmt.Errorf("failed to get application tree: %w", err) - } - - // we still need process app even without tree, it is in case of app yaml originally contain error, - // we still want to show it the erorrs that related to it on codefresh ui - logCtx.WithError(err).Warn("failed to get application tree, resuming") - } - - logCtx.Info("getting desired manifests") - - desiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, a, logCtx) - if err != nil { - return err - } - - logCtx.Info("getting parent application name") - - parentAppName := getParentAppName(a, appInstanceLabelKey, trackingMethod) - - if isChildApp(parentAppName) { - logCtx.Info("processing as child application") - parentApplicationEntity, err := s.applicationServiceClient.Get(ctx, &application.ApplicationQuery{ - Name: &parentAppName, - }) - if err != nil { - return fmt.Errorf("failed to get parent application entity: %w", err) - } - - rs := getAppAsResource(a) - - parentDesiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, logCtx) - if err != nil { - logCtx.WithError(err).Warn("failed to get parent application's desired manifests, resuming") - } - - // helm app hasnt revision - // TODO: add check if it helm application - parentOperationRevision := getOperationRevision(parentApplicationEntity) - parentRevisionMetadata, err := s.getApplicationRevisionDetails(ctx, parentApplicationEntity, parentOperationRevision) - if err != nil { - logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming") - } - - err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, parentDesiredManifests, appTree, manifestGenErr, a, parentRevisionMetadata, true, appInstanceLabelKey, trackingMethod, desiredManifests.ApplicationVersions) - if err != nil { - s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name) - return err - } - reconcileDuration := time.Since(startTime) - s.metricsServer.ObserveEventProcessingDurationHistogramDuration(metrics.MetricChildAppEventType, reconcileDuration) - } else { - logCtx.Info("processing as root application") - // will get here only for root applications (not managed as a resource by another application) - appEvent, err := s.getApplicationEventPayload(ctx, a, ts, appInstanceLabelKey, trackingMethod, desiredManifests.ApplicationVersions) - if err != nil { - s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name) - return fmt.Errorf("failed to get application event: %w", err) - } - - if appEvent == nil { - // event did not have an OperationState - skip all events - return nil - } - - logWithAppStatus(a, logCtx, ts).Info("sending root application event") - if err := s.codefreshClient.Send(ctx, a.Name, appEvent); err != nil { - s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventDeliveryErrorType, a.Name) - return fmt.Errorf("failed to send event for root application %s/%s: %w", a.Namespace, a.Name, err) - } - reconcileDuration := time.Since(startTime) - s.metricsServer.ObserveEventProcessingDurationHistogramDuration(metrics.MetricParentAppEventType, reconcileDuration) - } - - revisionMetadata, _ := s.getApplicationRevisionDetails(ctx, a, getOperationRevision(a)) - // for each resource in the application get desired and actual state, - // then stream the event - for _, rs := range a.Status.Resources { - if isApp(rs) { - continue - } - startTime := time.Now() - err := s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, appTree, manifestGenErr, nil, revisionMetadata, ignoreResourceCache, appInstanceLabelKey, trackingMethod, nil) - if err != nil { - s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name) - return err - } - reconcileDuration := time.Since(startTime) - s.metricsServer.ObserveEventProcessingDurationHistogramDuration(metrics.MetricResourceEventType, reconcileDuration) - } - return nil -} - -func (s *applicationEventReporter) getAppForResourceReporting( - rs appv1.ResourceStatus, - ctx context.Context, - a *appv1.Application, - revisionMetadata *appv1.RevisionMetadata, -) (*appv1.Application, *appv1.RevisionMetadata) { - if rs.Kind != "Rollout" { // for rollout it's crucial to report always correct operationSyncRevision - return a, revisionMetadata - } - - latestAppStatus, err := s.appLister.Applications(a.Namespace).Get(a.Name) - - if err != nil { - return a, revisionMetadata - } - - revisionMetadataToReport, err := s.getApplicationRevisionDetails(ctx, latestAppStatus, getOperationRevision(latestAppStatus)) - - if err != nil { - return a, revisionMetadata - } - - return latestAppStatus, revisionMetadataToReport -} - -func (s *applicationEventReporter) processResource( - ctx context.Context, - rs appv1.ResourceStatus, - parentApplication *appv1.Application, - logCtx *log.Entry, - ts string, - desiredManifests *apiclient.ManifestResponse, - appTree *appv1.ApplicationTree, - manifestGenErr bool, - originalApplication *appv1.Application, - revisionMetadata *appv1.RevisionMetadata, - ignoreResourceCache bool, - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, - applicationVersions *apiclient.ApplicationVersions, -) error { - metricsEventType := metrics.MetricResourceEventType - if isApp(rs) { - metricsEventType = metrics.MetricChildAppEventType - } - - logCtx = logCtx.WithFields(log.Fields{ - "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), - "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), - }) - - if rs.Health == nil && rs.Status == appv1.SyncStatusCodeSynced { - // for resources without health status we need to add 'Healthy' status - // when they are synced because we might have sent an event with 'Missing' - // status earlier and they would be stuck in it if we don't switch to 'Healthy' - rs.Health = &appv1.HealthStatus{ - Status: health.HealthStatusHealthy, - } - } - - if !ignoreResourceCache && !s.shouldSendResourceEvent(parentApplication, rs) { - s.metricsServer.IncCachedIgnoredEventsCounter(metricsEventType, parentApplication.Name) - return nil - } - - // get resource desired state - desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx) - - // get resource actual state - actualState, err := s.applicationServiceClient.GetResource(ctx, &application.ApplicationResourceRequest{ - Name: &parentApplication.Name, - Namespace: &rs.Namespace, - ResourceName: &rs.Name, - Version: &rs.Version, - Group: &rs.Group, - Kind: &rs.Kind, - }) - if err != nil { - if !strings.Contains(err.Error(), "not found") { - // only return error if there is no point in trying to send the - // next resource. For example if the shared context has exceeded - // its deadline - if strings.Contains(err.Error(), "context deadline exceeded") { - return fmt.Errorf("failed to get actual state: %w", err) - } - - s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventUnknownErrorType, parentApplication.Name) - logCtx.WithError(err).Warn("failed to get actual state, resuming") - return nil - } - - manifest := "" - // empty actual state - actualState = &application.ApplicationResourceResponse{Manifest: &manifest} - } - - parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, parentApplication, revisionMetadata) - - var originalAppRevisionMetadata *appv1.RevisionMetadata = nil - - if originalApplication != nil { - originalAppRevisionMetadata, _ = s.getApplicationRevisionDetails(ctx, originalApplication, getOperationRevision(originalApplication)) - } - - ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, ts, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions) - if err != nil { - s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, parentApplication.Name) - logCtx.WithError(err).Warn("failed to get event payload, resuming") - return nil - } - - appRes := appv1.Application{} - appName := "" - if isApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil { - logWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event") - appName = appRes.Name - } else { - logWithResourceStatus(logCtx, rs).Info("streaming resource event") - appName = rs.Name - } - - if err := s.codefreshClient.Send(ctx, appName, ev); err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { - return fmt.Errorf("failed to send resource event: %w", err) - } - - s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventDeliveryErrorType, appName) - logCtx.WithError(err).Warn("failed to send resource event, resuming") - return nil - } - - if err := s.cache.SetLastResourceEvent(parentApplicationToReport, rs, resourceEventCacheExpiration, getApplicationLatestRevision(parentApplicationToReport)); err != nil { - logCtx.WithError(err).Warn("failed to cache resource event") - } - - return nil -} - -func (s *applicationEventReporter) ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) { - logCtx := log.WithField("app", ae.Application.Name) - - if ae.Type == watch.Deleted { - logCtx.Info("application deleted") - return true, false - } - - cachedApp, err := s.cache.GetLastApplicationEvent(&ae.Application) - if err != nil || cachedApp == nil { - return true, false - } - - cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff - cachedApp.Spec.Project = ae.Application.Spec.Project // - for i := range cachedApp.Status.Conditions { - cachedApp.Status.Conditions[i].LastTransitionTime = nil - } - for i := range ae.Application.Status.Conditions { - ae.Application.Status.Conditions[i].LastTransitionTime = nil - } - - // check if application changed to healthy status - if ae.Application.Status.Health.Status == health.HealthStatusHealthy && cachedApp.Status.Health.Status != health.HealthStatusHealthy { - return true, true - } - - if !reflect.DeepEqual(ae.Application.Spec, cachedApp.Spec) { - logCtx.Info("application spec changed") - return true, false - } - - if !reflect.DeepEqual(ae.Application.Status, cachedApp.Status) { - logCtx.Info("application status changed") - return true, false - } - - if !reflect.DeepEqual(ae.Application.Operation, cachedApp.Operation) { - logCtx.Info("application operation changed") - return true, false - } - - return false, false -} - -func isApp(rs appv1.ResourceStatus) bool { - return rs.GroupVersionKind().String() == appv1.ApplicationSchemaGroupVersionKind.String() -} - -func logWithAppStatus(a *appv1.Application, logCtx *log.Entry, ts string) *log.Entry { - return logCtx.WithFields(log.Fields{ - "sync": a.Status.Sync.Status, - "health": a.Status.Health.Status, - "resourceVersion": a.ResourceVersion, - "ts": ts, - }) -} - -func logWithResourceStatus(logCtx *log.Entry, rs appv1.ResourceStatus) *log.Entry { - logCtx = logCtx.WithField("sync", rs.Status) - if rs.Health != nil { - logCtx = logCtx.WithField("health", rs.Health.Status) - } - - return logCtx -} - -func getLatestAppHistoryItem(a *appv1.Application) *appv1.RevisionHistory { - if a.Status.History != nil && len(a.Status.History) > 0 { - return &a.Status.History[len(a.Status.History)-1] - } - - return nil -} - -func getApplicationLatestRevision(a *appv1.Application) string { - revision := a.Status.Sync.Revision - lastHistory := getLatestAppHistoryItem(a) - - if lastHistory != nil { - revision = lastHistory.Revision - } - - return revision -} - -func getOperationRevision(a *appv1.Application) string { - var revision string - if a != nil { - // this value will be used in case if application hasnt resources , like gitsource - revision = a.Status.Sync.Revision - if a.Status.OperationState != nil && a.Status.OperationState.Operation.Sync != nil && a.Status.OperationState.Operation.Sync.Revision != "" { - revision = a.Status.OperationState.Operation.Sync.Revision - } else if a.Operation != nil && a.Operation.Sync != nil && a.Operation.Sync.Revision != "" { - revision = a.Operation.Sync.Revision - } - } - - return revision -} - -func (s *applicationEventReporter) getApplicationRevisionDetails(ctx context.Context, a *appv1.Application, revision string) (*appv1.RevisionMetadata, error) { - return s.applicationServiceClient.RevisionMetadata(ctx, &application.RevisionMetadataQuery{ - Name: &a.Name, - Revision: &revision, - }) -} - -func getLatestAppHistoryId(a *appv1.Application) int64 { - var id int64 - lastHistory := getLatestAppHistoryItem(a) - - if lastHistory != nil { - id = lastHistory.ID - } - - return id -} - -func getResourceEventPayload( - parentApplication *appv1.Application, - rs *appv1.ResourceStatus, - actualState *application.ApplicationResourceResponse, - desiredState *apiclient.Manifest, - apptree *appv1.ApplicationTree, - manifestGenErr bool, - ts string, - originalApplication *appv1.Application, // passed when rs is application - revisionMetadata *appv1.RevisionMetadata, - originalAppRevisionMetadata *appv1.RevisionMetadata, // passed when rs is application - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, - applicationVersions *apiclient.ApplicationVersions, -) (*events.Event, error) { - var ( - err error - syncStarted = metav1.Now() - syncFinished *metav1.Time - errors = []*events.ObjectError{} - logCtx *log.Entry - ) - - if originalApplication != nil { - logCtx = log.WithField("application", originalApplication.Name) - } else { - logCtx = log.NewEntry(log.StandardLogger()) - } - - object := []byte(*actualState.Manifest) - - if originalAppRevisionMetadata != nil && len(object) != 0 { - actualObject, err := appv1.UnmarshalToUnstructured(*actualState.Manifest) - - if err == nil { - actualObject = addCommitDetailsToLabels(actualObject, originalAppRevisionMetadata) - object, err = actualObject.MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal unstructured object: %w", err) - } - } - } - if len(object) == 0 { - if len(desiredState.CompiledManifest) == 0 { - // no actual or desired state, don't send event - u := &unstructured.Unstructured{} - apiVersion := rs.Version - if rs.Group != "" { - apiVersion = rs.Group + "/" + rs.Version - } - - u.SetAPIVersion(apiVersion) - u.SetKind(rs.Kind) - u.SetName(rs.Name) - u.SetNamespace(rs.Namespace) - if originalAppRevisionMetadata != nil { - u = addCommitDetailsToLabels(u, originalAppRevisionMetadata) - } - - object, err = u.MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal unstructured object: %w", err) - } - } else { - // no actual state, use desired state as event object - unstructuredWithNamespace, err := addDestNamespaceToManifest([]byte(desiredState.CompiledManifest), rs) - if err != nil { - return nil, fmt.Errorf("failed to add destination namespace to manifest: %w", err) - } - if originalAppRevisionMetadata != nil { - unstructuredWithNamespace = addCommitDetailsToLabels(unstructuredWithNamespace, originalAppRevisionMetadata) - } - - object, _ = unstructuredWithNamespace.MarshalJSON() - } - } else if rs.RequiresPruning && !manifestGenErr { - // resource should be deleted - desiredState.CompiledManifest = "" - manifest := "" - actualState.Manifest = &manifest - } - - if (originalApplication != nil && originalApplication.DeletionTimestamp != nil) || parentApplication.ObjectMeta.DeletionTimestamp != nil { - // resource should be deleted in case if application in process of deletion - desiredState.CompiledManifest = "" - manifest := "" - actualState.Manifest = &manifest - } - - if parentApplication.Status.OperationState != nil { - syncStarted = parentApplication.Status.OperationState.StartedAt - syncFinished = parentApplication.Status.OperationState.FinishedAt - errors = append(errors, parseResourceSyncResultErrors(rs, parentApplication.Status.OperationState)...) - } - - // for primitive resources that are synced right away and don't require progression time (like configmap) - if rs.Status == appv1.SyncStatusCodeSynced && rs.Health != nil && rs.Health.Status == health.HealthStatusHealthy { - syncFinished = &syncStarted - } - - // parent application not include errors in application originally was created with broken state, for example in destination missed namespace - if originalApplication != nil && originalApplication.Status.OperationState != nil { - errors = append(errors, parseApplicationSyncResultErrors(originalApplication.Status.OperationState)...) - } - - if originalApplication != nil && originalApplication.Status.Conditions != nil { - errors = append(errors, parseApplicationSyncResultErrorsFromConditions(originalApplication.Status)...) - } - - if len(desiredState.RawManifest) == 0 && len(desiredState.CompiledManifest) != 0 { - // for handling helm defined resources, etc... - y, err := yaml.JSONToYAML([]byte(desiredState.CompiledManifest)) - if err == nil { - desiredState.RawManifest = string(y) - } - } - - applicationVersionsEvents, err := repoAppVersionsToEvent(applicationVersions) - if err != nil { - logCtx.Errorf("failed to convert appVersions: %v", err) - } - - source := events.ObjectSource{ - DesiredManifest: desiredState.CompiledManifest, - ActualManifest: *actualState.Manifest, - GitManifest: desiredState.RawManifest, - RepoURL: parentApplication.Status.Sync.ComparedTo.Source.RepoURL, - Path: desiredState.Path, - Revision: getApplicationLatestRevision(parentApplication), - OperationSyncRevision: getOperationRevision(parentApplication), - HistoryId: getLatestAppHistoryId(parentApplication), - AppName: parentApplication.Name, - AppNamespace: parentApplication.Namespace, - AppUID: string(parentApplication.ObjectMeta.UID), - AppLabels: parentApplication.Labels, - SyncStatus: string(rs.Status), - SyncStartedAt: syncStarted, - SyncFinishedAt: syncFinished, - Cluster: parentApplication.Spec.Destination.Server, - AppInstanceLabelKey: appInstanceLabelKey, - TrackingMethod: string(trackingMethod), - } - - if revisionMetadata != nil { - source.CommitMessage = revisionMetadata.Message - source.CommitAuthor = revisionMetadata.Author - source.CommitDate = &revisionMetadata.Date - } - - if rs.Health != nil { - source.HealthStatus = (*string)(&rs.Health.Status) - source.HealthMessage = &rs.Health.Message - if rs.Health.Status != health.HealthStatusHealthy { - errors = append(errors, parseAggregativeHealthErrors(rs, apptree)...) - } - } - - payload := events.EventPayload{ - Timestamp: ts, - Object: object, - Source: &source, - Errors: errors, - AppVersions: applicationVersionsEvents, - } - - logCtx.Infof("AppVersion before encoding: %v", safeString(payload.AppVersions.AppVersion)) - - payloadBytes, err := json.Marshal(&payload) - if err != nil { - return nil, fmt.Errorf("failed to marshal payload for resource %s/%s: %w", rs.Namespace, rs.Name, err) - } - - return &events.Event{Payload: payloadBytes}, nil -} - -func (s *applicationEventReporter) getApplicationEventPayload( - ctx context.Context, - a *appv1.Application, - ts string, - appInstanceLabelKey string, - trackingMethod appv1.TrackingMethod, - applicationVersions *apiclient.ApplicationVersions, -) (*events.Event, error) { - var ( - syncStarted = metav1.Now() - syncFinished *metav1.Time - logCtx = log.WithField("application", a.Name) - ) - - obj := appv1.Application{} - a.DeepCopyInto(&obj) - - // make sure there is type meta on object - obj.TypeMeta = metav1.TypeMeta{ - Kind: appv1reg.ApplicationKind, - APIVersion: appv1.SchemeGroupVersion.String(), - } - - if a.Status.OperationState != nil { - syncStarted = a.Status.OperationState.StartedAt - syncFinished = a.Status.OperationState.FinishedAt - } - - applicationSource := a.Spec.GetSource() - if !applicationSource.IsHelm() && (a.Status.Sync.Revision != "" || (a.Status.History != nil && len(a.Status.History) > 0)) { - revisionMetadata, err := s.getApplicationRevisionDetails(ctx, a, getOperationRevision(a)) - - if err != nil { - if !strings.Contains(err.Error(), "not found") { - return nil, fmt.Errorf("failed to get revision metadata: %w", err) - } - - logCtx.Warnf("failed to get revision metadata: %s, reporting application deletion event", err.Error()) - } else { - if obj.ObjectMeta.Labels == nil { - obj.ObjectMeta.Labels = map[string]string{} - } - - obj.ObjectMeta.Labels["app.meta.commit-date"] = revisionMetadata.Date.Format("2006-01-02T15:04:05.000Z") - obj.ObjectMeta.Labels["app.meta.commit-author"] = revisionMetadata.Author - obj.ObjectMeta.Labels["app.meta.commit-message"] = revisionMetadata.Message - } - } - - object, err := json.Marshal(&obj) - if err != nil { - return nil, fmt.Errorf("failed to marshal application event") - } - - actualManifest := string(object) - if a.DeletionTimestamp != nil { - actualManifest = "" // mark as deleted - logCtx.Info("reporting application deletion event") - } - - applicationVersionsEvents, err := repoAppVersionsToEvent(applicationVersions) - if err != nil { - logCtx.Errorf("failed to convert appVersions: %v", err) - } - - hs := string(a.Status.Health.Status) - source := &events.ObjectSource{ - DesiredManifest: "", - GitManifest: "", - ActualManifest: actualManifest, - RepoURL: a.Spec.GetSource().RepoURL, - CommitMessage: "", - CommitAuthor: "", - Path: "", - Revision: "", - OperationSyncRevision: "", - HistoryId: 0, - AppName: "", - AppUID: "", - AppLabels: map[string]string{}, - SyncStatus: string(a.Status.Sync.Status), - SyncStartedAt: syncStarted, - SyncFinishedAt: syncFinished, - HealthStatus: &hs, - HealthMessage: &a.Status.Health.Message, - Cluster: a.Spec.Destination.Server, - AppInstanceLabelKey: appInstanceLabelKey, - TrackingMethod: string(trackingMethod), - } - - payload := events.EventPayload{ - Timestamp: ts, - Object: object, - Source: source, - Errors: parseApplicationSyncResultErrorsFromConditions(a.Status), - AppVersions: applicationVersionsEvents, - } - - logCtx.Infof("AppVersion before encoding: %v", safeString(payload.AppVersions.AppVersion)) - - payloadBytes, err := json.Marshal(&payload) - if err != nil { - return nil, fmt.Errorf("failed to marshal payload for resource %s/%s: %w", a.Namespace, a.Name, err) - } - - return &events.Event{Payload: payloadBytes}, nil -} - -func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger *log.Entry) *apiclient.Manifest { - if ds == nil { - return &apiclient.Manifest{} - } - for _, m := range ds.Manifests { - u, err := appv1.UnmarshalToUnstructured(m.CompiledManifest) - if err != nil { - logger.WithError(err).Warnf("failed to unmarshal compiled manifest") - continue - } - - if u == nil { - continue - } - - ns := text.FirstNonEmpty(u.GetNamespace(), rs.Namespace) - - if u.GroupVersionKind().String() == rs.GroupVersionKind().String() && - u.GetName() == rs.Name && - ns == rs.Namespace { - if rs.Kind == kube.SecretKind && rs.Version == "v1" { - m.RawManifest = m.CompiledManifest - } - - return m - } - } - - // no desired state for resource - // it's probably deleted from git - return &apiclient.Manifest{} -} - -func addDestNamespaceToManifest(resourceManifest []byte, rs *appv1.ResourceStatus) (*unstructured.Unstructured, error) { - u, err := appv1.UnmarshalToUnstructured(string(resourceManifest)) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal manifest: %w", err) - } - - if u.GetNamespace() == rs.Namespace { - return u, nil - } - - // need to change namespace - u.SetNamespace(rs.Namespace) - - return u, nil -} - -func addCommitDetailsToLabels(u *unstructured.Unstructured, revisionMetadata *appv1.RevisionMetadata) *unstructured.Unstructured { - if revisionMetadata == nil || u == nil { - return u - } - - if field, _, _ := unstructured.NestedFieldCopy(u.Object, "metadata", "labels"); field == nil { - _ = unstructured.SetNestedStringMap(u.Object, map[string]string{}, "metadata", "labels") - } - - _ = unstructured.SetNestedField(u.Object, revisionMetadata.Date.Format("2006-01-02T15:04:05.000Z"), "metadata", "labels", "app.meta.commit-date") - _ = unstructured.SetNestedField(u.Object, revisionMetadata.Author, "metadata", "labels", "app.meta.commit-author") - _ = unstructured.SetNestedField(u.Object, revisionMetadata.Message, "metadata", "labels", "app.meta.commit-message") - - return u -} - -func repoAppVersionsToEvent(applicationVersions *apiclient.ApplicationVersions) (*events.ApplicationVersions, error) { - applicationVersionsEvents := &events.ApplicationVersions{} - applicationVersionsData, _ := json.Marshal(applicationVersions) - err := json.Unmarshal(applicationVersionsData, applicationVersionsEvents) - if err != nil { - return nil, err - } - return applicationVersionsEvents, nil -} - -func safeString(s *string) string { - if s == nil { - return "" - } - return *s -} diff --git a/event_reporter/reporter/application_event_reporter_test.go b/event_reporter/reporter/application_event_reporter_test.go deleted file mode 100644 index 099af0242a3f0..0000000000000 --- a/event_reporter/reporter/application_event_reporter_test.go +++ /dev/null @@ -1,461 +0,0 @@ -package reporter - -import ( - "context" - "encoding/json" - "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - "github.com/argoproj/argo-cd/v2/util/io" - "net/http" - "testing" - "time" - - "github.com/ghodss/yaml" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "google.golang.org/grpc" - "k8s.io/apimachinery/pkg/runtime" - - "github.com/argoproj/argo-cd/v2/pkg/apiclient" - appsv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - fakeapps "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake" - appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions" - applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" - - servercache "github.com/argoproj/argo-cd/v2/server/cache" - cacheutil "github.com/argoproj/argo-cd/v2/util/cache" - appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" - - "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - repoApiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" - "github.com/argoproj/argo-cd/v2/util/argo" -) - -const ( - testNamespace = "default" -) - -func TestGetResourceEventPayload(t *testing.T) { - t.Run("Deleting timestamp is empty", func(t *testing.T) { - - app := v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - - man := "{ \"key\" : \"manifest\" }" - - actualState := application.ApplicationResourceResponse{ - Manifest: &man, - } - desiredState := repoApiclient.Manifest{ - CompiledManifest: "{ \"key\" : \"manifest\" }", - } - appTree := v1alpha1.ApplicationTree{} - revisionMetadata := v1alpha1.RevisionMetadata{ - Author: "demo usert", - Date: metav1.Time{}, - Message: "some message", - } - - event, err := getResourceEventPayload(&app, &rs, &actualState, &desiredState, &appTree, true, "", nil, &revisionMetadata, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &repoApiclient.ApplicationVersions{}) - assert.NoError(t, err) - - var eventPayload events.EventPayload - - err = json.Unmarshal(event.Payload, &eventPayload) - assert.NoError(t, err) - - assert.Equal(t, "{ \"key\" : \"manifest\" }", eventPayload.Source.DesiredManifest) - assert.Equal(t, "{ \"key\" : \"manifest\" }", eventPayload.Source.ActualManifest) - }) - - t.Run("Deleting timestamp is empty", func(t *testing.T) { - - app := v1alpha1.Application{ - ObjectMeta: metav1.ObjectMeta{ - DeletionTimestamp: &metav1.Time{}, - }, - Status: v1alpha1.ApplicationStatus{}, - } - rs := v1alpha1.ResourceStatus{} - man := "{ \"key\" : \"manifest\" }" - actualState := application.ApplicationResourceResponse{ - Manifest: &man, - } - desiredState := repoApiclient.Manifest{ - CompiledManifest: "{ \"key\" : \"manifest\" }", - } - appTree := v1alpha1.ApplicationTree{} - revisionMetadata := v1alpha1.RevisionMetadata{ - Author: "demo usert", - Date: metav1.Time{}, - Message: "some message", - } - - event, err := getResourceEventPayload(&app, &rs, &actualState, &desiredState, &appTree, true, "", nil, &revisionMetadata, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &repoApiclient.ApplicationVersions{}) - assert.NoError(t, err) - - var eventPayload events.EventPayload - - err = json.Unmarshal(event.Payload, &eventPayload) - assert.NoError(t, err) - - assert.Equal(t, "", eventPayload.Source.DesiredManifest) - assert.Equal(t, "", eventPayload.Source.ActualManifest) - }) -} - -func TestGetApplicationLatestRevision(t *testing.T) { - appRevision := "a-revision" - history1Revision := "history-revision-1" - history2Revision := "history-revision-2" - - t.Run("resource revision should be taken from sync.revision", func(t *testing.T) { - noStatusHistoryAppMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - Sync: v1alpha1.SyncStatus{ - Revision: appRevision, - }, - }, - } - - revisionResult := getApplicationLatestRevision(&noStatusHistoryAppMock) - assert.Equal(t, revisionResult, appRevision) - - emptyStatusHistoryAppMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - Sync: v1alpha1.SyncStatus{ - Revision: appRevision, - }, - History: []v1alpha1.RevisionHistory{}, - }, - } - - revision2Result := getApplicationLatestRevision(&emptyStatusHistoryAppMock) - assert.Equal(t, revision2Result, appRevision) - }) - - t.Run("resource revision should be taken from latest history.revision", func(t *testing.T) { - appMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - Sync: v1alpha1.SyncStatus{ - Revision: appRevision, - }, - History: []v1alpha1.RevisionHistory{ - v1alpha1.RevisionHistory{ - Revision: history1Revision, - }, - v1alpha1.RevisionHistory{ - Revision: history2Revision, - }, - }, - }, - } - - revisionResult := getApplicationLatestRevision(&appMock) - assert.Equal(t, revisionResult, history2Revision) - }) -} - -func TestGetLatestAppHistoryId(t *testing.T) { - history1Id := int64(1) - history2Id := int64(2) - - t.Run("resource revision should be 0", func(t *testing.T) { - noStatusHistoryAppMock := v1alpha1.Application{} - - idResult := getLatestAppHistoryId(&noStatusHistoryAppMock) - assert.Equal(t, idResult, int64(0)) - - emptyStatusHistoryAppMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - History: []v1alpha1.RevisionHistory{}, - }, - } - - id2Result := getLatestAppHistoryId(&emptyStatusHistoryAppMock) - assert.Equal(t, id2Result, int64(0)) - }) - - t.Run("resource revision should be taken from latest history.Id", func(t *testing.T) { - appMock := v1alpha1.Application{ - Status: v1alpha1.ApplicationStatus{ - History: []v1alpha1.RevisionHistory{ - v1alpha1.RevisionHistory{ - ID: history1Id, - }, - v1alpha1.RevisionHistory{ - ID: history2Id, - }, - }, - }, - } - - revisionResult := getLatestAppHistoryId(&appMock) - assert.Equal(t, revisionResult, history2Id) - }) -} - -func newAppLister(objects ...runtime.Object) applisters.ApplicationLister { - fakeAppsClientset := fakeapps.NewSimpleClientset(objects...) - factory := appinformer.NewSharedInformerFactoryWithOptions(fakeAppsClientset, 0, appinformer.WithNamespace(""), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) - appsInformer := factory.Argoproj().V1alpha1().Applications() - for _, obj := range objects { - switch obj.(type) { - case *appsv1.Application: - _ = appsInformer.Informer().GetStore().Add(obj) - } - } - appLister := appsInformer.Lister() - return appLister -} - -type MockcodefreshClient interface { - Send(ctx context.Context, appName string, event *events.Event) error -} - -type MockCodefreshConfig struct { - BaseURL string - AuthToken string -} - -type MockCodefreshClient struct { - cfConfig *MockCodefreshConfig - httpClient *http.Client -} - -func (cc *MockCodefreshClient) Send(ctx context.Context, appName string, event *events.Event) error { - return nil -} - -func fakeReporter() *applicationEventReporter { - guestbookApp := &appsv1.Application{ - TypeMeta: metav1.TypeMeta{ - Kind: "Application", - APIVersion: "argoproj.io/v1alpha1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "guestbook", - Namespace: testNamespace, - }, - Spec: appsv1.ApplicationSpec{ - Project: "default", - Source: &appsv1.ApplicationSource{ - RepoURL: "https://test", - TargetRevision: "HEAD", - Helm: &appsv1.ApplicationSourceHelm{ - ValueFiles: []string{"values.yaml"}, - }, - }, - }, - Status: appsv1.ApplicationStatus{ - History: appsv1.RevisionHistories{ - { - Revision: "abcdef123567", - Source: appsv1.ApplicationSource{ - RepoURL: "https://test", - TargetRevision: "HEAD", - Helm: &appsv1.ApplicationSourceHelm{ - ValueFiles: []string{"values-old.yaml"}, - }, - }, - }, - }, - }, - } - - appLister := newAppLister(guestbookApp) - - cache := servercache.NewCache( - appstatecache.NewCache( - cacheutil.NewCache(cacheutil.NewInMemoryCache(1*time.Hour)), - 1*time.Minute, - ), - 1*time.Minute, - 1*time.Minute, - 1*time.Minute, - ) - - cfClient := &MockCodefreshClient{ - cfConfig: &MockCodefreshConfig{ - BaseURL: "", - AuthToken: "", - }, - httpClient: &http.Client{ - Timeout: 30 * time.Second, - }, - } - - metricsServ := metrics.NewMetricsServer("", 8099) - closer, cdClient, _ := apiclient.NewClientOrDie(&apiclient.ClientOptions{ - ServerAddr: "site.com", - }).NewApplicationClient() - defer io.Close(closer) - - return &applicationEventReporter{ - cache, - cfClient, - appLister, - cdClient, - metricsServ, - } -} - -func TestShouldSendEvent(t *testing.T) { - eventReporter := fakeReporter() - t.Run("should send because cache is missing", func(t *testing.T) { - app := &v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - - res := eventReporter.shouldSendResourceEvent(app, rs) - assert.True(t, res) - }) - - t.Run("should not send - same entities", func(t *testing.T) { - app := &v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - - _ = eventReporter.cache.SetLastResourceEvent(app, rs, time.Minute, "") - - res := eventReporter.shouldSendResourceEvent(app, rs) - assert.False(t, res) - }) - - t.Run("should send - different entities", func(t *testing.T) { - app := &v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - - _ = eventReporter.cache.SetLastResourceEvent(app, rs, time.Minute, "") - - rs.Status = v1alpha1.SyncStatusCodeOutOfSync - - res := eventReporter.shouldSendResourceEvent(app, rs) - assert.True(t, res) - }) - -} - -type MockEventing_StartEventSourceServer struct { - grpc.ServerStream -} - -var result func(*events.Event) error - -func (m *MockEventing_StartEventSourceServer) Send(event *events.Event) error { - return result(event) -} - -func TestStreamApplicationEvent(t *testing.T) { - eventReporter := fakeReporter() - t.Run("root application", func(t *testing.T) { - app := &v1alpha1.Application{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "argoproj.io/v1alpha1", - Kind: "Application", - }, - } - - result = func(event *events.Event) error { - var payload events.EventPayload - _ = json.Unmarshal(event.Payload, &payload) - - var actualApp v1alpha1.Application - _ = json.Unmarshal([]byte(payload.Source.ActualManifest), &actualApp) - assert.Equal(t, *app, actualApp) - return nil - } - _ = eventReporter.StreamApplicationEvents(context.Background(), app, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel) - }) - -} - -func TestGetResourceEventPayloadWithoutRevision(t *testing.T) { - app := v1alpha1.Application{} - rs := v1alpha1.ResourceStatus{} - - mf := "{ \"key\" : \"manifest\" }" - - actualState := application.ApplicationResourceResponse{ - Manifest: &mf, - } - desiredState := repoApiclient.Manifest{ - CompiledManifest: "{ \"key\" : \"manifest\" }", - } - appTree := v1alpha1.ApplicationTree{} - - _, err := getResourceEventPayload(&app, &rs, &actualState, &desiredState, &appTree, true, "", nil, nil, nil, common.LabelKeyAppInstance, argo.TrackingMethodLabel, &repoApiclient.ApplicationVersions{}) - assert.NoError(t, err) - -} - -func StrToUnstructured(jsonStr string) *unstructured.Unstructured { - obj := make(map[string]interface{}) - err := yaml.Unmarshal([]byte(jsonStr), &obj) - if err != nil { - panic(err) - } - return &unstructured.Unstructured{Object: obj} -} - -func TestAddCommitDetailsToLabels(t *testing.T) { - revisionMetadata := v1alpha1.RevisionMetadata{ - Author: "demo usert", - Date: metav1.Time{}, - Message: "some message", - } - - t.Run("set labels when lable object missing", func(t *testing.T) { - resource := StrToUnstructured(` - apiVersion: v1 - kind: Service - metadata: - name: helm-guestbook - namespace: default - resourceVersion: "123" - uid: "4" - spec: - selector: - app: guestbook - type: LoadBalancer - status: - loadBalancer: - ingress: - - hostname: localhost`, - ) - - result := addCommitDetailsToLabels(resource, &revisionMetadata) - labels := result.GetLabels() - assert.Equal(t, revisionMetadata.Author, labels["app.meta.commit-author"]) - assert.Equal(t, revisionMetadata.Message, labels["app.meta.commit-message"]) - }) - - t.Run("set labels when labels present", func(t *testing.T) { - resource := StrToUnstructured(` - apiVersion: v1 - kind: Service - metadata: - name: helm-guestbook - namespace: default - labels: - link: http://my-grafana.com/pre-generated-link - spec: - selector: - app: guestbook - type: LoadBalancer - status: - loadBalancer: - ingress: - - hostname: localhost`, - ) - - result := addCommitDetailsToLabels(resource, &revisionMetadata) - labels := result.GetLabels() - assert.Equal(t, revisionMetadata.Author, labels["app.meta.commit-author"]) - assert.Equal(t, revisionMetadata.Message, labels["app.meta.commit-message"]) - assert.Equal(t, "http://my-grafana.com/pre-generated-link", result.GetLabels()["link"]) - }) -} diff --git a/event_reporter/reporter/broadcaster.go b/event_reporter/reporter/broadcaster.go deleted file mode 100644 index fbefbd9922da4..0000000000000 --- a/event_reporter/reporter/broadcaster.go +++ /dev/null @@ -1,140 +0,0 @@ -package reporter - -import ( - argocommon "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/event_reporter/sharding" - "github.com/argoproj/argo-cd/v2/util/env" - "math" - "sync" - - log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/watch" - - appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" -) - -type subscriber struct { - ch chan *appv1.ApplicationWatchEvent - filters []func(*appv1.ApplicationWatchEvent) bool -} - -func (s *subscriber) matches(event *appv1.ApplicationWatchEvent) bool { - for i := range s.filters { - if !s.filters[i](event) { - return false - } - } - return true -} - -// Broadcaster is an interface for broadcasting application informer watch events to multiple subscribers. -type Broadcaster interface { - Subscribe(ch chan *appv1.ApplicationWatchEvent, filters ...func(event *appv1.ApplicationWatchEvent) bool) func() - OnAdd(interface{}) - OnUpdate(interface{}, interface{}) - OnDelete(interface{}) -} - -type broadcasterHandler struct { - lock sync.Mutex - subscribers []*subscriber - filter sharding.ApplicationFilterFunction - featureManager *FeatureManager -} - -func NewBroadcaster(featureManager *FeatureManager) Broadcaster { - // todo: pass real value here - filter := getApplicationFilter("") - return &broadcasterHandler{ - filter: filter, - featureManager: featureManager, - } -} - -func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) { - // Make a local copy of b.subscribers, then send channel events outside the lock, - // to avoid data race on b.subscribers changes - subscribers := []*subscriber{} - b.lock.Lock() - subscribers = append(subscribers, b.subscribers...) - b.lock.Unlock() - - if !b.featureManager.ShouldReporterRun() { - log.Infof("filtering application '%s', event reporting is turned off and old one is in use", event.Application.Name) - return - } - - if b.filter != nil { - result, expectedShard := b.filter(&event.Application) - if !result { - log.Infof("filtering application '%s', wrong shard, should be %d", event.Application.Name, expectedShard) - return - } - } - - for _, s := range subscribers { - if s.matches(event) { - select { - case s.ch <- event: - log.Infof("adding application '%s' to channel", event.Application.Name) - default: - // drop event if cannot send right away - log.WithField("application", event.Application.Name).Warn("unable to send event notification") - } - } - } -} - -// Subscribe forward application informer watch events to the provided channel. -// The watch events are dropped if no receives are reading events from the channel so the channel must have -// buffer if dropping events is not acceptable. -func (b *broadcasterHandler) Subscribe(ch chan *appv1.ApplicationWatchEvent, filters ...func(event *appv1.ApplicationWatchEvent) bool) func() { - b.lock.Lock() - defer b.lock.Unlock() - subscriber := &subscriber{ch, filters} - b.subscribers = append(b.subscribers, subscriber) - return func() { - b.lock.Lock() - defer b.lock.Unlock() - for i := range b.subscribers { - if b.subscribers[i] == subscriber { - b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...) - break - } - } - } -} - -func (b *broadcasterHandler) OnAdd(obj interface{}) { - if app, ok := obj.(*appv1.Application); ok { - b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Added}) - } -} - -func (b *broadcasterHandler) OnUpdate(_, newObj interface{}) { - if app, ok := newObj.(*appv1.Application); ok { - b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Modified}) - } -} - -func (b *broadcasterHandler) OnDelete(obj interface{}) { - if app, ok := obj.(*appv1.Application); ok { - b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Deleted}) - } -} - -func getApplicationFilter(shardingAlgorithm string) sharding.ApplicationFilterFunction { - shardingSvc := sharding.NewSharding() - replicas := env.ParseNumFromEnv(argocommon.EnvEventReporterReplicas, 0, 0, math.MaxInt32) - var applicationFilter func(app *appv1.Application) (bool, int) - if replicas > 1 { - shard := sharding.GetShardNumber() - log.Infof("Processing applications from shard %d", shard) - log.Infof("Using filter function: %s", shardingAlgorithm) - distributionFunction := shardingSvc.GetDistributionFunction(shardingAlgorithm) - applicationFilter = shardingSvc.GetApplicationFilter(distributionFunction, shard) - } else { - log.Info("Processing all application shards") - } - return applicationFilter -} diff --git a/event_reporter/reporter/feature_manager.go b/event_reporter/reporter/feature_manager.go deleted file mode 100644 index c483ec9191b8e..0000000000000 --- a/event_reporter/reporter/feature_manager.go +++ /dev/null @@ -1,40 +0,0 @@ -package reporter - -import ( - settings_util "github.com/argoproj/argo-cd/v2/util/settings" - log "github.com/sirupsen/logrus" - "time" -) - -type FeatureManager struct { - settingsMgr *settings_util.SettingsManager - shouldRun bool -} - -func NewFeatureManager(settingsMgr *settings_util.SettingsManager) *FeatureManager { - return &FeatureManager{settingsMgr: settingsMgr} -} - -func (f *FeatureManager) setShouldRun() { - reporterVersion, err := f.settingsMgr.GetCodefreshReporterVersion() - if err != nil { - log.Warnf("Failed to get reporter version: %v", err) - f.shouldRun = false - return - } - f.shouldRun = reporterVersion == string(settings_util.CodefreshV2ReporterVersion) -} - -func (f *FeatureManager) Watch() { - f.setShouldRun() - // nolint:staticcheck - tick := time.Tick(5 * time.Second) - for { - <-tick - f.setShouldRun() - } -} - -func (f *FeatureManager) ShouldReporterRun() bool { - return f.shouldRun -} diff --git a/event_reporter/server.go b/event_reporter/server.go deleted file mode 100644 index 9f76a3b7e4920..0000000000000 --- a/event_reporter/server.go +++ /dev/null @@ -1,304 +0,0 @@ -package event_reporter - -import ( - "context" - "crypto/tls" - "fmt" - "github.com/argoproj/argo-cd/v2/event_reporter/reporter" - "net" - "net/http" - "os" - "strings" - "time" - - "github.com/argoproj/argo-cd/v2/common" - codefresh "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" - event_reporter "github.com/argoproj/argo-cd/v2/event_reporter/controller" - "github.com/argoproj/argo-cd/v2/event_reporter/handlers" - "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" - appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions" - applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" - repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - "github.com/argoproj/argo-cd/v2/server/rbacpolicy" - "github.com/argoproj/argo-cd/v2/server/repository" - "github.com/argoproj/argo-cd/v2/util/assets" - cacheutil "github.com/argoproj/argo-cd/v2/util/cache" - "github.com/argoproj/argo-cd/v2/util/db" - errorsutil "github.com/argoproj/argo-cd/v2/util/errors" - "github.com/argoproj/argo-cd/v2/util/healthz" - "github.com/argoproj/argo-cd/v2/util/io" - "github.com/argoproj/argo-cd/v2/util/rbac" - settings_util "github.com/argoproj/argo-cd/v2/util/settings" - "github.com/redis/go-redis/v9" - log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -const ( - // catches corrupted informer state; see https://github.com/argoproj/argo-cd/issues/4960 for more information - notObjectErrMsg = "object does not implement the Object interfaces" -) - -var backoff = wait.Backoff{ - Steps: 5, - Duration: 500 * time.Millisecond, - Factor: 1.0, - Jitter: 0.1, -} - -type EventReporterServer struct { - EventReporterServerOpts - - settings *settings_util.ArgoCDSettings - log *log.Entry - settingsMgr *settings_util.SettingsManager - enf *rbac.Enforcer - projInformer cache.SharedIndexInformer - projLister applisters.AppProjectNamespaceLister - policyEnforcer *rbacpolicy.RBACPolicyEnforcer - appInformer cache.SharedIndexInformer - appLister applisters.ApplicationLister - db db.ArgoDB - - // stopCh is the channel which when closed, will shutdown the Event Reporter server - stopCh chan struct{} - serviceSet *EventReporterServerSet - featureManager *reporter.FeatureManager -} - -type EventReporterServerSet struct { - RepoService *repository.Server - MetricsServer *metrics.MetricsServer -} - -type EventReporterServerOpts struct { - ListenPort int - ListenHost string - MetricsPort int - MetricsHost string - Namespace string - KubeClientset kubernetes.Interface - AppClientset appclientset.Interface - RepoClientset repoapiclient.Clientset - ApplicationServiceClient applicationpkg.ApplicationServiceClient - Cache *servercache.Cache - RedisClient *redis.Client - ApplicationNamespaces []string - BaseHRef string - RootPath string - CodefreshConfig *codefresh.CodefreshConfig -} - -type handlerSwitcher struct { - handler http.Handler - urlToHandler map[string]http.Handler - contentTypeToHandler map[string]http.Handler -} - -type Listeners struct { - Main net.Listener - Metrics net.Listener -} - -func (l *Listeners) Close() error { - if l.Main != nil { - if err := l.Main.Close(); err != nil { - return err - } - l.Main = nil - } - if l.Metrics != nil { - if err := l.Metrics.Close(); err != nil { - return err - } - l.Metrics = nil - } - return nil -} - -func (s *handlerSwitcher) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if urlHandler, ok := s.urlToHandler[r.URL.Path]; ok { - urlHandler.ServeHTTP(w, r) - } else if contentHandler, ok := s.contentTypeToHandler[r.Header.Get("content-type")]; ok { - contentHandler.ServeHTTP(w, r) - } else { - s.handler.ServeHTTP(w, r) - } -} - -func (a *EventReporterServer) healthCheck(r *http.Request) error { - if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" { - argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset) - _, err := argoDB.ListClusters(r.Context()) - if err != nil && strings.Contains(err.Error(), notObjectErrMsg) { - return err - } - } - return nil -} - -// Init starts informers used by the API server -func (a *EventReporterServer) Init(ctx context.Context) { - go a.appInformer.Run(ctx.Done()) - go a.featureManager.Watch() - svcSet := newEventReporterServiceSet(a) - a.serviceSet = svcSet -} - -func (a *EventReporterServer) RunController(ctx context.Context) { - controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager) - go controller.Run(ctx) -} - -// newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented -// using grpc-gateway as a proxy to the gRPC server. -func (a *EventReporterServer) newHTTPServer(ctx context.Context, port int) *http.Server { - endpoint := fmt.Sprintf("localhost:%d", port) - mux := http.NewServeMux() - httpS := http.Server{ - Addr: endpoint, - Handler: &handlerSwitcher{ - handler: mux, - }, - } - - healthz.ServeHealthCheck(mux, a.healthCheck) - - rH := handlers.GetRequestHandlers(a.ApplicationServiceClient) - mux.HandleFunc("/app-distribution", rH.GetAppDistribution) - - return &httpS -} - -func (a *EventReporterServer) checkServeErr(name string, err error) { - if err != nil { - if a.stopCh == nil { - // a nil stopCh indicates a graceful shutdown - log.Infof("graceful shutdown %s: %v", name, err) - } else { - log.Fatalf("%s: %v", name, err) - } - } else { - log.Infof("graceful shutdown %s", name) - } -} - -func startListener(host string, port int) (net.Listener, error) { - var conn net.Listener - var realErr error - _ = wait.ExponentialBackoff(backoff, func() (bool, error) { - conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) - if realErr != nil { - return false, nil - } - return true, nil - }) - return conn, realErr -} - -func (a *EventReporterServer) Listen() (*Listeners, error) { - mainLn, err := startListener(a.ListenHost, a.ListenPort) - if err != nil { - return nil, err - } - metricsLn, err := startListener(a.MetricsHost, a.MetricsPort) - if err != nil { - io.Close(mainLn) - return nil, err - } - return &Listeners{Main: mainLn, Metrics: metricsLn}, nil -} - -// Run runs the API Server -// We use k8s.io/code-generator/cmd/go-to-protobuf to generate the .proto files from the API types. -// k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of -// golang/protobuf). -func (a *EventReporterServer) Run(ctx context.Context, lns *Listeners) { - var httpS = a.newHTTPServer(ctx, a.ListenPort) - tlsConfig := tls.Config{} - tlsConfig.GetCertificate = func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { - return a.settings.Certificate, nil - } - go func() { a.checkServeErr("httpS", httpS.Serve(lns.Main)) }() - go func() { a.checkServeErr("metrics", a.serviceSet.MetricsServer.Serve(lns.Metrics)) }() - go a.RunController(ctx) - - if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) { - log.Fatal("Timed out waiting for project cache to sync") - } - - a.stopCh = make(chan struct{}) - <-a.stopCh -} - -// NewServer returns a new instance of the Event Reporter server -func NewEventReporterServer(ctx context.Context, opts EventReporterServerOpts) *EventReporterServer { - settingsMgr := settings_util.NewSettingsManager(ctx, opts.KubeClientset, opts.Namespace) - settings, err := settingsMgr.InitializeSettings(true) - errorsutil.CheckError(err) - - appInformerNs := opts.Namespace - if len(opts.ApplicationNamespaces) > 0 { - appInformerNs = "" - } - projFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(opts.Namespace), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) - appFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(appInformerNs), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) - - projInformer := projFactory.Argoproj().V1alpha1().AppProjects().Informer() - projLister := projFactory.Argoproj().V1alpha1().AppProjects().Lister().AppProjects(opts.Namespace) - - appInformer := appFactory.Argoproj().V1alpha1().Applications().Informer() - appLister := appFactory.Argoproj().V1alpha1().Applications().Lister() - - enf := rbac.NewEnforcer(opts.KubeClientset, opts.Namespace, common.ArgoCDRBACConfigMapName, nil) - enf.EnableEnforce(false) - err = enf.SetBuiltinPolicy(assets.BuiltinPolicyCSV) - errorsutil.CheckError(err) - enf.EnableLog(os.Getenv(common.EnvVarRBACDebug) == "1") - - policyEnf := rbacpolicy.NewRBACPolicyEnforcer(enf, projLister) - enf.SetClaimsEnforcerFunc(policyEnf.EnforceClaims) - - dbInstance := db.NewDB(opts.Namespace, settingsMgr, opts.KubeClientset) - - server := &EventReporterServer{ - EventReporterServerOpts: opts, - log: log.NewEntry(log.StandardLogger()), - settings: settings, - settingsMgr: settingsMgr, - enf: enf, - projInformer: projInformer, - projLister: projLister, - appInformer: appInformer, - appLister: appLister, - policyEnforcer: policyEnf, - db: dbInstance, - featureManager: reporter.NewFeatureManager(settingsMgr), - } - - if err != nil { - // Just log. It's not critical. - log.Warnf("Failed to log in-cluster warnings: %v", err) - } - - return server -} - -func newEventReporterServiceSet(a *EventReporterServer) *EventReporterServerSet { - repoService := repository.NewServer(a.RepoClientset, a.db, a.enf, a.Cache, a.appLister, a.projInformer, a.Namespace, a.settingsMgr) - metricsServer := metrics.NewMetricsServer(a.MetricsHost, a.MetricsPort) - if a.RedisClient != nil { - cacheutil.CollectMetrics(a.RedisClient, metricsServer) - } - - return &EventReporterServerSet{ - RepoService: repoService, - MetricsServer: metricsServer, - } -} diff --git a/event_reporter/sharding/sharding.go b/event_reporter/sharding/sharding.go deleted file mode 100644 index 0149cb71694ab..0000000000000 --- a/event_reporter/sharding/sharding.go +++ /dev/null @@ -1,101 +0,0 @@ -package sharding - -import ( - "fmt" - argocommon "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - "hash/fnv" - "math" - "os" - "strconv" - "strings" - - "github.com/argoproj/argo-cd/v2/util/env" - log "github.com/sirupsen/logrus" -) - -var osHostnameFunction = os.Hostname - -type DistributionFunction func(c *v1alpha1.Application) int -type ApplicationFilterFunction func(c *v1alpha1.Application) (bool, int) - -type Sharding interface { - GetApplicationFilter(distributionFunction DistributionFunction, shard int) ApplicationFilterFunction - GetDistributionFunction(shardingAlgorithm string) DistributionFunction -} - -type sharding struct { -} - -func NewSharding() Sharding { - return &sharding{} -} - -func (s *sharding) GetApplicationFilter(distributionFunction DistributionFunction, shard int) ApplicationFilterFunction { - return func(app *v1alpha1.Application) (bool, int) { - expectedShard := distributionFunction(app) - // TODO: [reporter] provide ability define label with shard number - return expectedShard == shard, expectedShard - } -} - -// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and -// the current datas. -func (s *sharding) GetDistributionFunction(shardingAlgorithm string) DistributionFunction { - log.Infof("Using filter function: %s", shardingAlgorithm) - //TODO: implement switch case for multiple strategies - return s.LegacyDistributionFunction() -} - -func (s *sharding) LegacyDistributionFunction() DistributionFunction { - replicas := env.ParseNumFromEnv(argocommon.EnvEventReporterReplicas, 0, 0, math.MaxInt32) - return func(a *v1alpha1.Application) int { - if replicas == 0 { - return -1 - } - if a == nil { - return 0 - } - id := a.Name - if id == "" { - return 0 - } else { - h := fnv.New32a() - _, _ = h.Write([]byte(id)) - shard := int32(h.Sum32() % uint32(replicas)) - log.Debugf("Application with id=%s will be processed by shard %d", id, shard) - return int(shard) - } - } -} - -// InferShard extracts the shard index based on its hostname. -func InferShard() (int, error) { - hostname, err := osHostnameFunction() - if err != nil { - return 0, err - } - parts := strings.Split(hostname, "-") - if len(parts) == 0 { - return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname) - } - shard, err := strconv.Atoi(parts[len(parts)-1]) - if err != nil { - return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname) - } - return int(shard), nil -} - -func GetShardNumber() int { - shard := env.ParseNumFromEnv(argocommon.EnvEventReporterShard, -1, -math.MaxInt32, math.MaxInt32) - - if shard < 0 { - var err error - shard, err = InferShard() - if err != nil { - return -1 - } - } - - return shard -} diff --git a/go.mod b/go.mod index e21e75a883042..ec361d104dbc9 100644 --- a/go.mod +++ b/go.mod @@ -213,7 +213,7 @@ require ( github.com/opsgenie/opsgenie-go-sdk-v2 v1.0.5 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect - github.com/pkg/errors v0.9.1 + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect diff --git a/manifests/base/event-reporter/event-reporter-metrics.yaml b/manifests/base/event-reporter/event-reporter-metrics.yaml deleted file mode 100644 index bb9e261d8321e..0000000000000 --- a/manifests/base/event-reporter/event-reporter-metrics.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - labels: - app.kubernetes.io/name: event-reporter-metrics - app.kubernetes.io/part-of: argocd - app.kubernetes.io/component: event-reporter - name: event-reporter-metrics -spec: - ports: - - name: metrics - protocol: TCP - port: 8087 - targetPort: 8087 - selector: - app.kubernetes.io/name: event-reporter diff --git a/manifests/base/event-reporter/event-reporter-role.yaml b/manifests/base/event-reporter/event-reporter-role.yaml deleted file mode 100644 index ddb1508d89690..0000000000000 --- a/manifests/base/event-reporter/event-reporter-role.yaml +++ /dev/null @@ -1,43 +0,0 @@ -apiVersion: rbac.authorization.k8s.io/v1 -kind: Role -metadata: - labels: - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - app.kubernetes.io/component: event-reporter - name: event-reporter -rules: -- apiGroups: - - "" - resources: - - secrets - - configmaps - verbs: - - create - - get - - list - - watch - - update - - patch - - delete -- apiGroups: - - argoproj.io - resources: - - applications - - appprojects - - applicationsets - verbs: - - create - - get - - list - - watch - - update - - delete - - patch -- apiGroups: - - "" - resources: - - events - verbs: - - create - - list diff --git a/manifests/base/event-reporter/event-reporter-rolebinding.yaml b/manifests/base/event-reporter/event-reporter-rolebinding.yaml deleted file mode 100644 index 1bbc0851f803d..0000000000000 --- a/manifests/base/event-reporter/event-reporter-rolebinding.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding -metadata: - labels: - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - app.kubernetes.io/component: event-reporter - name: event-reporter -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: Role - name: event-reporter -subjects: -- kind: ServiceAccount - name: event-reporter diff --git a/manifests/base/event-reporter/event-reporter-sa.yaml b/manifests/base/event-reporter/event-reporter-sa.yaml deleted file mode 100644 index 51d4e39fbb50e..0000000000000 --- a/manifests/base/event-reporter/event-reporter-sa.yaml +++ /dev/null @@ -1,8 +0,0 @@ -apiVersion: v1 -kind: ServiceAccount -metadata: - labels: - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - app.kubernetes.io/component: event-reporter - name: event-reporter diff --git a/manifests/base/event-reporter/event-reporter-service.yaml b/manifests/base/event-reporter/event-reporter-service.yaml deleted file mode 100644 index 6a89a2f9d17db..0000000000000 --- a/manifests/base/event-reporter/event-reporter-service.yaml +++ /dev/null @@ -1,20 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - labels: - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - app.kubernetes.io/component: event-reporter - name: event-reporter -spec: - ports: - - name: http - protocol: TCP - port: 80 - targetPort: 8088 - - name: https - protocol: TCP - port: 443 - targetPort: 8088 - selector: - app.kubernetes.io/name: event-reporter diff --git a/manifests/base/event-reporter/event-reporter-statefulset.yaml b/manifests/base/event-reporter/event-reporter-statefulset.yaml deleted file mode 100644 index 92d336b0681ce..0000000000000 --- a/manifests/base/event-reporter/event-reporter-statefulset.yaml +++ /dev/null @@ -1,168 +0,0 @@ -apiVersion: apps/v1 -kind: StatefulSet -metadata: - labels: - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - app.kubernetes.io/component: event-reporter - name: event-reporter -spec: - replicas: 5 - serviceName: event-reporter - selector: - matchLabels: - app.kubernetes.io/name: event-reporter - template: - metadata: - labels: - app.kubernetes.io/name: event-reporter - spec: - serviceAccountName: event-reporter - containers: - - name: event-reporter - image: quay.io/argoproj/argocd:latest - imagePullPolicy: Always - args: - - /usr/local/bin/event-reporter-server - env: - - name: EVENT_REPORTER_REPLICAS - value: "5" - - name: ARGOCD_TOKEN - valueFrom: - secretKeyRef: - key: token - name: argocd-token - - name: CODEFRESH_URL - valueFrom: - configMapKeyRef: - name: codefresh-cm - key: base-url - optional: true - - name: CODEFRESH_TOKEN - valueFrom: - secretKeyRef: - key: token - name: codefresh-token - - name: EVENT_REPORTER_INSECURE - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: event-reporter.insecure - optional: true - - name: EVENT_REPORTER_LOGFORMAT - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: event-reporter.log.format - optional: true - - name: EVENT_REPORTER_LOG_LEVEL - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: event-reporter.log.level - optional: true - - name: EVENT_REPORTER_REPO_SERVER - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: repo.server - optional: true - - name: EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: event-reporter.repo.server.timeout.seconds - optional: true - - name: EVENT_REPORTER_REPO_SERVER_PLAINTEXT - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: event-reporter.repo.server.plaintext - optional: true - - name: REDIS_SERVER - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: redis.server - optional: true - - name: REDISDB - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: redis.db - optional: true - - name: EVENT_REPORTER_LISTEN_ADDRESS - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: event-reporter.listen.address - optional: true - - name: EVENT_REPORTER_METRICS_LISTEN_ADDRESS - valueFrom: - configMapKeyRef: - name: argocd-cmd-params-cm - key: event-reporter.metrics.listen.address - optional: true - volumeMounts: - - name: argocd-repo-server-tls - mountPath: /app/config/server/tls - - mountPath: /tmp - name: tmp - ports: - - containerPort: 8088 - name: health - - containerPort: 8087 - name: metrics - livenessProbe: - httpGet: - path: /healthz?full=true - port: health - initialDelaySeconds: 3 - periodSeconds: 30 - timeoutSeconds: 5 - readinessProbe: - httpGet: - path: /healthz - port: health - initialDelaySeconds: 3 - periodSeconds: 30 - securityContext: - allowPrivilegeEscalation: false - readOnlyRootFilesystem: true - runAsNonRoot: true - capabilities: - drop: - - ALL - seccompProfile: - type: RuntimeDefault - volumes: - - emptyDir: {} - name: plugins-home - - emptyDir: {} - name: tmp - - name: argocd-repo-server-tls - secret: - secretName: argocd-repo-server-tls - optional: true - items: - - key: tls.crt - path: tls.crt - - key: tls.key - path: tls.key - - key: ca.crt - path: ca.crt - affinity: - podAntiAffinity: - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 100 - podAffinityTerm: - labelSelector: - matchLabels: - app.kubernetes.io/name: event-reporter - topologyKey: kubernetes.io/hostname - - weight: 5 - podAffinityTerm: - labelSelector: - matchLabels: - app.kubernetes.io/part-of: argocd - topologyKey: kubernetes.io/hostname diff --git a/manifests/base/event-reporter/kustomization.yaml b/manifests/base/event-reporter/kustomization.yaml deleted file mode 100644 index eb7c1067f0008..0000000000000 --- a/manifests/base/event-reporter/kustomization.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: kustomize.config.k8s.io/v1beta1 -kind: Kustomization - -resources: -- event-reporter-statefulset.yaml -- event-reporter-role.yaml -- event-reporter-rolebinding.yaml -- event-reporter-sa.yaml -- event-reporter-service.yaml -- event-reporter-metrics.yaml \ No newline at end of file diff --git a/manifests/base/kustomization.yaml b/manifests/base/kustomization.yaml index 50254c28bc233..7be81fd547fcf 100644 --- a/manifests/base/kustomization.yaml +++ b/manifests/base/kustomization.yaml @@ -13,4 +13,3 @@ resources: - ./config - ./redis - ./applicationset-controller -- ./event-reporter diff --git a/manifests/install.yaml b/manifests/install.yaml index c0320c443b7a4..3f1f872d07bc1 100644 --- a/manifests/install.yaml +++ b/manifests/install.yaml @@ -18859,15 +18859,6 @@ metadata: app.kubernetes.io/part-of: argocd name: argocd-server --- -apiVersion: v1 -kind: ServiceAccount -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter ---- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: @@ -19037,50 +19028,6 @@ rules: - list --- apiVersion: rbac.authorization.k8s.io/v1 -kind: Role -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter -rules: -- apiGroups: - - "" - resources: - - secrets - - configmaps - verbs: - - create - - get - - list - - watch - - update - - patch - - delete -- apiGroups: - - argoproj.io - resources: - - applications - - appprojects - - applicationsets - verbs: - - create - - get - - list - - watch - - update - - delete - - patch -- apiGroups: - - "" - resources: - - events - verbs: - - create - - list ---- -apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: labels: @@ -19204,22 +19151,6 @@ subjects: name: argocd-server --- apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: Role - name: event-reporter -subjects: -- kind: ServiceAccount - name: event-reporter ---- -apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: labels: @@ -19465,44 +19396,6 @@ spec: selector: app.kubernetes.io/name: argocd-server --- -apiVersion: v1 -kind: Service -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter -spec: - ports: - - name: http - port: 80 - protocol: TCP - targetPort: 8088 - - name: https - port: 443 - protocol: TCP - targetPort: 8088 - selector: - app.kubernetes.io/name: event-reporter ---- -apiVersion: v1 -kind: Service -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter-metrics - app.kubernetes.io/part-of: argocd - name: event-reporter-metrics -spec: - ports: - - name: metrics - port: 8087 - protocol: TCP - targetPort: 8087 - selector: - app.kubernetes.io/name: event-reporter ---- apiVersion: apps/v1 kind: Deployment metadata: @@ -20655,175 +20548,6 @@ spec: optional: true secretName: argocd-repo-server-tls --- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter -spec: - replicas: 5 - selector: - matchLabels: - app.kubernetes.io/name: event-reporter - serviceName: event-reporter - template: - metadata: - labels: - app.kubernetes.io/name: event-reporter - spec: - affinity: - podAntiAffinity: - preferredDuringSchedulingIgnoredDuringExecution: - - podAffinityTerm: - labelSelector: - matchLabels: - app.kubernetes.io/name: event-reporter - topologyKey: kubernetes.io/hostname - weight: 100 - - podAffinityTerm: - labelSelector: - matchLabels: - app.kubernetes.io/part-of: argocd - topologyKey: kubernetes.io/hostname - weight: 5 - containers: - - args: - - /usr/local/bin/event-reporter-server - env: - - name: EVENT_REPORTER_REPLICAS - value: "5" - - name: ARGOCD_TOKEN - valueFrom: - secretKeyRef: - key: token - name: argocd-token - - name: CODEFRESH_URL - valueFrom: - configMapKeyRef: - key: base-url - name: codefresh-cm - optional: true - - name: CODEFRESH_TOKEN - valueFrom: - secretKeyRef: - key: token - name: codefresh-token - - name: EVENT_REPORTER_INSECURE - valueFrom: - configMapKeyRef: - key: event-reporter.insecure - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_LOGFORMAT - valueFrom: - configMapKeyRef: - key: event-reporter.log.format - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_LOG_LEVEL - valueFrom: - configMapKeyRef: - key: event-reporter.log.level - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_REPO_SERVER - valueFrom: - configMapKeyRef: - key: repo.server - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS - valueFrom: - configMapKeyRef: - key: event-reporter.repo.server.timeout.seconds - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_REPO_SERVER_PLAINTEXT - valueFrom: - configMapKeyRef: - key: event-reporter.repo.server.plaintext - name: argocd-cmd-params-cm - optional: true - - name: REDIS_SERVER - valueFrom: - configMapKeyRef: - key: redis.server - name: argocd-cmd-params-cm - optional: true - - name: REDISDB - valueFrom: - configMapKeyRef: - key: redis.db - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_LISTEN_ADDRESS - valueFrom: - configMapKeyRef: - key: event-reporter.listen.address - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_METRICS_LISTEN_ADDRESS - valueFrom: - configMapKeyRef: - key: event-reporter.metrics.listen.address - name: argocd-cmd-params-cm - optional: true - image: quay.io/codefresh/argocd:latest - imagePullPolicy: Always - livenessProbe: - httpGet: - path: /healthz?full=true - port: health - initialDelaySeconds: 3 - periodSeconds: 30 - timeoutSeconds: 5 - name: event-reporter - ports: - - containerPort: 8088 - name: health - - containerPort: 8087 - name: metrics - readinessProbe: - httpGet: - path: /healthz - port: health - initialDelaySeconds: 3 - periodSeconds: 30 - securityContext: - allowPrivilegeEscalation: false - capabilities: - drop: - - ALL - readOnlyRootFilesystem: true - runAsNonRoot: true - seccompProfile: - type: RuntimeDefault - volumeMounts: - - mountPath: /app/config/server/tls - name: argocd-repo-server-tls - - mountPath: /tmp - name: tmp - serviceAccountName: event-reporter - volumes: - - emptyDir: {} - name: plugins-home - - emptyDir: {} - name: tmp - - name: argocd-repo-server-tls - secret: - items: - - key: tls.crt - path: tls.crt - - key: tls.key - path: tls.key - - key: ca.crt - path: ca.crt - optional: true - secretName: argocd-repo-server-tls ---- apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: diff --git a/manifests/namespace-install.yaml b/manifests/namespace-install.yaml index d1060987a17e6..a60485ed64a1a 100644 --- a/manifests/namespace-install.yaml +++ b/manifests/namespace-install.yaml @@ -53,15 +53,6 @@ metadata: app.kubernetes.io/part-of: argocd name: argocd-server --- -apiVersion: v1 -kind: ServiceAccount -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter ---- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: @@ -231,50 +222,6 @@ rules: - list --- apiVersion: rbac.authorization.k8s.io/v1 -kind: Role -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter -rules: -- apiGroups: - - "" - resources: - - secrets - - configmaps - verbs: - - create - - get - - list - - watch - - update - - patch - - delete -- apiGroups: - - argoproj.io - resources: - - applications - - appprojects - - applicationsets - verbs: - - create - - get - - list - - watch - - update - - delete - - patch -- apiGroups: - - "" - resources: - - events - verbs: - - create - - list ---- -apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: labels: @@ -338,22 +285,6 @@ subjects: - kind: ServiceAccount name: argocd-server --- -apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: Role - name: event-reporter -subjects: -- kind: ServiceAccount - name: event-reporter ---- apiVersion: v1 kind: ConfigMap metadata: @@ -566,44 +497,6 @@ spec: selector: app.kubernetes.io/name: argocd-server --- -apiVersion: v1 -kind: Service -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter -spec: - ports: - - name: http - port: 80 - protocol: TCP - targetPort: 8088 - - name: https - port: 443 - protocol: TCP - targetPort: 8088 - selector: - app.kubernetes.io/name: event-reporter ---- -apiVersion: v1 -kind: Service -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter-metrics - app.kubernetes.io/part-of: argocd - name: event-reporter-metrics -spec: - ports: - - name: metrics - port: 8087 - protocol: TCP - targetPort: 8087 - selector: - app.kubernetes.io/name: event-reporter ---- apiVersion: apps/v1 kind: Deployment metadata: @@ -1756,175 +1649,6 @@ spec: optional: true secretName: argocd-repo-server-tls --- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - labels: - app.kubernetes.io/component: event-reporter - app.kubernetes.io/name: event-reporter - app.kubernetes.io/part-of: argocd - name: event-reporter -spec: - replicas: 5 - selector: - matchLabels: - app.kubernetes.io/name: event-reporter - serviceName: event-reporter - template: - metadata: - labels: - app.kubernetes.io/name: event-reporter - spec: - affinity: - podAntiAffinity: - preferredDuringSchedulingIgnoredDuringExecution: - - podAffinityTerm: - labelSelector: - matchLabels: - app.kubernetes.io/name: event-reporter - topologyKey: kubernetes.io/hostname - weight: 100 - - podAffinityTerm: - labelSelector: - matchLabels: - app.kubernetes.io/part-of: argocd - topologyKey: kubernetes.io/hostname - weight: 5 - containers: - - args: - - /usr/local/bin/event-reporter-server - env: - - name: EVENT_REPORTER_REPLICAS - value: "5" - - name: ARGOCD_TOKEN - valueFrom: - secretKeyRef: - key: token - name: argocd-token - - name: CODEFRESH_URL - valueFrom: - configMapKeyRef: - key: base-url - name: codefresh-cm - optional: true - - name: CODEFRESH_TOKEN - valueFrom: - secretKeyRef: - key: token - name: codefresh-token - - name: EVENT_REPORTER_INSECURE - valueFrom: - configMapKeyRef: - key: event-reporter.insecure - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_LOGFORMAT - valueFrom: - configMapKeyRef: - key: event-reporter.log.format - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_LOG_LEVEL - valueFrom: - configMapKeyRef: - key: event-reporter.log.level - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_REPO_SERVER - valueFrom: - configMapKeyRef: - key: repo.server - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS - valueFrom: - configMapKeyRef: - key: event-reporter.repo.server.timeout.seconds - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_REPO_SERVER_PLAINTEXT - valueFrom: - configMapKeyRef: - key: event-reporter.repo.server.plaintext - name: argocd-cmd-params-cm - optional: true - - name: REDIS_SERVER - valueFrom: - configMapKeyRef: - key: redis.server - name: argocd-cmd-params-cm - optional: true - - name: REDISDB - valueFrom: - configMapKeyRef: - key: redis.db - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_LISTEN_ADDRESS - valueFrom: - configMapKeyRef: - key: event-reporter.listen.address - name: argocd-cmd-params-cm - optional: true - - name: EVENT_REPORTER_METRICS_LISTEN_ADDRESS - valueFrom: - configMapKeyRef: - key: event-reporter.metrics.listen.address - name: argocd-cmd-params-cm - optional: true - image: quay.io/codefresh/argocd:latest - imagePullPolicy: Always - livenessProbe: - httpGet: - path: /healthz?full=true - port: health - initialDelaySeconds: 3 - periodSeconds: 30 - timeoutSeconds: 5 - name: event-reporter - ports: - - containerPort: 8088 - name: health - - containerPort: 8087 - name: metrics - readinessProbe: - httpGet: - path: /healthz - port: health - initialDelaySeconds: 3 - periodSeconds: 30 - securityContext: - allowPrivilegeEscalation: false - capabilities: - drop: - - ALL - readOnlyRootFilesystem: true - runAsNonRoot: true - seccompProfile: - type: RuntimeDefault - volumeMounts: - - mountPath: /app/config/server/tls - name: argocd-repo-server-tls - - mountPath: /tmp - name: tmp - serviceAccountName: event-reporter - volumes: - - emptyDir: {} - name: plugins-home - - emptyDir: {} - name: tmp - - name: argocd-repo-server-tls - secret: - items: - - key: tls.crt - path: tls.crt - - key: tls.key - path: tls.key - - key: ca.crt - path: ca.crt - optional: true - secretName: argocd-repo-server-tls ---- apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: diff --git a/server/application/application.go b/server/application/application.go index a4778c26025cd..011b09b9e6035 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -1121,16 +1121,8 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing for { select { case event := <-eventsChannel: - log.Infof("event channel size is %d", len(eventsChannel)) - rVersion, _ := s.settingsMgr.GetCodefreshReporterVersion() - if rVersion == string(settings.CodefreshV2ReporterVersion) { - logCtx.Info("v1 reported disabled skipping event") - continue - } - shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) if !shouldProcess { - log.Infof("ignore event for app %s", event.Application.Name) continue } ts := time.Now().Format("2006-01-02T15:04:05.000Z") diff --git a/util/settings/settings.go b/util/settings/settings.go index 668ed8799e440..7762224f7284d 100644 --- a/util/settings/settings.go +++ b/util/settings/settings.go @@ -202,14 +202,6 @@ type KustomizeSettings struct { Versions []KustomizeVersion } -// CodefreshReporterVersion includes all cf reporter versions -type CodefreshReporterVersion string - -const ( - CodefreshV1ReporterVersion CodefreshReporterVersion = "v1" - CodefreshV2ReporterVersion CodefreshReporterVersion = "v2" -) - var ( ByClusterURLIndexer = "byClusterURL" byClusterURLIndexerFunc = func(obj interface{}) ([]string, error) { @@ -424,8 +416,6 @@ const ( settingsWebhookGogsSecretKey = "webhook.gogs.secret" // settingsApplicationInstanceLabelKey is the key to configure injected app instance label key settingsApplicationInstanceLabelKey = "application.instanceLabelKey" - // settingsCodefreshReporterVersion is the key to configure injected app instance label key - settingsCodefreshReporterVersion = "codefresh.reporterVersion" // settingsResourceTrackingMethodKey is the key to configure tracking method for application resources settingsResourceTrackingMethodKey = "application.resourceTrackingMethod" // resourcesCustomizationsKey is the key to the map of resource overrides @@ -730,18 +720,6 @@ func (mgr *SettingsManager) GetAppInstanceLabelKey() (string, error) { return label, nil } -func (mgr *SettingsManager) GetCodefreshReporterVersion() (string, error) { - argoCDCM, err := mgr.getConfigMap() - if err != nil { - return "", err - } - label := argoCDCM.Data[settingsCodefreshReporterVersion] - if label == "" { - return string(CodefreshV1ReporterVersion), nil - } - return label, nil -} - func (mgr *SettingsManager) GetKustomizeSetNamespaceEnabled() bool { argoCDCM, err := mgr.getConfigMap() if err != nil {