From 9780d880640ba42ff79451a01145a2856b9f0f02 Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Mon, 23 Sep 2024 14:13:33 -0700 Subject: [PATCH] Refactor parse.Run to use an event funnel (#1272) Replace the timers in parse.Run with an set of event Producers that send events into a Funnel which synchronizes them for an EventHandler which executes the RunFunc as needed for specific event types. The PublishingGroupBuilder takes the timer periods and generates Publishers, one for each type of event. Then the event Funnel starts all the Publishers and runs a select loop that calls the Publisher's Publish method when that event is received. The Publish method then calls the Subscriber.Handle method. Motivation: - Allows for testing retry backoff in the event funnel, separate from the parser logic, using a FakeClock. - This should make parse.Run look a little more like a controller's Reconcile loop from controller-runtime, which paves the way for further refactoring. - This is not an ideal solution yet, just a next step towards something that's easier to test and easier to change. --- pkg/parse/event_handler.go | 152 +++++++++ pkg/parse/events/builder.go | 67 ++++ pkg/parse/events/doc.go | 36 +++ pkg/parse/events/events.go | 42 +++ pkg/parse/events/funnel.go | 95 ++++++ pkg/parse/events/funnel_test.go | 536 ++++++++++++++++++++++++++++++++ pkg/parse/events/publisher.go | 206 ++++++++++++ pkg/parse/opts.go | 11 - pkg/parse/run.go | 199 ++---------- pkg/parse/run_test.go | 181 +++++------ pkg/parse/state.go | 28 -- pkg/reconciler/reconciler.go | 37 ++- pkg/util/retry.go | 11 + 13 files changed, 1268 insertions(+), 333 deletions(-) create mode 100644 pkg/parse/event_handler.go create mode 100644 pkg/parse/events/builder.go create mode 100644 pkg/parse/events/doc.go create mode 100644 pkg/parse/events/events.go create mode 100644 pkg/parse/events/funnel.go create mode 100644 pkg/parse/events/funnel_test.go create mode 100644 pkg/parse/events/publisher.go diff --git a/pkg/parse/event_handler.go b/pkg/parse/event_handler.go new file mode 100644 index 000000000..8fa038d94 --- /dev/null +++ b/pkg/parse/event_handler.go @@ -0,0 +1,152 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + "errors" + + "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/parse/events" + "kpt.dev/configsync/pkg/reconciler/namespacecontroller" +) + +// EventHandler is a events.Subscriber implementation that handles events and +// triggers the RunFunc when appropriate. +type EventHandler struct { + Context context.Context + Parser Parser + ReconcilerState *reconcilerState + NSControllerState *namespacecontroller.State + Run RunFunc +} + +// NewEventHandler builds an EventHandler +func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler { + return &EventHandler{ + Context: ctx, + Parser: parser, + ReconcilerState: &reconcilerState{}, + NSControllerState: nsControllerState, + Run: runFn, + } +} + +// Handle an Event and return the Result. +// - SyncWithReimportEventType - Reset the cache and sync from scratch. +// - SyncEventType - Sync from the cache, priming the cache from disk, if necessary. +// - StatusEventType - Update the RSync status with status from the Remediator & NSController. +// - NamespaceResyncEventType - Sync from the cache, if the NSController requested one. +// - RetrySyncEventType - Sync from the cache, if one of the following cases is detected: +// - Remediator or Reconciler reported a management conflict +// - Reconciler requested a retry due to error +// - Remediator requested a watch update +func (s *EventHandler) Handle(event events.Event) events.Result { + opts := s.Parser.options() + + var eventResult events.Result + // Wrap the RunFunc to set Result.RunAttempted. + // This delays status update and sync events. + runFn := func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult { + result := s.Run(ctx, p, trigger, state) + eventResult.RunAttempted = true + return result + } + + var runResult RunResult + switch event.Type { + case events.SyncWithReimportEventType: + // Re-apply even if no changes have been detected. + // This case should be checked first since it resets the cache. + // If the reconciler is in the process of reconciling a given commit, the resync won't + // happen until the ongoing reconciliation is done. + klog.Infof("It is time for a force-resync") + // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. + // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. + // The cached needToRetry will not be reset to avoid resetting the backoff retries. + s.ReconcilerState.resetPartialCache() + runResult = runFn(s.Context, s.Parser, triggerResync, s.ReconcilerState) + + case events.SyncEventType: + // Re-import declared resources from the filesystem (from *-sync). + // If the reconciler is in the process of reconciling a given commit, the re-import won't + // happen until the ongoing reconciliation is done. + runResult = runFn(s.Context, s.Parser, triggerReimport, s.ReconcilerState) + + case events.StatusEventType: + // Publish the sync status periodically to update remediator errors. + // Skip updates if the remediator is not running yet, paused, or watches haven't been updated yet. + // This implies that this reconciler has successfully parsed, rendered, validated, and synced. + if opts.Remediating() { + klog.V(3).Info("Updating sync status (periodic while not syncing)") + // Don't update the sync spec or commit. + if err := setSyncStatus(s.Context, s.Parser, s.ReconcilerState, s.ReconcilerState.status.SyncStatus.Spec, false, s.ReconcilerState.status.SyncStatus.Commit, s.Parser.SyncErrors()); err != nil { + if errors.Is(err, context.Canceled) { + klog.Infof("Sync status update skipped: %v", err) + } else { + klog.Warningf("Failed to update sync status: %v", err) + } + } + } + + case events.NamespaceResyncEventType: + // If the namespace controller indicates that an update is needed, + // attempt to re-sync. + if !s.NSControllerState.ScheduleSync() { + // No RunFunc call + break + } + + klog.Infof("A new sync is triggered by a Namespace event") + // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. + // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. + // The cached needToRetry will not be reset to avoid resetting the backoff retries. + s.ReconcilerState.resetPartialCache() + runResult = runFn(s.Context, s.Parser, namespaceEvent, s.ReconcilerState) + + case events.RetrySyncEventType: + // Retry if there was an error, conflict, or any watches need to be updated. + var trigger string + if opts.HasManagementConflict() { + // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. + // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. + // The cached needToRetry will not be reset to avoid resetting the backoff retries. + s.ReconcilerState.resetPartialCache() + trigger = triggerManagementConflict + } else if s.ReconcilerState.cache.needToRetry { + trigger = triggerRetry + } else if opts.needToUpdateWatch() { + trigger = triggerWatchUpdate + } else { + // No RunFunc call + break + } + + // During the execution of `run`, if a new commit is detected, + // retryTimer will be reset to `Options.RetryPeriod`, and state.backoff is reset to `defaultBackoff()`. + // In this case, `run` will try to sync the configs from the new commit instead of the old commit + // being retried. + runResult = runFn(s.Context, s.Parser, trigger, s.ReconcilerState) + + default: + klog.Fatalf("Invalid event received: %#v", event) + } + + // If the run succeeded or source changed, reset the retry backoff. + if runResult.Success || runResult.SourceChanged { + eventResult.ResetRetryBackoff = true + } + return eventResult +} diff --git a/pkg/parse/events/builder.go b/pkg/parse/events/builder.go new file mode 100644 index 000000000..e51d0baab --- /dev/null +++ b/pkg/parse/events/builder.go @@ -0,0 +1,67 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/clock" +) + +// PublishingGroupBuilder oversees construction of event publishers. +// +// For now, the publishers are driven by clock-based delay and backoff timers. +type PublishingGroupBuilder struct { + // Clock is used for time tracking, namely to simplify testing by allowing + // a fake clock, instead of a RealClock. + Clock clock.Clock + // SyncPeriod is the period of time between checking the filesystem + // for publisher updates to sync. + SyncPeriod time.Duration + // SyncWithReimportPeriod is the period of time between forced re-sync from + // publisher (even without a new commit). + SyncWithReimportPeriod time.Duration + // StatusUpdatePeriod is how long the Parser waits between updates of the + // sync status, to account for management conflict errors from the Remediator. + StatusUpdatePeriod time.Duration + // NamespaceControllerPeriod is how long to wait between checks to see if + // the namespace-controller wants to trigger a resync. + // TODO: Use a channel, instead of a timer checking a locked variable. + NamespaceControllerPeriod time.Duration + // RetryBackoff is how long the Parser waits between retries, after an error. + RetryBackoff wait.Backoff +} + +// Build a list of Publishers based on the PublishingGroupBuilder config. +func (t *PublishingGroupBuilder) Build() []Publisher { + var publishers []Publisher + if t.SyncPeriod > 0 { + publishers = append(publishers, NewResetOnRunAttemptPublisher(SyncEventType, t.Clock, t.SyncPeriod)) + } + if t.SyncWithReimportPeriod > 0 { + publishers = append(publishers, NewTimeDelayPublisher(SyncWithReimportEventType, t.Clock, t.SyncWithReimportPeriod)) + } + if t.NamespaceControllerPeriod > 0 { + publishers = append(publishers, NewTimeDelayPublisher(NamespaceResyncEventType, t.Clock, t.NamespaceControllerPeriod)) + } + if t.RetryBackoff.Duration > 0 { + publishers = append(publishers, NewRetrySyncPublisher(t.Clock, t.RetryBackoff)) + } + if t.StatusUpdatePeriod > 0 { + publishers = append(publishers, NewResetOnRunAttemptPublisher(StatusEventType, t.Clock, t.StatusUpdatePeriod)) + } + return publishers +} diff --git a/pkg/parse/events/doc.go b/pkg/parse/events/doc.go new file mode 100644 index 000000000..040b5cadf --- /dev/null +++ b/pkg/parse/events/doc.go @@ -0,0 +1,36 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package events contains a set of Events sent by Publishers. +// +// PublishingGroupBuilder can be used to Build a set of Publishers. +// +// Publisher types: +// - TimeDelayPublisher - Publishes events with a configurable time delay. +// - ResetOnRunAttemptPublisher - TimeDelayPublisher with resettable timer (Result.RunAttempted). +// - RetrySyncPublisher - TimeDelayPublisher with resettable backoff (Result.ResetRetryBackoff). +// +// Events and their Publisher: +// - SyncEvent - ResetOnRunAttemptPublisher (SyncPeriod) +// - SyncWithReimportEvent - TimeDelayPublisher (SyncWithReimportPeriod) +// - NamespaceResyncEvent - TimeDelayPublisher (NamespaceControllerPeriod) +// - RetrySyncEvent - RetrySyncPublisher (RetryBackoff) +// - StatusEvent - ResetOnRunAttemptPublisher (StatusUpdatePeriod) +// +// EventResult flags: +// - ResetRetryBackoff - Set after a sync succeeds or the source changed (spec or commit). +// - DelayStatusUpdate - Set after a sync is attempted. +// +// PublishingGroupBuilder can be used to Build a set of the above Publishers. +package events diff --git a/pkg/parse/events/events.go b/pkg/parse/events/events.go new file mode 100644 index 000000000..d8e01b79e --- /dev/null +++ b/pkg/parse/events/events.go @@ -0,0 +1,42 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +// Event represents the cause of HandleEventFunc being called. +// Some event types may have special fields or methods. +type Event struct { + // Type of the event + Type EventType +} + +// EventType is the name of the event +type EventType string + +const ( + // SyncWithReimportEventType is the EventType for a sync from disk, + // including reading and parsing resource objects from the shared volume. + SyncWithReimportEventType EventType = "SyncWithReimportEvent" + // SyncEventType is the EventType for a sync from the source cache, + // only parsing objects from the shared volume if there's a new commit. + SyncEventType EventType = "SyncEvent" + // StatusEventType is the EventType for a periodic RSync status update. + StatusEventType EventType = "StatusEvent" + // NamespaceResyncEventType is the EventType for a sync triggered by an + // update to a selected namespace. + NamespaceResyncEventType EventType = "NamespaceResyncEvent" + // RetrySyncEventType is the EventType for a sync triggered by an error + // during a previous sync attempt. + RetrySyncEventType EventType = "RetrySyncEvent" +) diff --git a/pkg/parse/events/funnel.go b/pkg/parse/events/funnel.go new file mode 100644 index 000000000..ae917233b --- /dev/null +++ b/pkg/parse/events/funnel.go @@ -0,0 +1,95 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "reflect" + + "k8s.io/klog/v2" +) + +// Funnel events from multiple publishers to a single subscriber. +type Funnel struct { + Publishers []Publisher + Subscriber Subscriber +} + +// Start all the event publishers and sends events one at a time to the +// subscriber. Blocks until the publishers are started. Returns a channel that +// will be closed when the Funnel is done handling events. Stops when the +// context is done. +func (f *Funnel) Start(ctx context.Context) <-chan struct{} { + // Stop all sources when done, even if the parent context isn't done. + // This could happen if all the source channels are closed. + ctx, cancel := context.WithCancel(ctx) + + klog.Info("Starting event loop") + + // Create a list of cases to select from + cases := make([]reflect.SelectCase, len(f.Publishers)+1) + for i, source := range f.Publishers { + klog.Infof("Starting event source: %s", source.Type()) + cases[i] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: source.Start(ctx), + } + } + + // Add a select case that closes the event channel when the context is done + cases[len(cases)-1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ctx.Done()), + } + + doneCh := make(chan struct{}) + go func() { + defer cancel() + defer close(doneCh) + f.funnel(ctx, cases) + }() + return doneCh +} + +func (f *Funnel) funnel(ctx context.Context, cases []reflect.SelectCase) { + // Select from cases until context is done or all source channels have been closed. + remaining := len(cases) + for remaining > 0 { + // Use reflect.Select to allow a dynamic set of cases. + // None of the channels return anything important, so ignore the value. + caseIndex, _, ok := reflect.Select(cases) + if !ok { + ctxErr := ctx.Err() + if ctxErr != nil { + klog.Infof("Stopping event loop: %v", ctxErr) + return + } + // Closed channels are always selected, so nil the channel to + // disable this case. We can't just remove the case from the list, + // because we need the index to match the handlers list. + cases[caseIndex].Chan = reflect.ValueOf(nil) + remaining-- + continue + } + + klog.V(1).Infof("Handling event: %s", f.Publishers[caseIndex].Type()) + result := f.Publishers[caseIndex].Publish(f.Subscriber) + + // Give all publishers a chance to handle the result + for _, source := range f.Publishers { + source.HandleResult(result) + } + } +} diff --git a/pkg/parse/events/funnel_test.go b/pkg/parse/events/funnel_test.go new file mode 100644 index 000000000..5627eddb6 --- /dev/null +++ b/pkg/parse/events/funnel_test.go @@ -0,0 +1,536 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + testingclock "k8s.io/utils/clock/testing" + "sigs.k8s.io/cli-utils/pkg/testutil" +) + +func TestFunnel_Start(t *testing.T) { + tests := []struct { + name string + builder *PublishingGroupBuilder + stepSize time.Duration + steps []time.Duration + expectedEvents []eventResult + }{ + { + name: "SyncEvents From SyncPeriod", + builder: &PublishingGroupBuilder{ + SyncPeriod: time.Second, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: SyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, + Result: Result{}, + }, + }, + }, + { + name: "ResyncEvents From SyncWithReimportPeriod", + builder: &PublishingGroupBuilder{ + SyncWithReimportPeriod: time.Second, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: SyncWithReimportEventType}, + Result: Result{}, + }, + { + Event: Event{Type: SyncWithReimportEventType}, + Result: Result{}, + }, + { + Event: Event{Type: SyncWithReimportEventType}, + Result: Result{}, + }, + }, + }, + { + name: "StatusEvents From StatusUpdatePeriod", + builder: &PublishingGroupBuilder{ + StatusUpdatePeriod: time.Second, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: StatusEventType}, + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, + Result: Result{}, + }, + }, + }, + { + name: "NamespaceResyncEvents From NamespaceControllerPeriod", + builder: &PublishingGroupBuilder{ + NamespaceControllerPeriod: time.Second, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: NamespaceResyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: NamespaceResyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: NamespaceResyncEventType}, + Result: Result{}, + }, + }, + }, + { + name: "RetryEvents From RetryBackoff", + builder: &PublishingGroupBuilder{ + RetryBackoff: wait.Backoff{ + Duration: time.Second, + Factor: 0, // no backoff, just 1s delay + Steps: 3, // at least as many as we expect + }, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: RetrySyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, + Result: Result{}, + }, + }, + }, + { + name: "RetryEvents From RetryBackoff with max steps", + builder: &PublishingGroupBuilder{ + RetryBackoff: wait.Backoff{ + Duration: time.Second, + Factor: 2, // double the delay each time + Steps: 3, + }, + }, + steps: []time.Duration{ + time.Second, + 3 * time.Second, //+2 + 7 * time.Second, //+4 + 8 * time.Second, //+1 + }, + expectedEvents: []eventResult{ + { + Event: Event{Type: RetrySyncEventType}, // 1s + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, // 3s + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, // 7s + Result: Result{}, + }, + // No more retry events until another event sets ResetRetryBackoff=true + }, + }, + { + name: "RetryEvents From RetryBackoff with ResetRetryBackoff", + builder: &PublishingGroupBuilder{ + RetryBackoff: wait.Backoff{ + Duration: time.Second, + Factor: 2, // double the delay each time + Steps: 3, + }, + SyncPeriod: 10 * time.Second, + }, + steps: []time.Duration{ + time.Second, + 3 * time.Second, //+2 + 7 * time.Second, //+4 + 10 * time.Second, + 11 * time.Second, //+1 + }, + expectedEvents: []eventResult{ + { + Event: Event{Type: RetrySyncEventType}, // 1s + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, // 3s + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, // 7s + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 10s + Result: Result{ + ResetRetryBackoff: true, + }, + }, + { + Event: Event{Type: RetrySyncEventType}, // 11s + Result: Result{}, + }, + }, + }, + { + name: "SyncEvents, StatusEventType, ResyncEvents", + builder: &PublishingGroupBuilder{ + SyncPeriod: 700 * time.Millisecond, + SyncWithReimportPeriod: 4000 * time.Millisecond, + StatusUpdatePeriod: 300 * time.Millisecond, + }, + // Explicit steps to avoid race conditions that make validation difficult. + steps: []time.Duration{ + 300 * time.Millisecond, + 600 * time.Millisecond, + 700 * time.Millisecond, + 1000 * time.Millisecond, + 1300 * time.Millisecond, + 1400 * time.Millisecond, + 1700 * time.Millisecond, + 2000 * time.Millisecond, + 2100 * time.Millisecond, + 2400 * time.Millisecond, + 2700 * time.Millisecond, + 2800 * time.Millisecond, + 3100 * time.Millisecond, + 3400 * time.Millisecond, + 3500 * time.Millisecond, + 3800 * time.Millisecond, + 4000 * time.Millisecond, + 4300 * time.Millisecond, + 4600 * time.Millisecond, + 4700 * time.Millisecond, + 5000 * time.Millisecond, + 5300 * time.Millisecond, + 5400 * time.Millisecond, + 5700 * time.Millisecond, + 6000 * time.Millisecond, + 6100 * time.Millisecond, + 6400 * time.Millisecond, + 6700 * time.Millisecond, + 6800 * time.Millisecond, + 7100 * time.Millisecond, + 7400 * time.Millisecond, + 7500 * time.Millisecond, + 7800 * time.Millisecond, + 8000 * time.Millisecond, + 8300 * time.Millisecond, + 8600 * time.Millisecond, + 8700 * time.Millisecond, + }, + expectedEvents: []eventResult{ + { + Event: Event{Type: StatusEventType}, // 300ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 600ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 700ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 1000ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 1300ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 1400ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 1700ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 2000ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 2100ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 2400ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 2700ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 2800ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 3100ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 3400ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 3500ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 3800ms + Result: Result{}, + }, + { + Event: Event{Type: SyncWithReimportEventType}, // 4000ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 4300ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 4600ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 4700ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 5000ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 5300ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 5400ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 5700ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 6000ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 6100ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 6400ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 6700ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 6800ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 7100ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 7400ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 7500ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 7800ms + Result: Result{}, + }, + { + Event: Event{Type: SyncWithReimportEventType}, // 8000ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 8300ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 8600ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 8700ms + Result: Result{ + RunAttempted: true, + }, + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + fakeClock := testingclock.NewFakeClock(time.Now().Truncate(time.Second)) + tc.builder.Clock = fakeClock + + var stepFunc func() + if len(tc.steps) > 0 { + // Explicit list of times + startTime := fakeClock.Now() + stepIndex := 0 + stepFunc = func() { + if fakeClock.HasWaiters() { + offset := tc.steps[stepIndex] + t.Logf("Step %d", offset/1000/1000) // Nanos -> Millis + fakeClock.SetTime(startTime.Add(offset)) + stepIndex++ + } + } + } else { + // Fixed step size + stepFunc = func() { + if fakeClock.HasWaiters() { + t.Log("Step") + fakeClock.Step(tc.stepSize) + } + } + } + + subscriber := &testSubscriber{ + T: t, + ExpectedEvents: tc.expectedEvents, + StepFunc: stepFunc, + DoneFunc: cancel, + } + funnel := &Funnel{ + Publishers: tc.builder.Build(), + Subscriber: subscriber, + } + doneCh := funnel.Start(ctx) + + // Move clock forward, without blocking handler + go stepFunc() + + // Wait until done + <-doneCh + + var expectedEvents []Event + for _, e := range tc.expectedEvents { + expectedEvents = append(expectedEvents, e.Event) + } + + testutil.AssertEqual(t, expectedEvents, subscriber.ReceivedEvents) + }) + } +} + +type eventResult struct { + Event Event + Result Result +} + +type testSubscriber struct { + T *testing.T + ExpectedEvents []eventResult + ReceivedEvents []Event + StepFunc func() + DoneFunc context.CancelFunc +} + +func (s *testSubscriber) Handle(e Event) Result { + s.T.Logf("Received event: %#v", e) + // Record received events + s.ReceivedEvents = append(s.ReceivedEvents, e) + eventIndex := len(s.ReceivedEvents) - 1 + // Handle unexpected extra events + if eventIndex >= len(s.ExpectedEvents) { + s.T.Errorf("Unexpected event %d: %#v", eventIndex, e) + return Result{} + } + // Handle expected events + result := s.ExpectedEvents[eventIndex] + eventIndex++ + // If all expected events have been seen, stop the funnel + if eventIndex >= len(s.ExpectedEvents) { + s.T.Log("Stopping: Received all expected events") + s.DoneFunc() + } else { + // Move clock forward, without blocking return + go s.StepFunc() + } + return result.Result +} diff --git a/pkg/parse/events/publisher.go b/pkg/parse/events/publisher.go new file mode 100644 index 000000000..942efadc2 --- /dev/null +++ b/pkg/parse/events/publisher.go @@ -0,0 +1,206 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "reflect" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "k8s.io/utils/clock" + "kpt.dev/configsync/pkg/util" +) + +// Publisher writes events to a channel and defines how to handle them. +type Publisher interface { + // Type of events published by this Publisher. + Type() EventType + // Start the event publisher and return the event channel. + // Messages will be sent to the channel with an optional typed payload. + // Channel will be closed when the context is done. + Start(context.Context) reflect.Value + // Publish an event to a specific Subscriber, and return the Result. + // Publish should be called when the event channel sends an event. + Publish(Subscriber) Result + // HandleResult handles the result of all types of events. + // This allows a Publisher to handle signals from other Publishers. + HandleResult(Result) +} + +// Subscriber handles an Event and returns the result. +type Subscriber interface { + // Handle an event and return the result. + Handle(Event) Result +} + +// Result encapsulates the result of a ConsumeFunc. +// This simply allows explicitly naming return values in a way that makes the +// implementation easier to read. +type Result struct { + // RunAttempted triggers timer reset of any events using the + // ResetOnRunAttemptPublisher. + RunAttempted bool + // ResetRetryBackoff triggers backoff reset for the RetrySyncPublisher. + ResetRetryBackoff bool +} + +// NewTimeDelayPublisher constructs an TimeDelayPublisher that generates and +// handles the specified events. +func NewTimeDelayPublisher(eventType EventType, c clock.Clock, period time.Duration) *TimeDelayPublisher { + return &TimeDelayPublisher{ + EventType: eventType, + Clock: c, + Period: period, + } +} + +// TimeDelayPublisher sends events periodically using a timer that is reset after +// each event is handled. This avoids unhandled events stacking up waiting to be +// handled. +type TimeDelayPublisher struct { + EventType EventType + Clock clock.Clock + Period time.Duration + + timer clock.Timer +} + +// Type of events produced by this publisher. +func (s *TimeDelayPublisher) Type() EventType { + return s.EventType +} + +// Start the timer and return the event channel. +func (s *TimeDelayPublisher) Start(ctx context.Context) reflect.Value { + s.timer = s.Clock.NewTimer(s.Period) + go func() { + <-ctx.Done() + s.timer.Stop() + }() + return reflect.ValueOf(s.timer.C()) +} + +// Publish calls the HandleFunc with a new event and resets the delay timer. +func (s *TimeDelayPublisher) Publish(subscriber Subscriber) Result { + result := subscriber.Handle(Event{Type: s.EventType}) + + // Schedule next event + s.timer.Reset(s.Period) + return result +} + +// HandleResult is a no-op. +func (s *TimeDelayPublisher) HandleResult(_ Result) {} + +// NewRetrySyncPublisher constructs an RetrySyncPublisher that generates and +// handles RetrySyncEvents with retry backoff. +func NewRetrySyncPublisher(c clock.Clock, backoff wait.Backoff) *RetrySyncPublisher { + return &RetrySyncPublisher{ + EventType: RetrySyncEventType, + Clock: c, + Backoff: backoff, + } +} + +// RetrySyncPublisher sends StatusEvents periodically using a backoff +// timer that is incremented after each event is handled. +// +// Unlike, TimeDelayPublisher, RetrySyncPublisher always increases the +// delay, unless the subscriber sets Result.ResetRetryBackoff. +type RetrySyncPublisher struct { + EventType EventType + Clock clock.Clock + Backoff wait.Backoff + + currentBackoff wait.Backoff + retryLimit int + timer clock.Timer +} + +// Type of events produced by this publisher. +func (s *RetrySyncPublisher) Type() EventType { + return s.EventType +} + +// Start the timer and return the event channel. +func (s *RetrySyncPublisher) Start(ctx context.Context) reflect.Value { + s.currentBackoff = util.CopyBackoff(s.Backoff) + s.retryLimit = s.currentBackoff.Steps + s.timer = s.Clock.NewTimer(s.currentBackoff.Duration) + go func() { + <-ctx.Done() + s.timer.Stop() + }() + return reflect.ValueOf(s.timer.C()) +} + +// Publish calls the HandleFunc with a new event, increments the backoff +// step, and updates the delay timer. +// If the maximum number of retries has been reached, the HandleFunc is NOT +// called and an empty Result is returned. +func (s *RetrySyncPublisher) Publish(subscriber Subscriber) Result { + if s.currentBackoff.Steps == 0 { + klog.Infof("Retry limit (%v) has been reached", s.retryLimit) + // Don't reset retryTimer if retry limit has been reached. + return Result{} + } + + retryDuration := s.currentBackoff.Step() + retries := s.retryLimit - s.currentBackoff.Steps + klog.Infof("a retry is triggered (retries: %v/%v)", retries, s.retryLimit) + + result := subscriber.Handle(Event{Type: s.EventType}) + + // Schedule next event + s.timer.Reset(retryDuration) + return result +} + +// HandleResult resets the backoff timer if ResetRetryBackoff is true. +func (s *RetrySyncPublisher) HandleResult(result Result) { + if result.ResetRetryBackoff { + s.currentBackoff = util.CopyBackoff(s.Backoff) + s.timer.Reset(s.currentBackoff.Duration) + } +} + +// NewResetOnRunAttemptPublisher constructs an ResetOnRunPublisher that +// generates and handles the specified events and resets the delay any time +// Result.RunAttempted=true. +func NewResetOnRunAttemptPublisher(eventType EventType, c clock.Clock, period time.Duration) *ResetOnRunAttemptPublisher { + return &ResetOnRunAttemptPublisher{ + TimeDelayPublisher: TimeDelayPublisher{ + EventType: eventType, + Clock: c, + Period: period, + }, + } +} + +// ResetOnRunAttemptPublisher is a TimeDelayPublisher that is automatically +// delayed when Result.RunAttempted is set by any event. +type ResetOnRunAttemptPublisher struct { + TimeDelayPublisher +} + +// HandleResult resets the delay timer if DelayStatusUpdate is true. +func (s *ResetOnRunAttemptPublisher) HandleResult(result Result) { + s.TimeDelayPublisher.HandleResult(result) + if result.RunAttempted { + s.timer.Reset(s.Period) + } +} diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index 4ed504953..6c4d5ca1a 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -52,17 +52,6 @@ type Options struct { // SyncName is the name of the RootSync or RepoSync object. SyncName string - // PollingPeriod is the period of time between checking the filesystem for - // source updates to sync. - PollingPeriod time.Duration - - // ResyncPeriod is the period of time between forced re-sync from source - // (even without a new commit). - ResyncPeriod time.Duration - - // RetryPeriod is how long the Parser waits between retries, after an error. - RetryPeriod time.Duration - // StatusUpdatePeriod is how long the Parser waits between updates of the // sync status, to account for management conflict errors from the Remediator. StatusUpdatePeriod time.Duration diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 1e46c064c..606df6995 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -19,10 +19,8 @@ import ( "fmt" "os" "path" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/core" @@ -31,7 +29,6 @@ import ( "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/metrics" - "kpt.dev/configsync/pkg/reconciler/namespacecontroller" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util" webhookconfiguration "kpt.dev/configsync/pkg/webhook/configuration" @@ -68,180 +65,31 @@ const ( RenderingNotRequired string = "Rendering not required but is currently enabled" ) -// RunOpts are the options used when calling Run -type RunOpts struct { - runFunc RunFunc - backoff wait.Backoff +// RunResult encapsulates the result of a RunFunc. +// This simply allows explicitly naming return values in a way that makes the +// implementation easier to read. +type RunResult struct { + SourceChanged bool + Success bool } // RunFunc is the function signature of the function that starts the parse-apply-watch loop -type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcilerState) +type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult -// Run keeps checking whether a parse-apply-watch loop is necessary and starts a loop if needed. -func Run(ctx context.Context, p Parser, nsControllerState *namespacecontroller.State, runOpts RunOpts) { - opts := p.options() - // Use timers, not tickers. - // Tickers can cause memory leaks and continuous execution, when execution - // takes longer than the tick duration. - runTimer := opts.Clock.NewTimer(opts.PollingPeriod) - defer runTimer.Stop() - - resyncTimer := opts.Clock.NewTimer(opts.ResyncPeriod) - defer resyncTimer.Stop() - - retryTimer := opts.Clock.NewTimer(opts.RetryPeriod) - defer retryTimer.Stop() - - statusUpdateTimer := opts.Clock.NewTimer(opts.StatusUpdatePeriod) - defer statusUpdateTimer.Stop() - - nsEventPeriod := time.Second - nsEventTimer := opts.Clock.NewTimer(nsEventPeriod) - defer nsEventTimer.Stop() - - runFn := runOpts.runFunc - state := &reconcilerState{ - backoff: runOpts.backoff, - retryTimer: retryTimer, - retryPeriod: opts.RetryPeriod, - } - - for { - select { - case <-ctx.Done(): - return - - // Re-apply even if no changes have been detected. - // This case should be checked first since it resets the cache. - // If the reconciler is in the process of reconciling a given commit, the resync won't - // happen until the ongoing reconciliation is done. - case <-resyncTimer.C(): - klog.Infof("It is time for a force-resync") - // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. - // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. - // The cached needToRetry will not be reset to avoid resetting the backoff retries. - state.resetPartialCache() - runFn(ctx, p, triggerResync, state) - - resyncTimer.Reset(opts.ResyncPeriod) // Schedule resync attempt - // we should not reset retryTimer under this `case` since it is not aware of the - // state of backoff retry. - statusUpdateTimer.Reset(opts.StatusUpdatePeriod) // Schedule status update attempt - - // Re-import declared resources from the filesystem (from git-sync/helm-sync/oci-sync). - // If the reconciler is in the process of reconciling a given commit, the re-import won't - // happen until the ongoing reconciliation is done. - case <-runTimer.C(): - runFn(ctx, p, triggerReimport, state) - - runTimer.Reset(opts.PollingPeriod) // Schedule re-import attempt - // we should not reset retryTimer under this `case` since it is not aware of the - // state of backoff retry. - statusUpdateTimer.Reset(opts.StatusUpdatePeriod) // Schedule status update attempt - - // Retry if there was an error, conflict, or any watches need to be updated. - case <-retryTimer.C(): - var trigger string - if opts.HasManagementConflict() { - // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. - // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. - // The cached needToRetry will not be reset to avoid resetting the backoff retries. - state.resetPartialCache() - trigger = triggerManagementConflict - } else if state.cache.needToRetry { - trigger = triggerRetry - } else if opts.needToUpdateWatch() { - trigger = triggerWatchUpdate - } else { - // Reset retryTimer here to make sure it can be fired in the future. - // Image the following scenario: - // When the for loop starts, retryTimer fires first, none of triggerManagementConflict, - // triggerRetry, and triggerWatchUpdate is true. If we don't reset retryTimer here, when - // the `run` function fails when runTimer or resyncTimer fires, the retry logic under this `case` - // will never be executed. - retryTimer.Reset(opts.RetryPeriod) - continue - } - if state.backoff.Steps == 0 { - klog.Infof("Retry limit (%v) has been reached", retryLimit) - // Don't reset retryTimer if retry limit has been reached. - continue - } - - retryDuration := state.backoff.Step() - retries := retryLimit - state.backoff.Steps - klog.Infof("a retry is triggered (trigger type: %v, retries: %v/%v)", trigger, retries, retryLimit) - // During the execution of `run`, if a new commit is detected, - // retryTimer will be reset to `Options.RetryPeriod`, and state.backoff is reset to `defaultBackoff()`. - // In this case, `run` will try to sync the configs from the new commit instead of the old commit - // being retried. - runFn(ctx, p, trigger, state) - // Reset retryTimer after `run` to make sure `retryDuration` happens between the end of one execution - // of `run` and the start of the next execution. - retryTimer.Reset(retryDuration) - statusUpdateTimer.Reset(opts.StatusUpdatePeriod) // Schedule status update attempt - - // Update the sync status to report management conflicts (from the remediator). - case <-statusUpdateTimer.C(): - // Publish the sync status periodically to update remediator errors. - // Skip updates if the remediator is not running yet, paused, or watches haven't been updated yet. - // This implies that this reconciler has successfully parsed, rendered, validated, and synced. - if opts.Remediating() { - klog.V(3).Info("Updating sync status (periodic while not syncing)") - // Don't update the sync spec or commit. - if err := setSyncStatus(ctx, p, state, state.status.SyncStatus.Spec, false, state.status.SyncStatus.Commit, p.SyncErrors()); err != nil { - klog.Warningf("failed to update sync status: %v", err) - } - } - - statusUpdateTimer.Reset(opts.StatusUpdatePeriod) // Schedule status update attempt - // we should not reset retryTimer under this `case` since it is not aware of the - // state of backoff retry. - - // Execute the entire parse-apply-watch loop for a namespace event. - case <-nsEventTimer.C(): - if nsControllerState == nil { - // If the Namespace Controller is not running, stop the timer without - // closing the channel. - nsEventTimer.Stop() - continue - } - if nsControllerState.ScheduleSync() { - klog.Infof("A new sync is triggered by a Namespace event") - // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. - // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. - // The cached needToRetry will not be reset to avoid resetting the backoff retries. - state.resetPartialCache() - runFn(ctx, p, namespaceEvent, state) - } - // we should not reset retryTimer under this `case` since it is not aware of the - // state of backoff retry. - nsEventTimer.Reset(nsEventPeriod) // Schedule namespace event check attempt - } - } -} - -// DefaultRunOpts returns the default options for Run -func DefaultRunOpts() RunOpts { - return RunOpts{ - runFunc: run, - backoff: defaultBackoff(), - } -} - -func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) { - // Initialize state from RSync status. +// DefaultRunFunc is the default implementation for RunOpts.RunFunc. +func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult { + result := RunResult{} + // Initialize status + // TODO: Populate status from RSync status if state.status == nil { reconcilerStatus, err := p.ReconcilerStatusFromCluster(ctx) if err != nil { state.invalidate(status.Append(nil, err)) - return + return result } state.status = reconcilerStatus } - opts := p.options() - var syncDir cmpath.Absolute gs := &SourceStatus{} // pull the source commit and directory with retries within 5 minutes. @@ -264,7 +112,7 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) if setSourceStatusErr != nil { // If there were fetch errors, log those too state.invalidate(status.Append(gs.Errs, setSourceStatusErr)) - return + return result } // Cache the latest source status in memory state.status.SourceStatus = gs @@ -273,7 +121,7 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) // If there were fetch errors, stop, log them, and retry later if gs.Errs != nil { state.invalidate(gs.Errs) - return + return result } } @@ -302,7 +150,7 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) var m status.MultiError state.invalidate(status.Append(m, setRenderingStatusErr)) } - return + return result } if err != nil { rs.Message = RenderingFailed @@ -315,7 +163,7 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) state.status.SyncingConditionLastUpdate = rs.LastUpdate } state.invalidate(status.Append(rs.Errs, setRenderingStatusErr)) - return + return result } } @@ -334,15 +182,14 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) } if errs := read(ctx, p, trigger, state, ps); errs != nil { state.invalidate(errs) - return + return result } newSyncDir := state.cache.source.syncDir if newSyncDir != oldSyncDir { - // Reset the backoff and retryTimer since it is a new commit - state.backoff = defaultBackoff() - state.retryTimer.Reset(state.retryPeriod) + // If the commit changed and parsing succeeded, trigger retries to start again, if stopped. + result.SourceChanged = true } // The parse-apply-watch sequence will be skipped if the trigger type is `triggerReimport` and @@ -350,17 +197,19 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) // * If a former parse-apply-watch sequence for syncDir succeeded, there is no need to run the sequence again; // * If all the former parse-apply-watch sequences for syncDir failed, the next retry will call the sequence. if trigger == triggerReimport && oldSyncDir == newSyncDir { - return + return result } errs := parseAndUpdate(ctx, p, trigger, state) if errs != nil { state.invalidate(errs) - return + return result } // Only checkpoint the state after *everything* succeeded, including status update. state.checkpoint() + result.Success = true + return result } // read reads config files from source if no rendering is needed, or from hydrated output if rendering is done. diff --git a/pkg/parse/run_test.go b/pkg/parse/run_test.go index 7a653b8fb..ac6e8e9e7 100644 --- a/pkg/parse/run_test.go +++ b/pkg/parse/run_test.go @@ -45,7 +45,6 @@ import ( "kpt.dev/configsync/pkg/importer/reader" "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metadata" - "kpt.dev/configsync/pkg/reconciler/namespacecontroller" "kpt.dev/configsync/pkg/remediator/conflict" remediatorfake "kpt.dev/configsync/pkg/remediator/fake" "kpt.dev/configsync/pkg/rootsync" @@ -62,7 +61,7 @@ const ( symLink = "rev" ) -func newParser(t *testing.T, clock clock.Clock, fakeClient client.Client, fs FileSource, renderingEnabled bool, retryPeriod time.Duration, pollingPeriod time.Duration) Parser { +func newParser(t *testing.T, clock clock.Clock, fakeClient client.Client, fs FileSource, renderingEnabled bool) Parser { parser := &root{} converter, err := openapitest.ValueConverterForTest() if err != nil { @@ -94,9 +93,6 @@ func newParser(t *testing.T, clock clock.Clock, fakeClient client.Client, fs Fil SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), }, RenderingEnabled: renderingEnabled, - RetryPeriod: retryPeriod, - ResyncPeriod: 2 * time.Second, - PollingPeriod: pollingPeriod, } return parser @@ -212,24 +208,26 @@ func TestRun(t *testing.T) { } testCases := []struct { - name string - commit string - renderingEnabled bool - hasKustomization bool - hydratedRootExist bool - retryCap time.Duration - srcRootCreateLatency time.Duration - sourceError string - hydratedError string - hydrationDone bool - needRetry bool - expectedRootSync *v1beta1.RootSync + name string + commit string + renderingEnabled bool + hasKustomization bool + hydratedRootExist bool + retryCap time.Duration + srcRootCreateLatency time.Duration + sourceError string + hydratedError string + hydrationDone bool + expectedSourceChanged bool + needRetry bool + expectedRootSync *v1beta1.RootSync }{ { - name: "source commit directory isn't created within the retry cap", - retryCap: 5 * time.Millisecond, - srcRootCreateLatency: 10 * time.Millisecond, - needRetry: true, + name: "source commit directory isn't created within the retry cap", + retryCap: 5 * time.Millisecond, + srcRootCreateLatency: 10 * time.Millisecond, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() rs.ObjectMeta.ResourceVersion = "2" // Create + Update (source status) @@ -264,10 +262,11 @@ func TestRun(t *testing.T) { }(), }, { - name: "source commit directory created within the retry cap", - retryCap: 100 * time.Millisecond, - srcRootCreateLatency: 5 * time.Millisecond, - needRetry: false, + name: "source commit directory created within the retry cap", + retryCap: 100 * time.Millisecond, + srcRootCreateLatency: 5 * time.Millisecond, + expectedSourceChanged: true, + needRetry: false, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) + Update (sync status) @@ -317,9 +316,10 @@ func TestRun(t *testing.T) { }(), }, { - name: "source fetch error", - sourceError: "git sync permission issue", - needRetry: true, + name: "source fetch error", + sourceError: "git sync permission issue", + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) @@ -355,10 +355,11 @@ func TestRun(t *testing.T) { }(), }, { - name: "rendering in progress", - renderingEnabled: true, - hydratedRootExist: true, - needRetry: true, + name: "rendering in progress", + renderingEnabled: true, + hydratedRootExist: true, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) @@ -398,13 +399,14 @@ func TestRun(t *testing.T) { }(), }, { - name: "hydration error", - renderingEnabled: true, - hasKustomization: true, - hydratedRootExist: true, - hydrationDone: true, - hydratedError: `{"code": "1068", "error": "rendering error"}`, - needRetry: true, + name: "hydration error", + renderingEnabled: true, + hasKustomization: true, + hydratedRootExist: true, + hydrationDone: true, + hydratedError: `{"code": "1068", "error": "rendering error"}`, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) @@ -450,12 +452,13 @@ func TestRun(t *testing.T) { }, // TODO: Add source parse error test case (with and without rendering) { - name: "successful read", - renderingEnabled: true, - hasKustomization: true, - hydratedRootExist: true, - hydrationDone: true, - needRetry: false, + name: "successful read", + renderingEnabled: true, + hasKustomization: true, + hydratedRootExist: true, + hydrationDone: true, + expectedSourceChanged: true, + needRetry: false, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) + Update (sync status) @@ -505,10 +508,11 @@ func TestRun(t *testing.T) { }(), }, { - name: "successful read without hydration", - hydratedRootExist: false, - hydrationDone: false, - needRetry: false, + name: "successful read without hydration", + hydratedRootExist: false, + hydrationDone: false, + expectedSourceChanged: true, + needRetry: false, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) + Update (sync status) @@ -558,12 +562,13 @@ func TestRun(t *testing.T) { }(), }, { - name: "error because hydration enabled with wet source", - renderingEnabled: true, - hasKustomization: false, - hydratedRootExist: false, - hydrationDone: true, - needRetry: true, + name: "error because hydration enabled with wet source", + renderingEnabled: true, + hasKustomization: false, + hydratedRootExist: false, + hydrationDone: true, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) @@ -610,12 +615,13 @@ func TestRun(t *testing.T) { }(), }, { - name: "error because hydration disabled with dry source", - renderingEnabled: false, - hasKustomization: true, - hydratedRootExist: true, - hydrationDone: true, - needRetry: true, + name: "error because hydration disabled with dry source", + renderingEnabled: false, + hasKustomization: true, + hydratedRootExist: true, + hydrationDone: true, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) @@ -746,15 +752,12 @@ func TestRun(t *testing.T) { SourceBranch: fileSource.SourceBranch, } fakeClient := syncerFake.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := newParser(t, fakeClock, fakeClient, fs, tc.renderingEnabled, configsync.DefaultReconcilerRetryPeriod, configsync.DefaultReconcilerPollingPeriod) - state := &reconcilerState{ - backoff: defaultBackoff(), - retryTimer: fakeClock.NewTimer(configsync.DefaultReconcilerRetryPeriod), - retryPeriod: configsync.DefaultReconcilerRetryPeriod, - } + parser := newParser(t, fakeClock, fakeClient, fs, tc.renderingEnabled) + state := &reconcilerState{} t.Logf("start running test at %v", time.Now()) - run(context.Background(), parser, triggerReimport, state) + result := DefaultRunFunc(context.Background(), parser, triggerReimport, state) + assert.Equal(t, tc.expectedSourceChanged, result.SourceChanged) assert.Equal(t, tc.needRetry, state.cache.needToRetry) rs := &v1beta1.RootSync{} @@ -767,45 +770,3 @@ func TestRun(t *testing.T) { }) } } - -func TestBackoffRetryCount(t *testing.T) { - realClock := clock.RealClock{} - fakeClient := syncerFake.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := newParser(t, realClock, fakeClient, FileSource{}, false, 10*time.Microsecond, 150*time.Microsecond) - testState := &namespacecontroller.State{} - reimportCount := 0 - retryCount := 0 - testIsDone := func(reimportCount *int, _ *int) bool { - return *reimportCount == 35 - } - - t.Logf("start running test at %v", time.Now()) - - ctx, cancel := context.WithCancel(context.Background()) - backoff := defaultBackoff() - backoff.Duration = 10 * time.Microsecond - - Run(ctx, parser, testState, RunOpts{ - runFunc: mockRun(&reimportCount, &retryCount, cancel, testIsDone), - backoff: backoff, - }) - - assert.Equal(t, 12, retryCount) -} - -func mockRun(reimportCount *int, retryCount *int, cancelFn func(), testIsDone func(reimportCount *int, retryCount *int) bool) RunFunc { - return func(_ context.Context, _ Parser, trigger string, state *reconcilerState) { - state.cache.needToRetry = true - - switch trigger { - case triggerReimport: - *reimportCount++ - case triggerRetry: - *retryCount++ - } - - if testIsDone(reimportCount, retryCount) { - cancelFn() - } - } -} diff --git a/pkg/parse/state.go b/pkg/parse/state.go index 6fc22cfda..d355a1379 100644 --- a/pkg/parse/state.go +++ b/pkg/parse/state.go @@ -15,13 +15,8 @@ package parse import ( - "time" - - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" - "k8s.io/utils/clock" "kpt.dev/configsync/pkg/status" - "kpt.dev/configsync/pkg/util" ) type reconcilerState struct { @@ -33,29 +28,6 @@ type reconcilerState struct { // cache tracks the progress made by the reconciler for a source commit. cache cacheForCommit - - // backoff defines the duration to wait before retries - // backoff is initialized to `defaultBackoff()` when a reconcilerState struct is created. - // backoff is updated before a retry starts. - // backoff should only be reset back to `defaultBackoff()` when a new commit is detected. - backoff wait.Backoff - - retryTimer clock.Timer - - retryPeriod time.Duration -} - -// retryLimit defines the maximal number of retries allowed on a given commit. -const retryLimit = 12 - -// The returned backoff includes 12 steps. -// Here is an example of the duration between steps: -// -// 1.055843837s, 2.085359785s, 4.229560375s, 8.324724174s, -// 16.295984061s, 34.325711987s, 1m5.465642392s, 2m18.625713221s, -// 4m24.712222056s, 9m18.97652295s, 17m15.344384599s, 35m15.603237976s. -func defaultBackoff() wait.Backoff { - return util.BackoffWithDurationAndStepLimit(0, retryLimit) } func (s *reconcilerState) checkpoint() { diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index a2b32302e..9cd75e426 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -34,6 +34,7 @@ import ( "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/importer/reader" "kpt.dev/configsync/pkg/parse" + "kpt.dev/configsync/pkg/parse/events" "kpt.dev/configsync/pkg/reconciler/finalizer" "kpt.dev/configsync/pkg/reconciler/namespacecontroller" "kpt.dev/configsync/pkg/remediator" @@ -43,6 +44,7 @@ import ( "kpt.dev/configsync/pkg/syncer/metrics" "kpt.dev/configsync/pkg/syncer/reconcile" "kpt.dev/configsync/pkg/syncer/reconcile/fight" + "kpt.dev/configsync/pkg/util" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -246,9 +248,6 @@ func Run(opts Options) { Client: cl, ReconcilerName: opts.ReconcilerName, SyncName: opts.SyncName, - PollingPeriod: opts.PollingPeriod, - ResyncPeriod: opts.ResyncPeriod, - RetryPeriod: opts.RetryPeriod, StatusUpdatePeriod: opts.StatusUpdatePeriod, DiscoveryInterface: discoveryClient, RenderingEnabled: opts.RenderingEnabled, @@ -271,6 +270,17 @@ func Run(opts Options) { } } + // Use the builder to build a set of event publishers for parser.Run. + pgBuilder := &events.PublishingGroupBuilder{ + Clock: parseOpts.Clock, + SyncPeriod: opts.PollingPeriod, + SyncWithReimportPeriod: opts.ResyncPeriod, + StatusUpdatePeriod: opts.StatusUpdatePeriod, + // TODO: Shouldn't this use opts.RetryPeriod as the initial duration? + // Limit to 12 retries, with no max retry duration. + RetryBackoff: util.BackoffWithDurationAndStepLimit(0, 12), + } + var nsControllerState *namespacecontroller.State if opts.ReconcilerScope == declared.RootScope { rootOpts := &parse.RootOptions{ @@ -279,18 +289,18 @@ func Run(opts Options) { DynamicNSSelectorEnabled: opts.DynamicNSSelectorEnabled, } if opts.DynamicNSSelectorEnabled { - // Only set nsControllerState when dynamic NamespaceSelector is enabled on - // RootSyncs. + // Only set nsControllerState when dynamic NamespaceSelector is + // enabled on RootSyncs. // RepoSync can't manage NamespaceSelectors. nsControllerState = namespacecontroller.NewState() rootOpts.NSControllerState = nsControllerState + // Enable namespace events (every second) + // TODO: Trigger namespace events with a buffered channel from the NamespaceController + pgBuilder.NamespaceControllerPeriod = time.Second } parser = parse.NewRootRunner(parseOpts, rootOpts) } else { parser = parse.NewNamespaceRunner(parseOpts) - if err != nil { - klog.Fatalf("Instantiating Namespace Repository Parser: %v", err) - } } // Start listening to signals @@ -388,7 +398,16 @@ func Run(opts Options) { klog.Info("Starting Parser") // TODO: Convert the Parser to use the controller-manager framework. - parse.Run(ctx, parser, nsControllerState, parse.DefaultRunOpts()) // blocks until ctx.Done() + // Funnel events from the publishers to the subscriber. + funnel := &events.Funnel{ + Publishers: pgBuilder.Build(), + // Wrap the parser with an event handler that triggers the RunFunc, as needed. + Subscriber: parse.NewEventHandler(ctx, parser, nsControllerState, parse.DefaultRunFunc), + } + doneChForParser := funnel.Start(ctx) + + // Wait until done + <-doneChForParser klog.Info("Parser exited") // Wait for Remediator to exit diff --git a/pkg/util/retry.go b/pkg/util/retry.go index f3712b938..0a9983897 100644 --- a/pkg/util/retry.go +++ b/pkg/util/retry.go @@ -92,3 +92,14 @@ func RetryWithBackoff(backoff wait.Backoff, f func() error) error { return err }) } + +// CopyBackoff duplicates a wait.Backoff +func CopyBackoff(in wait.Backoff) wait.Backoff { + return wait.Backoff{ + Duration: in.Duration, + Factor: in.Factor, + Jitter: in.Jitter, + Steps: in.Steps, + Cap: in.Cap, + } +}