From b78576af8fc9cbc7dd8485fa5715d69a96c4d482 Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Tue, 29 Oct 2024 13:19:11 -0700 Subject: [PATCH] chore: Extract Reconciler from Parser (#1461) - Split the Parser interface into Parser & SyncStatusClient. - Replace the root & namespace structs with seperate implementations of the Parser & SyncStatusClient interfaces. - Create a new common reconciler struct which composes the Parser & SyncStatusClient interfaces. The reconciler struct can now hold common methods, and the current ReconcilerState, which encompases spec, status, and cache for all reconciler phases, not just the parser. - Add a new Reconciler interface, for use by the EventHandler and DefaultRunFunc. We may want to move DefaultRunFunc to be a reconciler method in the future, but this would relocate all the run code, so we can do it in a seperate change. - Move the Mutex out of the Options and into the Reconciler. Options should generally not be mutated after construction. - Change interface methods to public. There's no reason to have private interface methods, even if the interface itself is private. - Add ReconcilerOptions, to contain options that the Parser doesn't use itself. ReconcilerOptions extends parse.Options to ensure the injected values are consistent and simplify usage. - Move SyncErrorCache into the ReconcilerState, since it's not just Parser or Updater errors. - Move DeclaredCRDs method from the Updater into declared.Resources. Then inject the resources into the Parser, so it doesn't need to depend on the whole Updater just to get the list of valid CRDs. The naming here is still confusing, since it's not just the resource/objects declared in the source, but actually the set of valid & unskipped/known-scoped source objects, after being parsed. Managing the declared.Resources should probably be moved from the Updater to the Reconciler in the future. --- pkg/declared/resources.go | 21 + pkg/parse/event_handler.go | 42 +- pkg/parse/opts.go | 76 +- pkg/parse/parser.go | 28 + pkg/parse/reconciler.go | 117 +++ .../{namespace.go => repo_sync_client.go} | 158 +--- pkg/parse/repo_sync_parser.go | 82 ++ .../{root_test.go => root_reconciler_test.go} | 877 +++++------------- pkg/parse/{root.go => root_sync_client.go} | 239 +---- pkg/parse/root_sync_client_test.go | 577 ++++++++++++ pkg/parse/root_sync_parser.go | 177 ++++ pkg/parse/run.go | 135 +-- pkg/parse/run_test.go | 64 +- pkg/parse/source_test.go | 47 +- pkg/parse/state.go | 33 +- pkg/parse/sync_status_client.go | 35 + pkg/parse/updater.go | 23 +- pkg/reconciler/reconciler.go | 55 +- 18 files changed, 1569 insertions(+), 1217 deletions(-) create mode 100644 pkg/parse/parser.go create mode 100644 pkg/parse/reconciler.go rename pkg/parse/{namespace.go => repo_sync_client.go} (67%) create mode 100644 pkg/parse/repo_sync_parser.go rename pkg/parse/{root_test.go => root_reconciler_test.go} (70%) rename pkg/parse/{root.go => root_sync_client.go} (70%) create mode 100644 pkg/parse/root_sync_client_test.go create mode 100644 pkg/parse/root_sync_parser.go create mode 100644 pkg/parse/sync_status_client.go diff --git a/pkg/declared/resources.go b/pkg/declared/resources.go index 9e93e368d9..43be3d8db8 100644 --- a/pkg/declared/resources.go +++ b/pkg/declared/resources.go @@ -19,12 +19,15 @@ import ( "sync" "github.com/elliotchance/orderedmap/v2" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/syncer/reconcile" + "kpt.dev/configsync/pkg/util/clusterconfig" "sigs.k8s.io/controller-runtime/pkg/client" "kpt.dev/configsync/pkg/core" @@ -150,3 +153,21 @@ func (r *Resources) DeclaredGVKs() (map[schema.GroupVersionKind]struct{}, string } return gvkSet, r.commit } + +// DeclaredCRDs returns the list of CRDs declared in the source. +func (r *Resources) DeclaredCRDs() ([]*v1beta1.CustomResourceDefinition, status.MultiError) { + // DeclaredUnstructureds handles the mutex, so this method doesn't need to lock. + var crds []*v1beta1.CustomResourceDefinition + declaredObjs, _ := r.DeclaredUnstructureds() + for _, obj := range declaredObjs { + if obj.GroupVersionKind().GroupKind() != kinds.CustomResourceDefinition() { + continue + } + crd, err := clusterconfig.AsCRD(obj) + if err != nil { + return nil, err + } + crds = append(crds, crd) + } + return crds, nil +} diff --git a/pkg/parse/event_handler.go b/pkg/parse/event_handler.go index 8fa038d94c..bce8fec196 100644 --- a/pkg/parse/event_handler.go +++ b/pkg/parse/event_handler.go @@ -27,18 +27,16 @@ import ( // triggers the RunFunc when appropriate. type EventHandler struct { Context context.Context - Parser Parser - ReconcilerState *reconcilerState + Reconciler Reconciler NSControllerState *namespacecontroller.State Run RunFunc } // NewEventHandler builds an EventHandler -func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler { +func NewEventHandler(ctx context.Context, r Reconciler, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler { return &EventHandler{ Context: ctx, - Parser: parser, - ReconcilerState: &reconcilerState{}, + Reconciler: r, NSControllerState: nsControllerState, Run: runFn, } @@ -54,13 +52,14 @@ func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *name // - Reconciler requested a retry due to error // - Remediator requested a watch update func (s *EventHandler) Handle(event events.Event) events.Result { - opts := s.Parser.options() + opts := s.Reconciler.Options() + state := s.Reconciler.ReconcilerState() var eventResult events.Result // Wrap the RunFunc to set Result.RunAttempted. // This delays status update and sync events. - runFn := func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult { - result := s.Run(ctx, p, trigger, state) + runFn := func(ctx context.Context, r Reconciler, trigger string) RunResult { + result := s.Run(ctx, r, trigger) eventResult.RunAttempted = true return result } @@ -76,14 +75,14 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. // The cached needToRetry will not be reset to avoid resetting the backoff retries. - s.ReconcilerState.resetPartialCache() - runResult = runFn(s.Context, s.Parser, triggerResync, s.ReconcilerState) + state.resetPartialCache() + runResult = runFn(s.Context, s.Reconciler, triggerResync) case events.SyncEventType: // Re-import declared resources from the filesystem (from *-sync). // If the reconciler is in the process of reconciling a given commit, the re-import won't // happen until the ongoing reconciliation is done. - runResult = runFn(s.Context, s.Parser, triggerReimport, s.ReconcilerState) + runResult = runFn(s.Context, s.Reconciler, triggerReimport) case events.StatusEventType: // Publish the sync status periodically to update remediator errors. @@ -91,8 +90,15 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // This implies that this reconciler has successfully parsed, rendered, validated, and synced. if opts.Remediating() { klog.V(3).Info("Updating sync status (periodic while not syncing)") - // Don't update the sync spec or commit. - if err := setSyncStatus(s.Context, s.Parser, s.ReconcilerState, s.ReconcilerState.status.SyncStatus.Spec, false, s.ReconcilerState.status.SyncStatus.Commit, s.Parser.SyncErrors()); err != nil { + // Don't update the sync spec or commit, just the errors and status. + syncStatus := &SyncStatus{ + Spec: state.status.SyncStatus.Spec, + Syncing: false, + Commit: state.status.SyncStatus.Commit, + Errs: s.Reconciler.ReconcilerState().SyncErrors(), + LastUpdate: nowMeta(opts), + } + if err := s.Reconciler.SetSyncStatus(s.Context, syncStatus); err != nil { if errors.Is(err, context.Canceled) { klog.Infof("Sync status update skipped: %v", err) } else { @@ -113,8 +119,8 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. // The cached needToRetry will not be reset to avoid resetting the backoff retries. - s.ReconcilerState.resetPartialCache() - runResult = runFn(s.Context, s.Parser, namespaceEvent, s.ReconcilerState) + state.resetPartialCache() + runResult = runFn(s.Context, s.Reconciler, namespaceEvent) case events.RetrySyncEventType: // Retry if there was an error, conflict, or any watches need to be updated. @@ -123,9 +129,9 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. // The cached needToRetry will not be reset to avoid resetting the backoff retries. - s.ReconcilerState.resetPartialCache() + state.resetPartialCache() trigger = triggerManagementConflict - } else if s.ReconcilerState.cache.needToRetry { + } else if state.cache.needToRetry { trigger = triggerRetry } else if opts.needToUpdateWatch() { trigger = triggerWatchUpdate @@ -138,7 +144,7 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // retryTimer will be reset to `Options.RetryPeriod`, and state.backoff is reset to `defaultBackoff()`. // In this case, `run` will try to sync the configs from the new commit instead of the old commit // being retried. - runResult = runFn(s.Context, s.Parser, trigger, s.ReconcilerState) + runResult = runFn(s.Context, s.Reconciler, trigger) default: klog.Fatalf("Invalid event received: %#v", event) diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index e3656dbbbf..4117045362 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -15,28 +15,28 @@ package parse import ( - "context" - "sync" "time" "k8s.io/utils/clock" "kpt.dev/configsync/pkg/declared" - "kpt.dev/configsync/pkg/importer/analyzer/ast" "kpt.dev/configsync/pkg/importer/filesystem" - "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util/discovery" "sigs.k8s.io/controller-runtime/pkg/client" ) -// Options holds configuration and core functionality required by all parsers. +// Options holds configuration and dependencies required by all parsers. type Options struct { + // Files lists Files in the source of truth. + // TODO: compose Files without extending. + Files + // Clock is used for time tracking, namely to simplify testing by allowing // a fake clock, instead of a RealClock. Clock clock.Clock - // Parser defines the minimum interface required for Reconciler to use a - // Parser to read configs from a filesystem. - Parser filesystem.ConfigParser + // ConfigParser defines the minimum interface required for Reconciler to use a + // ConfigParser to read configs from a filesystem. + ConfigParser filesystem.ConfigParser // ClusterName is the name of the cluster we're syncing configuration to. ClusterName string @@ -52,56 +52,40 @@ type Options struct { // SyncName is the name of the RootSync or RepoSync object. SyncName string - // StatusUpdatePeriod is how long the Parser waits between updates of the - // sync status, to account for management conflict errors from the Remediator. - StatusUpdatePeriod time.Duration + // Scope defines the scope of the reconciler, either root or namespaced. + Scope declared.Scope - // DiscoveryInterface is how the Parser learns what types are currently + // DiscoveryClient is how the Parser learns what types are currently // available on the cluster. - DiscoveryInterface discovery.ServerResourcer + DiscoveryClient discovery.ServerResourcer // Converter uses the DiscoveryInterface to encode the declared fields of // objects in Git. Converter *declared.ValueConverter - // mux prevents status update conflicts. - mux sync.Mutex - - // RenderingEnabled indicates whether the hydration-controller is currently - // running for this reconciler. - RenderingEnabled bool - // WebhookEnabled indicates whether the Webhook is currently enabled WebhookEnabled bool - // Files lists Files in the source of truth. - Files - // Updater mutates the most-recently-seen versions of objects stored in memory. - Updater + // DeclaredResources is the set of valid source objects, managed by the + // Updater and shared with the Parser & Remediator. + // This is used by the Parser to validate that CRDs can only be removed from + // the source when all of its CRs are removed as well. + DeclaredResources *declared.Resources } -// Parser represents a parser that can be pointed at and continuously parse a source. -type Parser interface { - parseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) - ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) - setSourceStatus(ctx context.Context, newStatus *SourceStatus) error - setRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error - SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error - options() *Options - // SyncErrors returns all the sync errors, including remediator errors, - // validation errors, applier errors, and watch update errors. - SyncErrors() status.MultiError - // K8sClient returns the Kubernetes client that talks to the API server. - K8sClient() client.Client - // setRequiresRendering sets the requires-rendering annotation on the RSync - setRequiresRendering(ctx context.Context, renderingRequired bool) error - setSourceAnnotations(ctx context.Context, commit string) error -} +// ReconcilerOptions holds configuration for the reconciler. +type ReconcilerOptions struct { + // Extend parser options to ensure they're using the same dependencies. + *Options -func (o *Options) k8sClient() client.Client { - return o.Client -} + // Updater syncs the source from the parsed cache to the cluster. + *Updater -func (o *Options) discoveryClient() discovery.ServerResourcer { - return o.DiscoveryInterface + // StatusUpdatePeriod is how long the Parser waits between updates of the + // sync status, to account for management conflict errors from the Remediator. + StatusUpdatePeriod time.Duration + + // RenderingEnabled indicates whether the hydration-controller is currently + // running for this reconciler. + RenderingEnabled bool } diff --git a/pkg/parse/parser.go b/pkg/parse/parser.go new file mode 100644 index 0000000000..e1f0d8502e --- /dev/null +++ b/pkg/parse/parser.go @@ -0,0 +1,28 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + + "kpt.dev/configsync/pkg/importer/analyzer/ast" + "kpt.dev/configsync/pkg/status" +) + +// Parser represents a parser that can be pointed at and continuously parse a source. +type Parser interface { + // ParseSource parses the source manifest files and returns the objects. + ParseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) +} diff --git a/pkg/parse/reconciler.go b/pkg/parse/reconciler.go new file mode 100644 index 0000000000..078926e951 --- /dev/null +++ b/pkg/parse/reconciler.go @@ -0,0 +1,117 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + + "kpt.dev/configsync/pkg/status" +) + +// Reconciler represents a parser that can be pointed at and continuously parse a source. +// TODO: Move to reconciler package; requires unwinding dependency cycles +type Reconciler interface { + // Options returns the ReconcilerOptions used by this reconciler. + Options() *ReconcilerOptions + // SyncStatusClient returns the SyncStatusClient used by this reconciler. + SyncStatusClient() SyncStatusClient + // Parser returns the Parser used by this reconciler. + Parser() Parser + // ReconcilerState returns the current state of the parser/reconciler. + ReconcilerState() *ReconcilerState + + // Read source manifests from the shared source volume. + // Waits for rendering, if enabled. + // Updates the RSync status (source, rendering, and syncing condition). + // + // Read is exposed for use by DefaultRunFunc. + Read(ctx context.Context, trigger string, sourceState *sourceState) status.MultiError + + // ParseAndUpdate parses objects from the source manifests, validates them, + // and then syncs them to the cluster with the Updater. + // + // ParseAndUpdate is exposed for use by DefaultRunFunc. + ParseAndUpdate(ctx context.Context, trigger string) status.MultiError + + // SetSyncStatus updates `.status.sync` and the Syncing condition, if needed, + // as well as `state.syncStatus` and `state.syncingConditionLastUpdate` if + // the update is successful. + // + // SetSyncStatus is exposed for use by the EventHandler, for periodic status + // updates. + SetSyncStatus(context.Context, *SyncStatus) error +} + +// TODO: Move to reconciler package; requires unwinding dependency cycles +type reconciler struct { + options *ReconcilerOptions + syncStatusClient SyncStatusClient + parser Parser + reconcilerState *ReconcilerState +} + +var _ Reconciler = &reconciler{} + +// Options returns the ReconcilerOptions used by this reconciler. +func (p *reconciler) Options() *ReconcilerOptions { + return p.options +} + +// SyncStatusClient returns the SyncStatusClient used by this reconciler. +func (p *reconciler) SyncStatusClient() SyncStatusClient { + return p.syncStatusClient +} + +// ReconcilerState returns the current state of the reconciler. +func (p *reconciler) Parser() Parser { + return p.parser +} + +// ReconcilerState returns the current state of the reconciler. +func (p *reconciler) ReconcilerState() *ReconcilerState { + return p.reconcilerState +} + +// NewRootRunner creates a new runnable parser for parsing a Root repository. +func NewRootRunner(recOpts *ReconcilerOptions, parseOpts *RootOptions) Reconciler { + return &reconciler{ + options: recOpts, + reconcilerState: &ReconcilerState{ + syncErrorCache: recOpts.SyncErrorCache, + }, + syncStatusClient: &rootSyncStatusClient{ + options: parseOpts.Options, + }, + parser: &rootSyncParser{ + options: parseOpts, + }, + } +} + +// NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo. +func NewNamespaceRunner(recOpts *ReconcilerOptions, parseOpts *Options) Reconciler { + return &reconciler{ + options: recOpts, + reconcilerState: &ReconcilerState{ + syncErrorCache: recOpts.SyncErrorCache, + }, + syncStatusClient: &repoSyncStatusClient{ + options: parseOpts, + }, + parser: &repoSyncParser{ + options: parseOpts, + }, + } +} diff --git a/pkg/parse/namespace.go b/pkg/parse/repo_sync_client.go similarity index 67% rename from pkg/parse/namespace.go rename to pkg/parse/repo_sync_client.go index 664b6904b9..b79c7d8aec 100644 --- a/pkg/parse/namespace.go +++ b/pkg/parse/repo_sync_client.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/google/go-cmp/cmp" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -29,109 +30,43 @@ import ( "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/core" - "kpt.dev/configsync/pkg/importer/analyzer/ast" - "kpt.dev/configsync/pkg/importer/reader" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/reposync" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util/compare" - utildiscovery "kpt.dev/configsync/pkg/util/discovery" - "kpt.dev/configsync/pkg/validate" "sigs.k8s.io/controller-runtime/pkg/client" ) -// NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo. -func NewNamespaceRunner(opts *Options) Parser { - return &namespace{ - Options: opts, - } -} - -type namespace struct { - *Options -} - -var _ Parser = &namespace{} - -func (p *namespace) options() *Options { - return p.Options +type repoSyncStatusClient struct { + options *Options + // mux prevents status update conflicts. + mux sync.Mutex } -// parseSource implements the Parser interface -func (p *namespace) parseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) { - p.mux.Lock() - defer p.mux.Unlock() - - filePaths := reader.FilePaths{ - RootDir: state.syncDir, - PolicyDir: p.SyncDir, - Files: state.files, - } - crds, err := p.declaredCRDs() - if err != nil { - return nil, err - } - builder := utildiscovery.ScoperBuilder(p.DiscoveryInterface) - - klog.Infof("Parsing files from source dir: %s", state.syncDir.OSPath()) - objs, err := p.Parser.Parse(filePaths) - if err != nil { - return nil, err - } - - options := validate.Options{ - ClusterName: p.ClusterName, - SyncName: p.SyncName, - PolicyDir: p.SyncDir, - PreviousCRDs: crds, - BuildScoper: builder, - Converter: p.Converter, - // Namespaces and NamespaceSelectors should not be declared in a namespace repo. - // So disable the API call and dynamic mode of NamespaceSelector. - AllowAPICall: false, - DynamicNSSelectorEnabled: false, - WebhookEnabled: p.WebhookEnabled, - FieldManager: configsync.FieldManager, - } - options = OptionsForScope(options, p.Scope) - - objs, err = validate.Unstructured(ctx, p.Client, objs, options) - - if status.HasBlockingErrors(err) { - return nil, err - } - - // Duplicated with root.go. - e := addAnnotationsAndLabels(objs, p.Scope, p.SyncName, p.sourceContext(), state.commit) - if e != nil { - err = status.Append(err, status.InternalErrorf("unable to add annotations and labels: %v", e)) - return nil, err - } - return objs, err -} - -// setSourceStatus implements the Parser interface +// SetSourceStatus implements the Parser interface // -// setSourceStatus sets the source status with a given source state and set of errors. If errs is empty, all errors +// SetSourceStatus sets the source status with a given source state and set of errors. If errs is empty, all errors // will be removed from the status. -func (p *namespace) setSourceStatus(ctx context.Context, newStatus *SourceStatus) error { +func (p *repoSyncStatusClient) SetSourceStatus(ctx context.Context, newStatus *SourceStatus) error { p.mux.Lock() defer p.mux.Unlock() return p.setSourceStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus *SourceStatus, denominator int) error { +func (p *repoSyncStatusClient) setSourceStatusWithRetries(ctx context.Context, newStatus *SourceStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options + // The main idea here is an error-robust way of surfacing to the user that // we're having problems reading from our local clone of their source repository. // This can happen when Kubernetes does weird things with mounted filesystems, // or if an attacker tried to maliciously change the cluster's record of the // source of truth. var rs v1beta1.RepoSync - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), &rs); err != nil { + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), &rs); err != nil { return status.APIServerError(err, "failed to get RepoSync for parser") } @@ -165,7 +100,7 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus *S cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync source status (total error count: %d, denominator: %d): %s.", rs.Status.Source.ErrorSummary.TotalCount, denominator, err) @@ -176,24 +111,25 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus *S return nil } -func (p *namespace) setSourceAnnotations(ctx context.Context, commit string) error { +func (p *repoSyncStatusClient) SetSourceAnnotations(ctx context.Context, commit string) error { + opts := p.options rs := &v1beta1.RepoSync{} - rs.Namespace = string(p.Scope) - rs.Name = p.SyncName + rs.Namespace = string(opts.Scope) + rs.Name = opts.SyncName var patch string - if p.Options.SourceType == configsync.OciSource || - (p.Options.SourceType == configsync.HelmSource && strings.HasPrefix(p.Options.SourceRepo, "oci://")) { + if opts.SourceType == configsync.OciSource || + (opts.SourceType == configsync.HelmSource && strings.HasPrefix(opts.SourceRepo, "oci://")) { patch = fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, metadata.ImageToSyncAnnotationKey, - fmt.Sprintf("%s@sha256:%s", p.Options.SourceRepo, commit), + fmt.Sprintf("%s@sha256:%s", opts.SourceRepo, commit), ) } else { patch = fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, metadata.ImageToSyncAnnotationKey) } - err := p.Client.Patch(ctx, rs, + err := opts.Client.Patch(ctx, rs, client.RawPatch(types.MergePatchType, []byte(patch)), client.FieldOwner(configsync.FieldManager)) @@ -204,9 +140,10 @@ func (p *namespace) setSourceAnnotations(ctx context.Context, commit string) err return nil } -func (p *namespace) setRequiresRendering(ctx context.Context, renderingRequired bool) error { +func (p *repoSyncStatusClient) SetRequiresRendering(ctx context.Context, renderingRequired bool) error { + opts := p.options rs := &v1beta1.RepoSync{} - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), rs); err != nil { return status.APIServerError(err, "failed to get RepoSync for Parser") } newVal := strconv.FormatBool(renderingRequired) @@ -216,11 +153,11 @@ func (p *namespace) setRequiresRendering(ctx context.Context, renderingRequired } existing := rs.DeepCopy() core.SetAnnotation(rs, metadata.RequiresRenderingAnnotationKey, newVal) - return p.Client.Patch(ctx, rs, client.MergeFrom(existing), client.FieldOwner(configsync.FieldManager)) + return opts.Client.Patch(ctx, rs, client.MergeFrom(existing), client.FieldOwner(configsync.FieldManager)) } -// setRenderingStatus implements the Parser interface -func (p *namespace) setRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error { +// SetRenderingStatus implements the Parser interface +func (p *repoSyncStatusClient) SetRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error { if oldStatus.Equals(newStatus) { return nil } @@ -230,13 +167,14 @@ func (p *namespace) setRenderingStatus(ctx context.Context, oldStatus, newStatus return p.setRenderingStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *namespace) setRenderingStatusWithRetries(ctx context.Context, newStatus *RenderingStatus, denominator int) error { +func (p *repoSyncStatusClient) setRenderingStatusWithRetries(ctx context.Context, newStatus *RenderingStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options var rs v1beta1.RepoSync - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), &rs); err != nil { + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), &rs); err != nil { return status.APIServerError(err, "failed to get RepoSync for parser") } @@ -270,7 +208,7 @@ func (p *namespace) setRenderingStatusWithRetries(ctx context.Context, newStatus cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync rendering status (total error count: %d, denominator: %d): %s.", rs.Status.Rendering.ErrorSummary.TotalCount, denominator, err) @@ -282,13 +220,14 @@ func (p *namespace) setRenderingStatusWithRetries(ctx context.Context, newStatus } // ReconcilerStatusFromCluster gets the RepoSync sync status from the cluster. -func (p *namespace) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) { +func (p *repoSyncStatusClient) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) { + opts := p.options rs := &v1beta1.RepoSync{} - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), rs); err != nil { if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) { return nil, nil } - return nil, status.APIServerError(err, fmt.Sprintf("failed to get the RepoSync object for the %v namespace", p.Scope)) + return nil, status.APIServerError(err, fmt.Sprintf("failed to get the RepoSync object for the %v namespace", opts.Scope)) } // Read Syncing condition @@ -304,26 +243,27 @@ func (p *namespace) ReconcilerStatusFromCluster(ctx context.Context) (*Reconcile } } - return reconcilerStatusFromRSyncStatus(rs.Status.Status, p.options().SourceType, syncing, syncingConditionLastUpdate), nil + return reconcilerStatusFromRSyncStatus(rs.Status.Status, opts.SourceType, syncing, syncingConditionLastUpdate), nil } // SetSyncStatus implements the Parser interface // SetSyncStatus sets the RepoSync sync status. // `errs` includes the errors encountered during the apply step; -func (p *namespace) SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error { +func (p *repoSyncStatusClient) SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error { p.mux.Lock() defer p.mux.Unlock() return p.setSyncStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *namespace) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStatus, denominator int) error { +func (p *repoSyncStatusClient) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options rs := &v1beta1.RepoSync{} - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), rs); err != nil { - return status.APIServerError(err, fmt.Sprintf("failed to get the RepoSync object for the %v namespace", p.Scope)) + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), rs); err != nil { + return status.APIServerError(err, fmt.Sprintf("failed to get the RepoSync object for the %v namespace", opts.Scope)) } currentRS := rs.DeepCopy() @@ -371,25 +311,13 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, newStatus *Syn cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) return p.setSyncStatusWithRetries(ctx, newStatus, denominator*2) } - return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.Scope)) + return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", opts.Scope)) } return nil } - -// SyncErrors returns all the sync errors, including remediator errors, -// validation errors, applier errors, and watch update errors. -// SyncErrors implements the Parser interface -func (p *namespace) SyncErrors() status.MultiError { - return p.SyncErrorCache.Errors() -} - -// K8sClient implements the Parser interface -func (p *namespace) K8sClient() client.Client { - return p.Client -} diff --git a/pkg/parse/repo_sync_parser.go b/pkg/parse/repo_sync_parser.go new file mode 100644 index 0000000000..d7fcb25c09 --- /dev/null +++ b/pkg/parse/repo_sync_parser.go @@ -0,0 +1,82 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + + "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/api/configsync" + "kpt.dev/configsync/pkg/importer/analyzer/ast" + "kpt.dev/configsync/pkg/importer/reader" + "kpt.dev/configsync/pkg/status" + utildiscovery "kpt.dev/configsync/pkg/util/discovery" + "kpt.dev/configsync/pkg/validate" +) + +type repoSyncParser struct { + options *Options +} + +// ParseSource implements the Parser interface +func (p *repoSyncParser) ParseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) { + opts := p.options + filePaths := reader.FilePaths{ + RootDir: state.syncDir, + PolicyDir: opts.SyncDir, + Files: state.files, + } + crds, err := opts.DeclaredResources.DeclaredCRDs() + if err != nil { + return nil, err + } + builder := utildiscovery.ScoperBuilder(opts.DiscoveryClient) + + klog.Infof("Parsing files from source dir: %s", state.syncDir.OSPath()) + objs, err := opts.ConfigParser.Parse(filePaths) + if err != nil { + return nil, err + } + + options := validate.Options{ + ClusterName: opts.ClusterName, + SyncName: opts.SyncName, + PolicyDir: opts.SyncDir, + PreviousCRDs: crds, + BuildScoper: builder, + Converter: opts.Converter, + // Namespaces and NamespaceSelectors should not be declared in a namespace repo. + // So disable the API call and dynamic mode of NamespaceSelector. + AllowAPICall: false, + DynamicNSSelectorEnabled: false, + WebhookEnabled: opts.WebhookEnabled, + FieldManager: configsync.FieldManager, + } + options = OptionsForScope(options, opts.Scope) + + objs, err = validate.Unstructured(ctx, opts.Client, objs, options) + + if status.HasBlockingErrors(err) { + return nil, err + } + + // Duplicated with root.go. + e := addAnnotationsAndLabels(objs, opts.Scope, opts.SyncName, opts.Files.sourceContext(), state.commit) + if e != nil { + err = status.Append(err, status.InternalErrorf("unable to add annotations and labels: %v", e)) + return nil, err + } + return objs, err +} diff --git a/pkg/parse/root_test.go b/pkg/parse/root_reconciler_test.go similarity index 70% rename from pkg/parse/root_test.go rename to pkg/parse/root_reconciler_test.go index c9a93340a2..e59e285f40 100644 --- a/pkg/parse/root_test.go +++ b/pkg/parse/root_reconciler_test.go @@ -20,7 +20,6 @@ import ( "fmt" "testing" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "go.opencensus.io/stats/view" "go.opencensus.io/tag" @@ -84,7 +83,7 @@ func gitSpec(repo string, auth configsync.AuthType) core.MetaMutator { } } -func TestRoot_Parse(t *testing.T) { +func TestRootReconciler_ParseAndUpdate(t *testing.T) { fakeMetaTime := metav1.Now().Rfc3339Copy() // truncate to second precision fakeTime := fakeMetaTime.Time fakeClock := fakeclock.NewFakeClock(fakeTime) @@ -677,33 +676,10 @@ func TestRoot_Parse(t *testing.T) { }, } tc.existingObjects = append(tc.existingObjects, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := &root{ - Options: &Options{ - Clock: fakeClock, - Parser: fakeConfigParser, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: fakeClient, - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Converter: converter, - Files: Files{FileSource: fileSource}, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - }, - RootOptions: &RootOptions{ - SourceFormat: tc.format, - NamespaceStrategy: tc.namespaceStrategy, - }, - } files := []cmpath.Absolute{ "example.yaml", } - state := &reconcilerState{ + state := &ReconcilerState{ status: reconcilerStatus.DeepCopy(), cache: cacheForCommit{ source: &sourceState{ @@ -718,8 +694,48 @@ func TestRoot_Parse(t *testing.T) { files: files, }, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + } + opts := &Options{ + Clock: fakeClock, + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: fakeClient, + DiscoveryClient: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + Converter: converter, + Files: Files{FileSource: fileSource}, + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: tc.format, + NamespaceStrategy: tc.namespaceStrategy, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, + } + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, } - if err := parseAndUpdate(context.Background(), parser, triggerReimport, state); err != nil { + if err := reconciler.ParseAndUpdate(context.Background(), triggerReimport); err != nil { t.Fatal(err) } @@ -754,7 +770,7 @@ func TestRoot_Parse(t *testing.T) { } } -func TestRoot_DeclaredFields(t *testing.T) { +func TestRootReconciler_DeclaredFields(t *testing.T) { applySetID := applyset.IDFromSync(rootSyncName, declared.RootScope) testCases := []struct { @@ -915,36 +931,53 @@ func TestRoot_DeclaredFields(t *testing.T) { }, } tc.existingObjects = append(tc.existingObjects, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Parser: fakeConfigParser, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, tc.existingObjects...), - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Converter: converter, - WebhookEnabled: tc.webhookEnabled, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - NamespaceStrategy: configsync.NamespaceStrategyExplicit, - }, - } - state := &reconcilerState{ + state := &ReconcilerState{ status: &ReconcilerStatus{}, cache: cacheForCommit{ source: &sourceState{}, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), } - if err := parseAndUpdate(context.Background(), parser, triggerReimport, state); err != nil { + opts := &Options{ + Clock: clock.RealClock{}, // TODO: Test with fake clock + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: syncertest.NewClient(t, core.Scheme, tc.existingObjects...), + DiscoveryClient: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + Converter: converter, + WebhookEnabled: tc.webhookEnabled, + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: configsync.SourceFormatUnstructured, + NamespaceStrategy: configsync.NamespaceStrategyExplicit, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, + } + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, + } + if err := reconciler.ParseAndUpdate(context.Background(), triggerReimport); err != nil { t.Fatal(err) } @@ -1017,7 +1050,7 @@ func fakeParseError(err error, gvks ...schema.GroupVersionKind) status.MultiErro return status.APIServerError(&discovery.ErrGroupDiscoveryFailed{Groups: groups}, "API discovery failed") } -func TestRoot_Parse_Discovery(t *testing.T) { +func TestRootReconciler_Parse_Discovery(t *testing.T) { applySetID := applyset.IDFromSync(rootSyncName, declared.RootScope) testCases := []struct { @@ -1183,35 +1216,52 @@ func TestRoot_Parse_Discovery(t *testing.T) { {}, // One Apply call }, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Parser: fakeConfigParser, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), - DiscoveryInterface: tc.discoveryClient, - Converter: converter, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - NamespaceStrategy: configsync.NamespaceStrategyImplicit, - }, - } - state := &reconcilerState{ + state := &ReconcilerState{ status: &ReconcilerStatus{}, cache: cacheForCommit{ source: &sourceState{}, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + } + opts := &Options{ + Clock: clock.RealClock{}, // TODO: Test with fake clock + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), + DiscoveryClient: tc.discoveryClient, + Converter: converter, + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: configsync.SourceFormatUnstructured, + NamespaceStrategy: configsync.NamespaceStrategyImplicit, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, } - err := parseAndUpdate(context.Background(), parser, triggerReimport, state) + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, + } + err := reconciler.ParseAndUpdate(context.Background(), triggerReimport) testerrors.AssertEqual(t, tc.expectedError, err, "expected error to match") // After parsing, the objects set is processed and modified. @@ -1221,7 +1271,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { } } -func TestRoot_SourceReconcilerErrorsMetricValidation(t *testing.T) { +func TestRootReconciler_SourceReconcilerErrorsMetricValidation(t *testing.T) { testCases := []struct { name string parseErrors status.MultiError @@ -1275,33 +1325,50 @@ func TestRoot_SourceReconcilerErrorsMetricValidation(t *testing.T) { {}, // One Apply call }, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Parser: fakeConfigParser, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - }, - } - state := &reconcilerState{ + state := &ReconcilerState{ status: &ReconcilerStatus{}, cache: cacheForCommit{ source: &sourceState{}, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + } + opts := &Options{ + Clock: clock.RealClock{}, // TODO: Test with fake clock + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), + DiscoveryClient: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: configsync.SourceFormatUnstructured, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, + } + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, } - err := parseAndUpdate(context.Background(), parser, triggerReimport, state) + err := reconciler.ParseAndUpdate(context.Background(), triggerReimport) testerrors.AssertEqual(t, tc.expectedError, err, "expected error to match") if diff := m.ValidateMetrics(metrics.ReconcilerErrorsView, tc.expectedMetrics); diff != "" { @@ -1311,7 +1378,7 @@ func TestRoot_SourceReconcilerErrorsMetricValidation(t *testing.T) { } } -func TestRoot_SourceAndSyncReconcilerErrorsMetricValidation(t *testing.T) { +func TestRootReconciler_SourceAndSyncReconcilerErrorsMetricValidation(t *testing.T) { testCases := []struct { name string applyErrors []status.Error @@ -1367,583 +1434,55 @@ func TestRoot_SourceAndSyncReconcilerErrorsMetricValidation(t *testing.T) { {Errors: tc.applyErrors}, // One Apply call, optional errors }, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Parser: fakeConfigParser, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - }, - } - state := &reconcilerState{ + state := &ReconcilerState{ status: &ReconcilerStatus{}, cache: cacheForCommit{ source: &sourceState{}, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), } - err := parseAndUpdate(context.Background(), parser, triggerReimport, state) - testerrors.AssertEqual(t, tc.expectedError, err, "expected error to match") - - if diff := m.ValidateMetrics(metrics.ReconcilerErrorsView, tc.expectedMetrics); diff != "" { - t.Error(diff) + opts := &Options{ + Clock: clock.RealClock{}, // TODO: Test with fake clock + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), + DiscoveryClient: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + DeclaredResources: &declared.Resources{}, } - }) - } -} - -func TestSummarizeErrors(t *testing.T) { - testCases := []struct { - name string - sourceStatus v1beta1.SourceStatus - renderingStatus v1beta1.RenderingStatus - syncStatus v1beta1.SyncStatus - expectedErrorSources []v1beta1.ErrorSource - expectedErrorSummary *v1beta1.ErrorSummary - }{ - { - name: "both sourceStatus and syncStatus are empty", - sourceStatus: v1beta1.SourceStatus{}, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{}, - expectedErrorSources: nil, - expectedErrorSummary: &v1beta1.ErrorSummary{}, - }, - { - name: "sourceStatus is not empty (no trucation), syncStatus is empty", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{}, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - { - name: "sourceStatus is not empty and trucates errors, syncStatus is empty", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{}, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - { - name: "sourceStatus is empty, syncStatus is not empty (no trucation)", - sourceStatus: v1beta1.SourceStatus{}, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - { - name: "sourceStatus is empty, syncStatus is not empty and trucates errors", - sourceStatus: v1beta1.SourceStatus{}, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - { - name: "neither sourceStatus nor syncStatus is empty or trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 4, - Truncated: false, - ErrorCountAfterTruncation: 4, - }, - }, - { - name: "neither sourceStatus nor syncStatus is empty, sourceStatus trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 102, - Truncated: true, - ErrorCountAfterTruncation: 4, - }, - }, - { - name: "neither sourceStatus nor syncStatus is empty, syncStatus trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 102, - Truncated: true, - ErrorCountAfterTruncation: 4, - }, - }, - { - name: "neither sourceStatus nor syncStatus is empty, both trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 200, - Truncated: true, - ErrorCountAfterTruncation: 4, - }, - }, - { - name: "source, rendering, and sync errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1068", ErrorMessage: "1068-error-message"}, - {Code: "2015", ErrorMessage: "2015-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.RenderingError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 6, - Truncated: false, - ErrorCountAfterTruncation: 6, - }, - }, - { - name: "source, rendering, and sync errors, all truncated", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1068", ErrorMessage: "1068-error-message"}, - {Code: "2015", ErrorMessage: "2015-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.RenderingError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 300, - Truncated: true, - ErrorCountAfterTruncation: 6, - }, - }, - { - name: "source errors from newer commit", - sourceStatus: v1beta1.SourceStatus{ - Commit: "newer-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 1, - Truncated: false, - ErrorCountAfterTruncation: 1, - }, - }, - renderingStatus: v1beta1.RenderingStatus{ - Commit: "older-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "1068", ErrorMessage: "1068-error-message"}, - {Code: "2015", ErrorMessage: "2015-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Commit: "older-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - {Code: "2009", ErrorMessage: "another error"}, - {Code: "2009", ErrorMessage: "yet-another error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 4, - Truncated: false, - ErrorCountAfterTruncation: 4, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.RenderingError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 6, - Truncated: false, - ErrorCountAfterTruncation: 6, - }, - }, - { - name: "rendering errors from newer commit", - sourceStatus: v1beta1.SourceStatus{ - Commit: "newer-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 1, - Truncated: false, - ErrorCountAfterTruncation: 1, - }, - }, - renderingStatus: v1beta1.RenderingStatus{ - Commit: "newer-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "1068", ErrorMessage: "1068-error-message"}, - {Code: "2015", ErrorMessage: "2015-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Commit: "older-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - {Code: "2009", ErrorMessage: "another error"}, - {Code: "2009", ErrorMessage: "yet-another error"}, + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: configsync.SourceFormatUnstructured, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, + } + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 4, - Truncated: false, - ErrorCountAfterTruncation: 4, + parser: &rootSyncParser{ + options: rootOpts, }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 4, - Truncated: false, - ErrorCountAfterTruncation: 4, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - gotErrorSources, gotErrorSummary := summarizeErrorsForCommit(tc.sourceStatus, tc.renderingStatus, tc.syncStatus, tc.syncStatus.Commit) - if diff := cmp.Diff(tc.expectedErrorSources, gotErrorSources); diff != "" { - t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSources, tc.expectedErrorSources) + reconcilerState: state, } - if diff := cmp.Diff(tc.expectedErrorSummary, gotErrorSummary); diff != "" { - t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSummary, tc.expectedErrorSummary) - } - }) - } -} - -func TestPrependRootSyncRemediatorStatus(t *testing.T) { - const rootSyncName = "my-root-sync" - const thisManager = "this-manager" - const otherManager = "other-manager" - conflictingObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, otherManager)) - conflictAB := status.ManagementConflictErrorWrap(conflictingObject, thisManager) - invertedObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, thisManager)) - conflictBA := status.ManagementConflictErrorWrap(invertedObject, otherManager) - conflictABInverted := conflictAB.Invert() - // KptManagementConflictError is created with the desired object, not the current live object. - // So its manager annotation matches the reconciler doing the applying. - // This is the opposite of the objects passed to ManagementConflictErrorWrap. - kptConflictError := applier.KptManagementConflictError(invertedObject) - // Assert the value of each error message to make each value clear - const conflictABMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with the "other-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. - -metadata.name: foo-ns -group: -version: v1 -kind: Namespace - -For more information, see https://g.co/cloud/acm-errors#knv1060` - const conflictBAMessage = `KNV1060: The "other-manager" reconciler detected a management conflict with the "this-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. - -metadata.name: foo-ns -group: -version: v1 -kind: Namespace - -For more information, see https://g.co/cloud/acm-errors#knv1060` - const kptConflictMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with another reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. - -metadata.name: foo-ns -group: -version: v1 -kind: Namespace + err := reconciler.ParseAndUpdate(context.Background(), triggerReimport) + testerrors.AssertEqual(t, tc.expectedError, err, "expected error to match") -For more information, see https://g.co/cloud/acm-errors#knv1060` - testutil.AssertEqual(t, conflictABMessage, conflictAB.ToCSE().ErrorMessage) - testutil.AssertEqual(t, conflictBAMessage, conflictBA.ToCSE().ErrorMessage) - testutil.AssertEqual(t, kptConflictMessage, kptConflictError.ToCSE().ErrorMessage) - testutil.AssertEqual(t, conflictBA, conflictABInverted) - testCases := map[string]struct { - thisSyncErrors []v1beta1.ConfigSyncError - expectedErrors []v1beta1.ConfigSyncError - }{ - "empty errors": { - thisSyncErrors: []v1beta1.ConfigSyncError{}, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - }, - }, - "unchanged conflict error": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - }, - }, - "prepend conflict error": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - {ErrorMessage: "foo"}, - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - {ErrorMessage: "foo"}, - }, - }, - "dedupe AB and BA errors": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - conflictBA.ToCSE(), - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictBA.ToCSE(), - }, - }, - "dedupe AB and AB inverted errors": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - conflictABInverted.ToCSE(), - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictABInverted.ToCSE(), - }, - }, - // TODO: De-dupe ManagementConflictErrorWrap & KptManagementConflictError - // These are currently de-duped locally by the conflict handler, - // but not remotely by prependRootSyncRemediatorStatus. - "prepend KptManagementConflictError": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - kptConflictError.ToCSE(), - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - kptConflictError.ToCSE(), - }, - }, - } - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - rootSync := k8sobjects.RootSyncObjectV1Beta1(rootSyncName) - rootSync.Status.Sync.Errors = tc.thisSyncErrors - fakeClient := syncertest.NewClient(t, core.Scheme, rootSync) - ctx := context.Background() - err := prependRootSyncRemediatorStatus(ctx, fakeClient, rootSyncName, - []status.ManagementConflictError{conflictAB}, defaultDenominator) - require.NoError(t, err) - var updatedRootSync v1beta1.RootSync - err = fakeClient.Get(ctx, rootsync.ObjectKey(rootSyncName), &updatedRootSync) - require.NoError(t, err) - testutil.AssertEqual(t, tc.expectedErrors, updatedRootSync.Status.Sync.Errors) + if diff := m.ValidateMetrics(metrics.ReconcilerErrorsView, tc.expectedMetrics); diff != "" { + t.Error(diff) + } }) } } diff --git a/pkg/parse/root.go b/pkg/parse/root_sync_client.go similarity index 70% rename from pkg/parse/root.go rename to pkg/parse/root_sync_client.go index e6d518dabc..753dba9831 100644 --- a/pkg/parse/root.go +++ b/pkg/parse/root_sync_client.go @@ -19,48 +19,31 @@ import ( "fmt" "strconv" "strings" + "sync" - "github.com/elliotchance/orderedmap/v2" "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/core" - "kpt.dev/configsync/pkg/declared" - "kpt.dev/configsync/pkg/diff" - "kpt.dev/configsync/pkg/importer/analyzer/ast" - "kpt.dev/configsync/pkg/importer/filesystem" - "kpt.dev/configsync/pkg/importer/filesystem/cmpath" - "kpt.dev/configsync/pkg/importer/reader" - "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/reconciler/namespacecontroller" "kpt.dev/configsync/pkg/rootsync" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util/compare" - "kpt.dev/configsync/pkg/util/discovery" - "kpt.dev/configsync/pkg/validate" - "sigs.k8s.io/cli-utils/pkg/common" "sigs.k8s.io/controller-runtime/pkg/client" ) -// NewRootRunner creates a new runnable parser for parsing a Root repository. -func NewRootRunner(opts *Options, rootOpts *RootOptions) Parser { - return &root{ - Options: opts, - RootOptions: rootOpts, - } -} - // RootOptions includes options specific to RootSync objects. type RootOptions struct { + // Extend parse.Options + *Options + // SourceFormat defines the structure of the Root repository. Only the Root // repository may be SourceFormatHierarchy; all others are implicitly // SourceFormatUnstructured. @@ -85,96 +68,27 @@ type RootOptions struct { NSControllerState *namespacecontroller.State } -type root struct { - *Options - *RootOptions -} - -var _ Parser = &root{} - -func (p *root) options() *Options { - return p.Options -} - -// parseSource implements the Parser interface -func (p *root) parseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) { - wantFiles := state.files - if p.SourceFormat == configsync.SourceFormatHierarchy { - // We're using hierarchical mode for the root repository, so ignore files - // outside of the allowed directories. - wantFiles = filesystem.FilterHierarchyFiles(state.syncDir, wantFiles) - } - - filePaths := reader.FilePaths{ - RootDir: state.syncDir, - PolicyDir: p.SyncDir, - Files: wantFiles, - } - - crds, err := p.declaredCRDs() - if err != nil { - return nil, err - } - builder := discovery.ScoperBuilder(p.DiscoveryInterface) - - klog.Infof("Parsing files from source dir: %s", state.syncDir.OSPath()) - objs, err := p.Parser.Parse(filePaths) - if err != nil { - return nil, err - } - - options := validate.Options{ - ClusterName: p.ClusterName, - SyncName: p.SyncName, - PolicyDir: p.SyncDir, - PreviousCRDs: crds, - BuildScoper: builder, - Converter: p.Converter, - // Enable API call so NamespaceSelector can talk to k8s-api-server. - AllowAPICall: true, - DynamicNSSelectorEnabled: p.DynamicNSSelectorEnabled, - NSControllerState: p.NSControllerState, - WebhookEnabled: p.WebhookEnabled, - FieldManager: configsync.FieldManager, - } - options = OptionsForScope(options, p.Scope) - - if p.SourceFormat == configsync.SourceFormatUnstructured { - if p.NamespaceStrategy == configsync.NamespaceStrategyImplicit { - options.Visitors = append(options.Visitors, p.addImplicitNamespaces) - } - objs, err = validate.Unstructured(ctx, p.Client, objs, options) - } else { - objs, err = validate.Hierarchical(objs, options) - } - - if status.HasBlockingErrors(err) { - return nil, err - } - - // Duplicated with namespace.go. - e := addAnnotationsAndLabels(objs, declared.RootScope, p.SyncName, p.sourceContext(), state.commit) - if e != nil { - err = status.Append(err, status.InternalErrorf("unable to add annotations and labels: %v", e)) - return nil, err - } - return objs, err +type rootSyncStatusClient struct { + options *Options + // mux prevents status update conflicts. + mux sync.Mutex } -// setSourceStatus implements the Parser interface -func (p *root) setSourceStatus(ctx context.Context, newStatus *SourceStatus) error { +// SetSourceStatus implements the Parser interface +func (p *rootSyncStatusClient) SetSourceStatus(ctx context.Context, newStatus *SourceStatus) error { p.mux.Lock() defer p.mux.Unlock() return p.setSourceStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus *SourceStatus, denominator int) error { +func (p *rootSyncStatusClient) setSourceStatusWithRetries(ctx context.Context, newStatus *SourceStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options var rs v1beta1.RootSync - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), &rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), &rs); err != nil { return status.APIServerError(err, "failed to get RootSync for parser") } @@ -208,7 +122,7 @@ func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus *Source cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync source status (total error count: %d, denominator: %d): %s.", rs.Status.Source.ErrorSummary.TotalCount, denominator, err) @@ -262,24 +176,25 @@ func setSourceStatusFields(source *v1beta1.SourceStatus, newStatus *SourceStatus source.LastUpdate = newStatus.LastUpdate } -func (p *root) setSourceAnnotations(ctx context.Context, commit string) error { +func (p *rootSyncStatusClient) SetSourceAnnotations(ctx context.Context, commit string) error { + opts := p.options rs := &v1beta1.RootSync{} rs.Namespace = configsync.ControllerNamespace - rs.Name = p.SyncName + rs.Name = opts.SyncName var patch string - if p.Options.SourceType == configsync.OciSource || - (p.Options.SourceType == configsync.HelmSource && strings.HasPrefix(p.Options.SourceRepo, "oci://")) { + if opts.SourceType == configsync.OciSource || + (opts.SourceType == configsync.HelmSource && strings.HasPrefix(opts.SourceRepo, "oci://")) { patch = fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, metadata.ImageToSyncAnnotationKey, - fmt.Sprintf("%s@sha256:%s", p.Options.SourceRepo, commit), + fmt.Sprintf("%s@sha256:%s", opts.SourceRepo, commit), ) } else { patch = fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, metadata.ImageToSyncAnnotationKey) } - err := p.Client.Patch(ctx, rs, + err := opts.Client.Patch(ctx, rs, client.RawPatch(types.MergePatchType, []byte(patch)), client.FieldOwner(configsync.FieldManager)) @@ -290,9 +205,10 @@ func (p *root) setSourceAnnotations(ctx context.Context, commit string) error { return nil } -func (p *root) setRequiresRendering(ctx context.Context, renderingRequired bool) error { +func (p *rootSyncStatusClient) SetRequiresRendering(ctx context.Context, renderingRequired bool) error { + opts := p.options rs := &v1beta1.RootSync{} - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), rs); err != nil { return status.APIServerError(err, "failed to get RootSync for parser") } newVal := strconv.FormatBool(renderingRequired) @@ -302,11 +218,11 @@ func (p *root) setRequiresRendering(ctx context.Context, renderingRequired bool) } existing := rs.DeepCopy() core.SetAnnotation(rs, metadata.RequiresRenderingAnnotationKey, newVal) - return p.Client.Patch(ctx, rs, client.MergeFrom(existing), client.FieldOwner(configsync.FieldManager)) + return opts.Client.Patch(ctx, rs, client.MergeFrom(existing), client.FieldOwner(configsync.FieldManager)) } -// setRenderingStatus implements the Parser interface -func (p *root) setRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error { +// SetRenderingStatus implements the Parser interface +func (p *rootSyncStatusClient) SetRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error { if oldStatus.Equals(newStatus) { return nil } @@ -316,13 +232,14 @@ func (p *root) setRenderingStatus(ctx context.Context, oldStatus, newStatus *Ren return p.setRenderingStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *root) setRenderingStatusWithRetries(ctx context.Context, newStatus *RenderingStatus, denominator int) error { +func (p *rootSyncStatusClient) setRenderingStatusWithRetries(ctx context.Context, newStatus *RenderingStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options var rs v1beta1.RootSync - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), &rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), &rs); err != nil { return status.APIServerError(err, "failed to get RootSync for parser") } @@ -356,7 +273,7 @@ func (p *root) setRenderingStatusWithRetries(ctx context.Context, newStatus *Ren cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync rendering status (total error count: %d, denominator: %d): %s.", rs.Status.Rendering.ErrorSummary.TotalCount, denominator, err) @@ -411,9 +328,10 @@ func setRenderingStatusFields(rendering *v1beta1.RenderingStatus, newStatus *Ren } // ReconcilerStatusFromCluster gets the RootSync sync status from the cluster. -func (p *root) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) { +func (p *rootSyncStatusClient) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) { + opts := p.options rs := &v1beta1.RootSync{} - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), rs); err != nil { if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) { return nil, nil } @@ -432,7 +350,7 @@ func (p *root) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStat } } - return reconcilerStatusFromRSyncStatus(rs.Status.Status, p.options().SourceType, syncing, syncingConditionLastUpdate), nil + return reconcilerStatusFromRSyncStatus(rs.Status.Status, opts.SourceType, syncing, syncingConditionLastUpdate), nil } func reconcilerStatusFromRSyncStatus(rsyncStatus v1beta1.Status, sourceType configsync.SourceType, syncing bool, syncingConditionLastUpdate metav1.Time) *ReconcilerStatus { @@ -543,19 +461,20 @@ func reconcilerStatusFromRSyncStatus(rsyncStatus v1beta1.Status, sourceType conf // SetSyncStatus implements the Parser interface // SetSyncStatus sets the RootSync sync status. // `errs` includes the errors encountered during the apply step; -func (p *root) SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error { +func (p *rootSyncStatusClient) SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error { p.mux.Lock() defer p.mux.Unlock() return p.setSyncStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *root) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStatus, denominator int) error { +func (p *rootSyncStatusClient) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options rs := &v1beta1.RootSync{} - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), rs); err != nil { return status.APIServerError(err, "failed to get RootSync") } @@ -604,7 +523,7 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStat cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) @@ -669,84 +588,6 @@ func summarizeErrorsForCommit(sourceStatus v1beta1.SourceStatus, renderingStatus return errorSources, errorSummary } -// addImplicitNamespaces hydrates the given FileObjects by injecting implicit -// namespaces into the list before returning it. Implicit namespaces are those -// that are declared by an object's metadata namespace field but are not present -// in the list. The implicit namespace is only added if it doesn't exist. -func (p *root) addImplicitNamespaces(objs []ast.FileObject) ([]ast.FileObject, status.MultiError) { - var errs status.MultiError - // namespaces will track the set of Namespaces we expect to exist, and those - // which actually do. - namespaces := orderedmap.NewOrderedMap[string, bool]() - - for _, o := range objs { - if o.GetObjectKind().GroupVersionKind().GroupKind() == kinds.Namespace().GroupKind() { - namespaces.Set(o.GetName(), true) - } else if o.GetNamespace() != "" { - if _, found := namespaces.Get(o.GetNamespace()); !found { - // If unset, this ensures the key exists and is false. - // Otherwise it has no impact. - namespaces.Set(o.GetNamespace(), false) - } - } - } - - for e := namespaces.Front(); e != nil; e = e.Next() { - ns, isDeclared := e.Key, e.Value - // Do not treat config-management-system as an implicit namespace for multi-sync support. - // Otherwise, the namespace will become a managed resource, and will cause conflict among multiple RootSyncs. - if isDeclared || ns == configsync.ControllerNamespace { - continue - } - existingNs := &corev1.Namespace{} - err := p.Client.Get(context.Background(), types.NamespacedName{Name: ns}, existingNs) - if err != nil && !apierrors.IsNotFound(err) { - errs = status.Append(errs, fmt.Errorf("unable to check the existence of the implicit namespace %q: %w", ns, err)) - continue - } - - existingNs.SetGroupVersionKind(kinds.Namespace()) - // If the namespace already exists and not self-managed, do not add it as an implicit namespace. - // This is to avoid conflicts caused by multiple Root reconcilers managing the same implicit namespace. - if err == nil && !diff.IsManager(p.Scope, p.SyncName, existingNs) { - continue - } - - // Add the implicit namespace if it doesn't exist, or if it is managed by itself. - // If it is a self-managed namespace, still add it to the object list. Otherwise, - // it will be pruned because it is no longer in the inventory list. - u := &unstructured.Unstructured{} - u.SetGroupVersionKind(kinds.Namespace()) - u.SetName(ns) - // We do NOT want to delete theses implicit Namespaces when the resources - // inside them are removed from the repo. We don't know when it is safe to remove - // the implicit namespaces. An implicit namespace may already exist in the - // cluster. Deleting it will cause other unmanaged resources in that namespace - // being deleted. - // - // Adding the LifecycleDeleteAnnotation is to prevent the applier from deleting - // the implicit namespace when the namespaced config is removed from the repo. - // Note that if the user later declares the - // Namespace without this annotation, the annotation is removed as expected. - u.SetAnnotations(map[string]string{common.LifecycleDeleteAnnotation: common.PreventDeletion}) - objs = append(objs, ast.NewFileObject(u, cmpath.RelativeOS(""))) - } - - return objs, errs -} - -// SyncErrors returns all the sync errors, including remediator errors, -// validation errors, applier errors, and watch update errors. -// SyncErrors implements the Parser interface -func (p *root) SyncErrors() status.MultiError { - return p.SyncErrorCache.Errors() -} - -// K8sClient implements the Parser interface -func (p *root) K8sClient() client.Client { - return p.Client -} - // prependRootSyncRemediatorStatus adds the conflict error detected by the remediator to the front of the sync errors. func prependRootSyncRemediatorStatus(ctx context.Context, c client.Client, syncName string, conflictErrs []status.ManagementConflictError, denominator int) error { if denominator <= 0 { diff --git a/pkg/parse/root_sync_client_test.go b/pkg/parse/root_sync_client_test.go new file mode 100644 index 0000000000..6fc31ed9b6 --- /dev/null +++ b/pkg/parse/root_sync_client_test.go @@ -0,0 +1,577 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + "kpt.dev/configsync/pkg/api/configsync/v1beta1" + "kpt.dev/configsync/pkg/applier" + "kpt.dev/configsync/pkg/core" + "kpt.dev/configsync/pkg/core/k8sobjects" + "kpt.dev/configsync/pkg/metadata" + "kpt.dev/configsync/pkg/rootsync" + "kpt.dev/configsync/pkg/status" + syncertest "kpt.dev/configsync/pkg/syncer/syncertest/fake" + "sigs.k8s.io/cli-utils/pkg/testutil" +) + +func TestSummarizeErrors(t *testing.T) { + testCases := []struct { + name string + sourceStatus v1beta1.SourceStatus + renderingStatus v1beta1.RenderingStatus + syncStatus v1beta1.SyncStatus + expectedErrorSources []v1beta1.ErrorSource + expectedErrorSummary *v1beta1.ErrorSummary + }{ + { + name: "both sourceStatus and syncStatus are empty", + sourceStatus: v1beta1.SourceStatus{}, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{}, + expectedErrorSources: nil, + expectedErrorSummary: &v1beta1.ErrorSummary{}, + }, + { + name: "sourceStatus is not empty (no trucation), syncStatus is empty", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{}, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "sourceStatus is not empty and trucates errors, syncStatus is empty", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{}, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "sourceStatus is empty, syncStatus is not empty (no trucation)", + sourceStatus: v1beta1.SourceStatus{}, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "sourceStatus is empty, syncStatus is not empty and trucates errors", + sourceStatus: v1beta1.SourceStatus{}, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "neither sourceStatus nor syncStatus is empty or trucates errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 4, + Truncated: false, + ErrorCountAfterTruncation: 4, + }, + }, + { + name: "neither sourceStatus nor syncStatus is empty, sourceStatus trucates errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 102, + Truncated: true, + ErrorCountAfterTruncation: 4, + }, + }, + { + name: "neither sourceStatus nor syncStatus is empty, syncStatus trucates errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 102, + Truncated: true, + ErrorCountAfterTruncation: 4, + }, + }, + { + name: "neither sourceStatus nor syncStatus is empty, both trucates errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 200, + Truncated: true, + ErrorCountAfterTruncation: 4, + }, + }, + { + name: "source, rendering, and sync errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1068", ErrorMessage: "1068-error-message"}, + {Code: "2015", ErrorMessage: "2015-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.RenderingError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 6, + Truncated: false, + ErrorCountAfterTruncation: 6, + }, + }, + { + name: "source, rendering, and sync errors, all truncated", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1068", ErrorMessage: "1068-error-message"}, + {Code: "2015", ErrorMessage: "2015-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.RenderingError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 300, + Truncated: true, + ErrorCountAfterTruncation: 6, + }, + }, + { + name: "source errors from newer commit", + sourceStatus: v1beta1.SourceStatus{ + Commit: "newer-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 1, + Truncated: false, + ErrorCountAfterTruncation: 1, + }, + }, + renderingStatus: v1beta1.RenderingStatus{ + Commit: "older-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "1068", ErrorMessage: "1068-error-message"}, + {Code: "2015", ErrorMessage: "2015-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + syncStatus: v1beta1.SyncStatus{ + Commit: "older-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + {Code: "2009", ErrorMessage: "another error"}, + {Code: "2009", ErrorMessage: "yet-another error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 4, + Truncated: false, + ErrorCountAfterTruncation: 4, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.RenderingError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 6, + Truncated: false, + ErrorCountAfterTruncation: 6, + }, + }, + { + name: "rendering errors from newer commit", + sourceStatus: v1beta1.SourceStatus{ + Commit: "newer-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 1, + Truncated: false, + ErrorCountAfterTruncation: 1, + }, + }, + renderingStatus: v1beta1.RenderingStatus{ + Commit: "newer-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "1068", ErrorMessage: "1068-error-message"}, + {Code: "2015", ErrorMessage: "2015-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + syncStatus: v1beta1.SyncStatus{ + Commit: "older-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + {Code: "2009", ErrorMessage: "another error"}, + {Code: "2009", ErrorMessage: "yet-another error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 4, + Truncated: false, + ErrorCountAfterTruncation: 4, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 4, + Truncated: false, + ErrorCountAfterTruncation: 4, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gotErrorSources, gotErrorSummary := summarizeErrorsForCommit(tc.sourceStatus, tc.renderingStatus, tc.syncStatus, tc.syncStatus.Commit) + if diff := cmp.Diff(tc.expectedErrorSources, gotErrorSources); diff != "" { + t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSources, tc.expectedErrorSources) + } + if diff := cmp.Diff(tc.expectedErrorSummary, gotErrorSummary); diff != "" { + t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSummary, tc.expectedErrorSummary) + } + }) + } +} + +func TestPrependRootSyncRemediatorStatus(t *testing.T) { + const rootSyncName = "my-root-sync" + const thisManager = "this-manager" + const otherManager = "other-manager" + conflictingObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, otherManager)) + conflictAB := status.ManagementConflictErrorWrap(conflictingObject, thisManager) + invertedObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, thisManager)) + conflictBA := status.ManagementConflictErrorWrap(invertedObject, otherManager) + conflictABInverted := conflictAB.Invert() + // KptManagementConflictError is created with the desired object, not the current live object. + // So its manager annotation matches the reconciler doing the applying. + // This is the opposite of the objects passed to ManagementConflictErrorWrap. + kptConflictError := applier.KptManagementConflictError(invertedObject) + // Assert the value of each error message to make each value clear + const conflictABMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with the "other-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. + +metadata.name: foo-ns +group: +version: v1 +kind: Namespace + +For more information, see https://g.co/cloud/acm-errors#knv1060` + const conflictBAMessage = `KNV1060: The "other-manager" reconciler detected a management conflict with the "this-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. + +metadata.name: foo-ns +group: +version: v1 +kind: Namespace + +For more information, see https://g.co/cloud/acm-errors#knv1060` + const kptConflictMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with another reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. + +metadata.name: foo-ns +group: +version: v1 +kind: Namespace + +For more information, see https://g.co/cloud/acm-errors#knv1060` + testutil.AssertEqual(t, conflictABMessage, conflictAB.ToCSE().ErrorMessage) + testutil.AssertEqual(t, conflictBAMessage, conflictBA.ToCSE().ErrorMessage) + testutil.AssertEqual(t, kptConflictMessage, kptConflictError.ToCSE().ErrorMessage) + testutil.AssertEqual(t, conflictBA, conflictABInverted) + testCases := map[string]struct { + thisSyncErrors []v1beta1.ConfigSyncError + expectedErrors []v1beta1.ConfigSyncError + }{ + "empty errors": { + thisSyncErrors: []v1beta1.ConfigSyncError{}, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + }, + }, + "unchanged conflict error": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + }, + }, + "prepend conflict error": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + {ErrorMessage: "foo"}, + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + {ErrorMessage: "foo"}, + }, + }, + "dedupe AB and BA errors": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + conflictBA.ToCSE(), + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictBA.ToCSE(), + }, + }, + "dedupe AB and AB inverted errors": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + conflictABInverted.ToCSE(), + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictABInverted.ToCSE(), + }, + }, + // TODO: De-dupe ManagementConflictErrorWrap & KptManagementConflictError + // These are currently de-duped locally by the conflict handler, + // but not remotely by prependRootSyncRemediatorStatus. + "prepend KptManagementConflictError": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + kptConflictError.ToCSE(), + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + kptConflictError.ToCSE(), + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + rootSync := k8sobjects.RootSyncObjectV1Beta1(rootSyncName) + rootSync.Status.Sync.Errors = tc.thisSyncErrors + fakeClient := syncertest.NewClient(t, core.Scheme, rootSync) + ctx := context.Background() + err := prependRootSyncRemediatorStatus(ctx, fakeClient, rootSyncName, + []status.ManagementConflictError{conflictAB}, defaultDenominator) + require.NoError(t, err) + var updatedRootSync v1beta1.RootSync + err = fakeClient.Get(ctx, rootsync.ObjectKey(rootSyncName), &updatedRootSync) + require.NoError(t, err) + testutil.AssertEqual(t, tc.expectedErrors, updatedRootSync.Status.Sync.Errors) + }) + } +} diff --git a/pkg/parse/root_sync_parser.go b/pkg/parse/root_sync_parser.go new file mode 100644 index 0000000000..6291b0a86b --- /dev/null +++ b/pkg/parse/root_sync_parser.go @@ -0,0 +1,177 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + "fmt" + + "github.com/elliotchance/orderedmap/v2" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/api/configsync" + "kpt.dev/configsync/pkg/declared" + "kpt.dev/configsync/pkg/diff" + "kpt.dev/configsync/pkg/importer/analyzer/ast" + "kpt.dev/configsync/pkg/importer/filesystem" + "kpt.dev/configsync/pkg/importer/filesystem/cmpath" + "kpt.dev/configsync/pkg/importer/reader" + "kpt.dev/configsync/pkg/kinds" + "kpt.dev/configsync/pkg/status" + "kpt.dev/configsync/pkg/util/discovery" + "kpt.dev/configsync/pkg/validate" + "sigs.k8s.io/cli-utils/pkg/common" +) + +type rootSyncParser struct { + options *RootOptions +} + +// ParseSource implements the Parser interface +func (p *rootSyncParser) ParseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) { + opts := p.options + + wantFiles := state.files + if opts.SourceFormat == configsync.SourceFormatHierarchy { + // We're using hierarchical mode for the root repository, so ignore files + // outside of the allowed directories. + wantFiles = filesystem.FilterHierarchyFiles(state.syncDir, wantFiles) + } + + filePaths := reader.FilePaths{ + RootDir: state.syncDir, + PolicyDir: p.options.SyncDir, + Files: wantFiles, + } + + crds, err := opts.DeclaredResources.DeclaredCRDs() + if err != nil { + return nil, err + } + builder := discovery.ScoperBuilder(opts.DiscoveryClient) + + klog.Infof("Parsing files from source dir: %s", state.syncDir.OSPath()) + objs, err := opts.ConfigParser.Parse(filePaths) + if err != nil { + return nil, err + } + + options := validate.Options{ + ClusterName: opts.ClusterName, + SyncName: opts.SyncName, + PolicyDir: opts.SyncDir, + PreviousCRDs: crds, + BuildScoper: builder, + Converter: opts.Converter, + // Enable API call so NamespaceSelector can talk to k8s-api-server. + AllowAPICall: true, + DynamicNSSelectorEnabled: opts.DynamicNSSelectorEnabled, + NSControllerState: opts.NSControllerState, + WebhookEnabled: opts.WebhookEnabled, + FieldManager: configsync.FieldManager, + } + options = OptionsForScope(options, opts.Scope) + + if opts.SourceFormat == configsync.SourceFormatUnstructured { + if opts.NamespaceStrategy == configsync.NamespaceStrategyImplicit { + options.Visitors = append(options.Visitors, p.addImplicitNamespaces) + } + objs, err = validate.Unstructured(ctx, opts.Client, objs, options) + } else { + objs, err = validate.Hierarchical(objs, options) + } + + if status.HasBlockingErrors(err) { + return nil, err + } + + // Duplicated with namespace.go. + e := addAnnotationsAndLabels(objs, declared.RootScope, opts.SyncName, opts.Files.sourceContext(), state.commit) + if e != nil { + err = status.Append(err, status.InternalErrorf("unable to add annotations and labels: %v", e)) + return nil, err + } + return objs, err +} + +// addImplicitNamespaces hydrates the given FileObjects by injecting implicit +// namespaces into the list before returning it. Implicit namespaces are those +// that are declared by an object's metadata namespace field but are not present +// in the list. The implicit namespace is only added if it doesn't exist. +func (p *rootSyncParser) addImplicitNamespaces(objs []ast.FileObject) ([]ast.FileObject, status.MultiError) { + opts := p.options + var errs status.MultiError + // namespaces will track the set of Namespaces we expect to exist, and those + // which actually do. + namespaces := orderedmap.NewOrderedMap[string, bool]() + + for _, o := range objs { + if o.GetObjectKind().GroupVersionKind().GroupKind() == kinds.Namespace().GroupKind() { + namespaces.Set(o.GetName(), true) + } else if o.GetNamespace() != "" { + if _, found := namespaces.Get(o.GetNamespace()); !found { + // If unset, this ensures the key exists and is false. + // Otherwise it has no impact. + namespaces.Set(o.GetNamespace(), false) + } + } + } + + for e := namespaces.Front(); e != nil; e = e.Next() { + ns, isDeclared := e.Key, e.Value + // Do not treat config-management-system as an implicit namespace for multi-sync support. + // Otherwise, the namespace will become a managed resource, and will cause conflict among multiple RootSyncs. + if isDeclared || ns == configsync.ControllerNamespace { + continue + } + existingNs := &corev1.Namespace{} + err := opts.Client.Get(context.Background(), types.NamespacedName{Name: ns}, existingNs) + if err != nil && !apierrors.IsNotFound(err) { + errs = status.Append(errs, fmt.Errorf("unable to check the existence of the implicit namespace %q: %w", ns, err)) + continue + } + + existingNs.SetGroupVersionKind(kinds.Namespace()) + // If the namespace already exists and not self-managed, do not add it as an implicit namespace. + // This is to avoid conflicts caused by multiple Root reconcilers managing the same implicit namespace. + if err == nil && !diff.IsManager(opts.Scope, opts.SyncName, existingNs) { + continue + } + + // Add the implicit namespace if it doesn't exist, or if it is managed by itself. + // If it is a self-managed namespace, still add it to the object list. Otherwise, + // it will be pruned because it is no longer in the inventory list. + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(kinds.Namespace()) + u.SetName(ns) + // We do NOT want to delete theses implicit Namespaces when the resources + // inside them are removed from the repo. We don't know when it is safe to remove + // the implicit namespaces. An implicit namespace may already exist in the + // cluster. Deleting it will cause other unmanaged resources in that namespace + // being deleted. + // + // Adding the LifecycleDeleteAnnotation is to prevent the applier from deleting + // the implicit namespace when the namespaced config is removed from the repo. + // Note that if the user later declares the + // Namespace without this annotation, the annotation is removed as expected. + u.SetAnnotations(map[string]string{common.LifecycleDeleteAnnotation: common.PreventDeletion}) + objs = append(objs, ast.NewFileObject(u, cmpath.RelativeOS(""))) + } + + return objs, errs +} diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 66cd3adf57..e29a449c0b 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -74,22 +74,23 @@ type RunResult struct { } // RunFunc is the function signature of the function that starts the parse-apply-watch loop -type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult +type RunFunc func(ctx context.Context, r Reconciler, trigger string) RunResult // DefaultRunFunc is the default implementation for RunOpts.RunFunc. -func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult { +func DefaultRunFunc(ctx context.Context, r Reconciler, trigger string) RunResult { + opts := r.Options() result := RunResult{} + state := r.ReconcilerState() // Initialize status // TODO: Populate status from RSync status if state.status == nil { - reconcilerStatus, err := p.ReconcilerStatusFromCluster(ctx) + reconcilerStatus, err := r.SyncStatusClient().ReconcilerStatusFromCluster(ctx) if err != nil { state.invalidate(status.Append(nil, err)) return result } state.status = reconcilerStatus } - opts := p.options() var syncDir cmpath.Absolute gs := &SourceStatus{} // pull the source commit and directory with retries within 5 minutes. @@ -99,7 +100,7 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc // If updating the object fails, it's likely due to a signature verification error // from the webhook. In this case, add the error as a source error. if gs.Errs == nil { - err := p.setSourceAnnotations(ctx, gs.Commit) + err := r.SyncStatusClient().SetSourceAnnotations(ctx, gs.Commit) if err != nil { gs.Errs = status.Append(gs.Errs, status.SourceError.Wrap(err).Build()) } @@ -112,12 +113,12 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc // Otherwise, parsing errors may be overwritten. // TODO: Decouple fetch & parse stages to use different status fields if gs.Errs != nil || state.status.SourceStatus == nil || gs.Commit != state.status.SourceStatus.Commit { - gs.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + gs.LastUpdate = nowMeta(opts) var setSourceStatusErr error // Only update the source status if it changed if state.status.needToSetSourceStatus(gs) { klog.V(3).Info("Updating source status (after fetch)") - setSourceStatusErr = p.setSourceStatus(ctx, gs) + setSourceStatusErr = r.SyncStatusClient().SetSourceStatus(ctx, gs) // If there were errors publishing the source status, stop, log them, and retry later if setSourceStatusErr != nil { // If there were fetch errors, log those too @@ -149,9 +150,9 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc _, err := os.Stat(doneFilePath) if os.IsNotExist(err) || (err == nil && hydrate.DoneCommit(doneFilePath) != gs.Commit) { rs.Message = RenderingInProgress - rs.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + rs.LastUpdate = nowMeta(opts) klog.V(3).Info("Updating rendering status (before parse)") - setRenderingStatusErr := p.setRenderingStatus(ctx, state.status.RenderingStatus, rs) + setRenderingStatusErr := r.SyncStatusClient().SetRenderingStatus(ctx, state.status.RenderingStatus, rs) if setRenderingStatusErr == nil { state.reset() state.status.RenderingStatus = rs @@ -164,10 +165,10 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc } if err != nil { rs.Message = RenderingFailed - rs.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + rs.LastUpdate = nowMeta(opts) rs.Errs = status.InternalHydrationError(err, "unable to read the done file: %s", doneFilePath) klog.V(3).Info("Updating rendering status (before parse)") - setRenderingStatusErr := p.setRenderingStatus(ctx, state.status.RenderingStatus, rs) + setRenderingStatusErr := r.SyncStatusClient().SetRenderingStatus(ctx, state.status.RenderingStatus, rs) if setRenderingStatusErr == nil { state.status.RenderingStatus = rs state.status.SyncingConditionLastUpdate = rs.LastUpdate @@ -190,7 +191,7 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc commit: gs.Commit, syncDir: syncDir, } - if errs := read(ctx, p, trigger, state, ps); errs != nil { + if errs := r.Read(ctx, trigger, ps); errs != nil { state.invalidate(errs) return result } @@ -210,7 +211,7 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc return result } - errs := parseAndUpdate(ctx, p, trigger, state) + errs := r.ParseAndUpdate(ctx, trigger) if errs != nil { state.invalidate(errs) return result @@ -222,24 +223,26 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc return result } -// read reads config files from source if no rendering is needed, or from hydrated output if rendering is done. -// It also updates the .status.rendering and .status.source fields. -func read(ctx context.Context, p Parser, trigger string, state *reconcilerState, sourceState *sourceState) status.MultiError { - opts := p.options() - hydrationStatus, sourceStatus := readFromSource(ctx, p, trigger, state, sourceState) +// Read source manifests from the shared source volume. +// Waits for rendering, if enabled. +// Updates the RSync status (source, rendering, and syncing condition). +func (r *reconciler) Read(ctx context.Context, trigger string, sourceState *sourceState) status.MultiError { + opts := r.Options() + state := r.ReconcilerState() + hydrationStatus, sourceStatus := r.readFromSource(ctx, trigger, sourceState) if opts.RenderingEnabled != hydrationStatus.RequiresRendering { // the reconciler is misconfigured. set the annotation so that the reconciler-manager // will recreate this reconciler with the correct configuration. - if err := p.setRequiresRendering(ctx, hydrationStatus.RequiresRendering); err != nil { + if err := r.SyncStatusClient().SetRequiresRendering(ctx, hydrationStatus.RequiresRendering); err != nil { hydrationStatus.Errs = status.Append(hydrationStatus.Errs, status.InternalHydrationError(err, "error setting %s annotation", metadata.RequiresRenderingAnnotationKey)) } } - hydrationStatus.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + hydrationStatus.LastUpdate = nowMeta(opts) // update the rendering status before source status because the parser needs to // read and parse the configs after rendering is done and there might have errors. klog.V(3).Info("Updating rendering status (after parse)") - setRenderingStatusErr := p.setRenderingStatus(ctx, state.status.RenderingStatus, hydrationStatus) + setRenderingStatusErr := r.SyncStatusClient().SetRenderingStatus(ctx, state.status.RenderingStatus, hydrationStatus) if setRenderingStatusErr == nil { state.status.RenderingStatus = hydrationStatus state.status.SyncingConditionLastUpdate = hydrationStatus.LastUpdate @@ -255,11 +258,11 @@ func read(ctx context.Context, p Parser, trigger string, state *reconcilerState, // Only call `setSourceStatus` if `readFromSource` fails. // If `readFromSource` succeeds, `parse` may still fail. - sourceStatus.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + sourceStatus.LastUpdate = nowMeta(opts) var setSourceStatusErr error if state.status.needToSetSourceStatus(sourceStatus) { klog.V(3).Info("Updating source status (after parse)") - setSourceStatusErr := p.setSourceStatus(ctx, sourceStatus) + setSourceStatusErr := r.SyncStatusClient().SetSourceStatus(ctx, sourceStatus) if setSourceStatusErr == nil { state.status.SourceStatus = sourceStatus state.status.SyncingConditionLastUpdate = sourceStatus.LastUpdate @@ -272,8 +275,8 @@ func read(ctx context.Context, p Parser, trigger string, state *reconcilerState, // parseHydrationState reads from the file path which the hydration-controller // container writes to. It checks if the hydrated files are ready and returns // a renderingStatus. -func parseHydrationState(p Parser, srcState *sourceState, hydrationStatus *RenderingStatus) (*sourceState, *RenderingStatus) { - opts := p.options() +func (r *reconciler) parseHydrationState(srcState *sourceState, hydrationStatus *RenderingStatus) (*sourceState, *RenderingStatus) { + opts := r.Options() if !opts.RenderingEnabled { hydrationStatus.Message = RenderingSkipped return srcState, hydrationStatus @@ -315,8 +318,9 @@ func parseHydrationState(p Parser, srcState *sourceState, hydrationStatus *Rende // readFromSource reads the source or hydrated configs, checks whether the sourceState in // the cache is up-to-date. If the cache is not up-to-date, reads all the source or hydrated files. // readFromSource returns the rendering status and source status. -func readFromSource(ctx context.Context, p Parser, trigger string, recState *reconcilerState, srcState *sourceState) (*RenderingStatus, *SourceStatus) { - opts := p.options() +func (r *reconciler) readFromSource(ctx context.Context, trigger string, srcState *sourceState) (*RenderingStatus, *SourceStatus) { + opts := r.Options() + recState := r.ReconcilerState() start := opts.Clock.Now() hydrationStatus := &RenderingStatus{ @@ -329,7 +333,7 @@ func readFromSource(ctx context.Context, p Parser, trigger string, recState *rec Commit: srcState.commit, } - srcState, hydrationStatus = parseHydrationState(p, srcState, hydrationStatus) + srcState, hydrationStatus = r.parseHydrationState(srcState, hydrationStatus) if hydrationStatus.Errs != nil { return hydrationStatus, srcStatus } @@ -366,16 +370,17 @@ func readFromSource(ctx context.Context, p Parser, trigger string, recState *rec return hydrationStatus, srcStatus } -func parseSource(ctx context.Context, p Parser, trigger string, state *reconcilerState) status.MultiError { +func (r *reconciler) parseSource(ctx context.Context, trigger string) status.MultiError { + state := r.ReconcilerState() if state.cache.parserResultUpToDate() { return nil } - opts := p.options() + opts := r.Options() start := opts.Clock.Now() var sourceErrs status.MultiError - objs, errs := p.parseSource(ctx, state.cache.source) + objs, errs := r.Parser().ParseSource(ctx, state.cache.source) if !opts.WebhookEnabled { klog.V(3).Infof("Removing %s annotation as Admission Webhook is disabled", metadata.DeclaredFieldsKey) for _, obj := range objs { @@ -387,7 +392,7 @@ func parseSource(ctx context.Context, p Parser, trigger string, state *reconcile state.cache.setParserResult(objs, sourceErrs) if !status.HasBlockingErrors(sourceErrs) && opts.WebhookEnabled { - err := webhookconfiguration.Update(ctx, opts.k8sClient(), opts.discoveryClient(), objs, + err := webhookconfiguration.Update(ctx, opts.Client, opts.DiscoveryClient, objs, client.FieldOwner(configsync.FieldManager)) if err != nil { // Don't block if updating the admission webhook fails. @@ -404,21 +409,24 @@ func parseSource(ctx context.Context, p Parser, trigger string, state *reconcile return sourceErrs } -func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconcilerState) status.MultiError { - opts := p.options() +// ParseAndUpdate parses objects from the source manifests, validates them, and +// then syncs them to the cluster with the Updater. +func (r *reconciler) ParseAndUpdate(ctx context.Context, trigger string) status.MultiError { + opts := r.Options() + state := r.ReconcilerState() klog.V(3).Info("Parser starting...") - sourceErrs := parseSource(ctx, p, trigger, state) + sourceErrs := r.parseSource(ctx, trigger) klog.V(3).Info("Parser stopped") newSourceStatus := &SourceStatus{ Spec: state.cache.source.spec, Commit: state.cache.source.commit, Errs: sourceErrs, - LastUpdate: metav1.Time{Time: opts.Clock.Now()}, + LastUpdate: nowMeta(opts), } if state.status.needToSetSourceStatus(newSourceStatus) { klog.V(3).Info("Updating source status (after parse)") - if err := p.setSourceStatus(ctx, newSourceStatus); err != nil { - // If `p.setSourceStatus` fails, we terminate the reconciliation. + if err := r.SyncStatusClient().SetSourceStatus(ctx, newSourceStatus); err != nil { + // If `r.SetSourceStatus` fails, we terminate the reconciliation. // If we call `update` in this case and `update` succeeds, `Status.Source.Commit` would end up be older // than `Status.Sync.Commit`. return status.Append(sourceErrs, err) @@ -434,7 +442,7 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc // Create a new context with its cancellation function. ctxForUpdateSyncStatus, cancel := context.WithCancel(context.Background()) - go updateSyncStatusPeriodically(ctxForUpdateSyncStatus, p, state) + go r.updateSyncStatusPeriodically(ctxForUpdateSyncStatus) klog.V(3).Info("Updater starting...") start := opts.Clock.Now() @@ -448,8 +456,16 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc // SyncErrors include errors from both the Updater and Remediator klog.V(3).Info("Updating sync status (after sync)") - syncErrs := p.SyncErrors() - if err := setSyncStatus(ctx, p, state, state.status.SourceStatus.Spec, false, state.cache.source.commit, syncErrs); err != nil { + syncErrs := r.ReconcilerState().SyncErrors() + // Copy the spec and commit from the source status + syncStatus := &SyncStatus{ + Spec: state.status.SourceStatus.Spec, + Syncing: false, + Commit: state.cache.source.commit, + Errs: syncErrs, + LastUpdate: nowMeta(opts), + } + if err := r.SetSyncStatus(ctx, syncStatus); err != nil { syncErrs = status.Append(syncErrs, err) } @@ -457,21 +473,14 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc return status.Append(sourceErrs, syncErrs) } -// setSyncStatus updates `.status.sync` and the Syncing condition, if needed, +// SetSyncStatus updates `.status.sync` and the Syncing condition, if needed, // as well as `state.syncStatus` and `state.syncingConditionLastUpdate` if // the update is successful. -func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, spec SourceSpec, syncing bool, commit string, syncErrs status.MultiError) error { - options := p.options() +func (r *reconciler) SetSyncStatus(ctx context.Context, newSyncStatus *SyncStatus) error { + state := r.ReconcilerState() // Update the RSync status, if necessary - newSyncStatus := &SyncStatus{ - Spec: spec, - Syncing: syncing, - Commit: commit, - Errs: syncErrs, - LastUpdate: metav1.Time{Time: options.Clock.Now()}, - } if state.status.needToSetSyncStatus(newSyncStatus) { - if err := p.SetSyncStatus(ctx, newSyncStatus); err != nil { + if err := r.SyncStatusClient().SetSyncStatus(ctx, newSyncStatus); err != nil { return err } state.status.SyncStatus = newSyncStatus @@ -479,7 +488,8 @@ func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, spec S } // Report conflict errors to the remote manager, if it's a RootSync. - if err := reportRootSyncConflicts(ctx, p.K8sClient(), options.ManagementConflicts()); err != nil { + opts := r.Options() + if err := reportRootSyncConflicts(ctx, opts.Client, opts.ManagementConflicts()); err != nil { return fmt.Errorf("failed to report remote conflicts: %w", err) } return nil @@ -487,8 +497,9 @@ func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, spec S // updateSyncStatusPeriodically update the sync status periodically until the // cancellation function of the context is called. -func updateSyncStatusPeriodically(ctx context.Context, p Parser, state *reconcilerState) { - opts := p.options() +func (r *reconciler) updateSyncStatusPeriodically(ctx context.Context) { + opts := r.Options() + state := r.ReconcilerState() klog.V(3).Info("Periodic sync status updates starting...") updatePeriod := opts.StatusUpdatePeriod updateTimer := opts.Clock.NewTimer(updatePeriod) @@ -502,7 +513,15 @@ func updateSyncStatusPeriodically(ctx context.Context, p Parser, state *reconcil case <-updateTimer.C(): klog.V(3).Info("Updating sync status (periodic while syncing)") - if err := setSyncStatus(ctx, p, state, state.status.SourceStatus.Spec, true, state.cache.source.commit, p.SyncErrors()); err != nil { + // Copy the spec and commit from the source status + syncStatus := &SyncStatus{ + Spec: state.status.SourceStatus.Spec, + Syncing: true, + Commit: state.cache.source.commit, + Errs: r.ReconcilerState().SyncErrors(), + LastUpdate: nowMeta(opts), + } + if err := r.SetSyncStatus(ctx, syncStatus); err != nil { klog.Warningf("failed to update sync status: %v", err) } @@ -541,3 +560,7 @@ func reportRootSyncConflicts(ctx context.Context, k8sClient client.Client, confl } return nil } + +func nowMeta(opts *ReconcilerOptions) metav1.Time { + return metav1.Time{Time: opts.Clock.Now()} +} diff --git a/pkg/parse/run_test.go b/pkg/parse/run_test.go index ac6e8e9e76..6fc0cbf4fe 100644 --- a/pkg/parse/run_test.go +++ b/pkg/parse/run_test.go @@ -61,41 +61,56 @@ const ( symLink = "rev" ) -func newParser(t *testing.T, clock clock.Clock, fakeClient client.Client, fs FileSource, renderingEnabled bool) Parser { - parser := &root{} +func newRootReconciler(t *testing.T, clock clock.Clock, fakeClient client.Client, fs FileSource, renderingEnabled bool) *reconciler { converter, err := openapitest.ValueConverterForTest() if err != nil { t.Fatal(err) } - - parser.RootOptions = &RootOptions{ + state := &ReconcilerState{ + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + } + opts := &Options{ + Clock: clock, + ConfigParser: filesystem.NewParser(&reader.File{}), + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: fakeClient, + DiscoveryClient: syncerFake.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + Converter: converter, + Files: Files{FileSource: fs}, + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, SourceFormat: configsync.SourceFormatUnstructured, } - parser.Options = &Options{ - Clock: clock, - Parser: filesystem.NewParser(&reader.File{}), - StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: fakeClient, - DiscoveryInterface: syncerFake.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Converter: converter, - Files: Files{FileSource: fs}, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, Remediator: &remediatorfake.Remediator{}, Applier: &applierfake.Applier{ ApplyOutputs: []applierfake.ApplierOutputs{ {}, // One Apply call, no errors }, }, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + SyncErrorCache: state.syncErrorCache, }, - RenderingEnabled: renderingEnabled, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: renderingEnabled, + } + return &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, } - - return parser } func createRootDir(rootDir, commit string) error { @@ -752,13 +767,12 @@ func TestRun(t *testing.T) { SourceBranch: fileSource.SourceBranch, } fakeClient := syncerFake.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := newParser(t, fakeClock, fakeClient, fs, tc.renderingEnabled) - state := &reconcilerState{} + reconciler := newRootReconciler(t, fakeClock, fakeClient, fs, tc.renderingEnabled) t.Logf("start running test at %v", time.Now()) - result := DefaultRunFunc(context.Background(), parser, triggerReimport, state) + result := DefaultRunFunc(context.Background(), reconciler, triggerReimport) assert.Equal(t, tc.expectedSourceChanged, result.SourceChanged) - assert.Equal(t, tc.needRetry, state.cache.needToRetry) + assert.Equal(t, tc.needRetry, reconciler.ReconcilerState().cache.needToRetry) rs := &v1beta1.RootSync{} err = fakeClient.Get(context.Background(), rootsync.ObjectKey(rootSyncName), rs) diff --git a/pkg/parse/source_test.go b/pkg/parse/source_test.go index badf45167f..40f124c460 100644 --- a/pkg/parse/source_test.go +++ b/pkg/parse/source_test.go @@ -24,17 +24,10 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/utils/clock" - "kpt.dev/configsync/pkg/api/configsync" - "kpt.dev/configsync/pkg/core" - "kpt.dev/configsync/pkg/core/k8sobjects" - "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/hydrate" "kpt.dev/configsync/pkg/importer/filesystem/cmpath" ft "kpt.dev/configsync/pkg/importer/filesystem/filesystemtest" - "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/status" - syncertest "kpt.dev/configsync/pkg/syncer/syncertest/fake" ) var originCommit = "1234567890abcde" @@ -106,27 +99,12 @@ func TestReadConfigFiles(t *testing.T) { files: nil, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - }, - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - }, - } + files := &Files{} // set the necessary FileSource of parser - parser.SourceDir = symDir + files.SourceDir = symDir - err = parser.readConfigFiles(srcState) + err = files.readConfigFiles(srcState) assert.Equal(t, tc.wantedErr, err) }) } @@ -267,16 +245,11 @@ func TestReadHydratedDirWithRetry(t *testing.T) { commit: originCommit, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Files: Files{ - FileSource: FileSource{ - HydratedRoot: hydratedRoot, - HydratedLink: symLink, - SyncDir: cmpath.RelativeOS(tc.syncDir), - }, - }, + files := Files{ + FileSource: FileSource{ + HydratedRoot: hydratedRoot, + HydratedLink: symLink, + SyncDir: cmpath.RelativeOS(tc.syncDir), }, } @@ -286,8 +259,8 @@ func TestReadHydratedDirWithRetry(t *testing.T) { } t.Logf("start calling readHydratedDirWithRetry at %v", time.Now()) - hydrationState, hydrationErr := parser.readHydratedDirWithRetry(backoff, - cmpath.Absolute(hydratedRoot), parser.ReconcilerName, srcState) + hydrationState, hydrationErr := files.readHydratedDirWithRetry(backoff, + cmpath.Absolute(hydratedRoot), "unused", srcState) if tc.expectedErrMsg == "" { assert.Nil(t, hydrationErr) diff --git a/pkg/parse/state.go b/pkg/parse/state.go index d355a13795..4f0aba427c 100644 --- a/pkg/parse/state.go +++ b/pkg/parse/state.go @@ -19,7 +19,20 @@ import ( "kpt.dev/configsync/pkg/status" ) -type reconcilerState struct { +// ReconcilerState is the current state of the Reconciler, including progress +// indicators and in-memory cache for each of the reconciler stages: +// - Fetch +// - Read +// - Render/Hydrate +// - Parse/Validate +// - Update +// +// ReconcilerState also includes a cache of the RSync spec and status +// (ReconcilerStatus). +// +// TODO: break up cacheForCommit into phase-based caches +// TODO: move sourceState into ReconcilerState so the RSync spec and status are next to each other +type ReconcilerState struct { // lastApplied keeps the state for the last successful-applied syncDir. lastApplied string @@ -28,9 +41,11 @@ type reconcilerState struct { // cache tracks the progress made by the reconciler for a source commit. cache cacheForCommit + + syncErrorCache *SyncErrorCache } -func (s *reconcilerState) checkpoint() { +func (s *ReconcilerState) checkpoint() { applied := s.cache.source.syncDir.OSPath() if applied == s.lastApplied { return @@ -42,7 +57,7 @@ func (s *reconcilerState) checkpoint() { // reset sets the reconciler to retry in the next second because the rendering // status is not available -func (s *reconcilerState) reset() { +func (s *ReconcilerState) reset() { klog.Infof("Resetting reconciler checkpoint because the rendering status is not available yet") s.resetCache() s.lastApplied = "" @@ -51,7 +66,7 @@ func (s *reconcilerState) reset() { // invalidate logs the errors, clears the state tracking information. // invalidate does not clean up the `s.cache`. -func (s *reconcilerState) invalidate(errs status.MultiError) { +func (s *ReconcilerState) invalidate(errs status.MultiError) { klog.Errorf("Invalidating reconciler checkpoint: %v", status.FormatSingleLine(errs)) // Invalidate state on error since this could be the result of switching // branches or some other operation where inverting the operation would @@ -63,7 +78,7 @@ func (s *reconcilerState) invalidate(errs status.MultiError) { // resetCache resets the whole cache. // // resetCache is called when a new source commit is detected. -func (s *reconcilerState) resetCache() { +func (s *ReconcilerState) resetCache() { s.cache = cacheForCommit{} } @@ -74,10 +89,16 @@ func (s *reconcilerState) resetCache() { // resetPartialCache is called when: // - a force-resync happens, or // - one of the watchers noticed a management conflict. -func (s *reconcilerState) resetPartialCache() { +func (s *ReconcilerState) resetPartialCache() { source := s.cache.source needToRetry := s.cache.needToRetry s.cache = cacheForCommit{} s.cache.source = source s.cache.needToRetry = needToRetry } + +// SyncErrors returns all the sync errors, including remediator errors, +// validation errors, applier errors, and watch update errors. +func (s *ReconcilerState) SyncErrors() status.MultiError { + return s.syncErrorCache.Errors() +} diff --git a/pkg/parse/sync_status_client.go b/pkg/parse/sync_status_client.go new file mode 100644 index 0000000000..48b5216d5b --- /dev/null +++ b/pkg/parse/sync_status_client.go @@ -0,0 +1,35 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" +) + +// SyncStatusClient provides methods to read and write RSync object status. +type SyncStatusClient interface { + // ReconcilerStatusFromCluster reads the status of the reconciler from the RSync status. + ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) + // SetSourceStatus sets the source status and syncing condition on the RSync. + SetSourceStatus(ctx context.Context, newStatus *SourceStatus) error + // SetRenderingStatus sets the rendering status and syncing condition on the RSync. + SetRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error + // SetSyncStatus sets the sync status and syncing condition on the RSync. + SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error + // SetRequiresRendering sets the requires-rendering annotation on the RSync. + SetRequiresRendering(ctx context.Context, renderingRequired bool) error + // SetSourceAnnotations sets the source annotations on the RSync. + SetSourceAnnotations(ctx context.Context, commit string) error +} diff --git a/pkg/parse/updater.go b/pkg/parse/updater.go index ed97cf002d..f90ae7c990 100644 --- a/pkg/parse/updater.go +++ b/pkg/parse/updater.go @@ -19,18 +19,15 @@ import ( "sync" "time" - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/applier" "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/importer/filesystem" - "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/remediator" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/status" - "kpt.dev/configsync/pkg/util/clusterconfig" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -39,7 +36,7 @@ type Updater struct { // Scope defines the scope of the reconciler, either root or namespaced. Scope declared.Scope // Resources is a set of resources declared in the source of truth. - *declared.Resources + Resources *declared.Resources // Remediator is the interface Remediator implements that accepts a new set of // declared configuration. Remediator remediator.Interface @@ -75,24 +72,6 @@ func (u *Updater) Remediating() bool { return u.Remediator.Remediating() } -// declaredCRDs returns the list of CRDs which are present in the updater's -// declared resources. -func (u *Updater) declaredCRDs() ([]*v1beta1.CustomResourceDefinition, status.MultiError) { - var crds []*v1beta1.CustomResourceDefinition - declaredObjs, _ := u.Resources.DeclaredUnstructureds() - for _, obj := range declaredObjs { - if obj.GroupVersionKind().GroupKind() != kinds.CustomResourceDefinition() { - continue - } - crd, err := clusterconfig.AsCRD(obj) - if err != nil { - return nil, err - } - crds = append(crds, crd) - } - return crds, nil -} - // Update does the following: // 1. Pauses the remediator // 2. Validates and sterilizes the objects diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index a260800bd5..fb68678e01 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -233,7 +233,7 @@ func Run(opts Options) { } // Configure the Parser. - var parser parse.Parser + var reconciler parse.Reconciler fs := parse.FileSource{ SourceDir: opts.SourceRoot, RepoRoot: opts.RepoRoot, @@ -247,24 +247,17 @@ func Run(opts Options) { } parseOpts := &parse.Options{ - Clock: clock.RealClock{}, - Parser: filesystem.NewParser(&reader.File{}), - ClusterName: opts.ClusterName, - Client: cl, - ReconcilerName: opts.ReconcilerName, - SyncName: opts.SyncName, - StatusUpdatePeriod: opts.StatusUpdatePeriod, - DiscoveryInterface: discoveryClient, - RenderingEnabled: opts.RenderingEnabled, - Files: parse.Files{FileSource: fs}, - WebhookEnabled: opts.WebhookEnabled, - Updater: parse.Updater{ - Scope: opts.ReconcilerScope, - Resources: decls, - Applier: supervisor, - Remediator: rem, - SyncErrorCache: parse.NewSyncErrorCache(conflictHandler, fightHandler), - }, + Clock: clock.RealClock{}, + ConfigParser: filesystem.NewParser(&reader.File{}), + ClusterName: opts.ClusterName, + Client: cl, + ReconcilerName: opts.ReconcilerName, + SyncName: opts.SyncName, + Scope: opts.ReconcilerScope, + DiscoveryClient: discoveryClient, + Files: parse.Files{FileSource: fs}, + WebhookEnabled: opts.WebhookEnabled, + DeclaredResources: decls, } // Only instantiate the converter when the webhook is enabled because the // instantiation pulls fresh schemas from the openapi discovery endpoint. @@ -286,9 +279,23 @@ func Run(opts Options) { RetryBackoff: util.BackoffWithDurationAndStepLimit(0, 12), } + reconcilerOpts := &parse.ReconcilerOptions{ + Options: parseOpts, + Updater: &parse.Updater{ + Scope: opts.ReconcilerScope, + Resources: decls, + Applier: supervisor, + Remediator: rem, + SyncErrorCache: parse.NewSyncErrorCache(conflictHandler, fightHandler), + }, + StatusUpdatePeriod: opts.StatusUpdatePeriod, + RenderingEnabled: opts.RenderingEnabled, + } + var nsControllerState *namespacecontroller.State if opts.ReconcilerScope == declared.RootScope { - rootOpts := &parse.RootOptions{ + rootParseOpts := &parse.RootOptions{ + Options: parseOpts, SourceFormat: opts.SourceFormat, NamespaceStrategy: opts.NamespaceStrategy, DynamicNSSelectorEnabled: opts.DynamicNSSelectorEnabled, @@ -298,14 +305,14 @@ func Run(opts Options) { // enabled on RootSyncs. // RepoSync can't manage NamespaceSelectors. nsControllerState = namespacecontroller.NewState() - rootOpts.NSControllerState = nsControllerState + rootParseOpts.NSControllerState = nsControllerState // Enable namespace events (every second) // TODO: Trigger namespace events with a buffered channel from the NamespaceController pgBuilder.NamespaceControllerPeriod = time.Second } - parser = parse.NewRootRunner(parseOpts, rootOpts) + reconciler = parse.NewRootRunner(reconcilerOpts, rootParseOpts) } else { - parser = parse.NewNamespaceRunner(parseOpts) + reconciler = parse.NewNamespaceRunner(reconcilerOpts, parseOpts) } // Start listening to signals @@ -414,7 +421,7 @@ func Run(opts Options) { funnel := &events.Funnel{ Publishers: pgBuilder.Build(), // Wrap the parser with an event handler that triggers the RunFunc, as needed. - Subscriber: parse.NewEventHandler(ctx, parser, nsControllerState, parse.DefaultRunFunc), + Subscriber: parse.NewEventHandler(ctx, reconciler, nsControllerState, parse.DefaultRunFunc), } doneChForParser := funnel.Start(ctx)