diff --git a/pkg/parse/event_handler.go b/pkg/parse/event_handler.go new file mode 100644 index 000000000..8fa038d94 --- /dev/null +++ b/pkg/parse/event_handler.go @@ -0,0 +1,152 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "context" + "errors" + + "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/parse/events" + "kpt.dev/configsync/pkg/reconciler/namespacecontroller" +) + +// EventHandler is a events.Subscriber implementation that handles events and +// triggers the RunFunc when appropriate. +type EventHandler struct { + Context context.Context + Parser Parser + ReconcilerState *reconcilerState + NSControllerState *namespacecontroller.State + Run RunFunc +} + +// NewEventHandler builds an EventHandler +func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler { + return &EventHandler{ + Context: ctx, + Parser: parser, + ReconcilerState: &reconcilerState{}, + NSControllerState: nsControllerState, + Run: runFn, + } +} + +// Handle an Event and return the Result. +// - SyncWithReimportEventType - Reset the cache and sync from scratch. +// - SyncEventType - Sync from the cache, priming the cache from disk, if necessary. +// - StatusEventType - Update the RSync status with status from the Remediator & NSController. +// - NamespaceResyncEventType - Sync from the cache, if the NSController requested one. +// - RetrySyncEventType - Sync from the cache, if one of the following cases is detected: +// - Remediator or Reconciler reported a management conflict +// - Reconciler requested a retry due to error +// - Remediator requested a watch update +func (s *EventHandler) Handle(event events.Event) events.Result { + opts := s.Parser.options() + + var eventResult events.Result + // Wrap the RunFunc to set Result.RunAttempted. + // This delays status update and sync events. + runFn := func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult { + result := s.Run(ctx, p, trigger, state) + eventResult.RunAttempted = true + return result + } + + var runResult RunResult + switch event.Type { + case events.SyncWithReimportEventType: + // Re-apply even if no changes have been detected. + // This case should be checked first since it resets the cache. + // If the reconciler is in the process of reconciling a given commit, the resync won't + // happen until the ongoing reconciliation is done. + klog.Infof("It is time for a force-resync") + // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. + // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. + // The cached needToRetry will not be reset to avoid resetting the backoff retries. + s.ReconcilerState.resetPartialCache() + runResult = runFn(s.Context, s.Parser, triggerResync, s.ReconcilerState) + + case events.SyncEventType: + // Re-import declared resources from the filesystem (from *-sync). + // If the reconciler is in the process of reconciling a given commit, the re-import won't + // happen until the ongoing reconciliation is done. + runResult = runFn(s.Context, s.Parser, triggerReimport, s.ReconcilerState) + + case events.StatusEventType: + // Publish the sync status periodically to update remediator errors. + // Skip updates if the remediator is not running yet, paused, or watches haven't been updated yet. + // This implies that this reconciler has successfully parsed, rendered, validated, and synced. + if opts.Remediating() { + klog.V(3).Info("Updating sync status (periodic while not syncing)") + // Don't update the sync spec or commit. + if err := setSyncStatus(s.Context, s.Parser, s.ReconcilerState, s.ReconcilerState.status.SyncStatus.Spec, false, s.ReconcilerState.status.SyncStatus.Commit, s.Parser.SyncErrors()); err != nil { + if errors.Is(err, context.Canceled) { + klog.Infof("Sync status update skipped: %v", err) + } else { + klog.Warningf("Failed to update sync status: %v", err) + } + } + } + + case events.NamespaceResyncEventType: + // If the namespace controller indicates that an update is needed, + // attempt to re-sync. + if !s.NSControllerState.ScheduleSync() { + // No RunFunc call + break + } + + klog.Infof("A new sync is triggered by a Namespace event") + // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. + // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. + // The cached needToRetry will not be reset to avoid resetting the backoff retries. + s.ReconcilerState.resetPartialCache() + runResult = runFn(s.Context, s.Parser, namespaceEvent, s.ReconcilerState) + + case events.RetrySyncEventType: + // Retry if there was an error, conflict, or any watches need to be updated. + var trigger string + if opts.HasManagementConflict() { + // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. + // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. + // The cached needToRetry will not be reset to avoid resetting the backoff retries. + s.ReconcilerState.resetPartialCache() + trigger = triggerManagementConflict + } else if s.ReconcilerState.cache.needToRetry { + trigger = triggerRetry + } else if opts.needToUpdateWatch() { + trigger = triggerWatchUpdate + } else { + // No RunFunc call + break + } + + // During the execution of `run`, if a new commit is detected, + // retryTimer will be reset to `Options.RetryPeriod`, and state.backoff is reset to `defaultBackoff()`. + // In this case, `run` will try to sync the configs from the new commit instead of the old commit + // being retried. + runResult = runFn(s.Context, s.Parser, trigger, s.ReconcilerState) + + default: + klog.Fatalf("Invalid event received: %#v", event) + } + + // If the run succeeded or source changed, reset the retry backoff. + if runResult.Success || runResult.SourceChanged { + eventResult.ResetRetryBackoff = true + } + return eventResult +} diff --git a/pkg/parse/events/builder.go b/pkg/parse/events/builder.go new file mode 100644 index 000000000..e51d0baab --- /dev/null +++ b/pkg/parse/events/builder.go @@ -0,0 +1,67 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/clock" +) + +// PublishingGroupBuilder oversees construction of event publishers. +// +// For now, the publishers are driven by clock-based delay and backoff timers. +type PublishingGroupBuilder struct { + // Clock is used for time tracking, namely to simplify testing by allowing + // a fake clock, instead of a RealClock. + Clock clock.Clock + // SyncPeriod is the period of time between checking the filesystem + // for publisher updates to sync. + SyncPeriod time.Duration + // SyncWithReimportPeriod is the period of time between forced re-sync from + // publisher (even without a new commit). + SyncWithReimportPeriod time.Duration + // StatusUpdatePeriod is how long the Parser waits between updates of the + // sync status, to account for management conflict errors from the Remediator. + StatusUpdatePeriod time.Duration + // NamespaceControllerPeriod is how long to wait between checks to see if + // the namespace-controller wants to trigger a resync. + // TODO: Use a channel, instead of a timer checking a locked variable. + NamespaceControllerPeriod time.Duration + // RetryBackoff is how long the Parser waits between retries, after an error. + RetryBackoff wait.Backoff +} + +// Build a list of Publishers based on the PublishingGroupBuilder config. +func (t *PublishingGroupBuilder) Build() []Publisher { + var publishers []Publisher + if t.SyncPeriod > 0 { + publishers = append(publishers, NewResetOnRunAttemptPublisher(SyncEventType, t.Clock, t.SyncPeriod)) + } + if t.SyncWithReimportPeriod > 0 { + publishers = append(publishers, NewTimeDelayPublisher(SyncWithReimportEventType, t.Clock, t.SyncWithReimportPeriod)) + } + if t.NamespaceControllerPeriod > 0 { + publishers = append(publishers, NewTimeDelayPublisher(NamespaceResyncEventType, t.Clock, t.NamespaceControllerPeriod)) + } + if t.RetryBackoff.Duration > 0 { + publishers = append(publishers, NewRetrySyncPublisher(t.Clock, t.RetryBackoff)) + } + if t.StatusUpdatePeriod > 0 { + publishers = append(publishers, NewResetOnRunAttemptPublisher(StatusEventType, t.Clock, t.StatusUpdatePeriod)) + } + return publishers +} diff --git a/pkg/parse/events/doc.go b/pkg/parse/events/doc.go new file mode 100644 index 000000000..040b5cadf --- /dev/null +++ b/pkg/parse/events/doc.go @@ -0,0 +1,36 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package events contains a set of Events sent by Publishers. +// +// PublishingGroupBuilder can be used to Build a set of Publishers. +// +// Publisher types: +// - TimeDelayPublisher - Publishes events with a configurable time delay. +// - ResetOnRunAttemptPublisher - TimeDelayPublisher with resettable timer (Result.RunAttempted). +// - RetrySyncPublisher - TimeDelayPublisher with resettable backoff (Result.ResetRetryBackoff). +// +// Events and their Publisher: +// - SyncEvent - ResetOnRunAttemptPublisher (SyncPeriod) +// - SyncWithReimportEvent - TimeDelayPublisher (SyncWithReimportPeriod) +// - NamespaceResyncEvent - TimeDelayPublisher (NamespaceControllerPeriod) +// - RetrySyncEvent - RetrySyncPublisher (RetryBackoff) +// - StatusEvent - ResetOnRunAttemptPublisher (StatusUpdatePeriod) +// +// EventResult flags: +// - ResetRetryBackoff - Set after a sync succeeds or the source changed (spec or commit). +// - DelayStatusUpdate - Set after a sync is attempted. +// +// PublishingGroupBuilder can be used to Build a set of the above Publishers. +package events diff --git a/pkg/parse/events/events.go b/pkg/parse/events/events.go new file mode 100644 index 000000000..d8e01b79e --- /dev/null +++ b/pkg/parse/events/events.go @@ -0,0 +1,42 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +// Event represents the cause of HandleEventFunc being called. +// Some event types may have special fields or methods. +type Event struct { + // Type of the event + Type EventType +} + +// EventType is the name of the event +type EventType string + +const ( + // SyncWithReimportEventType is the EventType for a sync from disk, + // including reading and parsing resource objects from the shared volume. + SyncWithReimportEventType EventType = "SyncWithReimportEvent" + // SyncEventType is the EventType for a sync from the source cache, + // only parsing objects from the shared volume if there's a new commit. + SyncEventType EventType = "SyncEvent" + // StatusEventType is the EventType for a periodic RSync status update. + StatusEventType EventType = "StatusEvent" + // NamespaceResyncEventType is the EventType for a sync triggered by an + // update to a selected namespace. + NamespaceResyncEventType EventType = "NamespaceResyncEvent" + // RetrySyncEventType is the EventType for a sync triggered by an error + // during a previous sync attempt. + RetrySyncEventType EventType = "RetrySyncEvent" +) diff --git a/pkg/parse/events/funnel.go b/pkg/parse/events/funnel.go new file mode 100644 index 000000000..ae917233b --- /dev/null +++ b/pkg/parse/events/funnel.go @@ -0,0 +1,95 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "reflect" + + "k8s.io/klog/v2" +) + +// Funnel events from multiple publishers to a single subscriber. +type Funnel struct { + Publishers []Publisher + Subscriber Subscriber +} + +// Start all the event publishers and sends events one at a time to the +// subscriber. Blocks until the publishers are started. Returns a channel that +// will be closed when the Funnel is done handling events. Stops when the +// context is done. +func (f *Funnel) Start(ctx context.Context) <-chan struct{} { + // Stop all sources when done, even if the parent context isn't done. + // This could happen if all the source channels are closed. + ctx, cancel := context.WithCancel(ctx) + + klog.Info("Starting event loop") + + // Create a list of cases to select from + cases := make([]reflect.SelectCase, len(f.Publishers)+1) + for i, source := range f.Publishers { + klog.Infof("Starting event source: %s", source.Type()) + cases[i] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: source.Start(ctx), + } + } + + // Add a select case that closes the event channel when the context is done + cases[len(cases)-1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ctx.Done()), + } + + doneCh := make(chan struct{}) + go func() { + defer cancel() + defer close(doneCh) + f.funnel(ctx, cases) + }() + return doneCh +} + +func (f *Funnel) funnel(ctx context.Context, cases []reflect.SelectCase) { + // Select from cases until context is done or all source channels have been closed. + remaining := len(cases) + for remaining > 0 { + // Use reflect.Select to allow a dynamic set of cases. + // None of the channels return anything important, so ignore the value. + caseIndex, _, ok := reflect.Select(cases) + if !ok { + ctxErr := ctx.Err() + if ctxErr != nil { + klog.Infof("Stopping event loop: %v", ctxErr) + return + } + // Closed channels are always selected, so nil the channel to + // disable this case. We can't just remove the case from the list, + // because we need the index to match the handlers list. + cases[caseIndex].Chan = reflect.ValueOf(nil) + remaining-- + continue + } + + klog.V(1).Infof("Handling event: %s", f.Publishers[caseIndex].Type()) + result := f.Publishers[caseIndex].Publish(f.Subscriber) + + // Give all publishers a chance to handle the result + for _, source := range f.Publishers { + source.HandleResult(result) + } + } +} diff --git a/pkg/parse/events/funnel_test.go b/pkg/parse/events/funnel_test.go new file mode 100644 index 000000000..5627eddb6 --- /dev/null +++ b/pkg/parse/events/funnel_test.go @@ -0,0 +1,536 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + testingclock "k8s.io/utils/clock/testing" + "sigs.k8s.io/cli-utils/pkg/testutil" +) + +func TestFunnel_Start(t *testing.T) { + tests := []struct { + name string + builder *PublishingGroupBuilder + stepSize time.Duration + steps []time.Duration + expectedEvents []eventResult + }{ + { + name: "SyncEvents From SyncPeriod", + builder: &PublishingGroupBuilder{ + SyncPeriod: time.Second, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: SyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, + Result: Result{}, + }, + }, + }, + { + name: "ResyncEvents From SyncWithReimportPeriod", + builder: &PublishingGroupBuilder{ + SyncWithReimportPeriod: time.Second, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: SyncWithReimportEventType}, + Result: Result{}, + }, + { + Event: Event{Type: SyncWithReimportEventType}, + Result: Result{}, + }, + { + Event: Event{Type: SyncWithReimportEventType}, + Result: Result{}, + }, + }, + }, + { + name: "StatusEvents From StatusUpdatePeriod", + builder: &PublishingGroupBuilder{ + StatusUpdatePeriod: time.Second, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: StatusEventType}, + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, + Result: Result{}, + }, + }, + }, + { + name: "NamespaceResyncEvents From NamespaceControllerPeriod", + builder: &PublishingGroupBuilder{ + NamespaceControllerPeriod: time.Second, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: NamespaceResyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: NamespaceResyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: NamespaceResyncEventType}, + Result: Result{}, + }, + }, + }, + { + name: "RetryEvents From RetryBackoff", + builder: &PublishingGroupBuilder{ + RetryBackoff: wait.Backoff{ + Duration: time.Second, + Factor: 0, // no backoff, just 1s delay + Steps: 3, // at least as many as we expect + }, + }, + stepSize: time.Second, + expectedEvents: []eventResult{ + { + Event: Event{Type: RetrySyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, + Result: Result{}, + }, + }, + }, + { + name: "RetryEvents From RetryBackoff with max steps", + builder: &PublishingGroupBuilder{ + RetryBackoff: wait.Backoff{ + Duration: time.Second, + Factor: 2, // double the delay each time + Steps: 3, + }, + }, + steps: []time.Duration{ + time.Second, + 3 * time.Second, //+2 + 7 * time.Second, //+4 + 8 * time.Second, //+1 + }, + expectedEvents: []eventResult{ + { + Event: Event{Type: RetrySyncEventType}, // 1s + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, // 3s + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, // 7s + Result: Result{}, + }, + // No more retry events until another event sets ResetRetryBackoff=true + }, + }, + { + name: "RetryEvents From RetryBackoff with ResetRetryBackoff", + builder: &PublishingGroupBuilder{ + RetryBackoff: wait.Backoff{ + Duration: time.Second, + Factor: 2, // double the delay each time + Steps: 3, + }, + SyncPeriod: 10 * time.Second, + }, + steps: []time.Duration{ + time.Second, + 3 * time.Second, //+2 + 7 * time.Second, //+4 + 10 * time.Second, + 11 * time.Second, //+1 + }, + expectedEvents: []eventResult{ + { + Event: Event{Type: RetrySyncEventType}, // 1s + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, // 3s + Result: Result{}, + }, + { + Event: Event{Type: RetrySyncEventType}, // 7s + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 10s + Result: Result{ + ResetRetryBackoff: true, + }, + }, + { + Event: Event{Type: RetrySyncEventType}, // 11s + Result: Result{}, + }, + }, + }, + { + name: "SyncEvents, StatusEventType, ResyncEvents", + builder: &PublishingGroupBuilder{ + SyncPeriod: 700 * time.Millisecond, + SyncWithReimportPeriod: 4000 * time.Millisecond, + StatusUpdatePeriod: 300 * time.Millisecond, + }, + // Explicit steps to avoid race conditions that make validation difficult. + steps: []time.Duration{ + 300 * time.Millisecond, + 600 * time.Millisecond, + 700 * time.Millisecond, + 1000 * time.Millisecond, + 1300 * time.Millisecond, + 1400 * time.Millisecond, + 1700 * time.Millisecond, + 2000 * time.Millisecond, + 2100 * time.Millisecond, + 2400 * time.Millisecond, + 2700 * time.Millisecond, + 2800 * time.Millisecond, + 3100 * time.Millisecond, + 3400 * time.Millisecond, + 3500 * time.Millisecond, + 3800 * time.Millisecond, + 4000 * time.Millisecond, + 4300 * time.Millisecond, + 4600 * time.Millisecond, + 4700 * time.Millisecond, + 5000 * time.Millisecond, + 5300 * time.Millisecond, + 5400 * time.Millisecond, + 5700 * time.Millisecond, + 6000 * time.Millisecond, + 6100 * time.Millisecond, + 6400 * time.Millisecond, + 6700 * time.Millisecond, + 6800 * time.Millisecond, + 7100 * time.Millisecond, + 7400 * time.Millisecond, + 7500 * time.Millisecond, + 7800 * time.Millisecond, + 8000 * time.Millisecond, + 8300 * time.Millisecond, + 8600 * time.Millisecond, + 8700 * time.Millisecond, + }, + expectedEvents: []eventResult{ + { + Event: Event{Type: StatusEventType}, // 300ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 600ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 700ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 1000ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 1300ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 1400ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 1700ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 2000ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 2100ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 2400ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 2700ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 2800ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 3100ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 3400ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 3500ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 3800ms + Result: Result{}, + }, + { + Event: Event{Type: SyncWithReimportEventType}, // 4000ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 4300ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 4600ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 4700ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 5000ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 5300ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 5400ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 5700ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 6000ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 6100ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 6400ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 6700ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 6800ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 7100ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 7400ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 7500ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 7800ms + Result: Result{}, + }, + { + Event: Event{Type: SyncWithReimportEventType}, // 8000ms + Result: Result{ + RunAttempted: true, + }, + }, + { + Event: Event{Type: StatusEventType}, // 8300ms + Result: Result{}, + }, + { + Event: Event{Type: StatusEventType}, // 8600ms + Result: Result{}, + }, + { + Event: Event{Type: SyncEventType}, // 8700ms + Result: Result{ + RunAttempted: true, + }, + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + fakeClock := testingclock.NewFakeClock(time.Now().Truncate(time.Second)) + tc.builder.Clock = fakeClock + + var stepFunc func() + if len(tc.steps) > 0 { + // Explicit list of times + startTime := fakeClock.Now() + stepIndex := 0 + stepFunc = func() { + if fakeClock.HasWaiters() { + offset := tc.steps[stepIndex] + t.Logf("Step %d", offset/1000/1000) // Nanos -> Millis + fakeClock.SetTime(startTime.Add(offset)) + stepIndex++ + } + } + } else { + // Fixed step size + stepFunc = func() { + if fakeClock.HasWaiters() { + t.Log("Step") + fakeClock.Step(tc.stepSize) + } + } + } + + subscriber := &testSubscriber{ + T: t, + ExpectedEvents: tc.expectedEvents, + StepFunc: stepFunc, + DoneFunc: cancel, + } + funnel := &Funnel{ + Publishers: tc.builder.Build(), + Subscriber: subscriber, + } + doneCh := funnel.Start(ctx) + + // Move clock forward, without blocking handler + go stepFunc() + + // Wait until done + <-doneCh + + var expectedEvents []Event + for _, e := range tc.expectedEvents { + expectedEvents = append(expectedEvents, e.Event) + } + + testutil.AssertEqual(t, expectedEvents, subscriber.ReceivedEvents) + }) + } +} + +type eventResult struct { + Event Event + Result Result +} + +type testSubscriber struct { + T *testing.T + ExpectedEvents []eventResult + ReceivedEvents []Event + StepFunc func() + DoneFunc context.CancelFunc +} + +func (s *testSubscriber) Handle(e Event) Result { + s.T.Logf("Received event: %#v", e) + // Record received events + s.ReceivedEvents = append(s.ReceivedEvents, e) + eventIndex := len(s.ReceivedEvents) - 1 + // Handle unexpected extra events + if eventIndex >= len(s.ExpectedEvents) { + s.T.Errorf("Unexpected event %d: %#v", eventIndex, e) + return Result{} + } + // Handle expected events + result := s.ExpectedEvents[eventIndex] + eventIndex++ + // If all expected events have been seen, stop the funnel + if eventIndex >= len(s.ExpectedEvents) { + s.T.Log("Stopping: Received all expected events") + s.DoneFunc() + } else { + // Move clock forward, without blocking return + go s.StepFunc() + } + return result.Result +} diff --git a/pkg/parse/events/publisher.go b/pkg/parse/events/publisher.go new file mode 100644 index 000000000..942efadc2 --- /dev/null +++ b/pkg/parse/events/publisher.go @@ -0,0 +1,206 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "reflect" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "k8s.io/utils/clock" + "kpt.dev/configsync/pkg/util" +) + +// Publisher writes events to a channel and defines how to handle them. +type Publisher interface { + // Type of events published by this Publisher. + Type() EventType + // Start the event publisher and return the event channel. + // Messages will be sent to the channel with an optional typed payload. + // Channel will be closed when the context is done. + Start(context.Context) reflect.Value + // Publish an event to a specific Subscriber, and return the Result. + // Publish should be called when the event channel sends an event. + Publish(Subscriber) Result + // HandleResult handles the result of all types of events. + // This allows a Publisher to handle signals from other Publishers. + HandleResult(Result) +} + +// Subscriber handles an Event and returns the result. +type Subscriber interface { + // Handle an event and return the result. + Handle(Event) Result +} + +// Result encapsulates the result of a ConsumeFunc. +// This simply allows explicitly naming return values in a way that makes the +// implementation easier to read. +type Result struct { + // RunAttempted triggers timer reset of any events using the + // ResetOnRunAttemptPublisher. + RunAttempted bool + // ResetRetryBackoff triggers backoff reset for the RetrySyncPublisher. + ResetRetryBackoff bool +} + +// NewTimeDelayPublisher constructs an TimeDelayPublisher that generates and +// handles the specified events. +func NewTimeDelayPublisher(eventType EventType, c clock.Clock, period time.Duration) *TimeDelayPublisher { + return &TimeDelayPublisher{ + EventType: eventType, + Clock: c, + Period: period, + } +} + +// TimeDelayPublisher sends events periodically using a timer that is reset after +// each event is handled. This avoids unhandled events stacking up waiting to be +// handled. +type TimeDelayPublisher struct { + EventType EventType + Clock clock.Clock + Period time.Duration + + timer clock.Timer +} + +// Type of events produced by this publisher. +func (s *TimeDelayPublisher) Type() EventType { + return s.EventType +} + +// Start the timer and return the event channel. +func (s *TimeDelayPublisher) Start(ctx context.Context) reflect.Value { + s.timer = s.Clock.NewTimer(s.Period) + go func() { + <-ctx.Done() + s.timer.Stop() + }() + return reflect.ValueOf(s.timer.C()) +} + +// Publish calls the HandleFunc with a new event and resets the delay timer. +func (s *TimeDelayPublisher) Publish(subscriber Subscriber) Result { + result := subscriber.Handle(Event{Type: s.EventType}) + + // Schedule next event + s.timer.Reset(s.Period) + return result +} + +// HandleResult is a no-op. +func (s *TimeDelayPublisher) HandleResult(_ Result) {} + +// NewRetrySyncPublisher constructs an RetrySyncPublisher that generates and +// handles RetrySyncEvents with retry backoff. +func NewRetrySyncPublisher(c clock.Clock, backoff wait.Backoff) *RetrySyncPublisher { + return &RetrySyncPublisher{ + EventType: RetrySyncEventType, + Clock: c, + Backoff: backoff, + } +} + +// RetrySyncPublisher sends StatusEvents periodically using a backoff +// timer that is incremented after each event is handled. +// +// Unlike, TimeDelayPublisher, RetrySyncPublisher always increases the +// delay, unless the subscriber sets Result.ResetRetryBackoff. +type RetrySyncPublisher struct { + EventType EventType + Clock clock.Clock + Backoff wait.Backoff + + currentBackoff wait.Backoff + retryLimit int + timer clock.Timer +} + +// Type of events produced by this publisher. +func (s *RetrySyncPublisher) Type() EventType { + return s.EventType +} + +// Start the timer and return the event channel. +func (s *RetrySyncPublisher) Start(ctx context.Context) reflect.Value { + s.currentBackoff = util.CopyBackoff(s.Backoff) + s.retryLimit = s.currentBackoff.Steps + s.timer = s.Clock.NewTimer(s.currentBackoff.Duration) + go func() { + <-ctx.Done() + s.timer.Stop() + }() + return reflect.ValueOf(s.timer.C()) +} + +// Publish calls the HandleFunc with a new event, increments the backoff +// step, and updates the delay timer. +// If the maximum number of retries has been reached, the HandleFunc is NOT +// called and an empty Result is returned. +func (s *RetrySyncPublisher) Publish(subscriber Subscriber) Result { + if s.currentBackoff.Steps == 0 { + klog.Infof("Retry limit (%v) has been reached", s.retryLimit) + // Don't reset retryTimer if retry limit has been reached. + return Result{} + } + + retryDuration := s.currentBackoff.Step() + retries := s.retryLimit - s.currentBackoff.Steps + klog.Infof("a retry is triggered (retries: %v/%v)", retries, s.retryLimit) + + result := subscriber.Handle(Event{Type: s.EventType}) + + // Schedule next event + s.timer.Reset(retryDuration) + return result +} + +// HandleResult resets the backoff timer if ResetRetryBackoff is true. +func (s *RetrySyncPublisher) HandleResult(result Result) { + if result.ResetRetryBackoff { + s.currentBackoff = util.CopyBackoff(s.Backoff) + s.timer.Reset(s.currentBackoff.Duration) + } +} + +// NewResetOnRunAttemptPublisher constructs an ResetOnRunPublisher that +// generates and handles the specified events and resets the delay any time +// Result.RunAttempted=true. +func NewResetOnRunAttemptPublisher(eventType EventType, c clock.Clock, period time.Duration) *ResetOnRunAttemptPublisher { + return &ResetOnRunAttemptPublisher{ + TimeDelayPublisher: TimeDelayPublisher{ + EventType: eventType, + Clock: c, + Period: period, + }, + } +} + +// ResetOnRunAttemptPublisher is a TimeDelayPublisher that is automatically +// delayed when Result.RunAttempted is set by any event. +type ResetOnRunAttemptPublisher struct { + TimeDelayPublisher +} + +// HandleResult resets the delay timer if DelayStatusUpdate is true. +func (s *ResetOnRunAttemptPublisher) HandleResult(result Result) { + s.TimeDelayPublisher.HandleResult(result) + if result.RunAttempted { + s.timer.Reset(s.Period) + } +} diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index 4ed504953..6c4d5ca1a 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -52,17 +52,6 @@ type Options struct { // SyncName is the name of the RootSync or RepoSync object. SyncName string - // PollingPeriod is the period of time between checking the filesystem for - // source updates to sync. - PollingPeriod time.Duration - - // ResyncPeriod is the period of time between forced re-sync from source - // (even without a new commit). - ResyncPeriod time.Duration - - // RetryPeriod is how long the Parser waits between retries, after an error. - RetryPeriod time.Duration - // StatusUpdatePeriod is how long the Parser waits between updates of the // sync status, to account for management conflict errors from the Remediator. StatusUpdatePeriod time.Duration diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 1e46c064c..606df6995 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -19,10 +19,8 @@ import ( "fmt" "os" "path" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/core" @@ -31,7 +29,6 @@ import ( "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/metrics" - "kpt.dev/configsync/pkg/reconciler/namespacecontroller" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util" webhookconfiguration "kpt.dev/configsync/pkg/webhook/configuration" @@ -68,180 +65,31 @@ const ( RenderingNotRequired string = "Rendering not required but is currently enabled" ) -// RunOpts are the options used when calling Run -type RunOpts struct { - runFunc RunFunc - backoff wait.Backoff +// RunResult encapsulates the result of a RunFunc. +// This simply allows explicitly naming return values in a way that makes the +// implementation easier to read. +type RunResult struct { + SourceChanged bool + Success bool } // RunFunc is the function signature of the function that starts the parse-apply-watch loop -type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcilerState) +type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult -// Run keeps checking whether a parse-apply-watch loop is necessary and starts a loop if needed. -func Run(ctx context.Context, p Parser, nsControllerState *namespacecontroller.State, runOpts RunOpts) { - opts := p.options() - // Use timers, not tickers. - // Tickers can cause memory leaks and continuous execution, when execution - // takes longer than the tick duration. - runTimer := opts.Clock.NewTimer(opts.PollingPeriod) - defer runTimer.Stop() - - resyncTimer := opts.Clock.NewTimer(opts.ResyncPeriod) - defer resyncTimer.Stop() - - retryTimer := opts.Clock.NewTimer(opts.RetryPeriod) - defer retryTimer.Stop() - - statusUpdateTimer := opts.Clock.NewTimer(opts.StatusUpdatePeriod) - defer statusUpdateTimer.Stop() - - nsEventPeriod := time.Second - nsEventTimer := opts.Clock.NewTimer(nsEventPeriod) - defer nsEventTimer.Stop() - - runFn := runOpts.runFunc - state := &reconcilerState{ - backoff: runOpts.backoff, - retryTimer: retryTimer, - retryPeriod: opts.RetryPeriod, - } - - for { - select { - case <-ctx.Done(): - return - - // Re-apply even if no changes have been detected. - // This case should be checked first since it resets the cache. - // If the reconciler is in the process of reconciling a given commit, the resync won't - // happen until the ongoing reconciliation is done. - case <-resyncTimer.C(): - klog.Infof("It is time for a force-resync") - // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. - // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. - // The cached needToRetry will not be reset to avoid resetting the backoff retries. - state.resetPartialCache() - runFn(ctx, p, triggerResync, state) - - resyncTimer.Reset(opts.ResyncPeriod) // Schedule resync attempt - // we should not reset retryTimer under this `case` since it is not aware of the - // state of backoff retry. - statusUpdateTimer.Reset(opts.StatusUpdatePeriod) // Schedule status update attempt - - // Re-import declared resources from the filesystem (from git-sync/helm-sync/oci-sync). - // If the reconciler is in the process of reconciling a given commit, the re-import won't - // happen until the ongoing reconciliation is done. - case <-runTimer.C(): - runFn(ctx, p, triggerReimport, state) - - runTimer.Reset(opts.PollingPeriod) // Schedule re-import attempt - // we should not reset retryTimer under this `case` since it is not aware of the - // state of backoff retry. - statusUpdateTimer.Reset(opts.StatusUpdatePeriod) // Schedule status update attempt - - // Retry if there was an error, conflict, or any watches need to be updated. - case <-retryTimer.C(): - var trigger string - if opts.HasManagementConflict() { - // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. - // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. - // The cached needToRetry will not be reset to avoid resetting the backoff retries. - state.resetPartialCache() - trigger = triggerManagementConflict - } else if state.cache.needToRetry { - trigger = triggerRetry - } else if opts.needToUpdateWatch() { - trigger = triggerWatchUpdate - } else { - // Reset retryTimer here to make sure it can be fired in the future. - // Image the following scenario: - // When the for loop starts, retryTimer fires first, none of triggerManagementConflict, - // triggerRetry, and triggerWatchUpdate is true. If we don't reset retryTimer here, when - // the `run` function fails when runTimer or resyncTimer fires, the retry logic under this `case` - // will never be executed. - retryTimer.Reset(opts.RetryPeriod) - continue - } - if state.backoff.Steps == 0 { - klog.Infof("Retry limit (%v) has been reached", retryLimit) - // Don't reset retryTimer if retry limit has been reached. - continue - } - - retryDuration := state.backoff.Step() - retries := retryLimit - state.backoff.Steps - klog.Infof("a retry is triggered (trigger type: %v, retries: %v/%v)", trigger, retries, retryLimit) - // During the execution of `run`, if a new commit is detected, - // retryTimer will be reset to `Options.RetryPeriod`, and state.backoff is reset to `defaultBackoff()`. - // In this case, `run` will try to sync the configs from the new commit instead of the old commit - // being retried. - runFn(ctx, p, trigger, state) - // Reset retryTimer after `run` to make sure `retryDuration` happens between the end of one execution - // of `run` and the start of the next execution. - retryTimer.Reset(retryDuration) - statusUpdateTimer.Reset(opts.StatusUpdatePeriod) // Schedule status update attempt - - // Update the sync status to report management conflicts (from the remediator). - case <-statusUpdateTimer.C(): - // Publish the sync status periodically to update remediator errors. - // Skip updates if the remediator is not running yet, paused, or watches haven't been updated yet. - // This implies that this reconciler has successfully parsed, rendered, validated, and synced. - if opts.Remediating() { - klog.V(3).Info("Updating sync status (periodic while not syncing)") - // Don't update the sync spec or commit. - if err := setSyncStatus(ctx, p, state, state.status.SyncStatus.Spec, false, state.status.SyncStatus.Commit, p.SyncErrors()); err != nil { - klog.Warningf("failed to update sync status: %v", err) - } - } - - statusUpdateTimer.Reset(opts.StatusUpdatePeriod) // Schedule status update attempt - // we should not reset retryTimer under this `case` since it is not aware of the - // state of backoff retry. - - // Execute the entire parse-apply-watch loop for a namespace event. - case <-nsEventTimer.C(): - if nsControllerState == nil { - // If the Namespace Controller is not running, stop the timer without - // closing the channel. - nsEventTimer.Stop() - continue - } - if nsControllerState.ScheduleSync() { - klog.Infof("A new sync is triggered by a Namespace event") - // Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run. - // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. - // The cached needToRetry will not be reset to avoid resetting the backoff retries. - state.resetPartialCache() - runFn(ctx, p, namespaceEvent, state) - } - // we should not reset retryTimer under this `case` since it is not aware of the - // state of backoff retry. - nsEventTimer.Reset(nsEventPeriod) // Schedule namespace event check attempt - } - } -} - -// DefaultRunOpts returns the default options for Run -func DefaultRunOpts() RunOpts { - return RunOpts{ - runFunc: run, - backoff: defaultBackoff(), - } -} - -func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) { - // Initialize state from RSync status. +// DefaultRunFunc is the default implementation for RunOpts.RunFunc. +func DefaultRunFunc(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult { + result := RunResult{} + // Initialize status + // TODO: Populate status from RSync status if state.status == nil { reconcilerStatus, err := p.ReconcilerStatusFromCluster(ctx) if err != nil { state.invalidate(status.Append(nil, err)) - return + return result } state.status = reconcilerStatus } - opts := p.options() - var syncDir cmpath.Absolute gs := &SourceStatus{} // pull the source commit and directory with retries within 5 minutes. @@ -264,7 +112,7 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) if setSourceStatusErr != nil { // If there were fetch errors, log those too state.invalidate(status.Append(gs.Errs, setSourceStatusErr)) - return + return result } // Cache the latest source status in memory state.status.SourceStatus = gs @@ -273,7 +121,7 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) // If there were fetch errors, stop, log them, and retry later if gs.Errs != nil { state.invalidate(gs.Errs) - return + return result } } @@ -302,7 +150,7 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) var m status.MultiError state.invalidate(status.Append(m, setRenderingStatusErr)) } - return + return result } if err != nil { rs.Message = RenderingFailed @@ -315,7 +163,7 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) state.status.SyncingConditionLastUpdate = rs.LastUpdate } state.invalidate(status.Append(rs.Errs, setRenderingStatusErr)) - return + return result } } @@ -334,15 +182,14 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) } if errs := read(ctx, p, trigger, state, ps); errs != nil { state.invalidate(errs) - return + return result } newSyncDir := state.cache.source.syncDir if newSyncDir != oldSyncDir { - // Reset the backoff and retryTimer since it is a new commit - state.backoff = defaultBackoff() - state.retryTimer.Reset(state.retryPeriod) + // If the commit changed and parsing succeeded, trigger retries to start again, if stopped. + result.SourceChanged = true } // The parse-apply-watch sequence will be skipped if the trigger type is `triggerReimport` and @@ -350,17 +197,19 @@ func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) // * If a former parse-apply-watch sequence for syncDir succeeded, there is no need to run the sequence again; // * If all the former parse-apply-watch sequences for syncDir failed, the next retry will call the sequence. if trigger == triggerReimport && oldSyncDir == newSyncDir { - return + return result } errs := parseAndUpdate(ctx, p, trigger, state) if errs != nil { state.invalidate(errs) - return + return result } // Only checkpoint the state after *everything* succeeded, including status update. state.checkpoint() + result.Success = true + return result } // read reads config files from source if no rendering is needed, or from hydrated output if rendering is done. diff --git a/pkg/parse/run_test.go b/pkg/parse/run_test.go index 7a653b8fb..ac6e8e9e7 100644 --- a/pkg/parse/run_test.go +++ b/pkg/parse/run_test.go @@ -45,7 +45,6 @@ import ( "kpt.dev/configsync/pkg/importer/reader" "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metadata" - "kpt.dev/configsync/pkg/reconciler/namespacecontroller" "kpt.dev/configsync/pkg/remediator/conflict" remediatorfake "kpt.dev/configsync/pkg/remediator/fake" "kpt.dev/configsync/pkg/rootsync" @@ -62,7 +61,7 @@ const ( symLink = "rev" ) -func newParser(t *testing.T, clock clock.Clock, fakeClient client.Client, fs FileSource, renderingEnabled bool, retryPeriod time.Duration, pollingPeriod time.Duration) Parser { +func newParser(t *testing.T, clock clock.Clock, fakeClient client.Client, fs FileSource, renderingEnabled bool) Parser { parser := &root{} converter, err := openapitest.ValueConverterForTest() if err != nil { @@ -94,9 +93,6 @@ func newParser(t *testing.T, clock clock.Clock, fakeClient client.Client, fs Fil SyncErrorCache: NewSyncErrorCache(conflict.NewHandler(), fight.NewHandler()), }, RenderingEnabled: renderingEnabled, - RetryPeriod: retryPeriod, - ResyncPeriod: 2 * time.Second, - PollingPeriod: pollingPeriod, } return parser @@ -212,24 +208,26 @@ func TestRun(t *testing.T) { } testCases := []struct { - name string - commit string - renderingEnabled bool - hasKustomization bool - hydratedRootExist bool - retryCap time.Duration - srcRootCreateLatency time.Duration - sourceError string - hydratedError string - hydrationDone bool - needRetry bool - expectedRootSync *v1beta1.RootSync + name string + commit string + renderingEnabled bool + hasKustomization bool + hydratedRootExist bool + retryCap time.Duration + srcRootCreateLatency time.Duration + sourceError string + hydratedError string + hydrationDone bool + expectedSourceChanged bool + needRetry bool + expectedRootSync *v1beta1.RootSync }{ { - name: "source commit directory isn't created within the retry cap", - retryCap: 5 * time.Millisecond, - srcRootCreateLatency: 10 * time.Millisecond, - needRetry: true, + name: "source commit directory isn't created within the retry cap", + retryCap: 5 * time.Millisecond, + srcRootCreateLatency: 10 * time.Millisecond, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() rs.ObjectMeta.ResourceVersion = "2" // Create + Update (source status) @@ -264,10 +262,11 @@ func TestRun(t *testing.T) { }(), }, { - name: "source commit directory created within the retry cap", - retryCap: 100 * time.Millisecond, - srcRootCreateLatency: 5 * time.Millisecond, - needRetry: false, + name: "source commit directory created within the retry cap", + retryCap: 100 * time.Millisecond, + srcRootCreateLatency: 5 * time.Millisecond, + expectedSourceChanged: true, + needRetry: false, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) + Update (sync status) @@ -317,9 +316,10 @@ func TestRun(t *testing.T) { }(), }, { - name: "source fetch error", - sourceError: "git sync permission issue", - needRetry: true, + name: "source fetch error", + sourceError: "git sync permission issue", + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) @@ -355,10 +355,11 @@ func TestRun(t *testing.T) { }(), }, { - name: "rendering in progress", - renderingEnabled: true, - hydratedRootExist: true, - needRetry: true, + name: "rendering in progress", + renderingEnabled: true, + hydratedRootExist: true, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) @@ -398,13 +399,14 @@ func TestRun(t *testing.T) { }(), }, { - name: "hydration error", - renderingEnabled: true, - hasKustomization: true, - hydratedRootExist: true, - hydrationDone: true, - hydratedError: `{"code": "1068", "error": "rendering error"}`, - needRetry: true, + name: "hydration error", + renderingEnabled: true, + hasKustomization: true, + hydratedRootExist: true, + hydrationDone: true, + hydratedError: `{"code": "1068", "error": "rendering error"}`, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) @@ -450,12 +452,13 @@ func TestRun(t *testing.T) { }, // TODO: Add source parse error test case (with and without rendering) { - name: "successful read", - renderingEnabled: true, - hasKustomization: true, - hydratedRootExist: true, - hydrationDone: true, - needRetry: false, + name: "successful read", + renderingEnabled: true, + hasKustomization: true, + hydratedRootExist: true, + hydrationDone: true, + expectedSourceChanged: true, + needRetry: false, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) + Update (sync status) @@ -505,10 +508,11 @@ func TestRun(t *testing.T) { }(), }, { - name: "successful read without hydration", - hydratedRootExist: false, - hydrationDone: false, - needRetry: false, + name: "successful read without hydration", + hydratedRootExist: false, + hydrationDone: false, + expectedSourceChanged: true, + needRetry: false, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) + Update (sync status) @@ -558,12 +562,13 @@ func TestRun(t *testing.T) { }(), }, { - name: "error because hydration enabled with wet source", - renderingEnabled: true, - hasKustomization: false, - hydratedRootExist: false, - hydrationDone: true, - needRetry: true, + name: "error because hydration enabled with wet source", + renderingEnabled: true, + hasKustomization: false, + hydratedRootExist: false, + hydrationDone: true, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) @@ -610,12 +615,13 @@ func TestRun(t *testing.T) { }(), }, { - name: "error because hydration disabled with dry source", - renderingEnabled: false, - hasKustomization: true, - hydratedRootExist: true, - hydrationDone: true, - needRetry: true, + name: "error because hydration disabled with dry source", + renderingEnabled: false, + hasKustomization: true, + hydratedRootExist: true, + hydrationDone: true, + expectedSourceChanged: false, + needRetry: true, expectedRootSync: func() *v1beta1.RootSync { rs := rootSyncOutput.DeepCopy() // Create + Update (source status) + Update (rendering status) @@ -746,15 +752,12 @@ func TestRun(t *testing.T) { SourceBranch: fileSource.SourceBranch, } fakeClient := syncerFake.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := newParser(t, fakeClock, fakeClient, fs, tc.renderingEnabled, configsync.DefaultReconcilerRetryPeriod, configsync.DefaultReconcilerPollingPeriod) - state := &reconcilerState{ - backoff: defaultBackoff(), - retryTimer: fakeClock.NewTimer(configsync.DefaultReconcilerRetryPeriod), - retryPeriod: configsync.DefaultReconcilerRetryPeriod, - } + parser := newParser(t, fakeClock, fakeClient, fs, tc.renderingEnabled) + state := &reconcilerState{} t.Logf("start running test at %v", time.Now()) - run(context.Background(), parser, triggerReimport, state) + result := DefaultRunFunc(context.Background(), parser, triggerReimport, state) + assert.Equal(t, tc.expectedSourceChanged, result.SourceChanged) assert.Equal(t, tc.needRetry, state.cache.needToRetry) rs := &v1beta1.RootSync{} @@ -767,45 +770,3 @@ func TestRun(t *testing.T) { }) } } - -func TestBackoffRetryCount(t *testing.T) { - realClock := clock.RealClock{} - fakeClient := syncerFake.NewClient(t, core.Scheme, k8sobjects.RootSyncObjectV1Beta1(rootSyncName)) - parser := newParser(t, realClock, fakeClient, FileSource{}, false, 10*time.Microsecond, 150*time.Microsecond) - testState := &namespacecontroller.State{} - reimportCount := 0 - retryCount := 0 - testIsDone := func(reimportCount *int, _ *int) bool { - return *reimportCount == 35 - } - - t.Logf("start running test at %v", time.Now()) - - ctx, cancel := context.WithCancel(context.Background()) - backoff := defaultBackoff() - backoff.Duration = 10 * time.Microsecond - - Run(ctx, parser, testState, RunOpts{ - runFunc: mockRun(&reimportCount, &retryCount, cancel, testIsDone), - backoff: backoff, - }) - - assert.Equal(t, 12, retryCount) -} - -func mockRun(reimportCount *int, retryCount *int, cancelFn func(), testIsDone func(reimportCount *int, retryCount *int) bool) RunFunc { - return func(_ context.Context, _ Parser, trigger string, state *reconcilerState) { - state.cache.needToRetry = true - - switch trigger { - case triggerReimport: - *reimportCount++ - case triggerRetry: - *retryCount++ - } - - if testIsDone(reimportCount, retryCount) { - cancelFn() - } - } -} diff --git a/pkg/parse/state.go b/pkg/parse/state.go index 6fc22cfda..d355a1379 100644 --- a/pkg/parse/state.go +++ b/pkg/parse/state.go @@ -15,13 +15,8 @@ package parse import ( - "time" - - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" - "k8s.io/utils/clock" "kpt.dev/configsync/pkg/status" - "kpt.dev/configsync/pkg/util" ) type reconcilerState struct { @@ -33,29 +28,6 @@ type reconcilerState struct { // cache tracks the progress made by the reconciler for a source commit. cache cacheForCommit - - // backoff defines the duration to wait before retries - // backoff is initialized to `defaultBackoff()` when a reconcilerState struct is created. - // backoff is updated before a retry starts. - // backoff should only be reset back to `defaultBackoff()` when a new commit is detected. - backoff wait.Backoff - - retryTimer clock.Timer - - retryPeriod time.Duration -} - -// retryLimit defines the maximal number of retries allowed on a given commit. -const retryLimit = 12 - -// The returned backoff includes 12 steps. -// Here is an example of the duration between steps: -// -// 1.055843837s, 2.085359785s, 4.229560375s, 8.324724174s, -// 16.295984061s, 34.325711987s, 1m5.465642392s, 2m18.625713221s, -// 4m24.712222056s, 9m18.97652295s, 17m15.344384599s, 35m15.603237976s. -func defaultBackoff() wait.Backoff { - return util.BackoffWithDurationAndStepLimit(0, retryLimit) } func (s *reconcilerState) checkpoint() { diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index a2b32302e..9cd75e426 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -34,6 +34,7 @@ import ( "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/importer/reader" "kpt.dev/configsync/pkg/parse" + "kpt.dev/configsync/pkg/parse/events" "kpt.dev/configsync/pkg/reconciler/finalizer" "kpt.dev/configsync/pkg/reconciler/namespacecontroller" "kpt.dev/configsync/pkg/remediator" @@ -43,6 +44,7 @@ import ( "kpt.dev/configsync/pkg/syncer/metrics" "kpt.dev/configsync/pkg/syncer/reconcile" "kpt.dev/configsync/pkg/syncer/reconcile/fight" + "kpt.dev/configsync/pkg/util" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -246,9 +248,6 @@ func Run(opts Options) { Client: cl, ReconcilerName: opts.ReconcilerName, SyncName: opts.SyncName, - PollingPeriod: opts.PollingPeriod, - ResyncPeriod: opts.ResyncPeriod, - RetryPeriod: opts.RetryPeriod, StatusUpdatePeriod: opts.StatusUpdatePeriod, DiscoveryInterface: discoveryClient, RenderingEnabled: opts.RenderingEnabled, @@ -271,6 +270,17 @@ func Run(opts Options) { } } + // Use the builder to build a set of event publishers for parser.Run. + pgBuilder := &events.PublishingGroupBuilder{ + Clock: parseOpts.Clock, + SyncPeriod: opts.PollingPeriod, + SyncWithReimportPeriod: opts.ResyncPeriod, + StatusUpdatePeriod: opts.StatusUpdatePeriod, + // TODO: Shouldn't this use opts.RetryPeriod as the initial duration? + // Limit to 12 retries, with no max retry duration. + RetryBackoff: util.BackoffWithDurationAndStepLimit(0, 12), + } + var nsControllerState *namespacecontroller.State if opts.ReconcilerScope == declared.RootScope { rootOpts := &parse.RootOptions{ @@ -279,18 +289,18 @@ func Run(opts Options) { DynamicNSSelectorEnabled: opts.DynamicNSSelectorEnabled, } if opts.DynamicNSSelectorEnabled { - // Only set nsControllerState when dynamic NamespaceSelector is enabled on - // RootSyncs. + // Only set nsControllerState when dynamic NamespaceSelector is + // enabled on RootSyncs. // RepoSync can't manage NamespaceSelectors. nsControllerState = namespacecontroller.NewState() rootOpts.NSControllerState = nsControllerState + // Enable namespace events (every second) + // TODO: Trigger namespace events with a buffered channel from the NamespaceController + pgBuilder.NamespaceControllerPeriod = time.Second } parser = parse.NewRootRunner(parseOpts, rootOpts) } else { parser = parse.NewNamespaceRunner(parseOpts) - if err != nil { - klog.Fatalf("Instantiating Namespace Repository Parser: %v", err) - } } // Start listening to signals @@ -388,7 +398,16 @@ func Run(opts Options) { klog.Info("Starting Parser") // TODO: Convert the Parser to use the controller-manager framework. - parse.Run(ctx, parser, nsControllerState, parse.DefaultRunOpts()) // blocks until ctx.Done() + // Funnel events from the publishers to the subscriber. + funnel := &events.Funnel{ + Publishers: pgBuilder.Build(), + // Wrap the parser with an event handler that triggers the RunFunc, as needed. + Subscriber: parse.NewEventHandler(ctx, parser, nsControllerState, parse.DefaultRunFunc), + } + doneChForParser := funnel.Start(ctx) + + // Wait until done + <-doneChForParser klog.Info("Parser exited") // Wait for Remediator to exit diff --git a/pkg/util/retry.go b/pkg/util/retry.go index f3712b938..0a9983897 100644 --- a/pkg/util/retry.go +++ b/pkg/util/retry.go @@ -92,3 +92,14 @@ func RetryWithBackoff(backoff wait.Backoff, f func() error) error { return err }) } + +// CopyBackoff duplicates a wait.Backoff +func CopyBackoff(in wait.Backoff) wait.Backoff { + return wait.Backoff{ + Duration: in.Duration, + Factor: in.Factor, + Jitter: in.Jitter, + Steps: in.Steps, + Cap: in.Cap, + } +}