-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent StepExecutionIterator
from leaking memory in cases where a single processed execution has a stuck CPS VM thread
#347
Changes from 5 commits
ce222c4
9b6c0ae
b2c93a4
7100611
2de670d
b15fde4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
|
||
package org.jenkinsci.plugins.workflow.flow; | ||
|
||
import static org.awaitility.Awaitility.await; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.hasItem; | ||
|
@@ -38,20 +39,22 @@ | |
import hudson.model.TaskListener; | ||
import hudson.model.queue.QueueTaskFuture; | ||
import java.io.Serializable; | ||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.lang.ref.WeakReference; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.function.Supplier; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.logging.Level; | ||
import org.hamcrest.Matcher; | ||
import jenkins.model.Jenkins; | ||
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; | ||
import org.jenkinsci.plugins.workflow.job.WorkflowJob; | ||
import org.jenkinsci.plugins.workflow.job.WorkflowRun; | ||
import org.jenkinsci.plugins.workflow.steps.Step; | ||
import org.jenkinsci.plugins.workflow.steps.StepContext; | ||
import org.jenkinsci.plugins.workflow.steps.StepDescriptor; | ||
import org.jenkinsci.plugins.workflow.steps.StepExecution; | ||
import org.jenkinsci.plugins.workflow.steps.StepExecutions; | ||
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; | ||
import org.junit.ClassRule; | ||
import org.junit.Test; | ||
|
@@ -60,6 +63,7 @@ | |
import org.jvnet.hudson.test.Issue; | ||
import org.jvnet.hudson.test.LoggerRule; | ||
import org.jvnet.hudson.test.JenkinsSessionRule; | ||
import org.jvnet.hudson.test.MemoryAssert; | ||
import org.jvnet.hudson.test.TestExtension; | ||
import org.kohsuke.stapler.DataBoundConstructor; | ||
|
||
|
@@ -132,7 +136,7 @@ public class FlowExecutionListTest { | |
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175) | ||
at jenkins.model.Jenkins.<init>(Jenkins.java:1019) | ||
*/ | ||
waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); | ||
await().atMost(5, TimeUnit.SECONDS).until(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); | ||
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class); | ||
SemaphoreStep.success("wait/1", null); | ||
WorkflowRun b = p.getBuildByNumber(1); | ||
|
@@ -160,6 +164,34 @@ public class FlowExecutionListTest { | |
}); | ||
} | ||
|
||
@Test public void stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck() throws Throwable { | ||
sessions.then(r -> { | ||
var notStuck = r.createProject(WorkflowJob.class, "not-stuck"); | ||
notStuck.setDefinition(new CpsFlowDefinition("semaphore 'wait'", true)); | ||
var notStuckBuild = notStuck.scheduleBuild2(0).waitForStart(); | ||
SemaphoreStep.waitForStart("wait/1", notStuckBuild); | ||
WeakReference<Object> notStuckBuildRef = new WeakReference<>(notStuckBuild); | ||
// Create a Pipeline that runs a long-lived task on its CpsVmExecutorService, causing it to get stuck. | ||
var stuck = r.createProject(WorkflowJob.class, "stuck"); | ||
stuck.setDefinition(new CpsFlowDefinition("blockSynchronously 'stuck'", false)); | ||
var stuckBuild = stuck.scheduleBuild2(0).waitForStart(); | ||
await().atMost(5, TimeUnit.SECONDS).until(() -> SynchronousBlockingStep.isStarted("stuck")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or |
||
// Make FlowExecutionList$StepExecutionIteratorImpl.applyAll submit a task to the CpsVmExecutorService | ||
// for stuck #1 that will never complete, so the resulting future will never complete. | ||
StepExecution.applyAll(e -> null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this return value be kept in a local variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not necessary for the leak, if that's what you mean. Even if the |
||
// Let notStuckBuild complete and clean up all references. | ||
SemaphoreStep.success("wait/1", null); | ||
r.waitForCompletion(notStuckBuild); | ||
notStuckBuild = null; // Clear out the local variable in this thread. | ||
Jenkins.get().getQueue().clearLeftItems(); // Otherwise we'd have to wait 5 minutes for the cache to be cleared. | ||
// Make sure that the reference can be GC'd. | ||
MemoryAssert.assertGC(notStuckBuildRef, true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Revert the fix and this will fail. The PR description shows the reference path preventing the build from being cleaned up. |
||
// Allow stuck #1 to complete so the test can be cleaned up promptly. | ||
SynchronousBlockingStep.unblock("stuck"); | ||
r.waitForCompletion(stuckBuild); | ||
}); | ||
} | ||
|
||
public static class NonResumableStep extends Step implements Serializable { | ||
public static final long serialVersionUID = 1L; | ||
@DataBoundConstructor | ||
|
@@ -198,14 +230,59 @@ public String getFunctionName() { | |
} | ||
|
||
/** | ||
* Wait up to 5 seconds for the given supplier to return a matching value. | ||
* Blocks the CPS VM thread synchronously (bad!) to test related problems. | ||
*/ | ||
private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException { | ||
Instant end = Instant.now().plus(Duration.ofSeconds(5)); | ||
while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) { | ||
Thread.sleep(100L); | ||
public static class SynchronousBlockingStep extends Step implements Serializable { | ||
private static final long serialVersionUID = 1L; | ||
private static final Map<String, State> blocked = new HashMap<>(); | ||
private final String id; | ||
|
||
@DataBoundConstructor | ||
public SynchronousBlockingStep(String id) { | ||
this.id = id; | ||
if (blocked.put(id, State.NOT_STARTED) != null) { | ||
throw new IllegalArgumentException("Attempting to reuse ID: " + id); | ||
} | ||
} | ||
|
||
@Override | ||
public StepExecution start(StepContext context) throws Exception { | ||
return StepExecutions.synchronous(context, c -> { | ||
blocked.put(id, State.BLOCKED); | ||
c.get(TaskListener.class).getLogger().println(id + " blocked"); | ||
while (blocked.get(id) == State.BLOCKED) { | ||
Thread.sleep(100L); | ||
} | ||
c.get(TaskListener.class).getLogger().println(id + " unblocked "); | ||
return null; | ||
}); | ||
} | ||
|
||
public static boolean isStarted(String id) { | ||
var state = blocked.get(id); | ||
return state != null && state != State.NOT_STARTED; | ||
} | ||
|
||
public static void unblock(String id) { | ||
blocked.put(id, State.UNBLOCKED); | ||
} | ||
|
||
private enum State { | ||
NOT_STARTED, | ||
BLOCKED, | ||
UNBLOCKED, | ||
} | ||
|
||
@TestExtension public static class DescriptorImpl extends StepDescriptor { | ||
dwnusbaum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@Override | ||
public Set<? extends Class<?>> getRequiredContext() { | ||
return Collections.singleton(TaskListener.class); | ||
} | ||
@Override | ||
public String getFunctionName() { | ||
return "blockSynchronously"; | ||
} | ||
} | ||
assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yikes. Project Loom would be very welcome in code like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking more carefully through Guava's Javadoc I think I can use
FluentFuture
to simplify this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually
FluentFuture
doesn't really make things any clearer, so I will leave it.