diff --git a/runtime/controller.go b/runtime/controller.go index b08793b3ab3..44bee4b5b87 100644 --- a/runtime/controller.go +++ b/runtime/controller.go @@ -1203,6 +1203,19 @@ func (c *Controller) trySchedule(n *runtimev1.ResourceName) (success bool, err e } } + // If the resource was renamed and an invocation for its former name is currently running, it means the resource was renamed while it was reconciling. + // It is not possible that the running invocation is for a new resource because we always run rename reconciles before regular reconciles. + // It is also not possible that the running invocation is for another renamed resource because safeRename turns such cases into creates. + // + // In this case, we add the new name to the waitlist of the running invocation and return true. + if r.Meta.RenamedFrom != nil { + inv, ok := c.invocations[nameStr(r.Meta.RenamedFrom)] + if ok { + inv.addToWaitlist(n, r.Meta.SpecVersion) + return true, nil + } + } + // We want deletes to run before renames or regular reconciles. // And we want renames to run before regular reconciles. // Return false if there are deleted or renamed resources, and this isn't one of them. @@ -1343,12 +1356,14 @@ func (c *Controller) processCompletedInvocation(inv *invocation) error { r, err := c.catalog.get(inv.name, true, false) if err != nil { - // Self-deletes are immediately hard deletes. So only return the error if it's not a self-delete. - if !(inv.deletedSelf && errors.Is(err, drivers.ErrResourceNotFound)) { + if !errors.Is(err, drivers.ErrResourceNotFound) { return err } + // There are two cases where the resource no longer exists: + // 1. Self-deletes, which are immediately hard deletes. + // 2. When a resource was renamed while reconciling. } - // NOTE: Due to self-deletes, r may be nil! + // NOTE: Due to self-deletes and renames, r may be nil! if inv.isDelete { // Extra checks in case item was re-created during deletion, or deleted during a normal reconciling (in which case this is just a cancellation of the normal reconcile, not the result of deletion) @@ -1431,7 +1446,7 @@ func (c *Controller) processCompletedInvocation(inv *invocation) error { // Re-enqueue children if: if !inv.reschedule && // Not rescheduling (since then the children would be blocked anyway) - r != nil && // Not a hard delete (children were already enqueued when the soft delete happened) + r != nil && // Not a hard delete or cancelled due to rename (children were already enqueued when the soft delete or rename happened) r.Meta.DeletedOn == nil && // Not a soft delete (children were already enqueued when c.Delete(...) was called) !c.catalog.isCyclic(inv.name) && // Hasn't become cyclic (since DAG access is not safe for cyclic names) true { diff --git a/runtime/controller_test.go b/runtime/controller_test.go index 85206a67b14..1b06b96c086 100644 --- a/runtime/controller_test.go +++ b/runtime/controller_test.go @@ -3,6 +3,7 @@ package runtime_test import ( "context" "fmt" + "path/filepath" "testing" "time" @@ -635,6 +636,36 @@ path: data/foo.csv testruntime.RequireOLAPTableCount(t, rt, id, "bar3", 2) } +func TestRenameReconciling(t *testing.T) { + adbidsPath, err := filepath.Abs("testruntime/testdata/ad_bids/data/AdBids.csv.gz") + require.NoError(t, err) + + rt, id := testruntime.NewInstance(t) + testruntime.PutFiles(t, rt, id, map[string]string{ + "/sources/foo.yaml": ` +connector: local_file +path: ` + adbidsPath, + }) + + // Trigger a reconcile, but don't wait for it to complete + ctrl, err := rt.Controller(context.Background(), id) + require.NoError(t, err) + err = ctrl.Reconcile(context.Background(), runtime.GlobalProjectParserName) + require.NoError(t, err) + + // Imperfect way to wait until the reconcile is in progress, but not completed (AdBids seems to take about 100ms to ingest). + // This seems good enough in practice, and if there's a bug, it will at least identify it some of the time! + time.Sleep(5 * time.Millisecond) + + // Rename the resource while the reconcile is still running + testruntime.RenameFile(t, rt, id, "/sources/foo.yaml", "/sources/bar.yaml") + + // Wait for it to complete and verify the output is stable + testruntime.ReconcileParserAndWait(t, rt, id) + testruntime.RequireReconcileState(t, rt, id, 2, 0, 0) + testruntime.RequireOLAPTable(t, rt, id, "bar") +} + func TestInterdependence(t *testing.T) { // Test D -> C, D -> A, C -> A,B (-> = refs) // Test error propagation on source error