Skip to content

Commit

Permalink
Merge pull request #323 from jglick/USE_WATCHING-JENKINS-52165
Browse files Browse the repository at this point in the history
[JENKINS-52165] Stabilize and better test `USE_WATCHING`
  • Loading branch information
jglick authored Jul 28, 2023
2 parents 9b84e23 + efdb3c0 commit b30aaca
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 118 deletions.
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
buildPlugin(useContainerAgent: true, configurations: [
buildPlugin(useContainerAgent: true, forkCount: '1C', configurations: [
[platform: 'linux', jdk: 17],
[platform: 'windows', jdk: 11],
])
20 changes: 19 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
<properties>
<changelist>999999-SNAPSHOT</changelist>
<jenkins.version>2.361.4</jenkins.version>
<!-- TODO until in plugin-pom -->
<jenkins-test-harness.version>2042.v787a_641a_9b_26</jenkins-test-harness.version>
<useBeta>true</useBeta>
<gitHubRepo>jenkinsci/${project.artifactId}-plugin</gitHubRepo>
<hpi.compatibleSinceVersion>2.40</hpi.compatibleSinceVersion>
Expand All @@ -76,10 +78,26 @@
<dependency>
<groupId>io.jenkins.tools.bom</groupId>
<artifactId>bom-2.361.x</artifactId>
<version>1654.vcb_69d035fa_20</version>
<version>2081.v85885a_d2e5c5</version>
<scope>import</scope>
<type>pom</type>
</dependency>
<!-- TODO until in BOM -->
<dependency>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>durable-task</artifactId>
<version>513.vc48a_a_075a_d93</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-api</artifactId>
<version>1248.v4b_91043341d2</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>credentials-binding</artifactId>
<version>621.v58c0fb_d285a_c</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import hudson.AbortException;
import hudson.EnvVars;
import hudson.Extension;
import hudson.FilePath;
import hudson.Functions;
import hudson.Launcher;
import hudson.Util;
import hudson.init.Terminator;
import hudson.model.Computer;
import hudson.model.Node;
import hudson.model.Result;
import hudson.model.Run;
import hudson.model.TaskListener;
import hudson.remoting.Channel;
import hudson.remoting.ChannelClosedException;
import hudson.slaves.ComputerListener;
import hudson.slaves.OfflineCause;
import hudson.util.DaemonThreadFactory;
import hudson.util.FormValidation;
import hudson.util.LogTaskListener;
Expand All @@ -51,8 +55,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Set;
Expand All @@ -72,7 +74,9 @@
import org.jenkinsci.plugins.durabletask.Handler;
import org.jenkinsci.plugins.workflow.FilePathUtils;
import org.jenkinsci.plugins.workflow.actions.LabelAction;
import org.jenkinsci.plugins.workflow.flow.FlowExecutionList;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.log.OutputStreamTaskListener;
import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl;
import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException;
import org.jenkinsci.plugins.workflow.steps.Step;
Expand Down Expand Up @@ -437,26 +441,23 @@ private void getWorkspaceProblem(Exception x) {
}

/**
* Interprets {@link PrintStream#close} as a signal to end a final newline if necessary.
* Interprets {@link OutputStream#close} as a signal to end a final newline if necessary.
*/
private static final class NewlineSafeTaskListener implements TaskListener {
private static final class NewlineSafeTaskListener extends OutputStreamTaskListener.Default {

private static final long serialVersionUID = 1;

private final TaskListener delegate;
private transient PrintStream logger;
private transient OutputStream out;

NewlineSafeTaskListener(TaskListener delegate) {
this.delegate = delegate;
}

// Similar to DecoratedTaskListener:
@NonNull
@Override public synchronized PrintStream getLogger() {
if (logger == null) {
LOGGER.fine("creating filtered stream");
OutputStream base = delegate.getLogger();
OutputStream filtered = new FilterOutputStream(base) {
@Override public synchronized OutputStream getOutputStream() {
if (out == null) {
out = new FilterOutputStream(OutputStreamTaskListener.getOutputStream(delegate)) {
boolean nl = true; // empty string does not need a newline
@Override public void write(int b) throws IOException {
super.write(b);
Expand All @@ -475,14 +476,12 @@ private static final class NewlineSafeTaskListener implements TaskListener {
}
flush(); // do *not* call base.close() here, unlike super.close()
}
@Override public String toString() {
return "NewlineSafeTaskListener.output[" + out + "]";
}
};
try {
logger = new PrintStream(filtered, false, "UTF-8");
} catch (UnsupportedEncodingException x) {
throw new AssertionError(x);
}
}
return logger;
return out;
}

}
Expand Down Expand Up @@ -589,9 +588,12 @@ private void check() {
Integer exitCode = controller.exitStatus(workspace, launcher(), listener);
if (exitCode == null) {
LOGGER.log(Level.FINE, "still running in {0} on {1}", new Object[] {remote, node});
} else if (recurrencePeriod == 0) {
LOGGER.fine(() -> "late check in " + remote + " on " + node + " ignored");
} else if (awaitingAsynchExit) {
recurrencePeriod = 0;
getContext().onFailure(new AbortException("script apparently exited with code " + exitCode + " but asynchronous notification was lost"));
listener.getLogger().println("script apparently exited with code " + exitCode + " but asynchronous notification was lost");
handleExit(exitCode, () -> controller.getOutput(workspace, launcher()));
} else {
LOGGER.log(Level.FINE, "exited with {0} in {1} on {2}; expect asynchronous exit soon", new Object[] {exitCode, remote, node});
awaitingAsynchExit = true;
Expand Down Expand Up @@ -627,6 +629,7 @@ private void check() {

// called remotely from HandlerImpl
@Override public void exited(int exitCode, byte[] output) throws Exception {
recurrencePeriod = 0;
try {
getContext().get(TaskListener.class);
} catch (IOException | InterruptedException x) {
Expand All @@ -643,7 +646,6 @@ private void check() {
getContext().onFailure(new IllegalStateException("did not expect output but got some"));
return;
}
recurrencePeriod = 0;
handleExit(exitCode, () -> output);
}

Expand All @@ -657,11 +659,11 @@ private void handleExit(int exitCode, OutputSupplier output) throws IOException,
getContext().onSuccess(returnStatus ? exitCode : returnStdout ? new String(output.produce(), StandardCharsets.UTF_8) : null);
} else {
if (returnStdout) {
listener().getLogger().write(output.produce()); // diagnostic
_listener().getLogger().write(output.produce()); // diagnostic
}
if (originalCause != null) {
// JENKINS-28822: Use the previous cause instead of throwing a new AbortException
listener().getLogger().println("script returned exit code " + exitCode);
_listener().getLogger().println("script returned exit code " + exitCode);
getContext().onFailure(originalCause);
} else {
getContext().onFailure(new AbortException("script returned exit code " + exitCode));
Expand Down Expand Up @@ -694,21 +696,6 @@ private void setupTimer(long initialRecurrencePeriod) {

private static class HandlerImpl extends Handler {

private static final Field printStreamDelegate;
static {
try {
printStreamDelegate = FilterOutputStream.class.getDeclaredField("out");
} catch (NoSuchFieldException x) {
// Defined in Java Platform and protected, so should not happen.
throw new ExceptionInInitializerError(x);
}
try {
printStreamDelegate.setAccessible(true);
} catch (/* TODO Java 11+ InaccessibleObjectException */RuntimeException x) {
LOGGER.log(Level.WARNING, "On Java 17 error handling is degraded unless `--add-opens java.base/java.io=ALL-UNNAMED` is passed to the agent", x);
}
}

private static final long serialVersionUID = 1L;

private final ExecutionRemotable execution;
Expand All @@ -721,34 +708,24 @@ private static class HandlerImpl extends Handler {

@Override public void output(@NonNull InputStream stream) throws Exception {
PrintStream ps = listener.getLogger();
OutputStream os = OutputStreamTaskListener.getOutputStream(listener);
try {
if (ps.getClass() == PrintStream.class) {
// Try to extract the underlying stream, since swallowing exceptions is undesirable and PrintStream.checkError is useless.
OutputStream os = ps;
try {
os = (OutputStream) printStreamDelegate.get(ps);
} catch (IllegalAccessException x) {
LOGGER.log(Level.FINE, "using PrintStream rather than underlying FilterOutputStream.out", x);
}
if (os == null) { // like PrintStream.ensureOpen
throw new IOException("Stream closed");
}
synchronized (ps) { // like PrintStream.write overloads do
IOUtils.copy(stream, os);
}
} else {
// A subclass. Who knows why, but trust any write(…) overrides it may have.
IOUtils.copy(stream, ps);
synchronized (ps) { // like PrintStream.write overloads do
IOUtils.copy(stream, os);
}
LOGGER.finest(() -> "print to " + os + " succeeded");
} catch (ChannelClosedException x) {
LOGGER.log(Level.FINE, null, x);
// We are giving up on this watch. Wait for some call to getWorkspace to rewatch.
throw x;
} catch (Exception x) {
LOGGER.log(Level.FINE, null, x);
// Try to report it to the controller.
try {
execution.problem(x);
// OK, printed to log on controller side, we may have lost some text but could continue.
} catch (Exception x2) { // e.g., RemotingSystemException
LOGGER.log(Level.FINE, null, x2);
// No, channel seems to be broken, give up on this watch.
throw x;
}
Expand All @@ -762,4 +739,35 @@ private static class HandlerImpl extends Handler {

}

@Extension public static final class AgentReconnectionListener extends ComputerListener {

@Override public void onOffline(Computer c, OfflineCause cause) {
if (Jenkins.get().isTerminating()) {
LOGGER.fine(() -> "Skipping check on " + c.getName() + " during shutdown");
return;
}
check(c);
}

@Override public void onOnline(Computer c, TaskListener listener) throws IOException, InterruptedException {
if (!FlowExecutionList.get().isResumptionComplete()) {
LOGGER.fine(() -> "Skipping check on " + c.getName() + " before builds are ready");
return;
}
check(c);
}

private void check(Computer c) {
String name = c.getName();
StepExecution.applyAll(Execution.class, exec -> {
if (exec.watching && exec.node.equals(name)) {
LOGGER.fine(() -> "Online/offline event on " + name + ", checking current status of " + exec.remote + " soon");
threadPool().schedule(exec::check, 15, TimeUnit.SECONDS);
}
return null;
});
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public final class AgentErrorCondition extends ErrorCondition {
c -> c instanceof ExecutorStepExecution.RemovedNodeCause || c instanceof ExecutorStepExecution.QueueTaskCancelled)) {
return true;
}
if (isClosedChannel(t)) {
if (isClosedChannelException(t)) {
return true;
}
if (t instanceof MissingContextVariableException) {
Expand All @@ -75,7 +75,8 @@ public final class AgentErrorCondition extends ErrorCondition {
return false;
}

private static boolean isClosedChannel(Throwable t) {
// TODO https://github.com/jenkinsci/remoting/pull/657
private static boolean isClosedChannelException(Throwable t) {
if (t instanceof ClosedChannelException) {
return true;
} else if (t instanceof ChannelClosedException) {
Expand All @@ -85,7 +86,7 @@ private static boolean isClosedChannel(Throwable t) {
} else if (t == null) {
return false;
} else {
return isClosedChannel(t.getCause()) || Stream.of(t.getSuppressed()).anyMatch(AgentErrorCondition::isClosedChannel);
return isClosedChannelException(t.getCause()) || Stream.of(t.getSuppressed()).anyMatch(AgentErrorCondition::isClosedChannelException);
}
}

Expand Down
Loading

0 comments on commit b30aaca

Please sign in to comment.