Skip to content

Commit

Permalink
Gracefully handle controller shutdown during a catalog flush (#6312)
Browse files Browse the repository at this point in the history
  • Loading branch information
begelundmuller authored and k-anshul committed Jan 6, 2025
1 parent 00eae0f commit 5b3de29
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
8 changes: 7 additions & 1 deletion runtime/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/pkg/dag"
"github.com/rilldata/rill/runtime/pkg/graceful"
"github.com/rilldata/rill/runtime/pkg/schedule"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -212,11 +213,16 @@ func (c *Controller) Run(ctx context.Context) error {
resetTimelineTimer()
c.mu.Unlock()
case <-flushTicker.C: // It's time to flush the catalog to persistent storage
// Add a minimum duration to the ctx to reduce the chance of an interrupted flush.
ctx, cancel := graceful.WithMinimumDuration(ctx, 10*time.Second)
c.mu.RLock()
err := c.catalog.flush(ctx)
cancel()
c.mu.RUnlock()
if err != nil {
loopErr = err
if !errors.Is(err, ctx.Err()) {
loopErr = err
}
stop = true
}
case <-hangingTicker.C: // It's time to check for hanging canceled reconciles
Expand Down
26 changes: 26 additions & 0 deletions runtime/pkg/graceful/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"syscall"
"time"
)

// WithCancelOnTerminate derives a context that is cancelled on SIGINT and SIGTERM signals.
Expand All @@ -24,3 +25,28 @@ func WithCancelOnTerminate(ctx context.Context) context.Context {

return ctx
}

// WithMinimumDuration derives a context that delays the parent's cancellation until the provided minimum duration has elapsed.
// When done with the derived context, call the returned cancel function to clean up associated resources.
func WithMinimumDuration(parentCtx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
newCtx, cancel := context.WithCancel(context.Background())

go func() {
// Wait until the minimum duration has elapsed.
select {
case <-newCtx.Done():
return
case <-time.After(d):
}

// Wait until the parent context is done.
select {
case <-newCtx.Done():
return
case <-parentCtx.Done():
cancel()
}
}()

return newCtx, cancel
}

0 comments on commit 5b3de29

Please sign in to comment.