diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index dfc863e8f639..93d151f3e058 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -29,10 +29,12 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(":runners:portability:java") + implementation library.java.joda_time implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit + testImplementation library.java.mockito_core testImplementation library.java.truth } diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java index fba2eec99c5c..620d5508f22a 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java @@ -87,6 +87,14 @@ void stop() { } } + /** Reports whether the Prism executable {@link Process#isAlive()}. */ + boolean isAlive() { + if (process == null) { + return false; + } + return process.isAlive(); + } + /** * Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to STDOUT. */ diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java new file mode 100644 index 000000000000..a551196c9b6f --- /dev/null +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import java.io.IOException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * The {@link PipelineResult} of executing a {@link org.apache.beam.sdk.Pipeline} using the {@link + * PrismRunner} and an internal {@link PipelineResult} delegate. + */ +class PrismPipelineResult implements PipelineResult { + + static PrismPipelineResult of(PipelineResult delegate, PrismExecutor executor) { + return new PrismPipelineResult(delegate, executor::stop); + } + + private final PipelineResult delegate; + private final Runnable cancel; + private @Nullable MetricResults terminalMetrics; + private @Nullable State terminalState; + + /** + * Instantiate the {@link PipelineResult} from the {@param delegate} and a {@param cancel} to be + * called when stopping the underlying executable Job management service. + */ + PrismPipelineResult(PipelineResult delegate, Runnable cancel) { + this.delegate = delegate; + this.cancel = cancel; + } + + /** Forwards the result of the delegate {@link PipelineResult#getState}. */ + @Override + public State getState() { + if (terminalState != null) { + return terminalState; + } + return delegate.getState(); + } + + /** + * Forwards the result of the delegate {@link PipelineResult#cancel}. Invokes {@link + * PrismExecutor#stop()} before returning the resulting {@link + * org.apache.beam.sdk.PipelineResult.State}. + */ + @Override + public State cancel() throws IOException { + State state = delegate.cancel(); + this.terminalMetrics = delegate.metrics(); + this.terminalState = state; + this.cancel.run(); + return state; + } + + /** + * Forwards the result of the delegate {@link PipelineResult#waitUntilFinish(Duration)}. Invokes + * {@link PrismExecutor#stop()} before returning the resulting {@link + * org.apache.beam.sdk.PipelineResult.State}. + */ + @Override + public State waitUntilFinish(Duration duration) { + State state = delegate.waitUntilFinish(duration); + this.terminalMetrics = delegate.metrics(); + this.terminalState = state; + this.cancel.run(); + return state; + } + + /** + * Forwards the result of the delegate {@link PipelineResult#waitUntilFinish}. Invokes {@link + * PrismExecutor#stop()} before returning the resulting {@link + * org.apache.beam.sdk.PipelineResult.State}. + */ + @Override + public State waitUntilFinish() { + State state = delegate.waitUntilFinish(); + this.terminalMetrics = delegate.metrics(); + this.terminalState = state; + this.cancel.run(); + return state; + } + + /** Forwards the result of the delegate {@link PipelineResult#metrics}. */ + @Override + public MetricResults metrics() { + if (terminalMetrics != null) { + return terminalMetrics; + } + return delegate.metrics(); + } +} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java new file mode 100644 index 000000000000..2ad7e2eb3dd9 --- /dev/null +++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link PrismPipelineResult}. */ +@RunWith(JUnit4.class) +public class PrismPipelineResultTest { + + final PrismExecutor exec = executor(); + + @Before + public void setUp() throws IOException { + exec.execute(); + assertThat(exec.isAlive()).isTrue(); + } + + @After + public void tearDown() { + assertThat(exec.isAlive()).isFalse(); + } + + @Test + public void givenTerminated_reportsState() { + PipelineResult delegate = mock(PipelineResult.class); + when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.FAILED); + PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); + // Assigns terminal state. + underTest.waitUntilFinish(); + assertThat(underTest.getState()).isEqualTo(PipelineResult.State.FAILED); + } + + @Test + public void givenNotTerminated_reportsState() { + PipelineResult delegate = mock(PipelineResult.class); + when(delegate.getState()).thenReturn(PipelineResult.State.RUNNING); + PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); + assertThat(underTest.getState()).isEqualTo(PipelineResult.State.RUNNING); + exec.stop(); + } + + @Test + public void cancelStopsExecutable_reportsTerminalState() throws IOException { + PipelineResult delegate = mock(PipelineResult.class); + when(delegate.cancel()).thenReturn(PipelineResult.State.CANCELLED); + PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); + assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.CANCELLED); + } + + @Test + public void givenTerminated_cancelIsNoop_reportsTerminalState() throws IOException { + PipelineResult delegate = mock(PipelineResult.class); + when(delegate.cancel()).thenReturn(PipelineResult.State.FAILED); + PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); + assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.FAILED); + } + + @Test + public void givenPipelineRunWithDuration_waitUntilFinish_reportsTerminalState() { + PipelineResult delegate = mock(PipelineResult.class); + when(delegate.waitUntilFinish(Duration.millis(3000L))) + .thenReturn(PipelineResult.State.CANCELLED); + PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); + assertThat(underTest.waitUntilFinish(Duration.millis(3000L))) + .isEqualTo(PipelineResult.State.CANCELLED); + } + + @Test + public void givenTerminated_waitUntilFinishIsNoop_reportsTerminalState() { + PipelineResult delegate = mock(PipelineResult.class); + when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE); + PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); + // Terminate Job as setup for additional call. + underTest.waitUntilFinish(); + assertThat(underTest.waitUntilFinish()).isEqualTo(PipelineResult.State.DONE); + } + + @Test + public void givenNotTerminated_reportsMetrics() { + PipelineResult delegate = mock(PipelineResult.class); + when(delegate.metrics()).thenReturn(mock(MetricResults.class)); + PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); + assertThat(underTest.metrics()).isNotNull(); + exec.stop(); + } + + @Test + public void givenTerminated_reportsTerminatedMetrics() { + PipelineResult delegate = mock(PipelineResult.class); + when(delegate.metrics()).thenReturn(mock(MetricResults.class)); + when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE); + PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); + // Terminate Job as setup for additional call. + underTest.waitUntilFinish(); + assertThat(underTest.metrics()).isNotNull(); + } + + private static PrismExecutor executor() { + return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()).build(); + } +}