Skip to content

Commit

Permalink
chore: Extract Reconciler from Parser (#1461)
Browse files Browse the repository at this point in the history
- Split the Parser interface into Parser & SyncStatusClient.
- Replace the root & namespace structs with seperate implementations
  of the Parser & SyncStatusClient interfaces.
- Create a new common reconciler struct which composes the Parser
  & SyncStatusClient interfaces. The reconciler struct can now hold
  common methods, and the current ReconcilerState, which encompases
  spec, status, and cache for all reconciler phases, not just the
  parser.
- Add a new Reconciler interface, for use by the EventHandler and
  DefaultRunFunc. We may want to move DefaultRunFunc to be a
  reconciler method in the future, but this would relocate all the
  run code, so we can do it in a seperate change.
- Move the Mutex out of the Options and into the Reconciler.
  Options should generally not be mutated after construction.
- Change interface methods to public. There's no reason to have
  private interface methods, even if the interface itself is private.
- Add ReconcilerOptions, to contain options that the Parser doesn't
  use itself. ReconcilerOptions extends parse.Options to ensure the
  injected values are consistent and simplify usage.
- Move SyncErrorCache into the ReconcilerState, since it's not just
  Parser or Updater errors.
- Move DeclaredCRDs method from the Updater into declared.Resources.
  Then inject the resources into the Parser, so it doesn't need to
  depend on the whole Updater just to get the list of valid CRDs.
  The naming here is still confusing, since it's not just the
  resource/objects declared in the source, but actually the set of
  valid & unskipped/known-scoped source objects, after being parsed.
  Managing the declared.Resources should probably be moved from the
  Updater to the Reconciler in the future.
  • Loading branch information
karlkfi authored Oct 29, 2024
1 parent 81d3241 commit b78576a
Show file tree
Hide file tree
Showing 18 changed files with 1,569 additions and 1,217 deletions.
21 changes: 21 additions & 0 deletions pkg/declared/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"sync"

"github.com/elliotchance/orderedmap/v2"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"kpt.dev/configsync/pkg/kinds"
"kpt.dev/configsync/pkg/metrics"
"kpt.dev/configsync/pkg/status"
"kpt.dev/configsync/pkg/syncer/reconcile"
"kpt.dev/configsync/pkg/util/clusterconfig"
"sigs.k8s.io/controller-runtime/pkg/client"

"kpt.dev/configsync/pkg/core"
Expand Down Expand Up @@ -150,3 +153,21 @@ func (r *Resources) DeclaredGVKs() (map[schema.GroupVersionKind]struct{}, string
}
return gvkSet, r.commit
}

