From 8ffd109f5003d8f22a12d0fff8d50ac5ea26c6e6 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Fri, 26 Jan 2024 16:31:49 -0500 Subject: [PATCH 1/4] Simplified `PlaceholderTask` --- .../support/pickles/ExecutorPickle.java | 3 +- .../steps/ExecutorStepDynamicContext.java | 4 +- .../support/steps/ExecutorStepExecution.java | 183 ++++++++---------- .../support/steps/ExecutorStepTest.java | 2 +- 4 files changed, 81 insertions(+), 111 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/support/pickles/ExecutorPickle.java b/src/main/java/org/jenkinsci/plugins/workflow/support/pickles/ExecutorPickle.java index f24ac769..bd9756ed 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/support/pickles/ExecutorPickle.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/support/pickles/ExecutorPickle.java @@ -114,11 +114,10 @@ protected Executor tryResolve() throws Exception { if (task instanceof ExecutorStepExecution.PlaceholderTask) { ExecutorStepExecution.PlaceholderTask placeholder = (ExecutorStepExecution.PlaceholderTask)task; - // Non-null cookie means body has begun execution, i.e. we started using a node // PlaceholderTask#getAssignedLabel is set to the Node name when execution starts // Thus we're guaranteeing the execution began and the Node is now unknown. // Theoretically it's safe to simply fail earlier when rehydrating any EphemeralNode... but we're being extra safe. - if (placeholder.getCookie() != null && Jenkins.get().getNode(placeholder.getAssignedLabel().getName()) == null ) { + if (placeholder.hasStarted() && Jenkins.get().getNode(placeholder.getAssignedLabel().getName()) == null ) { if (System.nanoTime() > endTimeNanos) { Queue.getInstance().cancel(item); owner.getListener().getLogger().printf("Killed %s after waiting for %s because we assume unknown agent %s is never going to appear%n", diff --git a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java index 181005f6..5c0f6c35 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java @@ -69,7 +69,7 @@ public final class ExecutorStepDynamicContext implements Serializable { private static final long serialVersionUID = 1; - @NonNull ExecutorStepExecution.PlaceholderTask task; + final @NonNull ExecutorStepExecution.PlaceholderTask task; final @NonNull String node; final @NonNull String path; final int depth; @@ -97,8 +97,6 @@ void resume(StepContext context) throws Exception { if (item == null) { throw new IllegalStateException("queue refused " + task); } - // Try to avoid having distinct a instance of PlaceholderTask here compared to any previously-scheduled task. - task = (ExecutorStepExecution.PlaceholderTask)item.task; LOGGER.fine(() -> (result.isCreated() ? "scheduled " : " using already-scheduled ") + item + " for " + path + " on " + node); TaskListener listener = context.get(TaskListener.class); if (!node.isEmpty()) { // unlikely to be any delay for built-in node anyway diff --git a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java index 7a79ffca..ecbd1810 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java @@ -164,7 +164,7 @@ public void stop(@NonNull Throwable cause) throws Exception { if (item.task instanceof PlaceholderTask) { PlaceholderTask task = (PlaceholderTask) item.task; if (task.context.equals(context)) { - task.stopping = true; + RunningTasks.run(context, t -> t.stopping = true); if (Queue.getInstance().cancel(item)) { LOGGER.fine(() -> "canceled " + item); } else { @@ -188,7 +188,7 @@ public void stop(@NonNull Throwable cause) throws Exception { if (exec instanceof PlaceholderTask.PlaceholderExecutable) { StepContext actualContext = ((PlaceholderTask.PlaceholderExecutable) exec).getParent().context; if (actualContext.equals(context)) { - PlaceholderTask.finish(((PlaceholderTask.PlaceholderExecutable) exec).getParent().cookie); + PlaceholderTask.finish(context, ((PlaceholderTask.PlaceholderExecutable) exec).getParent().cookie); LOGGER.log(FINE, "canceling {0}", exec); break COMPUTERS; } else { @@ -251,7 +251,7 @@ public void stop(@NonNull Throwable cause) throws Exception { if (li.isCancelled()) { if (li.task instanceof PlaceholderTask) { PlaceholderTask task = (PlaceholderTask) li.task; - if (!task.stopping) { + if (!RunningTasks.get(task.context, t -> t.stopping)) { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.log(Level.FINE, null, new Throwable(li.task + " was cancelled")); } @@ -395,6 +395,11 @@ private static final class RunningTask { @Nullable AsynchronousExecution execution; /** null until placeholder executable runs */ @Nullable Launcher launcher; + /** + * Flag to remember that {@link #stop} is being called, so {@link CancelledItemListener} can be suppressed. + * Also used for {@link PlaceholderTask#getCauseOfBlockage}. + */ + boolean stopping; } private static final String COOKIE_VAR = "JENKINS_NODE_COOKIE"; @@ -408,19 +413,13 @@ public static final class PlaceholderTask implements ContinuedTask, Serializable private final String runId; /** * Unique cookie set once the task starts. - * Serves multiple purposes: - * identifies whether we have already invoked the body (since this can be rerun after restart); - * serves as a key for {@link RunningTasks#runningTasks} and {@link Callback} (cannot just have a doneness flag in {@link PlaceholderTask} because multiple copies might be deserialized); - * and allows {@link Launcher#kill} to work. + * Allows {@link Launcher#kill} to work. */ private String cookie; /** {@link Authentication#getName} of user of build, if known. */ private final @CheckForNull String auth; - /** Flag to remember that {@link #stop} is being called, so {@link CancelledItemListener} can be suppressed. */ - private transient boolean stopping; - PlaceholderTask(StepContext context, String label) throws IOException, InterruptedException { this.context = context; this.label = label; @@ -434,27 +433,6 @@ public static final class PlaceholderTask implements ContinuedTask, Serializable LOGGER.log(FINE, "scheduling {0}", this); } - private Object readResolve() { - if (cookie != null) { - RunningTasks.get().withRunningTasks(runningTasks -> { - // If Jenkins stops while this step is resuming, there may be a PlaceholderTask in the queue as - // well as in program.dat for the same step. We want to make sure not to create a second task to - // avoid race conditions, so we use putIfAbsent. - // TODO: This helps for runningTasks, but other fields like `stopping` may still be problematic. - // Should we refactor things to guarantee that the relevant state is a singleton? For example, - // introduce a PlaceholderTasksAction that holds a map of a new PlaceholderTaskState class, which - // would hold most of what is currently in PlaceholderTask, and then PlaceholderTask would only - // hold a `String cookie` field and would look up PlaceholderTaskState via the action so it wouldn't - // matter where the task was serialized. - runningTasks.putIfAbsent(cookie, new RunningTask()); - }); - } - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.log(FINE, null, new Exception("deserializing previously scheduled " + this)); - } - return this; - } - /** * We cannot keep {@link ExecutorStepExecution} as a serial field of {@link PlaceholderTask} * since it could not be serialized via XStream in {@link Queue}. @@ -492,9 +470,10 @@ private void withExecution(Consumer executionCallback) { return new PlaceholderExecutable(); } - @CheckForNull - public String getCookie() { - return cookie; + /** Body has begun execution: we started using a node. */ + @Restricted(NoExternalUse.class) + public boolean hasStarted() { + return cookie != null; } @Override public Label getAssignedLabel() { @@ -533,6 +512,7 @@ public String getCookie() { } @Override public CauseOfBlockage getCauseOfBlockage() { + boolean stopping = RunningTasks.get(context, t -> t.stopping); if (FlowExecutionList.get().isResumptionComplete()) { // We only do this if resumption is complete so that we do not load the run and resume its execution in this context in normal scenarios. Run run = runForDisplay(); @@ -540,7 +520,7 @@ public String getCookie() { if (stopping) { LOGGER.warning(() -> "Refusing to build " + PlaceholderTask.this + " and going to cancel it, even though it was supposedly stopped already, because associated build is complete"); } else { - stopping = true; + RunningTasks.run(context, t -> t.stopping = true); } Timer.get().execute(() -> { if (Queue.getInstance().cancel(this)) { @@ -640,8 +620,8 @@ public String getShortDescription() { } return context.get(Run.class); } catch (Exception x) { - LOGGER.log(FINE, "broken " + cookie + " in " + runId, x); - finish(cookie); // probably broken, so just shut it down + LOGGER.log(FINE, "broken " + context, x); + finish(context, cookie); // probably broken, so just shut it down return null; } } @@ -875,7 +855,7 @@ private String computeEnclosingLabel(FlowNode executorStepNode, List h } @Override public String toString() { - return "ExecutorStepExecution.PlaceholderTask{runId=" + runId + ",label=" + label + ",context=" + context + ",cookie=" + cookie + ",auth=" + auth + '}'; + return "ExecutorStepExecution.PlaceholderTask{label=" + label + ",context=" + context + '}'; } @Override @@ -896,34 +876,28 @@ public boolean equals(Object obj) { return this.context.equals(other.context); } - private static void finish(@CheckForNull final String cookie) { - if (cookie == null) { + private static void finish(StepContext context, @CheckForNull final String cookie) { + RunningTask runningTask = RunningTasks.remove(context); + final AsynchronousExecution execution = runningTask.execution; + if (execution == null) { + // JENKINS-30759: finished before asynch execution was even scheduled return; } - RunningTasks.get().withRunningTasks(runningTasks -> { - final RunningTask runningTask = runningTasks.remove(cookie); - if (runningTask == null) { - LOGGER.log(FINE, "no running task corresponds to {0}", cookie); + assert runningTask.launcher != null; + Timer.get().submit(() -> execution.completed(null)); // JENKINS-31614 + Computer.threadPoolForRemoting.submit(() -> { // JENKINS-34542, JENKINS-45553 + if (cookie == null) { return; } - final AsynchronousExecution execution = runningTask.execution; - if (execution == null) { - // JENKINS-30759: finished before asynch execution was even scheduled - return; + try { + runningTask.launcher.kill(Collections.singletonMap(COOKIE_VAR, cookie)); + } catch (ChannelClosedException x) { + // fine, Jenkins was shutting down + } catch (RequestAbortedException x) { + // agent was exiting; too late to kill subprocesses + } catch (Exception x) { + LOGGER.log(Level.WARNING, "failed to shut down " + cookie + " from " + context, x); } - assert runningTask.launcher != null; - Timer.get().submit(() -> execution.completed(null)); // JENKINS-31614 - Computer.threadPoolForRemoting.submit(() -> { // JENKINS-34542, JENKINS-45553 - try { - runningTask.launcher.kill(Collections.singletonMap(COOKIE_VAR, cookie)); - } catch (ChannelClosedException x) { - // fine, Jenkins was shutting down - } catch (RequestAbortedException x) { - // agent was exiting; too late to kill subprocesses - } catch (Exception x) { - LOGGER.log(Level.WARNING, "failed to shut down " + cookie, x); - } - }); }); } @@ -946,32 +920,32 @@ private static final class Callback extends BodyExecutionCallback.TailCall { } @Override protected void finished(StepContext context) throws Exception { - LOGGER.log(FINE, "finished {0}", cookie); + if (execution == null) { // compatibility with old serial forms + lease.release(); + lease = null; + return; + } + LOGGER.log(FINE, "finished {0}", execution.getContext()); try { - if (execution != null) { - WorkspaceList.Lease _lease = ExtensionList.lookupSingleton(ExecutorStepDynamicContext.WorkspaceListLeaseTranslator.class).get(execution.state); - if (_lease != null) { - _lease.release(); - } - } else { - lease.release(); - lease = null; + WorkspaceList.Lease _lease = ExtensionList.lookupSingleton(ExecutorStepDynamicContext.WorkspaceListLeaseTranslator.class).get(execution.state); + if (_lease != null) { + _lease.release(); } } finally { - finish(cookie); + finish(execution.getContext(), cookie); } - if (execution != null) { - execution.body = null; - boolean _stopping = execution.state.task.stopping; - execution.state.task.stopping = true; + execution.body = null; + RunningTasks.run(context, t -> { + boolean _stopping = t.stopping; + t.stopping = true; try { Queue.getInstance().cancel(execution.state.task); } finally { - execution.state.task.stopping = _stopping; + t.stopping = _stopping; } - execution.state = null; - context.saveState(); - } + }); + execution.state = null; + context.saveState(); } } @@ -1020,9 +994,6 @@ public final class PlaceholderExecutable implements ContinuableExecutable, Acces env.put("EXECUTOR_NUMBER", String.valueOf(exec.getNumber())); env.put("NODE_LABELS", node.getAssignedLabels().stream().map(Object::toString).collect(Collectors.joining(" "))); - RunningTasks.get().withRunningTasks(runningTasks -> { - runningTasks.put(cookie, new RunningTask()); - }); // For convenience, automatically allocate a workspace, like WorkspaceStep would: Job j = r.getParent(); if (!(j instanceof TopLevelItem)) { @@ -1050,12 +1021,12 @@ public final class PlaceholderExecutable implements ContinuableExecutable, Acces .withContexts(env, state) .withCallback(new Callback(cookie, execution)) .start(); - LOGGER.fine(() -> "started " + cookie + " in " + runId); + LOGGER.fine(() -> "started " + context); context.saveState(); }); } else { // just rescheduled after a restart; wait for task to complete - LOGGER.fine(() -> "resuming " + cookie + " in " + runId); + LOGGER.fine(() -> "resuming " + context); } } catch (Exception x) { if (computer != null) { @@ -1074,13 +1045,8 @@ public final class PlaceholderExecutable implements ContinuableExecutable, Acces } // wait until the invokeBodyLater call above completes and notifies our Callback object final TaskListener _listener = listener; - RunningTasks.get().withRunningTasks(runningTasks -> { - LOGGER.fine(() -> "waiting on " + cookie + " in " + runId); - RunningTask runningTask = runningTasks.get(cookie); - if (runningTask == null) { - LOGGER.fine(() -> "running task apparently finished quickly for " + cookie + " in " + runId); - return; - } + RunningTasks.run(context, runningTask -> { + LOGGER.fine(() -> "waiting on " + context); assert runningTask.execution == null; assert runningTask.launcher == null; runningTask.launcher = launcher; @@ -1089,7 +1055,7 @@ public final class PlaceholderExecutable implements ContinuableExecutable, Acces if (forShutdown) { return; } - LOGGER.fine(() -> "interrupted " + cookie + " in " + runId); + LOGGER.fine(() -> "interrupted " + context); Timer.get().submit(() -> { // JENKINS-46738 Executor thisExecutor = /* AsynchronousExecution. */ getExecutor(); AtomicReference cancelledBodyExecution = new AtomicReference(false); @@ -1156,9 +1122,7 @@ public Long getTimestamp() { } @Override public boolean willContinue() { - return RunningTasks.get().withRunningTasks(runningTasks -> { - return runningTasks.containsKey(cookie); - }); + return hasStarted(); } @Restricted(DoNotUse.class) // for Jelly @@ -1189,8 +1153,6 @@ public String getAbsoluteUrl() { return "PlaceholderExecutable:" + PlaceholderTask.this; } - private static final long serialVersionUID = 1L; - @NonNull @Override public ACL getACL() { @@ -1230,20 +1192,31 @@ public Queue.Item itemInQueue() { @Extension public static class RunningTasks { - /** keys are {@link PlaceholderTask#cookie}s */ - private final Map runningTasks = new HashMap<>(); + private final Map runningTasks = new HashMap<>(); - synchronized T withRunningTasks(Function, T> fn) { - return fn.apply(runningTasks); + private static RunningTask find(StepContext context) { + RunningTasks holder = ExtensionList.lookupSingleton(RunningTasks.class); + synchronized (holder) { + return holder.runningTasks.computeIfAbsent(context, k -> new RunningTask()); + } } - synchronized void withRunningTasks(Consumer> fn) { - fn.accept(runningTasks); + static T get(StepContext context, Function fn) { + return fn.apply(find(context)); } - static RunningTasks get() { - return ExtensionList.lookupSingleton(RunningTasks.class); + static void run(StepContext context, Consumer fn) { + fn.accept(find(context)); } + + static RunningTask remove(StepContext context) { + RunningTasks holder = ExtensionList.lookupSingleton(RunningTasks.class); + synchronized (holder) { + RunningTask t = holder.runningTasks.remove(context); + return t != null ? t : new RunningTask(); + } + } + } private static final long serialVersionUID = 1L; diff --git a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java index 811372df..121dd5f4 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java @@ -1215,7 +1215,7 @@ public void getOwnerTaskPermissions() throws Throwable { while (Queue.getInstance().getItems().length > 0) { Thread.sleep(100L); } - assertThat(logging.getMessages(), hasItem(startsWith("Refusing to build ExecutorStepExecution.PlaceholderTask{runId=p#"))); + assertThat(logging.getMessages(), hasItem(startsWith("Refusing to build ExecutorStepExecution.PlaceholderTask"))); }); } From 2017151ec8e3d258e0e65fccca9b560a89b0841c Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Fri, 26 Jan 2024 17:51:01 -0500 Subject: [PATCH 2/4] Logging from `RunningTasks.remove` https://github.com/jenkinsci/workflow-durable-task-step-plugin/pull/354#discussion_r1468187962 --- .../workflow/support/steps/ExecutorStepExecution.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java index ecbd1810..7dc2ff9e 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java @@ -1213,7 +1213,12 @@ static RunningTask remove(StepContext context) { RunningTasks holder = ExtensionList.lookupSingleton(RunningTasks.class); synchronized (holder) { RunningTask t = holder.runningTasks.remove(context); - return t != null ? t : new RunningTask(); + if (t != null) { + return t; + } else { + LOGGER.fine(() -> "was no running task information to remove from " + context); + return new RunningTask(); + } } } From d1a2aa26756457172050bd7707d66fc4730b8a09 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Mon, 29 Jan 2024 11:56:24 -0500 Subject: [PATCH 3/4] `willContinue` implementation was wrong (failed to return false after task completed), and `RunningTasks` add/remove timing was off --- .../support/steps/ExecutorStepExecution.java | 83 +++++++++++++------ 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java index 7dc2ff9e..252de6d7 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java @@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.logging.Level; import static java.util.logging.Level.*; import java.util.logging.Logger; @@ -189,6 +190,7 @@ public void stop(@NonNull Throwable cause) throws Exception { StepContext actualContext = ((PlaceholderTask.PlaceholderExecutable) exec).getParent().context; if (actualContext.equals(context)) { PlaceholderTask.finish(context, ((PlaceholderTask.PlaceholderExecutable) exec).getParent().cookie); + RunningTasks.remove(context); LOGGER.log(FINE, "canceling {0}", exec); break COMPUTERS; } else { @@ -251,7 +253,7 @@ public void stop(@NonNull Throwable cause) throws Exception { if (li.isCancelled()) { if (li.task instanceof PlaceholderTask) { PlaceholderTask task = (PlaceholderTask) li.task; - if (!RunningTasks.get(task.context, t -> t.stopping)) { + if (!RunningTasks.get(task.context, t -> t.stopping, () -> false)) { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.log(Level.FINE, null, new Throwable(li.task + " was cancelled")); } @@ -430,9 +432,18 @@ public static final class PlaceholderTask implements ContinuedTask, Serializable } else { auth = runningAuth.getName(); } + RunningTasks.add(context); LOGGER.log(FINE, "scheduling {0}", this); } + private Object readResolve() { + RunningTasks.add(context); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.log(FINE, null, new Exception("deserializing previously scheduled " + this)); + } + return this; + } + /** * We cannot keep {@link ExecutorStepExecution} as a serial field of {@link PlaceholderTask} * since it could not be serialized via XStream in {@link Queue}. @@ -512,7 +523,7 @@ public boolean hasStarted() { } @Override public CauseOfBlockage getCauseOfBlockage() { - boolean stopping = RunningTasks.get(context, t -> t.stopping); + boolean stopping = RunningTasks.get(context, t -> t.stopping, () -> false); if (FlowExecutionList.get().isResumptionComplete()) { // We only do this if resumption is complete so that we do not load the run and resume its execution in this context in normal scenarios. Run run = runForDisplay(); @@ -622,6 +633,7 @@ public String getShortDescription() { } catch (Exception x) { LOGGER.log(FINE, "broken " + context, x); finish(context, cookie); // probably broken, so just shut it down + RunningTasks.remove(context); return null; } } @@ -877,18 +889,23 @@ public boolean equals(Object obj) { } private static void finish(StepContext context, @CheckForNull final String cookie) { - RunningTask runningTask = RunningTasks.remove(context); + RunningTask runningTask = RunningTasks.get(context, t -> t, () -> null); + if (runningTask == null) { + LOGGER.fine(() -> "no known running task for " + context); + return; + } final AsynchronousExecution execution = runningTask.execution; if (execution == null) { - // JENKINS-30759: finished before asynch execution was even scheduled + LOGGER.fine(() -> "no AsynchronousExecution associated with " + context + " (JENKINS-30759 maybe finished before asynch execution was even scheduled?)"); return; } assert runningTask.launcher != null; Timer.get().submit(() -> execution.completed(null)); // JENKINS-31614 + if (cookie == null) { + LOGGER.fine(() -> "no cookie to kill from " + context); + return; + } Computer.threadPoolForRemoting.submit(() -> { // JENKINS-34542, JENKINS-45553 - if (cookie == null) { - return; - } try { runningTask.launcher.kill(Collections.singletonMap(COOKIE_VAR, cookie)); } catch (ChannelClosedException x) { @@ -919,7 +936,7 @@ private static final class Callback extends BodyExecutionCallback.TailCall { this.execution = execution; } - @Override protected void finished(StepContext context) throws Exception { + @Override protected void finished(StepContext bodyContext) throws Exception { if (execution == null) { // compatibility with old serial forms lease.release(); lease = null; @@ -935,7 +952,9 @@ private static final class Callback extends BodyExecutionCallback.TailCall { finish(execution.getContext(), cookie); } execution.body = null; - RunningTasks.run(context, t -> { + RunningTask t = RunningTasks.remove(execution.getContext()); + if (t != null) { + LOGGER.fine(() -> "cancelling any leftover task from " + execution.getContext()); boolean _stopping = t.stopping; t.stopping = true; try { @@ -943,9 +962,11 @@ private static final class Callback extends BodyExecutionCallback.TailCall { } finally { t.stopping = _stopping; } - }); + } else { + LOGGER.fine(() -> "no entry for " + execution.getContext()); + } execution.state = null; - context.saveState(); + bodyContext.saveState(); } } @@ -1122,7 +1143,7 @@ public Long getTimestamp() { } @Override public boolean willContinue() { - return hasStarted(); + return RunningTasks.get(context, t -> t.execution != null, () -> false); } @Restricted(DoNotUse.class) // for Jelly @@ -1194,31 +1215,43 @@ public Queue.Item itemInQueue() { public static class RunningTasks { private final Map runningTasks = new HashMap<>(); - private static RunningTask find(StepContext context) { + static void add(StepContext context) { RunningTasks holder = ExtensionList.lookupSingleton(RunningTasks.class); synchronized (holder) { - return holder.runningTasks.computeIfAbsent(context, k -> new RunningTask()); + holder.runningTasks.putIfAbsent(context, new RunningTask()); } } - static T get(StepContext context, Function fn) { - return fn.apply(find(context)); + private static @CheckForNull RunningTask find(StepContext context) { + RunningTasks holder = ExtensionList.lookupSingleton(RunningTasks.class); + synchronized (holder) { + return holder.runningTasks.get(context); + } + } + + static T get(StepContext context, Function fn, Supplier fallback) { + RunningTask t = find(context); + if (t != null) { + return fn.apply(t); + } else { + LOGGER.fine(() -> "no RunningTask associated with " + context); + return fallback.get(); + } } static void run(StepContext context, Consumer fn) { - fn.accept(find(context)); + RunningTask t = find(context); + if (t != null) { + fn.accept(t); + } else { + LOGGER.fine(() -> "no RunningTask associated with " + context); + } } - static RunningTask remove(StepContext context) { + static @CheckForNull RunningTask remove(StepContext context) { RunningTasks holder = ExtensionList.lookupSingleton(RunningTasks.class); synchronized (holder) { - RunningTask t = holder.runningTasks.remove(context); - if (t != null) { - return t; - } else { - LOGGER.fine(() -> "was no running task information to remove from " + context); - return new RunningTask(); - } + return holder.runningTasks.remove(context); } } From c52e84d102b295dc88f992a6ce0baf161d2b62c5 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Tue, 30 Jan 2024 11:22:18 -0500 Subject: [PATCH 4/4] Test coverage for `willContinue` --- .../support/steps/ExecutorStep2Test.java | 180 ++++++++++++++++++ .../support/steps/ExecutorStepTest.java | 53 ------ 2 files changed, 180 insertions(+), 53 deletions(-) create mode 100644 src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStep2Test.java diff --git a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStep2Test.java b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStep2Test.java new file mode 100644 index 00000000..308767e8 --- /dev/null +++ b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStep2Test.java @@ -0,0 +1,180 @@ +/* + * The MIT License + * + * Copyright 2024 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.workflow.support.steps; + +import hudson.ExtensionList; +import hudson.model.Computer; +import hudson.model.Descriptor; +import hudson.model.Item; +import hudson.model.Label; +import hudson.model.Node; +import hudson.model.Queue; +import hudson.model.Slave; +import hudson.model.TaskListener; +import hudson.model.queue.CauseOfBlockage; +import hudson.model.queue.QueueTaskDispatcher; +import hudson.slaves.AbstractCloudComputer; +import hudson.slaves.AbstractCloudSlave; +import hudson.slaves.Cloud; +import hudson.slaves.NodeProvisioner; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import jenkins.model.Jenkins; +import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy; +import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; +import org.jenkinsci.plugins.workflow.job.WorkflowJob; +import org.jenkinsci.plugins.workflow.job.WorkflowRun; +import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runners.Parameterized; +import org.jvnet.hudson.test.BuildWatcher; +import org.jvnet.hudson.test.Issue; +import org.jvnet.hudson.test.JenkinsSessionRule; +import org.jvnet.hudson.test.SimpleCommandLauncher; +import org.jvnet.hudson.test.TestExtension; + +/** + * Like {@link ExecutorStepTest} but not using {@link Parameterized} which appears incompatible with {@link TestExtension}. + */ +public final class ExecutorStep2Test { + + @ClassRule public static BuildWatcher buildWatcher = new BuildWatcher(); + @Rule public JenkinsSessionRule rr = new JenkinsSessionRule(); + + @Issue("JENKINS-53837") + @Test public void queueTaskOwnerCorrectWhenRestarting() throws Throwable { + rr.then(r -> { + ExtensionList.lookupSingleton(PipelineOnlyTaskDispatcher.class); + WorkflowJob p = r.createProject(WorkflowJob.class, "p1"); + p.setDefinition(new CpsFlowDefinition("node {\n" + + " semaphore('wait')\n" + + "}", true)); + WorkflowRun b = p.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait/1", b); + }); + rr.then(r -> { + WorkflowJob p = r.jenkins.getItemByFullName("p1", WorkflowJob.class); + WorkflowRun b = p.getBuildByNumber(1); + SemaphoreStep.success("wait/1", null); + r.waitForCompletion(b); + r.assertBuildStatusSuccess(b); + r.assertLogNotContains("Non-Pipeline tasks are forbidden!", b); + }); + } + @TestExtension("queueTaskOwnerCorrectWhenRestarting") + public static class PipelineOnlyTaskDispatcher extends QueueTaskDispatcher { + @Override + public CauseOfBlockage canTake(Node node, Queue.BuildableItem item) { + Queue.Task t = item.task; + while (!(t instanceof Item) && (t != null)) { + final Queue.Task ownerTask = t.getOwnerTask(); + if (t == ownerTask) { + break; + } + t = ownerTask; + } + if (t instanceof WorkflowJob) { + return null; + } + final Queue.Task finalT = t; + return new CauseOfBlockage() { + @Override + public String getShortDescription() { + return "Non-Pipeline tasks are forbidden! Not building: " + finalT; + } + }; + } + } + + @Test public void cloud() throws Throwable { + rr.then(r -> { + ExtensionList.lookupSingleton(TestCloud.DescriptorImpl.class); + r.jenkins.clouds.add(new TestCloud()); + var p = r.createProject(WorkflowJob.class, "p"); + p.setDefinition(new CpsFlowDefinition("node('test') {}", true)); + r.assertLogContains("Running on test-1", r.buildAndAssertSuccess(p)); + r.assertLogContains("Running on test-2", r.buildAndAssertSuccess(p)); + }); + } + // adapted from org.jenkinci.plugins.mock_slave.MockCloud + public static final class TestCloud extends Cloud { + TestCloud() { + super("test"); + } + @Override public boolean canProvision(Cloud.CloudState state) { + var label = state.getLabel(); + return label != null && label.matches(Label.parse("test")); + } + @Override public Collection provision(Cloud.CloudState state, int excessWorkload) { + var r = new ArrayList(); + while (excessWorkload > 0) { + r.add(new NodeProvisioner.PlannedNode("test", Computer.threadPoolForRemoting.submit(() -> new TestCloudSlave()), 1)); + excessWorkload -= 1; + } + return r; + } + @TestExtension("cloud") public static final class DescriptorImpl extends Descriptor { + private long counter; + public DescriptorImpl() { + load(); + NodeProvisioner.NodeProvisionerInvoker.INITIALDELAY = 1000; + NodeProvisioner.NodeProvisionerInvoker.RECURRENCEPERIOD = 1000; + } + synchronized long newNodeNumber() { + counter++; + save(); + return counter; + } + } + private static final class TestCloudSlave extends AbstractCloudSlave { + TestCloudSlave() throws Exception { + this("test-" + ExtensionList.lookupSingleton(TestCloud.DescriptorImpl.class).newNodeNumber()); + } + private TestCloudSlave(String name) throws Exception { + super(name, new File(new File(Jenkins.get().getRootDir(), "agents"), name).getAbsolutePath(), + new SimpleCommandLauncher(String.format("\"%s/bin/java\" -jar \"%s\"", + System.getProperty("java.home"), + new File(Jenkins.get().getJnlpJars("agent.jar").getURL().toURI())))); + setMode(Node.Mode.EXCLUSIVE); + setNumExecutors(1); + setLabelString("test"); + setRetentionStrategy(new OnceRetentionStrategy(1)); + } + @Override public AbstractCloudComputer createComputer() { + return new AbstractCloudComputer<>(this); + } + @Override protected void _terminate(TaskListener listener) {} + @TestExtension("cloud") public static final class DescriptorImpl extends Slave.SlaveDescriptor { + @Override public boolean isInstantiable() { + return false; + } + } + } + } + +} diff --git a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java index 121dd5f4..56b210ad 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java @@ -40,18 +40,14 @@ import hudson.Functions; import hudson.model.Computer; import hudson.model.Executor; -import hudson.model.Item; import hudson.model.Job; import hudson.model.Label; -import hudson.model.Node; import hudson.model.Queue; import hudson.model.Result; import hudson.model.Run; import hudson.model.Slave; import hudson.model.User; import hudson.model.labels.LabelAtom; -import hudson.model.queue.CauseOfBlockage; -import hudson.model.queue.QueueTaskDispatcher; import hudson.security.ACL; import hudson.security.ACLContext; import hudson.slaves.DumbSlave; @@ -123,7 +119,6 @@ import org.jvnet.hudson.test.LoggerRule; import org.jvnet.hudson.test.MockAuthorizationStrategy; import org.jvnet.hudson.test.MockFolder; -import org.jvnet.hudson.test.TestExtension; /** Tests pertaining to {@code node} and {@code sh} steps. */ @RunWith(Parameterized.class) @@ -1077,29 +1072,6 @@ private static List currentLabels(JenkinsRule r) { }); } - /** - * @see PipelineOnlyTaskDispatcher - */ - @Issue("JENKINS-53837") - @Test public void queueTaskOwnerCorrectWhenRestarting() throws Throwable { - sessions.then(r -> { - WorkflowJob p = r.createProject(WorkflowJob.class, "p1"); - p.setDefinition(new CpsFlowDefinition("node {\n" + - " semaphore('wait')\n" + - "}", true)); - WorkflowRun b = p.scheduleBuild2(0).waitForStart(); - SemaphoreStep.waitForStart("wait/1", b); - }); - sessions.then(r -> { - WorkflowJob p = r.jenkins.getItemByFullName("p1", WorkflowJob.class); - WorkflowRun b = p.getBuildByNumber(1); - SemaphoreStep.success("wait/1", null); - r.waitForCompletion(b); - r.assertBuildStatusSuccess(b); - r.assertLogNotContains("Non-Pipeline tasks are forbidden!", b); - }); - } - @Issue("JENKINS-58900") @Test public void nodeDisconnectMissingContextVariableException() throws Throwable { sessions.then(r -> { @@ -1267,31 +1239,6 @@ private static class FallbackAuthenticator extends QueueItemAuthenticator { } } - @TestExtension("queueTaskOwnerCorrectWhenRestarting") - public static class PipelineOnlyTaskDispatcher extends QueueTaskDispatcher { - @Override - public CauseOfBlockage canTake(Node node, Queue.BuildableItem item) { - Queue.Task t = item.task; - while (!(t instanceof Item) && (t != null)) { - final Queue.Task ownerTask = t.getOwnerTask(); - if (t == ownerTask) { - break; - } - t = ownerTask; - } - if (t instanceof WorkflowJob) { - return null; - } - final Queue.Task finalT = t; - return new CauseOfBlockage() { - @Override - public String getShortDescription() { - return "Non-Pipeline tasks are forbidden! Not building: " + finalT; - } - }; - } - } - private void createNOnlineAgentWithLabels(JenkinsRule r, int number, String label) throws Exception { // create all the slaves then wait for them to connect as it will be quicker as agents connect in parallel ArrayList agents = new ArrayList<>();