Skip to content

Commit

Permalink
chore: Clean up sync event naming & cache resets (#1470)
Browse files Browse the repository at this point in the history
- Consolidate resetPartialCache calls into the DefaultRunFunc.
  This makes the run func act more like a controller.
- Align the event names, trigger names, and period names.
  FullSync means parse from the filesystem and update the cluster,
  even if nothing changed.
  Sync means only parse from filesystem and update the cluster if
  there was a source change, like repo, commit, or syncDir change.
  • Loading branch information
karlkfi authored Nov 1, 2024
1 parent 059754a commit c5a41be
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 112 deletions.
4 changes: 2 additions & 2 deletions cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var (
fightDetectionThreshold = flag.Float64(
"fight-detection-threshold", 5.0,
"The rate of updates per minute to an API Resource at which the Syncer logs warnings about too many updates to the resource.")
resyncPeriod = flag.Duration("resync-period", configsync.DefaultReconcilerResyncPeriod,
fullSyncPeriod = flag.Duration("full-sync-period", configsync.DefaultReconcilerFullSyncPeriod,
"Period of time between forced re-syncs from source (even without a new commit).")
workers = flag.Int("workers", 1,
"Number of concurrent remediator workers to run at once.")
Expand Down Expand Up @@ -178,7 +178,7 @@ func main() {
FightDetectionThreshold: *fightDetectionThreshold,
NumWorkers: *workers,
ReconcilerScope: scope,
ResyncPeriod: *resyncPeriod,
FullSyncPeriod: *fullSyncPeriod,
PollingPeriod: *pollingPeriod,
RetryPeriod: configsync.DefaultReconcilerRetryPeriod,
StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod,
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/configsync/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ const (
// filesystem for source updates to sync.
DefaultReconcilerPollingPeriod = 15 * time.Second

// DefaultReconcilerResyncPeriod is the time delay between forced re-syncs
// DefaultReconcilerFullSyncPeriod is the time delay between forced re-syncs
// from source (even without a new commit).
DefaultReconcilerResyncPeriod = time.Hour
DefaultReconcilerFullSyncPeriod = time.Hour

// DefaultReconcilerRetryPeriod is the time delay between polling the
// filesystem for source updates to sync, when the previous attempt errored.
Expand Down
60 changes: 24 additions & 36 deletions pkg/parse/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func NewEventHandler(ctx context.Context, r Reconciler, nsControllerState *names
}

// Handle an Event and return the Result.
// - SyncWithReimportEventType - Reset the cache and sync from scratch.
// - SyncEventType - Sync from the cache, priming the cache from disk, if necessary.
// - StatusEventType - Update the RSync status with status from the Remediator & NSController.
// - NamespaceResyncEventType - Sync from the cache, if the NSController requested one.
// - RetrySyncEventType - Sync from the cache, if one of the following cases is detected:
// - FullSyncEventType - Reset the cache and sync from scratch.
// - SyncEventType - Sync from the cache, priming the cache from disk, if necessary.
// - StatusUpdateEventType - Update the RSync status with status from the Remediator & NSController.
// - NamespaceSyncEventType - Sync from the cache, if the NSController requested one.
// - RetrySyncEventType - Sync from the cache, if one of the following cases is detected:
// - Remediator or Reconciler reported a management conflict
// - Reconciler requested a retry due to error
// - Remediator requested a watch update
Expand All @@ -66,25 +66,24 @@ func (s *EventHandler) Handle(event events.Event) events.Result {

var runResult RunResult
switch event.Type {
case events.SyncWithReimportEventType:
// Re-apply even if no changes have been detected.
// This case should be checked first since it resets the cache.
// If the reconciler is in the process of reconciling a given commit, the resync won't
// happen until the ongoing reconciliation is done.
klog.Infof("It is time for a force-resync")
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
state.resetPartialCache()
runResult = runFn(s.Context, s.Reconciler, triggerResync)
case events.FullSyncEventType:
// FullSync = Read* + Render* + Parse + Update
//
// * Read & Render will only happen if there's a new commit, new source
// spec change, or a previous error invalidated the cache.
// Otherwise full-sync starts from re-parsing the objects from disk.
runResult = runFn(s.Context, s.Reconciler, triggerFullSync)

case events.SyncEventType:
// Re-import declared resources from the filesystem (from *-sync).
// If the reconciler is in the process of reconciling a given commit, the re-import won't
// happen until the ongoing reconciliation is done.
runResult = runFn(s.Context, s.Reconciler, triggerReimport)

case events.StatusEventType:
// Sync = Read* + Render* + Parse* + Update
//
// * Read, Render, and Parse will only happen if there's a new commit,
// new source spec change, or a previous error invalidated the cache.
// Otherwise sync skips directly to the Update stage, using the
// previously parsed in-memory object cache.
runResult = runFn(s.Context, s.Reconciler, triggerSync)

case events.StatusUpdateEventType:
// Publish the sync status periodically to update remediator errors.
// Skip updates if the remediator is not running yet, paused, or watches haven't been updated yet.
// This implies that this reconciler has successfully parsed, rendered, validated, and synced.
Expand All @@ -107,29 +106,18 @@ func (s *EventHandler) Handle(event events.Event) events.Result {
}
}

case events.NamespaceResyncEventType:
// If the namespace controller indicates that an update is needed,
// attempt to re-sync.
case events.NamespaceSyncEventType:
// FullSync if the namespace controller detected a change.
if !s.NSControllerState.ScheduleSync() {
// No RunFunc call
break
}

klog.Infof("A new sync is triggered by a Namespace event")
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
state.resetPartialCache()
runResult = runFn(s.Context, s.Reconciler, namespaceEvent)
runResult = runFn(s.Context, s.Reconciler, triggerNamespaceUpdate)

case events.RetrySyncEventType:
// Retry if there was an error, conflict, or any watches need to be updated.
var trigger string
if opts.HasManagementConflict() {
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
state.resetPartialCache()
trigger = triggerManagementConflict
} else if state.cache.needToRetry {
trigger = triggerRetry
Expand Down
14 changes: 7 additions & 7 deletions pkg/parse/events/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type PublishingGroupBuilder struct {
// SyncPeriod is the period of time between checking the filesystem
// for publisher updates to sync.
SyncPeriod time.Duration
// SyncWithReimportPeriod is the period of time between forced re-sync from
// publisher (even without a new commit).
SyncWithReimportPeriod time.Duration
// FullSyncPeriod is the period of time between full syncs from disk
// (even without a new commit).
FullSyncPeriod time.Duration
// StatusUpdatePeriod is how long the Parser waits between updates of the
// sync status, to account for management conflict errors from the Remediator.
StatusUpdatePeriod time.Duration
Expand All @@ -51,17 +51,17 @@ func (t *PublishingGroupBuilder) Build() []Publisher {
if t.SyncPeriod > 0 {
publishers = append(publishers, NewResetOnRunAttemptPublisher(SyncEventType, t.Clock, t.SyncPeriod))
}
if t.SyncWithReimportPeriod > 0 {
publishers = append(publishers, NewTimeDelayPublisher(SyncWithReimportEventType, t.Clock, t.SyncWithReimportPeriod))
if t.FullSyncPeriod > 0 {
publishers = append(publishers, NewTimeDelayPublisher(FullSyncEventType, t.Clock, t.FullSyncPeriod))
}
if t.NamespaceControllerPeriod > 0 {
publishers = append(publishers, NewTimeDelayPublisher(NamespaceResyncEventType, t.Clock, t.NamespaceControllerPeriod))
publishers = append(publishers, NewTimeDelayPublisher(NamespaceSyncEventType, t.Clock, t.NamespaceControllerPeriod))
}
if t.RetryBackoff.Duration > 0 {
publishers = append(publishers, NewRetrySyncPublisher(t.Clock, t.RetryBackoff))
}
if t.StatusUpdatePeriod > 0 {
publishers = append(publishers, NewResetOnRunAttemptPublisher(StatusEventType, t.Clock, t.StatusUpdatePeriod))
publishers = append(publishers, NewResetOnRunAttemptPublisher(StatusUpdateEventType, t.Clock, t.StatusUpdatePeriod))
}
return publishers
}
12 changes: 6 additions & 6 deletions pkg/parse/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ type Event struct {
type EventType string

const (
// SyncWithReimportEventType is the EventType for a sync from disk,
// FullSyncEventType is the EventType for a sync from disk,
// including reading and parsing resource objects from the shared volume.
SyncWithReimportEventType EventType = "SyncWithReimportEvent"
FullSyncEventType EventType = "FullSyncEvent"
// SyncEventType is the EventType for a sync from the source cache,
// only parsing objects from the shared volume if there's a new commit.
SyncEventType EventType = "SyncEvent"
// StatusEventType is the EventType for a periodic RSync status update.
StatusEventType EventType = "StatusEvent"
// NamespaceResyncEventType is the EventType for a sync triggered by an
// StatusUpdateEventType is the EventType for a periodic RSync status update.
StatusUpdateEventType EventType = "StatusUpdateEvent"
// NamespaceSyncEventType is the EventType for a sync triggered by an
// update to a selected namespace.
NamespaceResyncEventType EventType = "NamespaceResyncEvent"
NamespaceSyncEventType EventType = "NamespaceSyncEvent"
// RetrySyncEventType is the EventType for a sync triggered by an error
// during a previous sync attempt.
RetrySyncEventType EventType = "RetrySyncEvent"
Expand Down
Loading

0 comments on commit c5a41be

Please sign in to comment.