// DeclaredCRDs returns the list of CRDs declared in the source.
func (r *Resources) DeclaredCRDs() ([]*v1beta1.CustomResourceDefinition, status.MultiError) {
// DeclaredUnstructureds handles the mutex, so this method doesn't need to lock.
var crds []*v1beta1.CustomResourceDefinition
declaredObjs, _ := r.DeclaredUnstructureds()
for _, obj := range declaredObjs {
if obj.GroupVersionKind().GroupKind() != kinds.CustomResourceDefinition() {
continue
}
crd, err := clusterconfig.AsCRD(obj)
if err != nil {
return nil, err
}
crds = append(crds, crd)
}
return crds, nil
}
42 changes: 24 additions & 18 deletions pkg/parse/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,16 @@ import (
// triggers the RunFunc when appropriate.
type EventHandler struct {
Context context.Context
Parser Parser
ReconcilerState *reconcilerState
Reconciler Reconciler
NSControllerState *namespacecontroller.State
Run RunFunc
}

// NewEventHandler builds an EventHandler
func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler {
func NewEventHandler(ctx context.Context, r Reconciler, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler {
return &EventHandler{
Context: ctx,
Parser: parser,
ReconcilerState: &reconcilerState{},
Reconciler: r,
NSControllerState: nsControllerState,
Run: runFn,
}
Expand All @@ -54,13 +52,14 @@ func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *name
// - Reconciler requested a retry due to error
// - Remediator requested a watch update
func (s *EventHandler) Handle(event events.Event) events.Result {
opts := s.Parser.options()
opts := s.Reconciler.Options()
state := s.Reconciler.ReconcilerState()

var eventResult events.Result
// Wrap the RunFunc to set Result.RunAttempted.
// This delays status update and sync events.
runFn := func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult {
result := s.Run(ctx, p, trigger, state)
runFn := func(ctx context.Context, r Reconciler, trigger string) RunResult {
result := s.Run(ctx, r, trigger)
eventResult.RunAttempted = true
return result
}
Expand All @@ -76,23 +75,30 @@ func (s *EventHandler) Handle(event events.Event) events.Result {
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
s.ReconcilerState.resetPartialCache()
runResult = runFn(s.Context, s.Parser, triggerResync, s.ReconcilerState)
state.resetPartialCache()
runResult = runFn(s.Context, s.Reconciler, triggerResync)

case events.SyncEventType:
// Re-import declared resources from the filesystem (from *-sync).
// If the reconciler is in the process of reconciling a given commit, the re-import won't
// happen until the ongoing reconciliation is done.
runResult = runFn(s.Context, s.Parser, triggerReimport, s.ReconcilerState)
runResult = runFn(s.Context, s.Reconciler, triggerReimport)

case events.StatusEventType:
// Publish the sync status periodically to update remediator errors.
// Skip updates if the remediator is not running yet, paused, or watches haven't been updated yet.
// This implies that this reconciler has successfully parsed, rendered, validated, and synced.
if opts.Remediating() {
klog.V(3).Info("Updating sync status (periodic while not syncing)")
// Don't update the sync spec or commit.
if err := setSyncStatus(s.Context, s.Parser, s.ReconcilerState, s.ReconcilerState.status.SyncStatus.Spec, false, s.ReconcilerState.status.SyncStatus.Commit, s.Parser.SyncErrors()); err != nil {
// Don't update the sync spec or commit, just the errors and status.
syncStatus := &SyncStatus{
Spec: state.status.SyncStatus.Spec,
Syncing: false,
Commit: state.status.SyncStatus.Commit,
Errs: s.Reconciler.ReconcilerState().SyncErrors(),
LastUpdate: nowMeta(opts),
}
if err := s.Reconciler.SetSyncStatus(s.Context, syncStatus); err != nil {
if errors.Is(err, context.Canceled) {
klog.Infof("Sync status update skipped: %v", err)
} else {
Expand All @@ -113,8 +119,8 @@ func (s *EventHandler) Handle(event events.Event) events.Result {
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
s.ReconcilerState.resetPartialCache()
runResult = runFn(s.Context, s.Parser, namespaceEvent, s.ReconcilerState)
state.resetPartialCache()
runResult = runFn(s.Context, s.Reconciler, namespaceEvent)

case events.RetrySyncEventType:
// Retry if there was an error, conflict, or any watches need to be updated.
Expand All @@ -123,9 +129,9 @@ func (s *EventHandler) Handle(event events.Event) events.Result {
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
s.ReconcilerState.resetPartialCache()
state.resetPartialCache()
trigger = triggerManagementConflict
} else if s.ReconcilerState.cache.needToRetry {
} else if state.cache.needToRetry {
trigger = triggerRetry
} else if opts.needToUpdateWatch() {
trigger = triggerWatchUpdate
Expand All @@ -138,7 +144,7 @@ func (s *EventHandler) Handle(event events.Event) events.Result {
// retryTimer will be reset to `Options.RetryPeriod`, and state.backoff is reset to `defaultBackoff()`.
// In this case, `run` will try to sync the configs from the new commit instead of the old commit
// being retried.
runResult = runFn(s.Context, s.Parser, trigger, s.ReconcilerState)
runResult = runFn(s.Context, s.Reconciler, trigger)

default:
klog.Fatalf("Invalid event received: %#v", event)
Expand Down
76 changes: 30 additions & 46 deletions pkg/parse/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@
package parse

import (
"context"
"sync"
"time"

"k8s.io/utils/clock"
"kpt.dev/configsync/pkg/declared"
"kpt.dev/configsync/pkg/importer/analyzer/ast"
"kpt.dev/configsync/pkg/importer/filesystem"
"kpt.dev/configsync/pkg/status"
"kpt.dev/configsync/pkg/util/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Options holds configuration and core functionality required by all parsers.
// Options holds configuration and dependencies required by all parsers.
type Options struct {
// Files lists Files in the source of truth.
// TODO: compose Files without extending.
Files

// Clock is used for time tracking, namely to simplify testing by allowing
// a fake clock, instead of a RealClock.
Clock clock.Clock

// Parser defines the minimum interface required for Reconciler to use a
// Parser to read configs from a filesystem.
Parser filesystem.ConfigParser
// ConfigParser defines the minimum interface required for Reconciler to use a
// ConfigParser to read configs from a filesystem.
ConfigParser filesystem.ConfigParser

// ClusterName is the name of the cluster we're syncing configuration to.
ClusterName string
Expand All @@ -52,56 +52,40 @@ type Options struct {
// SyncName is the name of the RootSync or RepoSync object.
SyncName string

// StatusUpdatePeriod is how long the Parser waits between updates of the
// sync status, to account for management conflict errors from the Remediator.
StatusUpdatePeriod time.Duration
// Scope defines the scope of the reconciler, either root or namespaced.
Scope declared.Scope

// DiscoveryInterface is how the Parser learns what types are currently
// DiscoveryClient is how the Parser learns what types are currently
// available on the cluster.
DiscoveryInterface discovery.ServerResourcer
DiscoveryClient discovery.ServerResourcer

// Converter uses the DiscoveryInterface to encode the declared fields of
// objects in Git.
Converter *declared.ValueConverter

// mux prevents status update conflicts.
mux sync.Mutex

// RenderingEnabled indicates whether the hydration-controller is currently
// running for this reconciler.
RenderingEnabled bool

// WebhookEnabled indicates whether the Webhook is currently enabled
WebhookEnabled bool

// Files lists Files in the source of truth.
Files
// Updater mutates the most-recently-seen versions of objects stored in memory.
Updater
// DeclaredResources is the set of valid source objects, managed by the
// Updater and shared with the Parser & Remediator.
// This is used by the Parser to validate that CRDs can only be removed from
// the source when all of its CRs are removed as well.
DeclaredResources *declared.Resources
}

// Parser represents a parser that can be pointed at and continuously parse a source.
type Parser interface {
parseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError)
ReconcilerStatusFromCluster(ctx context.Context) (*ReconcilerStatus, error)
setSourceStatus(ctx context.Context, newStatus *SourceStatus) error
setRenderingStatus(ctx context.Context, oldStatus, newStatus *RenderingStatus) error
SetSyncStatus(ctx context.Context, newStatus *SyncStatus) error
options() *Options
// SyncErrors returns all the sync errors, including remediator errors,
// validation errors, applier errors, and watch update errors.
SyncErrors() status.MultiError
// K8sClient returns the Kubernetes client that talks to the API server.
K8sClient() client.Client
// setRequiresRendering sets the requires-rendering annotation on the RSync
setRequiresRendering(ctx context.Context, renderingRequired bool) error
setSourceAnnotations(ctx context.Context, commit string) error
}
// ReconcilerOptions holds configuration for the reconciler.
type ReconcilerOptions struct {
// Extend parser options to ensure they're using the same dependencies.
*Options

func (o *Options) k8sClient() client.Client {
return o.Client
}
// Updater syncs the source from the parsed cache to the cluster.
*Updater

func (o *Options) discoveryClient() discovery.ServerResourcer {
return o.DiscoveryInterface
// StatusUpdatePeriod is how long the Parser waits between updates of the
// sync status, to account for management conflict errors from the Remediator.
StatusUpdatePeriod time.Duration

// RenderingEnabled indicates whether the hydration-controller is currently
// running for this reconciler.
RenderingEnabled bool
}
28 changes: 28 additions & 0 deletions pkg/parse/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parse

import (
"context"

"kpt.dev/configsync/pkg/importer/analyzer/ast"
"kpt.dev/configsync/pkg/status"
)

// Parser represents a parser that can be pointed at and continuously parse a source.
type Parser interface {
// ParseSource parses the source manifest files and returns the objects.
ParseSource(ctx context.Context, state *sourceState) ([]ast.FileObject, status.MultiError)
}
Loading

0 comments on commit b78576a

Please sign in to comment.