Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent StepExecutionIterator from leaking memory in cases where a single processed execution has a stuck CPS VM thread #347

Merged
merged 6 commits into from
Aug 9, 2024
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>scm-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-job</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,24 +253,25 @@

for (FlowExecution e : FlowExecutionList.get()) {
ListenableFuture<List<StepExecution>> execs = e.getCurrentExecutions(false);
all.add(execs);
Futures.addCallback(execs,new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
for (StepExecution e : result) {
try {
f.apply(e);
} catch (RuntimeException x) {
LOGGER.log(Level.WARNING, null, x);
}
// It is important that the combined future's return values do not reference the individual step
// executions, so we use transform instead of addCallback. Otherwise, it is possible to leak references
// to the WorkflowRun for each processed StepExecution in the case where a single live FlowExecution
// has a stuck CpsVmExecutorService that prevents the getCurrentExecutions future from completing.
ListenableFuture<Void> results = Futures.transform(execs, (List<StepExecution> result) -> {
for (StepExecution se : result) {
try {
f.apply(se);
} catch (RuntimeException x) {
LOGGER.log(Level.WARNING, null, x);
}
}

@Override
public void onFailure(@NonNull Throwable t) {
LOGGER.log(Level.WARNING, null, t);
}
return null;
}, MoreExecutors.directExecutor());
ListenableFuture<Void> resultsWithWarningsLogged = Futures.catching(results, Throwable.class, t -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes. Project Loom would be very welcome in code like this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking more carefully through Guava's Javadoc I think I can use FluentFuture to simplify this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually FluentFuture doesn't really make things any clearer, so I will leave it.

LOGGER.log(Level.WARNING, null, t);
return null;

Check warning on line 272 in src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered lines

Lines 271-272 are not covered by tests
}, MoreExecutors.directExecutor());
all.add(resultsWithWarningsLogged);
}

return Futures.allAsList(all);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package org.jenkinsci.plugins.workflow.flow;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
Expand All @@ -38,20 +39,22 @@
import hudson.model.TaskListener;
import hudson.model.queue.QueueTaskFuture;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.hamcrest.Matcher;
import jenkins.model.Jenkins;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.steps.StepExecutions;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -60,6 +63,7 @@
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.LoggerRule;
import org.jvnet.hudson.test.JenkinsSessionRule;
import org.jvnet.hudson.test.MemoryAssert;
import org.jvnet.hudson.test.TestExtension;
import org.kohsuke.stapler.DataBoundConstructor;

Expand Down Expand Up @@ -132,7 +136,7 @@ public class FlowExecutionListTest {
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175)
at jenkins.model.Jenkins.<init>(Jenkins.java:1019)
*/
waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
await().atMost(5, TimeUnit.SECONDS).until(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
SemaphoreStep.success("wait/1", null);
WorkflowRun b = p.getBuildByNumber(1);
Expand Down Expand Up @@ -160,6 +164,34 @@ public class FlowExecutionListTest {
});
}

@Test public void stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck() throws Throwable {
sessions.then(r -> {
var notStuck = r.createProject(WorkflowJob.class, "not-stuck");
notStuck.setDefinition(new CpsFlowDefinition("semaphore 'wait'", true));
var notStuckBuild = notStuck.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("wait/1", notStuckBuild);
WeakReference<Object> notStuckBuildRef = new WeakReference<>(notStuckBuild);
// Create a Pipeline that runs a long-lived task on its CpsVmExecutorService, causing it to get stuck.
var stuck = r.createProject(WorkflowJob.class, "stuck");
stuck.setDefinition(new CpsFlowDefinition("blockSynchronously 'stuck'", false));
var stuckBuild = stuck.scheduleBuild2(0).waitForStart();
await().atMost(5, TimeUnit.SECONDS).until(() -> SynchronousBlockingStep.isStarted("stuck"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or waitForMessage would probably suffice.

// Make FlowExecutionList$StepExecutionIteratorImpl.applyAll submit a task to the CpsVmExecutorService
// for stuck #1 that will never complete, so the resulting future will never complete.
StepExecution.applyAll(e -> null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return value be kept in a local variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary for the leak, if that's what you mean. Even if the StepExecution.applyAll caller completely ignores the return value, the stuck CPS VM thread holds a strong reference to the aggregate future because the SingleLaneExecutorService's task queue has a reference to the SettableFuture for the getCurrentExecutions call, and then that future references the aggregate future via a listener just due to the implementation of Futures.allAsList.

// Let notStuckBuild complete and clean up all references.
SemaphoreStep.success("wait/1", null);
r.waitForCompletion(notStuckBuild);
notStuckBuild = null; // Clear out the local variable in this thread.
Jenkins.get().getQueue().clearLeftItems(); // Otherwise we'd have to wait 5 minutes for the cache to be cleared.
// Make sure that the reference can be GC'd.
MemoryAssert.assertGC(notStuckBuildRef, true);
Copy link
Member Author

@dwnusbaum dwnusbaum Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert the fix and this will fail. The PR description shows the reference path preventing the build from being cleaned up.

// Allow stuck #1 to complete so the test can be cleaned up promptly.
SynchronousBlockingStep.unblock("stuck");
r.waitForCompletion(stuckBuild);
});
}

public static class NonResumableStep extends Step implements Serializable {
public static final long serialVersionUID = 1L;
@DataBoundConstructor
Expand Down Expand Up @@ -198,14 +230,59 @@ public String getFunctionName() {
}

/**
* Wait up to 5 seconds for the given supplier to return a matching value.
* Blocks the CPS VM thread synchronously (bad!) to test related problems.
*/
private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException {
Instant end = Instant.now().plus(Duration.ofSeconds(5));
while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) {
Thread.sleep(100L);
public static class SynchronousBlockingStep extends Step implements Serializable {
private static final long serialVersionUID = 1L;
private static final Map<String, State> blocked = new HashMap<>();
private final String id;

@DataBoundConstructor
public SynchronousBlockingStep(String id) {
this.id = id;
if (blocked.put(id, State.NOT_STARTED) != null) {
throw new IllegalArgumentException("Attempting to reuse ID: " + id);
}
}

@Override
public StepExecution start(StepContext context) throws Exception {
return StepExecutions.synchronous(context, c -> {
blocked.put(id, State.BLOCKED);
c.get(TaskListener.class).getLogger().println(id + " blocked");
while (blocked.get(id) == State.BLOCKED) {
Thread.sleep(100L);
}
c.get(TaskListener.class).getLogger().println(id + " unblocked ");
return null;
});
}

public static boolean isStarted(String id) {
var state = blocked.get(id);
return state != null && state != State.NOT_STARTED;
}

public static void unblock(String id) {
blocked.put(id, State.UNBLOCKED);
}

private enum State {
NOT_STARTED,
BLOCKED,
UNBLOCKED,
}

@TestExtension("stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck") public static class DescriptorImpl extends StepDescriptor {
@Override
public Set<? extends Class<?>> getRequiredContext() {
return Collections.singleton(TaskListener.class);
}
@Override
public String getFunctionName() {
return "blockSynchronously";
}
}
assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher);
}

}
Loading