diff --git a/pkg/declared/resources.go b/pkg/declared/resources.go index 9e93e368d..43be3d8db 100644 --- a/pkg/declared/resources.go +++ b/pkg/declared/resources.go @@ -19,12 +19,15 @@ import ( "sync" "github.com/elliotchance/orderedmap/v2" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/syncer/reconcile" + "kpt.dev/configsync/pkg/util/clusterconfig" "sigs.k8s.io/controller-runtime/pkg/client" "kpt.dev/configsync/pkg/core" @@ -150,3 +153,21 @@ func (r *Resources) DeclaredGVKs() (map[schema.GroupVersionKind]struct{}, string } return gvkSet, r.commit } + +// DeclaredCRDs returns the list of CRDs declared in the source. +func (r *Resources) DeclaredCRDs() ([]*v1beta1.CustomResourceDefinition, status.MultiError) { + // DeclaredUnstructureds handles the mutex, so this method doesn't need to lock. + var crds []*v1beta1.CustomResourceDefinition + declaredObjs, _ := r.DeclaredUnstructureds() + for _, obj := range declaredObjs { + if obj.GroupVersionKind().GroupKind() != kinds.CustomResourceDefinition() { + continue + } + crd, err := clusterconfig.AsCRD(obj) + if err != nil { + return nil, err + } + crds = append(crds, crd) + } + return crds, nil +} diff --git a/pkg/parse/event_handler.go b/pkg/parse/event_handler.go index 8fa038d94..bce8fec19 100644 --- a/pkg/parse/event_handler.go +++ b/pkg/parse/event_handler.go @@ -27,18 +27,16 @@ import ( // triggers the RunFunc when appropriate. type EventHandler struct { Context context.Context - Parser Parser - ReconcilerState *reconcilerState + Reconciler Reconciler NSControllerState *namespacecontroller.State Run RunFunc } // NewEventHandler builds an EventHandler -func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler { +func NewEventHandler(ctx context.Context, r Reconciler, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler { return &EventHandler{ Context: ctx, - Parser: parser, - ReconcilerState: &reconcilerState{}, + Reconciler: r, NSControllerState: nsControllerState, Run: runFn, } @@ -54,13 +52,14 @@ func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *name // - Reconciler requested a retry due to error // - Remediator requested a watch update func (s *EventHandler) Handle(event events.Event) events.Result { - opts := s.Parser.options() + opts := s.Reconciler.Options() + state := s.Reconciler.ReconcilerState() var eventResult events.Result // Wrap the RunFunc to set Result.RunAttempted. // This delays status update and sync events. - runFn := func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult { - result := s.Run(ctx, p, trigger, state) + runFn := func(ctx context.Context, r Reconciler, trigger string) RunResult { + result := s.Run(ctx, r, trigger) eventResult.RunAttempted = true return result } @@ -76,14 +75,14 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. // The cached needToRetry will not be reset to avoid resetting the backoff retries. - s.ReconcilerState.resetPartialCache() - runResult = runFn(s.Context, s.Parser, triggerResync, s.ReconcilerState) + state.resetPartialCache() + runResult = runFn(s.Context, s.Reconciler, triggerResync) case events.SyncEventType: // Re-import declared resources from the filesystem (from *-sync). // If the reconciler is in the process of reconciling a given commit, the re-import won't // happen until the ongoing reconciliation is done. - runResult = runFn(s.Context, s.Parser, triggerReimport, s.ReconcilerState) + runResult = runFn(s.Context, s.Reconciler, triggerReimport) case events.StatusEventType: // Publish the sync status periodically to update remediator errors. @@ -91,8 +90,15 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // This implies that this reconciler has successfully parsed, rendered, validated, and synced. if opts.Remediating() { klog.V(3).Info("Updating sync status (periodic while not syncing)") - // Don't update the sync spec or commit. - if err := setSyncStatus(s.Context, s.Parser, s.ReconcilerState, s.ReconcilerState.status.SyncStatus.Spec, false, s.ReconcilerState.status.SyncStatus.Commit, s.Parser.SyncErrors()); err != nil { + // Don't update the sync spec or commit, just the errors and status. + syncStatus := &SyncStatus{ + Spec: state.status.SyncStatus.Spec, + Syncing: false, + Commit: state.status.SyncStatus.Commit, + Errs: s.Reconciler.ReconcilerState().SyncErrors(), + LastUpdate: nowMeta(opts), + } + if err := s.Reconciler.SetSyncStatus(s.Context, syncStatus); err != nil { if errors.Is(err, context.Canceled) { klog.Infof("Sync status update skipped: %v", err) } else { @@ -113,8 +119,8 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. // The cached needToRetry will not be reset to avoid resetting the backoff retries. - s.ReconcilerState.resetPartialCache() - runResult = runFn(s.Context, s.Parser, namespaceEvent, s.ReconcilerState) + state.resetPartialCache() + runResult = runFn(s.Context, s.Reconciler, namespaceEvent) case events.RetrySyncEventType: // Retry if there was an error, conflict, or any watches need to be updated. @@ -123,9 +129,9 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. // The cached needToRetry will not be reset to avoid resetting the backoff retries. - s.ReconcilerState.resetPartialCache() + state.resetPartialCache() trigger = triggerManagementConflict - } else if s.ReconcilerState.cache.needToRetry { + } else if state.cache.needToRetry { trigger = triggerRetry } else if opts.needToUpdateWatch() { trigger = triggerWatchUpdate @@ -138,7 +144,7 @@ func (s *EventHandler) Handle(event events.Event) events.Result { // retryTimer will be reset to `Options.RetryPeriod`, and state.backoff is reset to `defaultBackoff()`. // In this case, `run` will try to sync the configs from the new commit instead of the old commit // being retried. - runResult = runFn(s.Context, s.Parser, trigger, s.ReconcilerState) + runResult = runFn(s.Context, s.Reconciler, trigger) default: klog.Fatalf("Invalid event received: %#v", event) diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index e3656dbbb..411704536 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -15,28 +15,28 @@ package parse import ( - "context" - "sync" "time" "k8s.io/utils/clock" "kpt.dev/configsync/pkg/declared" - "kpt.dev/configsync/pkg/importer/analyzer/ast" "kpt.dev/configsync/pkg/importer/filesystem" - "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util/discovery" "sigs.k8s.io/controller-runtime/pkg/client" ) -// Options holds configuration and core functionality required by all parsers. +// Options holds configuration and dependencies required by all parsers. type Options struct { + // Files lists Files in the source of truth. + // TODO: compose Files without extending. + Files + // Clock is used for time tracking, namely to simplify testing by allowing // a fake clock, instead of a RealClock. Clock clock.Clock - // Parser defines the minimum interface required for Reconciler to use a - // Parser to read configs from a filesystem. - Parser filesystem.ConfigParser + // ConfigParser defines the minimum interface required for Reconciler to use a + // ConfigParser to read configs from a filesystem. + ConfigParser filesystem.ConfigParser // ClusterName is the name of the cluster we're syncing configuration to. ClusterName string @@ -52,56 +52,40 @@ type Options struct { // SyncName is the name of the RootSync or RepoSync object. SyncName string - // StatusUpdatePeriod is how long the Parser waits between updates of the - // sync status, to account for management conflict errors from the Remediator. - StatusUpdatePeriod time.Duration + // Scope defines the scope of the reconciler, either root or namespaced. + Scope declared.Scope - // DiscoveryInterface is how the Parser learns what types are currently + // DiscoveryClient is how the Parser learns what types are currently // available on the cluster. - DiscoveryInterface discovery.ServerResourcer + DiscoveryClient discovery.ServerResourcer // Converter uses the DiscoveryInterface to encode the declared fields of // objects in Git. Converter *declared.ValueConverter - // mux prevents status update conflicts. - mux sync.Mutex - - // RenderingEnabled indicates whether the hydration-controller is currently - // running for this reconciler. - RenderingEnabled bool - // WebhookEnabled indicates whether the Webhook is currently enabled WebhookEnabled bool - // Files lists Files in the source of truth. - Files - // Updater mutates the most-recently-seen versions of objects stored in memory. - Updater + // DeclaredResources is the set of valid source objects, managed by the + // Updater and shared with the Parser & Remediator. + // This is used by the Parser to validate that CRDs can only be removed from + // the source when all of its CRs are removed as well. + DeclaredResources *declared.Resources } -// Parser represents a parser that can be pointed at and continuously parse a source. -type Parser interface { - parseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) - ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) - setSourceStatus(ctx context.Context, newStatus *SourceStatus) error - setRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error - SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error - options() *Options - // SyncErrors returns all the sync errors, including remediator errors, - // validation errors, applier errors, and watch update errors. - SyncErrors() status.MultiError - // K8sClient returns the Kubernetes client that talks to the API server. - K8sClient() client.Client - // setRequiresRendering sets the requires-rendering annotation on the RSync - setRequiresRendering(ctx context.Context, renderingRequired bool) error - setSourceAnnotations(ctx context.Context, commit string) error -} +// ReconcilerOptions holds configuration for the reconciler. +type ReconcilerOptions struct { + // Extend parser options to ensure they're using the same dependencies. + *Options -func (o *Options) k8sClient() client.Client { - return o.Client -} + // Updater syncs the source from the parsed cache to the cluster. + *Updater -func (o *Options) discoveryClient() discovery.ServerResourcer { - return o.DiscoveryInterface + // StatusUpdatePeriod is how long the Parser waits between updates of the + // sync status, to account for management conflict errors from the Remediator. + StatusUpdatePeriod time.Duration + + // RenderingEnabled indicates whether the hydration-controller is currently + // running for this reconciler. + RenderingEnabled bool } diff --git a/pkg/parse/parser.go b/pkg/parse/parser.go new file mode 100644 index 000000000..e1f0d8502 --- /dev/null +++ b/pkg/parse/parser.go @@ -0,0 +1,28 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + + "kpt.dev/configsync/pkg/importer/analyzer/ast" + "kpt.dev/configsync/pkg/status" +) + +// Parser represents a parser that can be pointed at and continuously parse a source. +type Parser interface { + // ParseSource parses the source manifest files and returns the objects. + ParseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) +} diff --git a/pkg/parse/reconciler.go b/pkg/parse/reconciler.go new file mode 100644 index 000000000..078926e95 --- /dev/null +++ b/pkg/parse/reconciler.go @@ -0,0 +1,117 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + + "kpt.dev/configsync/pkg/status" +) + +// Reconciler represents a parser that can be pointed at and continuously parse a source. +// TODO: Move to reconciler package; requires unwinding dependency cycles +type Reconciler interface { + // Options returns the ReconcilerOptions used by this reconciler. + Options() *ReconcilerOptions + // SyncStatusClient returns the SyncStatusClient used by this reconciler. + SyncStatusClient() SyncStatusClient + // Parser returns the Parser used by this reconciler. + Parser() Parser + // ReconcilerState returns the current state of the parser/reconciler. + ReconcilerState() *ReconcilerState + + // Read source manifests from the shared source volume. + // Waits for rendering, if enabled. + // Updates the RSync status (source, rendering, and syncing condition). + // + // Read is exposed for use by DefaultRunFunc. + Read(ctx context.Context, trigger string, sourceState *sourceState) status.MultiError + + // ParseAndUpdate parses objects from the source manifests, validates them, + // and then syncs them to the cluster with the Updater. + // + // ParseAndUpdate is exposed for use by DefaultRunFunc. + ParseAndUpdate(ctx context.Context, trigger string) status.MultiError + + // SetSyncStatus updates `.status.sync` and the Syncing condition, if needed, + // as well as `state.syncStatus` and `state.syncingConditionLastUpdate` if + // the update is successful. + // + // SetSyncStatus is exposed for use by the EventHandler, for periodic status + // updates. + SetSyncStatus(context.Context, *SyncStatus) error +} + +// TODO: Move to reconciler package; requires unwinding dependency cycles +type reconciler struct { + options *ReconcilerOptions + syncStatusClient SyncStatusClient + parser Parser + reconcilerState *ReconcilerState +} + +var _ Reconciler = &reconciler{} + +// Options returns the ReconcilerOptions used by this reconciler. +func (p *reconciler) Options() *ReconcilerOptions { + return p.options +} + +// SyncStatusClient returns the SyncStatusClient used by this reconciler. +func (p *reconciler) SyncStatusClient() SyncStatusClient { + return p.syncStatusClient +} + +// ReconcilerState returns the current state of the reconciler. +func (p *reconciler) Parser() Parser { + return p.parser +} + +// ReconcilerState returns the current state of the reconciler. +func (p *reconciler) ReconcilerState() *ReconcilerState { + return p.reconcilerState +} + +// NewRootRunner creates a new runnable parser for parsing a Root repository. +func NewRootRunner(recOpts *ReconcilerOptions, parseOpts *RootOptions) Reconciler { + return &reconciler{ + options: recOpts, + reconcilerState: &ReconcilerState{ + syncErrorCache: recOpts.SyncErrorCache, + }, + syncStatusClient: &rootSyncStatusClient{ + options: parseOpts.Options, + }, + parser: &rootSyncParser{ + options: parseOpts, + }, + } +} + +// NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo. +func NewNamespaceRunner(recOpts *ReconcilerOptions, parseOpts *Options) Reconciler { + return &reconciler{ + options: recOpts, + reconcilerState: &ReconcilerState{ + syncErrorCache: recOpts.SyncErrorCache, + }, + syncStatusClient: &repoSyncStatusClient{ + options: parseOpts, + }, + parser: &repoSyncParser{ + options: parseOpts, + }, + } +} diff --git a/pkg/parse/namespace.go b/pkg/parse/repo_sync_client.go similarity index 67% rename from pkg/parse/namespace.go rename to pkg/parse/repo_sync_client.go index 664b6904b..b79c7d8ae 100644 --- a/pkg/parse/namespace.go +++ b/pkg/parse/repo_sync_client.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/google/go-cmp/cmp" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -29,109 +30,43 @@ import ( "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/core" - "kpt.dev/configsync/pkg/importer/analyzer/ast" - "kpt.dev/configsync/pkg/importer/reader" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/reposync" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util/compare" - utildiscovery "kpt.dev/configsync/pkg/util/discovery" - "kpt.dev/configsync/pkg/validate" "sigs.k8s.io/controller-runtime/pkg/client" ) -// NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo. -func NewNamespaceRunner(opts *Options) Parser { - return &namespace{ - Options: opts, - } -} - -type namespace struct { - *Options -} - -var _ Parser = &namespace{} - -func (p *namespace) options() *Options { - return p.Options +type repoSyncStatusClient struct { + options *Options + // mux prevents status update conflicts. + mux sync.Mutex } -// parseSource implements the Parser interface -func (p *namespace) parseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) { - p.mux.Lock() - defer p.mux.Unlock() - - filePaths := reader.FilePaths{ - RootDir: state.syncDir, - PolicyDir: p.SyncDir, - Files: state.files, - } - crds, err := p.declaredCRDs() - if err != nil { - return nil, err - } - builder := utildiscovery.ScoperBuilder(p.DiscoveryInterface) - - klog.Infof("Parsing files from source dir: %s", state.syncDir.OSPath()) - objs, err := p.Parser.Parse(filePaths) - if err != nil { - return nil, err - } - - options := validate.Options{ - ClusterName: p.ClusterName, - SyncName: p.SyncName, - PolicyDir: p.SyncDir, - PreviousCRDs: crds, - BuildScoper: builder, - Converter: p.Converter, - // Namespaces and NamespaceSelectors should not be declared in a namespace repo. - // So disable the API call and dynamic mode of NamespaceSelector. - AllowAPICall: false, - DynamicNSSelectorEnabled: false, - WebhookEnabled: p.WebhookEnabled, - FieldManager: configsync.FieldManager, - } - options = OptionsForScope(options, p.Scope) - - objs, err = validate.Unstructured(ctx, p.Client, objs, options) - - if status.HasBlockingErrors(err) { - return nil, err - } - - // Duplicated with root.go. - e := addAnnotationsAndLabels(objs, p.Scope, p.SyncName, p.sourceContext(), state.commit) - if e != nil { - err = status.Append(err, status.InternalErrorf("unable to add annotations and labels: %v", e)) - return nil, err - } - return objs, err -} - -// setSourceStatus implements the Parser interface +// SetSourceStatus implements the Parser interface // -// setSourceStatus sets the source status with a given source state and set of errors. If errs is empty, all errors +// SetSourceStatus sets the source status with a given source state and set of errors. If errs is empty, all errors // will be removed from the status. -func (p *namespace) setSourceStatus(ctx context.Context, newStatus *SourceStatus) error { +func (p *repoSyncStatusClient) SetSourceStatus(ctx context.Context, newStatus *SourceStatus) error { p.mux.Lock() defer p.mux.Unlock() return p.setSourceStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus *SourceStatus, denominator int) error { +func (p *repoSyncStatusClient) setSourceStatusWithRetries(ctx context.Context, newStatus *SourceStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options + // The main idea here is an error-robust way of surfacing to the user that // we're having problems reading from our local clone of their source repository. // This can happen when Kubernetes does weird things with mounted filesystems, // or if an attacker tried to maliciously change the cluster's record of the // source of truth. var rs v1beta1.RepoSync - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), &rs); err != nil { + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), &rs); err != nil { return status.APIServerError(err, "failed to get RepoSync for parser") } @@ -165,7 +100,7 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus *S cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync source status (total error count: %d, denominator: %d): %s.", rs.Status.Source.ErrorSummary.TotalCount, denominator, err) @@ -176,24 +111,25 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus *S return nil } -func (p *namespace) setSourceAnnotations(ctx context.Context, commit string) error { +func (p *repoSyncStatusClient) SetSourceAnnotations(ctx context.Context, commit string) error { + opts := p.options rs := &v1beta1.RepoSync{} - rs.Namespace = string(p.Scope) - rs.Name = p.SyncName + rs.Namespace = string(opts.Scope) + rs.Name = opts.SyncName var patch string - if p.Options.SourceType == configsync.OciSource || - (p.Options.SourceType == configsync.HelmSource && strings.HasPrefix(p.Options.SourceRepo, "oci://")) { + if opts.SourceType == configsync.OciSource || + (opts.SourceType == configsync.HelmSource && strings.HasPrefix(opts.SourceRepo, "oci://")) { patch = fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, metadata.ImageToSyncAnnotationKey, - fmt.Sprintf("%s@sha256:%s", p.Options.SourceRepo, commit), + fmt.Sprintf("%s@sha256:%s", opts.SourceRepo, commit), ) } else { patch = fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, metadata.ImageToSyncAnnotationKey) } - err := p.Client.Patch(ctx, rs, + err := opts.Client.Patch(ctx, rs, client.RawPatch(types.MergePatchType, []byte(patch)), client.FieldOwner(configsync.FieldManager)) @@ -204,9 +140,10 @@ func (p *namespace) setSourceAnnotations(ctx context.Context, commit string) err return nil } -func (p *namespace) setRequiresRendering(ctx context.Context, renderingRequired bool) error { +func (p *repoSyncStatusClient) SetRequiresRendering(ctx context.Context, renderingRequired bool) error { + opts := p.options rs := &v1beta1.RepoSync{} - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), rs); err != nil { return status.APIServerError(err, "failed to get RepoSync for Parser") } newVal := strconv.FormatBool(renderingRequired) @@ -216,11 +153,11 @@ func (p *namespace) setRequiresRendering(ctx context.Context, renderingRequired } existing := rs.DeepCopy() core.SetAnnotation(rs, metadata.RequiresRenderingAnnotationKey, newVal) - return p.Client.Patch(ctx, rs, client.MergeFrom(existing), client.FieldOwner(configsync.FieldManager)) + return opts.Client.Patch(ctx, rs, client.MergeFrom(existing), client.FieldOwner(configsync.FieldManager)) } -// setRenderingStatus implements the Parser interface -func (p *namespace) setRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error { +// SetRenderingStatus implements the Parser interface +func (p *repoSyncStatusClient) SetRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error { if oldStatus.Equals(newStatus) { return nil } @@ -230,13 +167,14 @@ func (p *namespace) setRenderingStatus(ctx context.Context, oldStatus, newStatus return p.setRenderingStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *namespace) setRenderingStatusWithRetries(ctx context.Context, newStatus *RenderingStatus, denominator int) error { +func (p *repoSyncStatusClient) setRenderingStatusWithRetries(ctx context.Context, newStatus *RenderingStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options var rs v1beta1.RepoSync - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), &rs); err != nil { + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), &rs); err != nil { return status.APIServerError(err, "failed to get RepoSync for parser") } @@ -270,7 +208,7 @@ func (p *namespace) setRenderingStatusWithRetries(ctx context.Context, newStatus cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync rendering status (total error count: %d, denominator: %d): %s.", rs.Status.Rendering.ErrorSummary.TotalCount, denominator, err) @@ -282,13 +220,14 @@ func (p *namespace) setRenderingStatusWithRetries(ctx context.Context, newStatus } // ReconcilerStatusFromCluster gets the RepoSync sync status from the cluster. -func (p *namespace) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) { +func (p *repoSyncStatusClient) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) { + opts := p.options rs := &v1beta1.RepoSync{} - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), rs); err != nil { if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) { return nil, nil } - return nil, status.APIServerError(err, fmt.Sprintf("failed to get the RepoSync object for the %v namespace", p.Scope)) + return nil, status.APIServerError(err, fmt.Sprintf("failed to get the RepoSync object for the %v namespace", opts.Scope)) } // Read Syncing condition @@ -304,26 +243,27 @@ func (p *namespace) ReconcilerStatusFromCluster(ctx context.Context) (*Reconcile } } - return reconcilerStatusFromRSyncStatus(rs.Status.Status, p.options().SourceType, syncing, syncingConditionLastUpdate), nil + return reconcilerStatusFromRSyncStatus(rs.Status.Status, opts.SourceType, syncing, syncingConditionLastUpdate), nil } // SetSyncStatus implements the Parser interface // SetSyncStatus sets the RepoSync sync status. // `errs` includes the errors encountered during the apply step; -func (p *namespace) SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error { +func (p *repoSyncStatusClient) SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error { p.mux.Lock() defer p.mux.Unlock() return p.setSyncStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *namespace) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStatus, denominator int) error { +func (p *repoSyncStatusClient) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options rs := &v1beta1.RepoSync{} - if err := p.Client.Get(ctx, reposync.ObjectKey(p.Scope, p.SyncName), rs); err != nil { - return status.APIServerError(err, fmt.Sprintf("failed to get the RepoSync object for the %v namespace", p.Scope)) + if err := opts.Client.Get(ctx, reposync.ObjectKey(opts.Scope, opts.SyncName), rs); err != nil { + return status.APIServerError(err, fmt.Sprintf("failed to get the RepoSync object for the %v namespace", opts.Scope)) } currentRS := rs.DeepCopy() @@ -371,25 +311,13 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, newStatus *Syn cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) return p.setSyncStatusWithRetries(ctx, newStatus, denominator*2) } - return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.Scope)) + return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", opts.Scope)) } return nil } - -// SyncErrors returns all the sync errors, including remediator errors, -// validation errors, applier errors, and watch update errors. -// SyncErrors implements the Parser interface -func (p *namespace) SyncErrors() status.MultiError { - return p.SyncErrorCache.Errors() -} - -// K8sClient implements the Parser interface -func (p *namespace) K8sClient() client.Client { - return p.Client -} diff --git a/pkg/parse/repo_sync_parser.go b/pkg/parse/repo_sync_parser.go new file mode 100644 index 000000000..d7fcb25c0 --- /dev/null +++ b/pkg/parse/repo_sync_parser.go @@ -0,0 +1,82 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + + "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/api/configsync" + "kpt.dev/configsync/pkg/importer/analyzer/ast" + "kpt.dev/configsync/pkg/importer/reader" + "kpt.dev/configsync/pkg/status" + utildiscovery "kpt.dev/configsync/pkg/util/discovery" + "kpt.dev/configsync/pkg/validate" +) + +type repoSyncParser struct { + options *Options +} + +// ParseSource implements the Parser interface +func (p *repoSyncParser) ParseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) { + opts := p.options + filePaths := reader.FilePaths{ + RootDir: state.syncDir, + PolicyDir: opts.SyncDir, + Files: state.files, + } + crds, err := opts.DeclaredResources.DeclaredCRDs() + if err != nil { + return nil, err + } + builder := utildiscovery.ScoperBuilder(opts.DiscoveryClient) + + klog.Infof("Parsing files from source dir: %s", state.syncDir.OSPath()) + objs, err := opts.ConfigParser.Parse(filePaths) + if err != nil { + return nil, err + } + + options := validate.Options{ + ClusterName: opts.ClusterName, + SyncName: opts.SyncName, + PolicyDir: opts.SyncDir, + PreviousCRDs: crds, + BuildScoper: builder, + Converter: opts.Converter, + // Namespaces and NamespaceSelectors should not be declared in a namespace repo. + // So disable the API call and dynamic mode of NamespaceSelector. + AllowAPICall: false, + DynamicNSSelectorEnabled: false, + WebhookEnabled: opts.WebhookEnabled, + FieldManager: configsync.FieldManager, + } + options = OptionsForScope(options, opts.Scope) + + objs, err = validate.Unstructured(ctx, opts.Client, objs, options) + + if status.HasBlockingErrors(err) { + return nil, err + } + + // Duplicated with root.go. + e := addAnnotationsAndLabels(objs, opts.Scope, opts.SyncName, opts.Files.sourceContext(), state.commit) + if e != nil { + err = status.Append(err, status.InternalErrorf("unable to add annotations and labels: %v", e)) + return nil, err + } + return objs, err +} diff --git a/pkg/parse/root_test.go b/pkg/parse/root_reconciler_test.go similarity index 70% rename from pkg/parse/root_test.go rename to pkg/parse/root_reconciler_test.go index c9a93340a..e59e285f4 100644 --- a/pkg/parse/root_test.go +++ b/pkg/parse/root_reconciler_test.go @@ -20,7 +20,6 @@ import ( "fmt" "testing" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "go.opencensus.io/stats/view" "go.opencensus.io/tag" @@ -84,7 +83,7 @@ func gitSpec(repo string, auth configsync.AuthType) core.MetaMutator { } } -func TestRoot_Parse(t *testing.T) { +func TestRootReconciler_ParseAndUpdate(t *testing.T) { fakeMetaTime := metav1.Now().Rfc3339Copy() // truncate to second precision fakeTime := fakeMetaTime.Time fakeClock := fakeclock.NewFakeClock(fakeTime) @@ -677,33 +676,10 @@ func TestRoot_Parse(t *testing.T) { }, } tc.existingObjects = append(tc.existingObjects, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := &root{ - Options: &Options{ - Clock: fakeClock, - Parser: fakeConfigParser, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: fakeClient, - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Converter: converter, - Files: Files{FileSource: fileSource}, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - }, - RootOptions: &RootOptions{ - SourceFormat: tc.format, - NamespaceStrategy: tc.namespaceStrategy, - }, - } files := []cmpath.Absolute{ "example.yaml", } - state := &reconcilerState{ + state := &ReconcilerState{ status: reconcilerStatus.DeepCopy(), cache: cacheForCommit{ source: &sourceState{ @@ -718,8 +694,48 @@ func TestRoot_Parse(t *testing.T) { files: files, }, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + } + opts := &Options{ + Clock: fakeClock, + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: fakeClient, + DiscoveryClient: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + Converter: converter, + Files: Files{FileSource: fileSource}, + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: tc.format, + NamespaceStrategy: tc.namespaceStrategy, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, + } + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, } - if err := parseAndUpdate(context.Background(), parser, triggerReimport, state); err != nil { + if err := reconciler.ParseAndUpdate(context.Background(), triggerReimport); err != nil { t.Fatal(err) } @@ -754,7 +770,7 @@ func TestRoot_Parse(t *testing.T) { } } -func TestRoot_DeclaredFields(t *testing.T) { +func TestRootReconciler_DeclaredFields(t *testing.T) { applySetID := applyset.IDFromSync(rootSyncName, declared.RootScope) testCases := []struct { @@ -915,36 +931,53 @@ func TestRoot_DeclaredFields(t *testing.T) { }, } tc.existingObjects = append(tc.existingObjects, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Parser: fakeConfigParser, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, tc.existingObjects...), - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Converter: converter, - WebhookEnabled: tc.webhookEnabled, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - NamespaceStrategy: configsync.NamespaceStrategyExplicit, - }, - } - state := &reconcilerState{ + state := &ReconcilerState{ status: &ReconcilerStatus{}, cache: cacheForCommit{ source: &sourceState{}, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), } - if err := parseAndUpdate(context.Background(), parser, triggerReimport, state); err != nil { + opts := &Options{ + Clock: clock.RealClock{}, // TODO: Test with fake clock + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: syncertest.NewClient(t, core.Scheme, tc.existingObjects...), + DiscoveryClient: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + Converter: converter, + WebhookEnabled: tc.webhookEnabled, + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: configsync.SourceFormatUnstructured, + NamespaceStrategy: configsync.NamespaceStrategyExplicit, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, + } + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, + } + if err := reconciler.ParseAndUpdate(context.Background(), triggerReimport); err != nil { t.Fatal(err) } @@ -1017,7 +1050,7 @@ func fakeParseError(err error, gvks ...schema.GroupVersionKind) status.MultiErro return status.APIServerError(&discovery.ErrGroupDiscoveryFailed{Groups: groups}, "API discovery failed") } -func TestRoot_Parse_Discovery(t *testing.T) { +func TestRootReconciler_Parse_Discovery(t *testing.T) { applySetID := applyset.IDFromSync(rootSyncName, declared.RootScope) testCases := []struct { @@ -1183,35 +1216,52 @@ func TestRoot_Parse_Discovery(t *testing.T) { {}, // One Apply call }, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Parser: fakeConfigParser, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), - DiscoveryInterface: tc.discoveryClient, - Converter: converter, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - NamespaceStrategy: configsync.NamespaceStrategyImplicit, - }, - } - state := &reconcilerState{ + state := &ReconcilerState{ status: &ReconcilerStatus{}, cache: cacheForCommit{ source: &sourceState{}, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + } + opts := &Options{ + Clock: clock.RealClock{}, // TODO: Test with fake clock + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), + DiscoveryClient: tc.discoveryClient, + Converter: converter, + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: configsync.SourceFormatUnstructured, + NamespaceStrategy: configsync.NamespaceStrategyImplicit, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, } - err := parseAndUpdate(context.Background(), parser, triggerReimport, state) + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, + } + err := reconciler.ParseAndUpdate(context.Background(), triggerReimport) testerrors.AssertEqual(t, tc.expectedError, err, "expected error to match") // After parsing, the objects set is processed and modified. @@ -1221,7 +1271,7 @@ func TestRoot_Parse_Discovery(t *testing.T) { } } -func TestRoot_SourceReconcilerErrorsMetricValidation(t *testing.T) { +func TestRootReconciler_SourceReconcilerErrorsMetricValidation(t *testing.T) { testCases := []struct { name string parseErrors status.MultiError @@ -1275,33 +1325,50 @@ func TestRoot_SourceReconcilerErrorsMetricValidation(t *testing.T) { {}, // One Apply call }, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Parser: fakeConfigParser, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - }, - } - state := &reconcilerState{ + state := &ReconcilerState{ status: &ReconcilerStatus{}, cache: cacheForCommit{ source: &sourceState{}, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + } + opts := &Options{ + Clock: clock.RealClock{}, // TODO: Test with fake clock + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), + DiscoveryClient: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: configsync.SourceFormatUnstructured, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, + } + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, } - err := parseAndUpdate(context.Background(), parser, triggerReimport, state) + err := reconciler.ParseAndUpdate(context.Background(), triggerReimport) testerrors.AssertEqual(t, tc.expectedError, err, "expected error to match") if diff := m.ValidateMetrics(metrics.ReconcilerErrorsView, tc.expectedMetrics); diff != "" { @@ -1311,7 +1378,7 @@ func TestRoot_SourceReconcilerErrorsMetricValidation(t *testing.T) { } } -func TestRoot_SourceAndSyncReconcilerErrorsMetricValidation(t *testing.T) { +func TestRootReconciler_SourceAndSyncReconcilerErrorsMetricValidation(t *testing.T) { testCases := []struct { name string applyErrors []status.Error @@ -1367,583 +1434,55 @@ func TestRoot_SourceAndSyncReconcilerErrorsMetricValidation(t *testing.T) { {Errors: tc.applyErrors}, // One Apply call, optional errors }, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Parser: fakeConfigParser, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - Remediator: &remediatorfake.Remediator{}, - Applier: fakeApplier, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), - }, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - }, - } - state := &reconcilerState{ + state := &ReconcilerState{ status: &ReconcilerStatus{}, cache: cacheForCommit{ source: &sourceState{}, }, + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), } - err := parseAndUpdate(context.Background(), parser, triggerReimport, state) - testerrors.AssertEqual(t, tc.expectedError, err, "expected error to match") - - if diff := m.ValidateMetrics(metrics.ReconcilerErrorsView, tc.expectedMetrics); diff != "" { - t.Error(diff) + opts := &Options{ + Clock: clock.RealClock{}, // TODO: Test with fake clock + ConfigParser: fakeConfigParser, + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), + DiscoveryClient: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + DeclaredResources: &declared.Resources{}, } - }) - } -} - -func TestSummarizeErrors(t *testing.T) { - testCases := []struct { - name string - sourceStatus v1beta1.SourceStatus - renderingStatus v1beta1.RenderingStatus - syncStatus v1beta1.SyncStatus - expectedErrorSources []v1beta1.ErrorSource - expectedErrorSummary *v1beta1.ErrorSummary - }{ - { - name: "both sourceStatus and syncStatus are empty", - sourceStatus: v1beta1.SourceStatus{}, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{}, - expectedErrorSources: nil, - expectedErrorSummary: &v1beta1.ErrorSummary{}, - }, - { - name: "sourceStatus is not empty (no trucation), syncStatus is empty", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{}, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - { - name: "sourceStatus is not empty and trucates errors, syncStatus is empty", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{}, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - { - name: "sourceStatus is empty, syncStatus is not empty (no trucation)", - sourceStatus: v1beta1.SourceStatus{}, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - { - name: "sourceStatus is empty, syncStatus is not empty and trucates errors", - sourceStatus: v1beta1.SourceStatus{}, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - { - name: "neither sourceStatus nor syncStatus is empty or trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 4, - Truncated: false, - ErrorCountAfterTruncation: 4, - }, - }, - { - name: "neither sourceStatus nor syncStatus is empty, sourceStatus trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 102, - Truncated: true, - ErrorCountAfterTruncation: 4, - }, - }, - { - name: "neither sourceStatus nor syncStatus is empty, syncStatus trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 102, - Truncated: true, - ErrorCountAfterTruncation: 4, - }, - }, - { - name: "neither sourceStatus nor syncStatus is empty, both trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 200, - Truncated: true, - ErrorCountAfterTruncation: 4, - }, - }, - { - name: "source, rendering, and sync errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1068", ErrorMessage: "1068-error-message"}, - {Code: "2015", ErrorMessage: "2015-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.RenderingError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 6, - Truncated: false, - ErrorCountAfterTruncation: 6, - }, - }, - { - name: "source, rendering, and sync errors, all truncated", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - renderingStatus: v1beta1.RenderingStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1068", ErrorMessage: "1068-error-message"}, - {Code: "2015", ErrorMessage: "2015-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.RenderingError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 300, - Truncated: true, - ErrorCountAfterTruncation: 6, - }, - }, - { - name: "source errors from newer commit", - sourceStatus: v1beta1.SourceStatus{ - Commit: "newer-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 1, - Truncated: false, - ErrorCountAfterTruncation: 1, - }, - }, - renderingStatus: v1beta1.RenderingStatus{ - Commit: "older-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "1068", ErrorMessage: "1068-error-message"}, - {Code: "2015", ErrorMessage: "2015-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Commit: "older-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - {Code: "2009", ErrorMessage: "another error"}, - {Code: "2009", ErrorMessage: "yet-another error"}, - }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 4, - Truncated: false, - ErrorCountAfterTruncation: 4, - }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.RenderingError, v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 6, - Truncated: false, - ErrorCountAfterTruncation: 6, - }, - }, - { - name: "rendering errors from newer commit", - sourceStatus: v1beta1.SourceStatus{ - Commit: "newer-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 1, - Truncated: false, - ErrorCountAfterTruncation: 1, - }, - }, - renderingStatus: v1beta1.RenderingStatus{ - Commit: "newer-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "1068", ErrorMessage: "1068-error-message"}, - {Code: "2015", ErrorMessage: "2015-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Commit: "older-commit", - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - {Code: "2009", ErrorMessage: "another error"}, - {Code: "2009", ErrorMessage: "yet-another error"}, + rootOpts := &RootOptions{ + Options: opts, + SourceFormat: configsync.SourceFormatUnstructured, + } + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, + Remediator: &remediatorfake.Remediator{}, + Applier: fakeApplier, + SyncErrorCache: state.syncErrorCache, + }, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: false, + } + reconciler := &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 4, - Truncated: false, - ErrorCountAfterTruncation: 4, + parser: &rootSyncParser{ + options: rootOpts, }, - }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, - expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 4, - Truncated: false, - ErrorCountAfterTruncation: 4, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - gotErrorSources, gotErrorSummary := summarizeErrorsForCommit(tc.sourceStatus, tc.renderingStatus, tc.syncStatus, tc.syncStatus.Commit) - if diff := cmp.Diff(tc.expectedErrorSources, gotErrorSources); diff != "" { - t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSources, tc.expectedErrorSources) + reconcilerState: state, } - if diff := cmp.Diff(tc.expectedErrorSummary, gotErrorSummary); diff != "" { - t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSummary, tc.expectedErrorSummary) - } - }) - } -} - -func TestPrependRootSyncRemediatorStatus(t *testing.T) { - const rootSyncName = "my-root-sync" - const thisManager = "this-manager" - const otherManager = "other-manager" - conflictingObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, otherManager)) - conflictAB := status.ManagementConflictErrorWrap(conflictingObject, thisManager) - invertedObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, thisManager)) - conflictBA := status.ManagementConflictErrorWrap(invertedObject, otherManager) - conflictABInverted := conflictAB.Invert() - // KptManagementConflictError is created with the desired object, not the current live object. - // So its manager annotation matches the reconciler doing the applying. - // This is the opposite of the objects passed to ManagementConflictErrorWrap. - kptConflictError := applier.KptManagementConflictError(invertedObject) - // Assert the value of each error message to make each value clear - const conflictABMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with the "other-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. - -metadata.name: foo-ns -group: -version: v1 -kind: Namespace - -For more information, see https://g.co/cloud/acm-errors#knv1060` - const conflictBAMessage = `KNV1060: The "other-manager" reconciler detected a management conflict with the "this-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. - -metadata.name: foo-ns -group: -version: v1 -kind: Namespace - -For more information, see https://g.co/cloud/acm-errors#knv1060` - const kptConflictMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with another reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. - -metadata.name: foo-ns -group: -version: v1 -kind: Namespace + err := reconciler.ParseAndUpdate(context.Background(), triggerReimport) + testerrors.AssertEqual(t, tc.expectedError, err, "expected error to match") -For more information, see https://g.co/cloud/acm-errors#knv1060` - testutil.AssertEqual(t, conflictABMessage, conflictAB.ToCSE().ErrorMessage) - testutil.AssertEqual(t, conflictBAMessage, conflictBA.ToCSE().ErrorMessage) - testutil.AssertEqual(t, kptConflictMessage, kptConflictError.ToCSE().ErrorMessage) - testutil.AssertEqual(t, conflictBA, conflictABInverted) - testCases := map[string]struct { - thisSyncErrors []v1beta1.ConfigSyncError - expectedErrors []v1beta1.ConfigSyncError - }{ - "empty errors": { - thisSyncErrors: []v1beta1.ConfigSyncError{}, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - }, - }, - "unchanged conflict error": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - }, - }, - "prepend conflict error": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - {ErrorMessage: "foo"}, - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - {ErrorMessage: "foo"}, - }, - }, - "dedupe AB and BA errors": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - conflictBA.ToCSE(), - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictBA.ToCSE(), - }, - }, - "dedupe AB and AB inverted errors": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - conflictABInverted.ToCSE(), - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictABInverted.ToCSE(), - }, - }, - // TODO: De-dupe ManagementConflictErrorWrap & KptManagementConflictError - // These are currently de-duped locally by the conflict handler, - // but not remotely by prependRootSyncRemediatorStatus. - "prepend KptManagementConflictError": { - thisSyncErrors: []v1beta1.ConfigSyncError{ - kptConflictError.ToCSE(), - }, - expectedErrors: []v1beta1.ConfigSyncError{ - conflictAB.ToCSE(), - kptConflictError.ToCSE(), - }, - }, - } - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - rootSync := k8sobjects.RootSyncObjectV1Beta1(rootSyncName) - rootSync.Status.Sync.Errors = tc.thisSyncErrors - fakeClient := syncertest.NewClient(t, core.Scheme, rootSync) - ctx := context.Background() - err := prependRootSyncRemediatorStatus(ctx, fakeClient, rootSyncName, - []status.ManagementConflictError{conflictAB}, defaultDenominator) - require.NoError(t, err) - var updatedRootSync v1beta1.RootSync - err = fakeClient.Get(ctx, rootsync.ObjectKey(rootSyncName), &updatedRootSync) - require.NoError(t, err) - testutil.AssertEqual(t, tc.expectedErrors, updatedRootSync.Status.Sync.Errors) + if diff := m.ValidateMetrics(metrics.ReconcilerErrorsView, tc.expectedMetrics); diff != "" { + t.Error(diff) + } }) } } diff --git a/pkg/parse/root.go b/pkg/parse/root_sync_client.go similarity index 70% rename from pkg/parse/root.go rename to pkg/parse/root_sync_client.go index e6d518dab..753dba983 100644 --- a/pkg/parse/root.go +++ b/pkg/parse/root_sync_client.go @@ -19,48 +19,31 @@ import ( "fmt" "strconv" "strings" + "sync" - "github.com/elliotchance/orderedmap/v2" "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/core" - "kpt.dev/configsync/pkg/declared" - "kpt.dev/configsync/pkg/diff" - "kpt.dev/configsync/pkg/importer/analyzer/ast" - "kpt.dev/configsync/pkg/importer/filesystem" - "kpt.dev/configsync/pkg/importer/filesystem/cmpath" - "kpt.dev/configsync/pkg/importer/reader" - "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/reconciler/namespacecontroller" "kpt.dev/configsync/pkg/rootsync" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util/compare" - "kpt.dev/configsync/pkg/util/discovery" - "kpt.dev/configsync/pkg/validate" - "sigs.k8s.io/cli-utils/pkg/common" "sigs.k8s.io/controller-runtime/pkg/client" ) -// NewRootRunner creates a new runnable parser for parsing a Root repository. -func NewRootRunner(opts *Options, rootOpts *RootOptions) Parser { - return &root{ - Options: opts, - RootOptions: rootOpts, - } -} - // RootOptions includes options specific to RootSync objects. type RootOptions struct { + // Extend parse.Options + *Options + // SourceFormat defines the structure of the Root repository. Only the Root // repository may be SourceFormatHierarchy; all others are implicitly // SourceFormatUnstructured. @@ -85,96 +68,27 @@ type RootOptions struct { NSControllerState *namespacecontroller.State } -type root struct { - *Options - *RootOptions -} - -var _ Parser = &root{} - -func (p *root) options() *Options { - return p.Options -} - -// parseSource implements the Parser interface -func (p *root) parseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) { - wantFiles := state.files - if p.SourceFormat == configsync.SourceFormatHierarchy { - // We're using hierarchical mode for the root repository, so ignore files - // outside of the allowed directories. - wantFiles = filesystem.FilterHierarchyFiles(state.syncDir, wantFiles) - } - - filePaths := reader.FilePaths{ - RootDir: state.syncDir, - PolicyDir: p.SyncDir, - Files: wantFiles, - } - - crds, err := p.declaredCRDs() - if err != nil { - return nil, err - } - builder := discovery.ScoperBuilder(p.DiscoveryInterface) - - klog.Infof("Parsing files from source dir: %s", state.syncDir.OSPath()) - objs, err := p.Parser.Parse(filePaths) - if err != nil { - return nil, err - } - - options := validate.Options{ - ClusterName: p.ClusterName, - SyncName: p.SyncName, - PolicyDir: p.SyncDir, - PreviousCRDs: crds, - BuildScoper: builder, - Converter: p.Converter, - // Enable API call so NamespaceSelector can talk to k8s-api-server. - AllowAPICall: true, - DynamicNSSelectorEnabled: p.DynamicNSSelectorEnabled, - NSControllerState: p.NSControllerState, - WebhookEnabled: p.WebhookEnabled, - FieldManager: configsync.FieldManager, - } - options = OptionsForScope(options, p.Scope) - - if p.SourceFormat == configsync.SourceFormatUnstructured { - if p.NamespaceStrategy == configsync.NamespaceStrategyImplicit { - options.Visitors = append(options.Visitors, p.addImplicitNamespaces) - } - objs, err = validate.Unstructured(ctx, p.Client, objs, options) - } else { - objs, err = validate.Hierarchical(objs, options) - } - - if status.HasBlockingErrors(err) { - return nil, err - } - - // Duplicated with namespace.go. - e := addAnnotationsAndLabels(objs, declared.RootScope, p.SyncName, p.sourceContext(), state.commit) - if e != nil { - err = status.Append(err, status.InternalErrorf("unable to add annotations and labels: %v", e)) - return nil, err - } - return objs, err +type rootSyncStatusClient struct { + options *Options + // mux prevents status update conflicts. + mux sync.Mutex } -// setSourceStatus implements the Parser interface -func (p *root) setSourceStatus(ctx context.Context, newStatus *SourceStatus) error { +// SetSourceStatus implements the Parser interface +func (p *rootSyncStatusClient) SetSourceStatus(ctx context.Context, newStatus *SourceStatus) error { p.mux.Lock() defer p.mux.Unlock() return p.setSourceStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus *SourceStatus, denominator int) error { +func (p *rootSyncStatusClient) setSourceStatusWithRetries(ctx context.Context, newStatus *SourceStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options var rs v1beta1.RootSync - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), &rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), &rs); err != nil { return status.APIServerError(err, "failed to get RootSync for parser") } @@ -208,7 +122,7 @@ func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus *Source cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync source status (total error count: %d, denominator: %d): %s.", rs.Status.Source.ErrorSummary.TotalCount, denominator, err) @@ -262,24 +176,25 @@ func setSourceStatusFields(source *v1beta1.SourceStatus, newStatus *SourceStatus source.LastUpdate = newStatus.LastUpdate } -func (p *root) setSourceAnnotations(ctx context.Context, commit string) error { +func (p *rootSyncStatusClient) SetSourceAnnotations(ctx context.Context, commit string) error { + opts := p.options rs := &v1beta1.RootSync{} rs.Namespace = configsync.ControllerNamespace - rs.Name = p.SyncName + rs.Name = opts.SyncName var patch string - if p.Options.SourceType == configsync.OciSource || - (p.Options.SourceType == configsync.HelmSource && strings.HasPrefix(p.Options.SourceRepo, "oci://")) { + if opts.SourceType == configsync.OciSource || + (opts.SourceType == configsync.HelmSource && strings.HasPrefix(opts.SourceRepo, "oci://")) { patch = fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, metadata.ImageToSyncAnnotationKey, - fmt.Sprintf("%s@sha256:%s", p.Options.SourceRepo, commit), + fmt.Sprintf("%s@sha256:%s", opts.SourceRepo, commit), ) } else { patch = fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, metadata.ImageToSyncAnnotationKey) } - err := p.Client.Patch(ctx, rs, + err := opts.Client.Patch(ctx, rs, client.RawPatch(types.MergePatchType, []byte(patch)), client.FieldOwner(configsync.FieldManager)) @@ -290,9 +205,10 @@ func (p *root) setSourceAnnotations(ctx context.Context, commit string) error { return nil } -func (p *root) setRequiresRendering(ctx context.Context, renderingRequired bool) error { +func (p *rootSyncStatusClient) SetRequiresRendering(ctx context.Context, renderingRequired bool) error { + opts := p.options rs := &v1beta1.RootSync{} - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), rs); err != nil { return status.APIServerError(err, "failed to get RootSync for parser") } newVal := strconv.FormatBool(renderingRequired) @@ -302,11 +218,11 @@ func (p *root) setRequiresRendering(ctx context.Context, renderingRequired bool) } existing := rs.DeepCopy() core.SetAnnotation(rs, metadata.RequiresRenderingAnnotationKey, newVal) - return p.Client.Patch(ctx, rs, client.MergeFrom(existing), client.FieldOwner(configsync.FieldManager)) + return opts.Client.Patch(ctx, rs, client.MergeFrom(existing), client.FieldOwner(configsync.FieldManager)) } -// setRenderingStatus implements the Parser interface -func (p *root) setRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error { +// SetRenderingStatus implements the Parser interface +func (p *rootSyncStatusClient) SetRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error { if oldStatus.Equals(newStatus) { return nil } @@ -316,13 +232,14 @@ func (p *root) setRenderingStatus(ctx context.Context, oldStatus, newStatus *Ren return p.setRenderingStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *root) setRenderingStatusWithRetries(ctx context.Context, newStatus *RenderingStatus, denominator int) error { +func (p *rootSyncStatusClient) setRenderingStatusWithRetries(ctx context.Context, newStatus *RenderingStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options var rs v1beta1.RootSync - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), &rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), &rs); err != nil { return status.APIServerError(err, "failed to get RootSync for parser") } @@ -356,7 +273,7 @@ func (p *root) setRenderingStatusWithRetries(ctx context.Context, newStatus *Ren cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync rendering status (total error count: %d, denominator: %d): %s.", rs.Status.Rendering.ErrorSummary.TotalCount, denominator, err) @@ -411,9 +328,10 @@ func setRenderingStatusFields(rendering *v1beta1.RenderingStatus, newStatus *Ren } // ReconcilerStatusFromCluster gets the RootSync sync status from the cluster. -func (p *root) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) { +func (p *rootSyncStatusClient) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) { + opts := p.options rs := &v1beta1.RootSync{} - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), rs); err != nil { if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) { return nil, nil } @@ -432,7 +350,7 @@ func (p *root) ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStat } } - return reconcilerStatusFromRSyncStatus(rs.Status.Status, p.options().SourceType, syncing, syncingConditionLastUpdate), nil + return reconcilerStatusFromRSyncStatus(rs.Status.Status, opts.SourceType, syncing, syncingConditionLastUpdate), nil } func reconcilerStatusFromRSyncStatus(rsyncStatus v1beta1.Status, sourceType configsync.SourceType, syncing bool, syncingConditionLastUpdate metav1.Time) *ReconcilerStatus { @@ -543,19 +461,20 @@ func reconcilerStatusFromRSyncStatus(rsyncStatus v1beta1.Status, sourceType conf // SetSyncStatus implements the Parser interface // SetSyncStatus sets the RootSync sync status. // `errs` includes the errors encountered during the apply step; -func (p *root) SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error { +func (p *rootSyncStatusClient) SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error { p.mux.Lock() defer p.mux.Unlock() return p.setSyncStatusWithRetries(ctx, newStatus, defaultDenominator) } -func (p *root) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStatus, denominator int) error { +func (p *rootSyncStatusClient) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStatus, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } + opts := p.options rs := &v1beta1.RootSync{} - if err := p.Client.Get(ctx, rootsync.ObjectKey(p.SyncName), rs); err != nil { + if err := opts.Client.Get(ctx, rootsync.ObjectKey(opts.SyncName), rs); err != nil { return status.APIServerError(err, "failed to get RootSync") } @@ -604,7 +523,7 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, newStatus *SyncStat cmp.Diff(currentRS.Status, rs.Status)) } - if err := p.Client.Status().Update(ctx, rs, client.FieldOwner(configsync.FieldManager)); err != nil { + if err := opts.Client.Status().Update(ctx, rs, client.FieldOwner(configsync.FieldManager)); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) @@ -669,84 +588,6 @@ func summarizeErrorsForCommit(sourceStatus v1beta1.SourceStatus, renderingStatus return errorSources, errorSummary } -// addImplicitNamespaces hydrates the given FileObjects by injecting implicit -// namespaces into the list before returning it. Implicit namespaces are those -// that are declared by an object's metadata namespace field but are not present -// in the list. The implicit namespace is only added if it doesn't exist. -func (p *root) addImplicitNamespaces(objs []ast.FileObject) ([]ast.FileObject, status.MultiError) { - var errs status.MultiError - // namespaces will track the set of Namespaces we expect to exist, and those - // which actually do. - namespaces := orderedmap.NewOrderedMap[string, bool]() - - for _, o := range objs { - if o.GetObjectKind().GroupVersionKind().GroupKind() == kinds.Namespace().GroupKind() { - namespaces.Set(o.GetName(), true) - } else if o.GetNamespace() != "" { - if _, found := namespaces.Get(o.GetNamespace()); !found { - // If unset, this ensures the key exists and is false. - // Otherwise it has no impact. - namespaces.Set(o.GetNamespace(), false) - } - } - } - - for e := namespaces.Front(); e != nil; e = e.Next() { - ns, isDeclared := e.Key, e.Value - // Do not treat config-management-system as an implicit namespace for multi-sync support. - // Otherwise, the namespace will become a managed resource, and will cause conflict among multiple RootSyncs. - if isDeclared || ns == configsync.ControllerNamespace { - continue - } - existingNs := &corev1.Namespace{} - err := p.Client.Get(context.Background(), types.NamespacedName{Name: ns}, existingNs) - if err != nil && !apierrors.IsNotFound(err) { - errs = status.Append(errs, fmt.Errorf("unable to check the existence of the implicit namespace %q: %w", ns, err)) - continue - } - - existingNs.SetGroupVersionKind(kinds.Namespace()) - // If the namespace already exists and not self-managed, do not add it as an implicit namespace. - // This is to avoid conflicts caused by multiple Root reconcilers managing the same implicit namespace. - if err == nil && !diff.IsManager(p.Scope, p.SyncName, existingNs) { - continue - } - - // Add the implicit namespace if it doesn't exist, or if it is managed by itself. - // If it is a self-managed namespace, still add it to the object list. Otherwise, - // it will be pruned because it is no longer in the inventory list. - u := &unstructured.Unstructured{} - u.SetGroupVersionKind(kinds.Namespace()) - u.SetName(ns) - // We do NOT want to delete theses implicit Namespaces when the resources - // inside them are removed from the repo. We don't know when it is safe to remove - // the implicit namespaces. An implicit namespace may already exist in the - // cluster. Deleting it will cause other unmanaged resources in that namespace - // being deleted. - // - // Adding the LifecycleDeleteAnnotation is to prevent the applier from deleting - // the implicit namespace when the namespaced config is removed from the repo. - // Note that if the user later declares the - // Namespace without this annotation, the annotation is removed as expected. - u.SetAnnotations(map[string]string{common.LifecycleDeleteAnnotation: common.PreventDeletion}) - objs = append(objs, ast.NewFileObject(u, cmpath.RelativeOS(""))) - } - - return objs, errs -} - -// SyncErrors returns all the sync errors, including remediator errors, -// validation errors, applier errors, and watch update errors. -// SyncErrors implements the Parser interface -func (p *root) SyncErrors() status.MultiError { - return p.SyncErrorCache.Errors() -} - -// K8sClient implements the Parser interface -func (p *root) K8sClient() client.Client { - return p.Client -} - // prependRootSyncRemediatorStatus adds the conflict error detected by the remediator to the front of the sync errors. func prependRootSyncRemediatorStatus(ctx context.Context, c client.Client, syncName string, conflictErrs []status.ManagementConflictError, denominator int) error { if denominator <= 0 { diff --git a/pkg/parse/root_sync_client_test.go b/pkg/parse/root_sync_client_test.go new file mode 100644 index 000000000..6fc31ed9b --- /dev/null +++ b/pkg/parse/root_sync_client_test.go @@ -0,0 +1,577 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + "kpt.dev/configsync/pkg/api/configsync/v1beta1" + "kpt.dev/configsync/pkg/applier" + "kpt.dev/configsync/pkg/core" + "kpt.dev/configsync/pkg/core/k8sobjects" + "kpt.dev/configsync/pkg/metadata" + "kpt.dev/configsync/pkg/rootsync" + "kpt.dev/configsync/pkg/status" + syncertest "kpt.dev/configsync/pkg/syncer/syncertest/fake" + "sigs.k8s.io/cli-utils/pkg/testutil" +) + +func TestSummarizeErrors(t *testing.T) { + testCases := []struct { + name string + sourceStatus v1beta1.SourceStatus + renderingStatus v1beta1.RenderingStatus + syncStatus v1beta1.SyncStatus + expectedErrorSources []v1beta1.ErrorSource + expectedErrorSummary *v1beta1.ErrorSummary + }{ + { + name: "both sourceStatus and syncStatus are empty", + sourceStatus: v1beta1.SourceStatus{}, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{}, + expectedErrorSources: nil, + expectedErrorSummary: &v1beta1.ErrorSummary{}, + }, + { + name: "sourceStatus is not empty (no trucation), syncStatus is empty", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{}, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "sourceStatus is not empty and trucates errors, syncStatus is empty", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{}, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "sourceStatus is empty, syncStatus is not empty (no trucation)", + sourceStatus: v1beta1.SourceStatus{}, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "sourceStatus is empty, syncStatus is not empty and trucates errors", + sourceStatus: v1beta1.SourceStatus{}, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "neither sourceStatus nor syncStatus is empty or trucates errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 4, + Truncated: false, + ErrorCountAfterTruncation: 4, + }, + }, + { + name: "neither sourceStatus nor syncStatus is empty, sourceStatus trucates errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 102, + Truncated: true, + ErrorCountAfterTruncation: 4, + }, + }, + { + name: "neither sourceStatus nor syncStatus is empty, syncStatus trucates errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 102, + Truncated: true, + ErrorCountAfterTruncation: 4, + }, + }, + { + name: "neither sourceStatus nor syncStatus is empty, both trucates errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{}, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 200, + Truncated: true, + ErrorCountAfterTruncation: 4, + }, + }, + { + name: "source, rendering, and sync errors", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1068", ErrorMessage: "1068-error-message"}, + {Code: "2015", ErrorMessage: "2015-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.RenderingError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 6, + Truncated: false, + ErrorCountAfterTruncation: 6, + }, + }, + { + name: "source, rendering, and sync errors, all truncated", + sourceStatus: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + renderingStatus: v1beta1.RenderingStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1068", ErrorMessage: "1068-error-message"}, + {Code: "2015", ErrorMessage: "2015-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + syncStatus: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.RenderingError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 300, + Truncated: true, + ErrorCountAfterTruncation: 6, + }, + }, + { + name: "source errors from newer commit", + sourceStatus: v1beta1.SourceStatus{ + Commit: "newer-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 1, + Truncated: false, + ErrorCountAfterTruncation: 1, + }, + }, + renderingStatus: v1beta1.RenderingStatus{ + Commit: "older-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "1068", ErrorMessage: "1068-error-message"}, + {Code: "2015", ErrorMessage: "2015-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + syncStatus: v1beta1.SyncStatus{ + Commit: "older-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + {Code: "2009", ErrorMessage: "another error"}, + {Code: "2009", ErrorMessage: "yet-another error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 4, + Truncated: false, + ErrorCountAfterTruncation: 4, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.RenderingError, v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 6, + Truncated: false, + ErrorCountAfterTruncation: 6, + }, + }, + { + name: "rendering errors from newer commit", + sourceStatus: v1beta1.SourceStatus{ + Commit: "newer-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 1, + Truncated: false, + ErrorCountAfterTruncation: 1, + }, + }, + renderingStatus: v1beta1.RenderingStatus{ + Commit: "newer-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "1068", ErrorMessage: "1068-error-message"}, + {Code: "2015", ErrorMessage: "2015-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + syncStatus: v1beta1.SyncStatus{ + Commit: "older-commit", + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + {Code: "2009", ErrorMessage: "another error"}, + {Code: "2009", ErrorMessage: "yet-another error"}, + }, + + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 4, + Truncated: false, + ErrorCountAfterTruncation: 4, + }, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 4, + Truncated: false, + ErrorCountAfterTruncation: 4, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gotErrorSources, gotErrorSummary := summarizeErrorsForCommit(tc.sourceStatus, tc.renderingStatus, tc.syncStatus, tc.syncStatus.Commit) + if diff := cmp.Diff(tc.expectedErrorSources, gotErrorSources); diff != "" { + t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSources, tc.expectedErrorSources) + } + if diff := cmp.Diff(tc.expectedErrorSummary, gotErrorSummary); diff != "" { + t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSummary, tc.expectedErrorSummary) + } + }) + } +} + +func TestPrependRootSyncRemediatorStatus(t *testing.T) { + const rootSyncName = "my-root-sync" + const thisManager = "this-manager" + const otherManager = "other-manager" + conflictingObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, otherManager)) + conflictAB := status.ManagementConflictErrorWrap(conflictingObject, thisManager) + invertedObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, thisManager)) + conflictBA := status.ManagementConflictErrorWrap(invertedObject, otherManager) + conflictABInverted := conflictAB.Invert() + // KptManagementConflictError is created with the desired object, not the current live object. + // So its manager annotation matches the reconciler doing the applying. + // This is the opposite of the objects passed to ManagementConflictErrorWrap. + kptConflictError := applier.KptManagementConflictError(invertedObject) + // Assert the value of each error message to make each value clear + const conflictABMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with the "other-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. + +metadata.name: foo-ns +group: +version: v1 +kind: Namespace + +For more information, see https://g.co/cloud/acm-errors#knv1060` + const conflictBAMessage = `KNV1060: The "other-manager" reconciler detected a management conflict with the "this-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. + +metadata.name: foo-ns +group: +version: v1 +kind: Namespace + +For more information, see https://g.co/cloud/acm-errors#knv1060` + const kptConflictMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with another reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler. + +metadata.name: foo-ns +group: +version: v1 +kind: Namespace + +For more information, see https://g.co/cloud/acm-errors#knv1060` + testutil.AssertEqual(t, conflictABMessage, conflictAB.ToCSE().ErrorMessage) + testutil.AssertEqual(t, conflictBAMessage, conflictBA.ToCSE().ErrorMessage) + testutil.AssertEqual(t, kptConflictMessage, kptConflictError.ToCSE().ErrorMessage) + testutil.AssertEqual(t, conflictBA, conflictABInverted) + testCases := map[string]struct { + thisSyncErrors []v1beta1.ConfigSyncError + expectedErrors []v1beta1.ConfigSyncError + }{ + "empty errors": { + thisSyncErrors: []v1beta1.ConfigSyncError{}, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + }, + }, + "unchanged conflict error": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + }, + }, + "prepend conflict error": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + {ErrorMessage: "foo"}, + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + {ErrorMessage: "foo"}, + }, + }, + "dedupe AB and BA errors": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + conflictBA.ToCSE(), + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictBA.ToCSE(), + }, + }, + "dedupe AB and AB inverted errors": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + conflictABInverted.ToCSE(), + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictABInverted.ToCSE(), + }, + }, + // TODO: De-dupe ManagementConflictErrorWrap & KptManagementConflictError + // These are currently de-duped locally by the conflict handler, + // but not remotely by prependRootSyncRemediatorStatus. + "prepend KptManagementConflictError": { + thisSyncErrors: []v1beta1.ConfigSyncError{ + kptConflictError.ToCSE(), + }, + expectedErrors: []v1beta1.ConfigSyncError{ + conflictAB.ToCSE(), + kptConflictError.ToCSE(), + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + rootSync := k8sobjects.RootSyncObjectV1Beta1(rootSyncName) + rootSync.Status.Sync.Errors = tc.thisSyncErrors + fakeClient := syncertest.NewClient(t, core.Scheme, rootSync) + ctx := context.Background() + err := prependRootSyncRemediatorStatus(ctx, fakeClient, rootSyncName, + []status.ManagementConflictError{conflictAB}, defaultDenominator) + require.NoError(t, err) + var updatedRootSync v1beta1.RootSync + err = fakeClient.Get(ctx, rootsync.ObjectKey(rootSyncName), &updatedRootSync) + require.NoError(t, err) + testutil.AssertEqual(t, tc.expectedErrors, updatedRootSync.Status.Sync.Errors) + }) + } +} diff --git a/pkg/parse/root_sync_parser.go b/pkg/parse/root_sync_parser.go new file mode 100644 index 000000000..6291b0a86 --- /dev/null +++ b/pkg/parse/root_sync_parser.go @@ -0,0 +1,177 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + "fmt" + + "github.com/elliotchance/orderedmap/v2" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/api/configsync" + "kpt.dev/configsync/pkg/declared" + "kpt.dev/configsync/pkg/diff" + "kpt.dev/configsync/pkg/importer/analyzer/ast" + "kpt.dev/configsync/pkg/importer/filesystem" + "kpt.dev/configsync/pkg/importer/filesystem/cmpath" + "kpt.dev/configsync/pkg/importer/reader" + "kpt.dev/configsync/pkg/kinds" + "kpt.dev/configsync/pkg/status" + "kpt.dev/configsync/pkg/util/discovery" + "kpt.dev/configsync/pkg/validate" + "sigs.k8s.io/cli-utils/pkg/common" +) + +type rootSyncParser struct { + options *RootOptions +} + +// ParseSource implements the Parser interface +func (p *rootSyncParser) ParseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError) { + opts := p.options + + wantFiles := state.files + if opts.SourceFormat == configsync.SourceFormatHierarchy { + // We're using hierarchical mode for the root repository, so ignore files + // outside of the allowed directories. + wantFiles = filesystem.FilterHierarchyFiles(state.syncDir, wantFiles) + } + + filePaths := reader.FilePaths{ + RootDir: state.syncDir, + PolicyDir: p.options.SyncDir, + Files: wantFiles, + } + + crds, err := opts.DeclaredResources.DeclaredCRDs() + if err != nil { + return nil, err + } + builder := discovery.ScoperBuilder(opts.DiscoveryClient) + + klog.Infof("Parsing files from source dir: %s", state.syncDir.OSPath()) + objs, err := opts.ConfigParser.Parse(filePaths) + if err != nil { + return nil, err + } + + options := validate.Options{ + ClusterName: opts.ClusterName, + SyncName: opts.SyncName, + PolicyDir: opts.SyncDir, + PreviousCRDs: crds, + BuildScoper: builder, + Converter: opts.Converter, + // Enable API call so NamespaceSelector can talk to k8s-api-server. + AllowAPICall: true, + DynamicNSSelectorEnabled: opts.DynamicNSSelectorEnabled, + NSControllerState: opts.NSControllerState, + WebhookEnabled: opts.WebhookEnabled, + FieldManager: configsync.FieldManager, + } + options = OptionsForScope(options, opts.Scope) + + if opts.SourceFormat == configsync.SourceFormatUnstructured { + if opts.NamespaceStrategy == configsync.NamespaceStrategyImplicit { + options.Visitors = append(options.Visitors, p.addImplicitNamespaces) + } + objs, err = validate.Unstructured(ctx, opts.Client, objs, options) + } else { + objs, err = validate.Hierarchical(objs, options) + } + + if status.HasBlockingErrors(err) { + return nil, err + } + + // Duplicated with namespace.go. + e := addAnnotationsAndLabels(objs, declared.RootScope, opts.SyncName, opts.Files.sourceContext(), state.commit) + if e != nil { + err = status.Append(err, status.InternalErrorf("unable to add annotations and labels: %v", e)) + return nil, err + } + return objs, err +} + +// addImplicitNamespaces hydrates the given FileObjects by injecting implicit +// namespaces into the list before returning it. Implicit namespaces are those +// that are declared by an object's metadata namespace field but are not present +// in the list. The implicit namespace is only added if it doesn't exist. +func (p *rootSyncParser) addImplicitNamespaces(objs []ast.FileObject) ([]ast.FileObject, status.MultiError) { + opts := p.options + var errs status.MultiError + // namespaces will track the set of Namespaces we expect to exist, and those + // which actually do. + namespaces := orderedmap.NewOrderedMap[string, bool]() + + for _, o := range objs { + if o.GetObjectKind().GroupVersionKind().GroupKind() == kinds.Namespace().GroupKind() { + namespaces.Set(o.GetName(), true) + } else if o.GetNamespace() != "" { + if _, found := namespaces.Get(o.GetNamespace()); !found { + // If unset, this ensures the key exists and is false. + // Otherwise it has no impact. + namespaces.Set(o.GetNamespace(), false) + } + } + } + + for e := namespaces.Front(); e != nil; e = e.Next() { + ns, isDeclared := e.Key, e.Value + // Do not treat config-management-system as an implicit namespace for multi-sync support. + // Otherwise, the namespace will become a managed resource, and will cause conflict among multiple RootSyncs. + if isDeclared || ns == configsync.ControllerNamespace { + continue + } + existingNs := &corev1.Namespace{} + err := opts.Client.Get(context.Background(), types.NamespacedName{Name: ns}, existingNs) + if err != nil && !apierrors.IsNotFound(err) { + errs = status.Append(errs, fmt.Errorf("unable to check the existence of the implicit namespace %q: %w", ns, err)) + continue + } + + existingNs.SetGroupVersionKind(kinds.Namespace()) + // If the namespace already exists and not self-managed, do not add it as an implicit namespace. + // This is to avoid conflicts caused by multiple Root reconcilers managing the same implicit namespace. + if err == nil && !diff.IsManager(opts.Scope, opts.SyncName, existingNs) { + continue + } + + // Add the implicit namespace if it doesn't exist, or if it is managed by itself. + // If it is a self-managed namespace, still add it to the object list. Otherwise, + // it will be pruned because it is no longer in the inventory list. + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(kinds.Namespace()) + u.SetName(ns) + // We do NOT want to delete theses implicit Namespaces when the resources + // inside them are removed from the repo. We don't know when it is safe to remove + // the implicit namespaces. An implicit namespace may already exist in the + // cluster. Deleting it will cause other unmanaged resources in that namespace + // being deleted. + // + // Adding the LifecycleDeleteAnnotation is to prevent the applier from deleting + // the implicit namespace when the namespaced config is removed from the repo. + // Note that if the user later declares the + // Namespace without this annotation, the annotation is removed as expected. + u.SetAnnotations(map[string]string{common.LifecycleDeleteAnnotation: common.PreventDeletion}) + objs = append(objs, ast.NewFileObject(u, cmpath.RelativeOS(""))) + } + + return objs, errs +} diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 66cd3adf5..e29a449c0 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -74,22 +74,23 @@ type RunResult struct { } // RunFunc is the function signature of the function that starts the parse-apply-watch loop -type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult +type RunFunc func(ctx context.Context, r Reconciler, trigger string) RunResult // DefaultRunFunc is the default implementation for RunOpts.RunFunc. -func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult { +func DefaultRunFunc(ctx context.Context, r Reconciler, trigger string) RunResult { + opts := r.Options() result := RunResult{} + state := r.ReconcilerState() // Initialize status // TODO: Populate status from RSync status if state.status == nil { - reconcilerStatus, err := p.ReconcilerStatusFromCluster(ctx) + reconcilerStatus, err := r.SyncStatusClient().ReconcilerStatusFromCluster(ctx) if err != nil { state.invalidate(status.Append(nil, err)) return result } state.status = reconcilerStatus } - opts := p.options() var syncDir cmpath.Absolute gs := &SourceStatus{} // pull the source commit and directory with retries within 5 minutes. @@ -99,7 +100,7 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc // If updating the object fails, it's likely due to a signature verification error // from the webhook. In this case, add the error as a source error. if gs.Errs == nil { - err := p.setSourceAnnotations(ctx, gs.Commit) + err := r.SyncStatusClient().SetSourceAnnotations(ctx, gs.Commit) if err != nil { gs.Errs = status.Append(gs.Errs, status.SourceError.Wrap(err).Build()) } @@ -112,12 +113,12 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc // Otherwise, parsing errors may be overwritten. // TODO: Decouple fetch & parse stages to use different status fields if gs.Errs != nil || state.status.SourceStatus == nil || gs.Commit != state.status.SourceStatus.Commit { - gs.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + gs.LastUpdate = nowMeta(opts) var setSourceStatusErr error // Only update the source status if it changed if state.status.needToSetSourceStatus(gs) { klog.V(3).Info("Updating source status (after fetch)") - setSourceStatusErr = p.setSourceStatus(ctx, gs) + setSourceStatusErr = r.SyncStatusClient().SetSourceStatus(ctx, gs) // If there were errors publishing the source status, stop, log them, and retry later if setSourceStatusErr != nil { // If there were fetch errors, log those too @@ -149,9 +150,9 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc _, err := os.Stat(doneFilePath) if os.IsNotExist(err) || (err == nil && hydrate.DoneCommit(doneFilePath) != gs.Commit) { rs.Message = RenderingInProgress - rs.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + rs.LastUpdate = nowMeta(opts) klog.V(3).Info("Updating rendering status (before parse)") - setRenderingStatusErr := p.setRenderingStatus(ctx, state.status.RenderingStatus, rs) + setRenderingStatusErr := r.SyncStatusClient().SetRenderingStatus(ctx, state.status.RenderingStatus, rs) if setRenderingStatusErr == nil { state.reset() state.status.RenderingStatus = rs @@ -164,10 +165,10 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc } if err != nil { rs.Message = RenderingFailed - rs.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + rs.LastUpdate = nowMeta(opts) rs.Errs = status.InternalHydrationError(err, "unable to read the done file: %s", doneFilePath) klog.V(3).Info("Updating rendering status (before parse)") - setRenderingStatusErr := p.setRenderingStatus(ctx, state.status.RenderingStatus, rs) + setRenderingStatusErr := r.SyncStatusClient().SetRenderingStatus(ctx, state.status.RenderingStatus, rs) if setRenderingStatusErr == nil { state.status.RenderingStatus = rs state.status.SyncingConditionLastUpdate = rs.LastUpdate @@ -190,7 +191,7 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc commit: gs.Commit, syncDir: syncDir, } - if errs := read(ctx, p, trigger, state, ps); errs != nil { + if errs := r.Read(ctx, trigger, ps); errs != nil { state.invalidate(errs) return result } @@ -210,7 +211,7 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc return result } - errs := parseAndUpdate(ctx, p, trigger, state) + errs := r.ParseAndUpdate(ctx, trigger) if errs != nil { state.invalidate(errs) return result @@ -222,24 +223,26 @@ func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconc return result } -// read reads config files from source if no rendering is needed, or from hydrated output if rendering is done. -// It also updates the .status.rendering and .status.source fields. -func read(ctx context.Context, p Parser, trigger string, state *reconcilerState, sourceState *sourceState) status.MultiError { - opts := p.options() - hydrationStatus, sourceStatus := readFromSource(ctx, p, trigger, state, sourceState) +// Read source manifests from the shared source volume. +// Waits for rendering, if enabled. +// Updates the RSync status (source, rendering, and syncing condition). +func (r *reconciler) Read(ctx context.Context, trigger string, sourceState *sourceState) status.MultiError { + opts := r.Options() + state := r.ReconcilerState() + hydrationStatus, sourceStatus := r.readFromSource(ctx, trigger, sourceState) if opts.RenderingEnabled != hydrationStatus.RequiresRendering { // the reconciler is misconfigured. set the annotation so that the reconciler-manager // will recreate this reconciler with the correct configuration. - if err := p.setRequiresRendering(ctx, hydrationStatus.RequiresRendering); err != nil { + if err := r.SyncStatusClient().SetRequiresRendering(ctx, hydrationStatus.RequiresRendering); err != nil { hydrationStatus.Errs = status.Append(hydrationStatus.Errs, status.InternalHydrationError(err, "error setting %s annotation", metadata.RequiresRenderingAnnotationKey)) } } - hydrationStatus.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + hydrationStatus.LastUpdate = nowMeta(opts) // update the rendering status before source status because the parser needs to // read and parse the configs after rendering is done and there might have errors. klog.V(3).Info("Updating rendering status (after parse)") - setRenderingStatusErr := p.setRenderingStatus(ctx, state.status.RenderingStatus, hydrationStatus) + setRenderingStatusErr := r.SyncStatusClient().SetRenderingStatus(ctx, state.status.RenderingStatus, hydrationStatus) if setRenderingStatusErr == nil { state.status.RenderingStatus = hydrationStatus state.status.SyncingConditionLastUpdate = hydrationStatus.LastUpdate @@ -255,11 +258,11 @@ func read(ctx context.Context, p Parser, trigger string, state *reconcilerState, // Only call `setSourceStatus` if `readFromSource` fails. // If `readFromSource` succeeds, `parse` may still fail. - sourceStatus.LastUpdate = metav1.Time{Time: opts.Clock.Now()} + sourceStatus.LastUpdate = nowMeta(opts) var setSourceStatusErr error if state.status.needToSetSourceStatus(sourceStatus) { klog.V(3).Info("Updating source status (after parse)") - setSourceStatusErr := p.setSourceStatus(ctx, sourceStatus) + setSourceStatusErr := r.SyncStatusClient().SetSourceStatus(ctx, sourceStatus) if setSourceStatusErr == nil { state.status.SourceStatus = sourceStatus state.status.SyncingConditionLastUpdate = sourceStatus.LastUpdate @@ -272,8 +275,8 @@ func read(ctx context.Context, p Parser, trigger string, state *reconcilerState, // parseHydrationState reads from the file path which the hydration-controller // container writes to. It checks if the hydrated files are ready and returns // a renderingStatus. -func parseHydrationState(p Parser, srcState *sourceState, hydrationStatus *RenderingStatus) (*sourceState, *RenderingStatus) { - opts := p.options() +func (r *reconciler) parseHydrationState(srcState *sourceState, hydrationStatus *RenderingStatus) (*sourceState, *RenderingStatus) { + opts := r.Options() if !opts.RenderingEnabled { hydrationStatus.Message = RenderingSkipped return srcState, hydrationStatus @@ -315,8 +318,9 @@ func parseHydrationState(p Parser, srcState *sourceState, hydrationStatus *Rende // readFromSource reads the source or hydrated configs, checks whether the sourceState in // the cache is up-to-date. If the cache is not up-to-date, reads all the source or hydrated files. // readFromSource returns the rendering status and source status. -func readFromSource(ctx context.Context, p Parser, trigger string, recState *reconcilerState, srcState *sourceState) (*RenderingStatus, *SourceStatus) { - opts := p.options() +func (r *reconciler) readFromSource(ctx context.Context, trigger string, srcState *sourceState) (*RenderingStatus, *SourceStatus) { + opts := r.Options() + recState := r.ReconcilerState() start := opts.Clock.Now() hydrationStatus := &RenderingStatus{ @@ -329,7 +333,7 @@ func readFromSource(ctx context.Context, p Parser, trigger string, recState *rec Commit: srcState.commit, } - srcState, hydrationStatus = parseHydrationState(p, srcState, hydrationStatus) + srcState, hydrationStatus = r.parseHydrationState(srcState, hydrationStatus) if hydrationStatus.Errs != nil { return hydrationStatus, srcStatus } @@ -366,16 +370,17 @@ func readFromSource(ctx context.Context, p Parser, trigger string, recState *rec return hydrationStatus, srcStatus } -func parseSource(ctx context.Context, p Parser, trigger string, state *reconcilerState) status.MultiError { +func (r *reconciler) parseSource(ctx context.Context, trigger string) status.MultiError { + state := r.ReconcilerState() if state.cache.parserResultUpToDate() { return nil } - opts := p.options() + opts := r.Options() start := opts.Clock.Now() var sourceErrs status.MultiError - objs, errs := p.parseSource(ctx, state.cache.source) + objs, errs := r.Parser().ParseSource(ctx, state.cache.source) if !opts.WebhookEnabled { klog.V(3).Infof("Removing %s annotation as Admission Webhook is disabled", metadata.DeclaredFieldsKey) for _, obj := range objs { @@ -387,7 +392,7 @@ func parseSource(ctx context.Context, p Parser, trigger string, state *reconcile state.cache.setParserResult(objs, sourceErrs) if !status.HasBlockingErrors(sourceErrs) && opts.WebhookEnabled { - err := webhookconfiguration.Update(ctx, opts.k8sClient(), opts.discoveryClient(), objs, + err := webhookconfiguration.Update(ctx, opts.Client, opts.DiscoveryClient, objs, client.FieldOwner(configsync.FieldManager)) if err != nil { // Don't block if updating the admission webhook fails. @@ -404,21 +409,24 @@ func parseSource(ctx context.Context, p Parser, trigger string, state *reconcile return sourceErrs } -func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconcilerState) status.MultiError { - opts := p.options() +// ParseAndUpdate parses objects from the source manifests, validates them, and +// then syncs them to the cluster with the Updater. +func (r *reconciler) ParseAndUpdate(ctx context.Context, trigger string) status.MultiError { + opts := r.Options() + state := r.ReconcilerState() klog.V(3).Info("Parser starting...") - sourceErrs := parseSource(ctx, p, trigger, state) + sourceErrs := r.parseSource(ctx, trigger) klog.V(3).Info("Parser stopped") newSourceStatus := &SourceStatus{ Spec: state.cache.source.spec, Commit: state.cache.source.commit, Errs: sourceErrs, - LastUpdate: metav1.Time{Time: opts.Clock.Now()}, + LastUpdate: nowMeta(opts), } if state.status.needToSetSourceStatus(newSourceStatus) { klog.V(3).Info("Updating source status (after parse)") - if err := p.setSourceStatus(ctx, newSourceStatus); err != nil { - // If `p.setSourceStatus` fails, we terminate the reconciliation. + if err := r.SyncStatusClient().SetSourceStatus(ctx, newSourceStatus); err != nil { + // If `r.SetSourceStatus` fails, we terminate the reconciliation. // If we call `update` in this case and `update` succeeds, `Status.Source.Commit` would end up be older // than `Status.Sync.Commit`. return status.Append(sourceErrs, err) @@ -434,7 +442,7 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc // Create a new context with its cancellation function. ctxForUpdateSyncStatus, cancel := context.WithCancel(context.Background()) - go updateSyncStatusPeriodically(ctxForUpdateSyncStatus, p, state) + go r.updateSyncStatusPeriodically(ctxForUpdateSyncStatus) klog.V(3).Info("Updater starting...") start := opts.Clock.Now() @@ -448,8 +456,16 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc // SyncErrors include errors from both the Updater and Remediator klog.V(3).Info("Updating sync status (after sync)") - syncErrs := p.SyncErrors() - if err := setSyncStatus(ctx, p, state, state.status.SourceStatus.Spec, false, state.cache.source.commit, syncErrs); err != nil { + syncErrs := r.ReconcilerState().SyncErrors() + // Copy the spec and commit from the source status + syncStatus := &SyncStatus{ + Spec: state.status.SourceStatus.Spec, + Syncing: false, + Commit: state.cache.source.commit, + Errs: syncErrs, + LastUpdate: nowMeta(opts), + } + if err := r.SetSyncStatus(ctx, syncStatus); err != nil { syncErrs = status.Append(syncErrs, err) } @@ -457,21 +473,14 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc return status.Append(sourceErrs, syncErrs) } -// setSyncStatus updates `.status.sync` and the Syncing condition, if needed, +// SetSyncStatus updates `.status.sync` and the Syncing condition, if needed, // as well as `state.syncStatus` and `state.syncingConditionLastUpdate` if // the update is successful. -func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, spec SourceSpec, syncing bool, commit string, syncErrs status.MultiError) error { - options := p.options() +func (r *reconciler) SetSyncStatus(ctx context.Context, newSyncStatus *SyncStatus) error { + state := r.ReconcilerState() // Update the RSync status, if necessary - newSyncStatus := &SyncStatus{ - Spec: spec, - Syncing: syncing, - Commit: commit, - Errs: syncErrs, - LastUpdate: metav1.Time{Time: options.Clock.Now()}, - } if state.status.needToSetSyncStatus(newSyncStatus) { - if err := p.SetSyncStatus(ctx, newSyncStatus); err != nil { + if err := r.SyncStatusClient().SetSyncStatus(ctx, newSyncStatus); err != nil { return err } state.status.SyncStatus = newSyncStatus @@ -479,7 +488,8 @@ func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, spec S } // Report conflict errors to the remote manager, if it's a RootSync. - if err := reportRootSyncConflicts(ctx, p.K8sClient(), options.ManagementConflicts()); err != nil { + opts := r.Options() + if err := reportRootSyncConflicts(ctx, opts.Client, opts.ManagementConflicts()); err != nil { return fmt.Errorf("failed to report remote conflicts: %w", err) } return nil @@ -487,8 +497,9 @@ func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, spec S // updateSyncStatusPeriodically update the sync status periodically until the // cancellation function of the context is called. -func updateSyncStatusPeriodically(ctx context.Context, p Parser, state *reconcilerState) { - opts := p.options() +func (r *reconciler) updateSyncStatusPeriodically(ctx context.Context) { + opts := r.Options() + state := r.ReconcilerState() klog.V(3).Info("Periodic sync status updates starting...") updatePeriod := opts.StatusUpdatePeriod updateTimer := opts.Clock.NewTimer(updatePeriod) @@ -502,7 +513,15 @@ func updateSyncStatusPeriodically(ctx context.Context, p Parser, state *reconcil case <-updateTimer.C(): klog.V(3).Info("Updating sync status (periodic while syncing)") - if err := setSyncStatus(ctx, p, state, state.status.SourceStatus.Spec, true, state.cache.source.commit, p.SyncErrors()); err != nil { + // Copy the spec and commit from the source status + syncStatus := &SyncStatus{ + Spec: state.status.SourceStatus.Spec, + Syncing: true, + Commit: state.cache.source.commit, + Errs: r.ReconcilerState().SyncErrors(), + LastUpdate: nowMeta(opts), + } + if err := r.SetSyncStatus(ctx, syncStatus); err != nil { klog.Warningf("failed to update sync status: %v", err) } @@ -541,3 +560,7 @@ func reportRootSyncConflicts(ctx context.Context, k8sClient client.Client, confl } return nil } + +func nowMeta(opts *ReconcilerOptions) metav1.Time { + return metav1.Time{Time: opts.Clock.Now()} +} diff --git a/pkg/parse/run_test.go b/pkg/parse/run_test.go index ac6e8e9e7..6fc0cbf4f 100644 --- a/pkg/parse/run_test.go +++ b/pkg/parse/run_test.go @@ -61,41 +61,56 @@ const ( symLink = "rev" ) -func newParser(t *testing.T, clock clock.Clock, fakeClient client.Client, fs FileSource, renderingEnabled bool) Parser { - parser := &root{} +func newRootReconciler(t *testing.T, clock clock.Clock, fakeClient client.Client, fs FileSource, renderingEnabled bool) *reconciler { converter, err := openapitest.ValueConverterForTest() if err != nil { t.Fatal(err) } - - parser.RootOptions = &RootOptions{ + state := &ReconcilerState{ + syncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + } + opts := &Options{ + Clock: clock, + ConfigParser: filesystem.NewParser(&reader.File{}), + SyncName: rootSyncName, + Scope: declared.RootScope, + ReconcilerName: rootReconcilerName, + Client: fakeClient, + DiscoveryClient: syncerFake.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), + Converter: converter, + Files: Files{FileSource: fs}, + DeclaredResources: &declared.Resources{}, + } + rootOpts := &RootOptions{ + Options: opts, SourceFormat: configsync.SourceFormatUnstructured, } - parser.Options = &Options{ - Clock: clock, - Parser: filesystem.NewParser(&reader.File{}), - StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: fakeClient, - DiscoveryInterface: syncerFake.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Converter: converter, - Files: Files{FileSource: fs}, - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, + recOpts := &ReconcilerOptions{ + Options: opts, + Updater: &Updater{ + Scope: opts.Scope, + Resources: opts.DeclaredResources, Remediator: &remediatorfake.Remediator{}, Applier: &applierfake.Applier{ ApplyOutputs: []applierfake.ApplierOutputs{ {}, // One Apply call, no errors }, }, - SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), + SyncErrorCache: state.syncErrorCache, }, - RenderingEnabled: renderingEnabled, + StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod, + RenderingEnabled: renderingEnabled, + } + return &reconciler{ + options: recOpts, + syncStatusClient: &rootSyncStatusClient{ + options: opts, + }, + parser: &rootSyncParser{ + options: rootOpts, + }, + reconcilerState: state, } - - return parser } func createRootDir(rootDir, commit string) error { @@ -752,13 +767,12 @@ func TestRun(t *testing.T) { SourceBranch: fileSource.SourceBranch, } fakeClient := syncerFake.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := newParser(t, fakeClock, fakeClient, fs, tc.renderingEnabled) - state := &reconcilerState{} + reconciler := newRootReconciler(t, fakeClock, fakeClient, fs, tc.renderingEnabled) t.Logf("start running test at %v", time.Now()) - result := DefaultRunFunc(context.Background(), parser, triggerReimport, state) + result := DefaultRunFunc(context.Background(), reconciler, triggerReimport) assert.Equal(t, tc.expectedSourceChanged, result.SourceChanged) - assert.Equal(t, tc.needRetry, state.cache.needToRetry) + assert.Equal(t, tc.needRetry, reconciler.ReconcilerState().cache.needToRetry) rs := &v1beta1.RootSync{} err = fakeClient.Get(context.Background(), rootsync.ObjectKey(rootSyncName), rs) diff --git a/pkg/parse/source_test.go b/pkg/parse/source_test.go index badf45167..40f124c46 100644 --- a/pkg/parse/source_test.go +++ b/pkg/parse/source_test.go @@ -24,17 +24,10 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/utils/clock" - "kpt.dev/configsync/pkg/api/configsync" - "kpt.dev/configsync/pkg/core" - "kpt.dev/configsync/pkg/core/k8sobjects" - "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/hydrate" "kpt.dev/configsync/pkg/importer/filesystem/cmpath" ft "kpt.dev/configsync/pkg/importer/filesystem/filesystemtest" - "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/status" - syncertest "kpt.dev/configsync/pkg/syncer/syncertest/fake" ) var originCommit = "1234567890abcde" @@ -106,27 +99,12 @@ func TestReadConfigFiles(t *testing.T) { files: nil, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - SyncName: rootSyncName, - ReconcilerName: rootReconcilerName, - Client: syncertest.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)), - DiscoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()), - Updater: Updater{ - Scope: declared.RootScope, - Resources: &declared.Resources{}, - }, - }, - RootOptions: &RootOptions{ - SourceFormat: configsync.SourceFormatUnstructured, - }, - } + files := &Files{} // set the necessary FileSource of parser - parser.SourceDir = symDir + files.SourceDir = symDir - err = parser.readConfigFiles(srcState) + err = files.readConfigFiles(srcState) assert.Equal(t, tc.wantedErr, err) }) } @@ -267,16 +245,11 @@ func TestReadHydratedDirWithRetry(t *testing.T) { commit: originCommit, } - parser := &root{ - Options: &Options{ - Clock: clock.RealClock{}, // TODO: Test with fake clock - Files: Files{ - FileSource: FileSource{ - HydratedRoot: hydratedRoot, - HydratedLink: symLink, - SyncDir: cmpath.RelativeOS(tc.syncDir), - }, - }, + files := Files{ + FileSource: FileSource{ + HydratedRoot: hydratedRoot, + HydratedLink: symLink, + SyncDir: cmpath.RelativeOS(tc.syncDir), }, } @@ -286,8 +259,8 @@ func TestReadHydratedDirWithRetry(t *testing.T) { } t.Logf("start calling readHydratedDirWithRetry at %v", time.Now()) - hydrationState, hydrationErr := parser.readHydratedDirWithRetry(backoff, - cmpath.Absolute(hydratedRoot), parser.ReconcilerName, srcState) + hydrationState, hydrationErr := files.readHydratedDirWithRetry(backoff, + cmpath.Absolute(hydratedRoot), "unused", srcState) if tc.expectedErrMsg == "" { assert.Nil(t, hydrationErr) diff --git a/pkg/parse/state.go b/pkg/parse/state.go index d355a1379..4f0aba427 100644 --- a/pkg/parse/state.go +++ b/pkg/parse/state.go @@ -19,7 +19,20 @@ import ( "kpt.dev/configsync/pkg/status" ) -type reconcilerState struct { +// ReconcilerState is the current state of the Reconciler, including progress +// indicators and in-memory cache for each of the reconciler stages: +// - Fetch +// - Read +// - Render/Hydrate +// - Parse/Validate +// - Update +// +// ReconcilerState also includes a cache of the RSync spec and status +// (ReconcilerStatus). +// +// TODO: break up cacheForCommit into phase-based caches +// TODO: move sourceState into ReconcilerState so the RSync spec and status are next to each other +type ReconcilerState struct { // lastApplied keeps the state for the last successful-applied syncDir. lastApplied string @@ -28,9 +41,11 @@ type reconcilerState struct { // cache tracks the progress made by the reconciler for a source commit. cache cacheForCommit + + syncErrorCache *SyncErrorCache } -func (s *reconcilerState) checkpoint() { +func (s *ReconcilerState) checkpoint() { applied := s.cache.source.syncDir.OSPath() if applied == s.lastApplied { return @@ -42,7 +57,7 @@ func (s *reconcilerState) checkpoint() { // reset sets the reconciler to retry in the next second because the rendering // status is not available -func (s *reconcilerState) reset() { +func (s *ReconcilerState) reset() { klog.Infof("Resetting reconciler checkpoint because the rendering status is not available yet") s.resetCache() s.lastApplied = "" @@ -51,7 +66,7 @@ func (s *reconcilerState) reset() { // invalidate logs the errors, clears the state tracking information. // invalidate does not clean up the `s.cache`. -func (s *reconcilerState) invalidate(errs status.MultiError) { +func (s *ReconcilerState) invalidate(errs status.MultiError) { klog.Errorf("Invalidating reconciler checkpoint: %v", status.FormatSingleLine(errs)) // Invalidate state on error since this could be the result of switching // branches or some other operation where inverting the operation would @@ -63,7 +78,7 @@ func (s *reconcilerState) invalidate(errs status.MultiError) { // resetCache resets the whole cache. // // resetCache is called when a new source commit is detected. -func (s *reconcilerState) resetCache() { +func (s *ReconcilerState) resetCache() { s.cache = cacheForCommit{} } @@ -74,10 +89,16 @@ func (s *reconcilerState) resetCache() { // resetPartialCache is called when: // - a force-resync happens, or // - one of the watchers noticed a management conflict. -func (s *reconcilerState) resetPartialCache() { +func (s *ReconcilerState) resetPartialCache() { source := s.cache.source needToRetry := s.cache.needToRetry s.cache = cacheForCommit{} s.cache.source = source s.cache.needToRetry = needToRetry } + +// SyncErrors returns all the sync errors, including remediator errors, +// validation errors, applier errors, and watch update errors. +func (s *ReconcilerState) SyncErrors() status.MultiError { + return s.syncErrorCache.Errors() +} diff --git a/pkg/parse/sync_status_client.go b/pkg/parse/sync_status_client.go new file mode 100644 index 000000000..48b5216d5 --- /dev/null +++ b/pkg/parse/sync_status_client.go @@ -0,0 +1,35 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" +) + +// SyncStatusClient provides methods to read and write RSync object status. +type SyncStatusClient interface { + // ReconcilerStatusFromCluster reads the status of the reconciler from the RSync status. + ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error) + // SetSourceStatus sets the source status and syncing condition on the RSync. + SetSourceStatus(ctx context.Context, newStatus *SourceStatus) error + // SetRenderingStatus sets the rendering status and syncing condition on the RSync. + SetRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error + // SetSyncStatus sets the sync status and syncing condition on the RSync. + SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error + // SetRequiresRendering sets the requires-rendering annotation on the RSync. + SetRequiresRendering(ctx context.Context, renderingRequired bool) error + // SetSourceAnnotations sets the source annotations on the RSync. + SetSourceAnnotations(ctx context.Context, commit string) error +} diff --git a/pkg/parse/updater.go b/pkg/parse/updater.go index ed97cf002..f90ae7c99 100644 --- a/pkg/parse/updater.go +++ b/pkg/parse/updater.go @@ -19,18 +19,15 @@ import ( "sync" "time" - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/applier" "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/importer/filesystem" - "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/remediator" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/status" - "kpt.dev/configsync/pkg/util/clusterconfig" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -39,7 +36,7 @@ type Updater struct { // Scope defines the scope of the reconciler, either root or namespaced. Scope declared.Scope // Resources is a set of resources declared in the source of truth. - *declared.Resources + Resources *declared.Resources // Remediator is the interface Remediator implements that accepts a new set of // declared configuration. Remediator remediator.Interface @@ -75,24 +72,6 @@ func (u *Updater) Remediating() bool { return u.Remediator.Remediating() } -// declaredCRDs returns the list of CRDs which are present in the updater's -// declared resources. -func (u *Updater) declaredCRDs() ([]*v1beta1.CustomResourceDefinition, status.MultiError) { - var crds []*v1beta1.CustomResourceDefinition - declaredObjs, _ := u.Resources.DeclaredUnstructureds() - for _, obj := range declaredObjs { - if obj.GroupVersionKind().GroupKind() != kinds.CustomResourceDefinition() { - continue - } - crd, err := clusterconfig.AsCRD(obj) - if err != nil { - return nil, err - } - crds = append(crds, crd) - } - return crds, nil -} - // Update does the following: // 1. Pauses the remediator // 2. Validates and sterilizes the objects diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index a260800bd..fb68678e0 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -233,7 +233,7 @@ func Run(opts Options) { } // Configure the Parser. - var parser parse.Parser + var reconciler parse.Reconciler fs := parse.FileSource{ SourceDir: opts.SourceRoot, RepoRoot: opts.RepoRoot, @@ -247,24 +247,17 @@ func Run(opts Options) { } parseOpts := &parse.Options{ - Clock: clock.RealClock{}, - Parser: filesystem.NewParser(&reader.File{}), - ClusterName: opts.ClusterName, - Client: cl, - ReconcilerName: opts.ReconcilerName, - SyncName: opts.SyncName, - StatusUpdatePeriod: opts.StatusUpdatePeriod, - DiscoveryInterface: discoveryClient, - RenderingEnabled: opts.RenderingEnabled, - Files: parse.Files{FileSource: fs}, - WebhookEnabled: opts.WebhookEnabled, - Updater: parse.Updater{ - Scope: opts.ReconcilerScope, - Resources: decls, - Applier: supervisor, - Remediator: rem, - SyncErrorCache: parse.NewSyncErrorCache(conflictHandler, fightHandler), - }, + Clock: clock.RealClock{}, + ConfigParser: filesystem.NewParser(&reader.File{}), + ClusterName: opts.ClusterName, + Client: cl, + ReconcilerName: opts.ReconcilerName, + SyncName: opts.SyncName, + Scope: opts.ReconcilerScope, + DiscoveryClient: discoveryClient, + Files: parse.Files{FileSource: fs}, + WebhookEnabled: opts.WebhookEnabled, + DeclaredResources: decls, } // Only instantiate the converter when the webhook is enabled because the // instantiation pulls fresh schemas from the openapi discovery endpoint. @@ -286,9 +279,23 @@ func Run(opts Options) { RetryBackoff: util.BackoffWithDurationAndStepLimit(0, 12), } + reconcilerOpts := &parse.ReconcilerOptions{ + Options: parseOpts, + Updater: &parse.Updater{ + Scope: opts.ReconcilerScope, + Resources: decls, + Applier: supervisor, + Remediator: rem, + SyncErrorCache: parse.NewSyncErrorCache(conflictHandler, fightHandler), + }, + StatusUpdatePeriod: opts.StatusUpdatePeriod, + RenderingEnabled: opts.RenderingEnabled, + } + var nsControllerState *namespacecontroller.State if opts.ReconcilerScope == declared.RootScope { - rootOpts := &parse.RootOptions{ + rootParseOpts := &parse.RootOptions{ + Options: parseOpts, SourceFormat: opts.SourceFormat, NamespaceStrategy: opts.NamespaceStrategy, DynamicNSSelectorEnabled: opts.DynamicNSSelectorEnabled, @@ -298,14 +305,14 @@ func Run(opts Options) { // enabled on RootSyncs. // RepoSync can't manage NamespaceSelectors. nsControllerState = namespacecontroller.NewState() - rootOpts.NSControllerState = nsControllerState + rootParseOpts.NSControllerState = nsControllerState // Enable namespace events (every second) // TODO: Trigger namespace events with a buffered channel from the NamespaceController pgBuilder.NamespaceControllerPeriod = time.Second } - parser = parse.NewRootRunner(parseOpts, rootOpts) + reconciler = parse.NewRootRunner(reconcilerOpts, rootParseOpts) } else { - parser = parse.NewNamespaceRunner(parseOpts) + reconciler = parse.NewNamespaceRunner(reconcilerOpts, parseOpts) } // Start listening to signals @@ -414,7 +421,7 @@ func Run(opts Options) { funnel := &events.Funnel{ Publishers: pgBuilder.Build(), // Wrap the parser with an event handler that triggers the RunFunc, as needed. - Subscriber: parse.NewEventHandler(ctx, parser, nsControllerState, parse.DefaultRunFunc), + Subscriber: parse.NewEventHandler(ctx, reconciler, nsControllerState, parse.DefaultRunFunc), } doneChForParser := funnel.Start(ctx)