From 57d5ab5a144c03cba9d309fa2beed4baa984a474 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Fri, 23 Aug 2024 16:32:51 -0700 Subject: [PATCH] Cherry Pick #32294 (Java PrismRunner and TestPrismRunner) to the release. (#32304) --- runners/prism/java/build.gradle | 1 + .../beam/runners/prism/PrismExecutor.java | 5 +- .../beam/runners/prism/PrismLocator.java | 12 +- .../runners/prism/PrismPipelineOptions.java | 17 +++ .../runners/prism/PrismPipelineResult.java | 33 ++--- .../beam/runners/prism/PrismRunner.java | 72 ++++++++-- .../runners/prism/PrismRunnerRegistrar.java | 35 +++++ .../prism/TestPrismPipelineOptions.java | 23 ++++ .../beam/runners/prism/TestPrismRunner.java | 84 +++++++++++ .../beam/runners/prism/PrismExecutorTest.java | 8 +- .../prism/PrismPipelineResultTest.java | 130 ------------------ .../beam/runners/prism/PrismRunnerTest.java | 117 ++++++++++++---- 12 files changed, 336 insertions(+), 201 deletions(-) create mode 100644 runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunnerRegistrar.java create mode 100644 runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismPipelineOptions.java create mode 100644 runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismRunner.java delete mode 100644 runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 96ab4e70a579..deee8876af6f 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -37,6 +37,7 @@ dependencies { implementation library.java.slf4j_api implementation library.java.vendored_grpc_1_60_1 implementation library.java.vendored_guava_32_1_2_jre + compileOnly library.java.hamcrest testImplementation library.java.junit testImplementation library.java.mockito_core diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java index 620d5508f22a..fda5db923a7f 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java @@ -45,6 +45,9 @@ abstract class PrismExecutor { private static final Logger LOG = LoggerFactory.getLogger(PrismExecutor.class); + static final String IDLE_SHUTDOWN_TIMEOUT = "-idle_shutdown_timeout=%s"; + static final String JOB_PORT_FLAG_TEMPLATE = "-job_port=%s"; + static final String SERVE_HTTP_FLAG_TEMPLATE = "-serve_http=%s"; protected @MonotonicNonNull Process process; protected ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -71,7 +74,7 @@ void stop() { } executorService.shutdown(); try { - boolean ignored = executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS); + boolean ignored = executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { } if (process == null) { diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java index f32e4d88f42b..f69260344d12 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java @@ -127,7 +127,9 @@ private String resolve(Path from, Path to) throws IOException { } copyFn.accept(from.toUri().toURL().openStream(), to); - ByteStreams.copy(from.toUri().toURL().openStream(), Files.newOutputStream(to)); + try (OutputStream out = Files.newOutputStream(to)) { + ByteStreams.copy(from.toUri().toURL().openStream(), out); + } Files.setPosixFilePermissions(to, PERMS); return to.toString(); @@ -159,16 +161,16 @@ private static void unzip(InputStream from, Path to) { } private static void copy(InputStream from, Path to) { - try { - ByteStreams.copy(from, Files.newOutputStream(to)); + try (OutputStream out = Files.newOutputStream(to)) { + ByteStreams.copy(from, out); } catch (IOException e) { throw new RuntimeException(e); } } private static void download(URL from, Path to) { - try { - ByteStreams.copy(from.openStream(), Files.newOutputStream(to)); + try (OutputStream out = Files.newOutputStream(to)) { + ByteStreams.copy(from.openStream(), out); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java index ec0f8beb620a..6a6ca4e615d0 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.prism; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PortablePipelineOptions; @@ -25,6 +26,9 @@ * org.apache.beam.sdk.Pipeline} on the {@link PrismRunner}. */ public interface PrismPipelineOptions extends PortablePipelineOptions { + + String JOB_PORT_FLAG_NAME = "job_port"; + @Description( "Path or URL to a prism binary, or zipped binary for the current " + "platform (Operating System and Architecture). May also be an Apache " @@ -41,4 +45,17 @@ public interface PrismPipelineOptions extends PortablePipelineOptions { String getPrismVersionOverride(); void setPrismVersionOverride(String prismVersionOverride); + + @Description("Enable or disable Prism Web UI") + @Default.Boolean(true) + Boolean getEnableWebUI(); + + void setEnableWebUI(Boolean enableWebUI); + + @Description( + "Duration, represented as a String, that prism will wait for a new job before shutting itself down. Negative durations disable auto shutdown. Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\".") + @Default.String("5m") + String getIdleShutdownTimeout(); + + void setIdleShutdownTimeout(String idleShutdownTimeout); } diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java index a551196c9b6f..7508e505725e 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java @@ -20,7 +20,6 @@ import java.io.IOException; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; /** @@ -29,14 +28,8 @@ */ class PrismPipelineResult implements PipelineResult { - static PrismPipelineResult of(PipelineResult delegate, PrismExecutor executor) { - return new PrismPipelineResult(delegate, executor::stop); - } - private final PipelineResult delegate; - private final Runnable cancel; - private @Nullable MetricResults terminalMetrics; - private @Nullable State terminalState; + private final Runnable cleanup; /** * Instantiate the {@link PipelineResult} from the {@param delegate} and a {@param cancel} to be @@ -44,15 +37,16 @@ static PrismPipelineResult of(PipelineResult delegate, PrismExecutor executor) { */ PrismPipelineResult(PipelineResult delegate, Runnable cancel) { this.delegate = delegate; - this.cancel = cancel; + this.cleanup = cancel; + } + + Runnable getCleanup() { + return cleanup; } /** Forwards the result of the delegate {@link PipelineResult#getState}. */ @Override public State getState() { - if (terminalState != null) { - return terminalState; - } return delegate.getState(); } @@ -64,9 +58,7 @@ public State getState() { @Override public State cancel() throws IOException { State state = delegate.cancel(); - this.terminalMetrics = delegate.metrics(); - this.terminalState = state; - this.cancel.run(); + this.cleanup.run(); return state; } @@ -78,9 +70,7 @@ public State cancel() throws IOException { @Override public State waitUntilFinish(Duration duration) { State state = delegate.waitUntilFinish(duration); - this.terminalMetrics = delegate.metrics(); - this.terminalState = state; - this.cancel.run(); + this.cleanup.run(); return state; } @@ -92,18 +82,13 @@ public State waitUntilFinish(Duration duration) { @Override public State waitUntilFinish() { State state = delegate.waitUntilFinish(); - this.terminalMetrics = delegate.metrics(); - this.terminalState = state; - this.cancel.run(); + this.cleanup.run(); return state; } /** Forwards the result of the delegate {@link PipelineResult#metrics}. */ @Override public MetricResults metrics() { - if (terminalMetrics != null) { - return terminalMetrics; - } return delegate.metrics(); } } diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java index 1ea4367292b0..6099db4b63ee 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java @@ -17,6 +17,12 @@ */ package org.apache.beam.runners.prism; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Arrays; import org.apache.beam.runners.portability.PortableRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -34,34 +40,38 @@ * submit to an already running Prism service, use the {@link PortableRunner} with the {@link * PortablePipelineOptions#getJobEndpoint()} option instead. Prism is a {@link * org.apache.beam.runners.portability.PortableRunner} maintained at sdks/go/cmd/prism. + * href="https://github.com/apache/beam/tree/master/sdks/go/cmd/prism">sdks/go/cmd/prism. For + * testing, use {@link TestPrismRunner}. */ -// TODO(https://github.com/apache/beam/issues/31793): add public modifier after finalizing -// PrismRunner. Depends on: https://github.com/apache/beam/issues/31402 and -// https://github.com/apache/beam/issues/31792. -class PrismRunner extends PipelineRunner { +public class PrismRunner extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(PrismRunner.class); - private static final String DEFAULT_PRISM_ENDPOINT = "localhost:8073"; - - private final PortableRunner internal; private final PrismPipelineOptions prismPipelineOptions; - private PrismRunner(PortableRunner internal, PrismPipelineOptions prismPipelineOptions) { - this.internal = internal; + protected PrismRunner(PrismPipelineOptions prismPipelineOptions) { this.prismPipelineOptions = prismPipelineOptions; } + PrismPipelineOptions getPrismPipelineOptions() { + return prismPipelineOptions; + } + /** * Invoked from {@link Pipeline#run} where {@link PrismRunner} instantiates using {@link * PrismPipelineOptions} configuration details. */ public static PrismRunner fromOptions(PipelineOptions options) { PrismPipelineOptions prismPipelineOptions = options.as(PrismPipelineOptions.class); + validate(prismPipelineOptions); assignDefaultsIfNeeded(prismPipelineOptions); - PortableRunner internal = PortableRunner.fromOptions(options); - return new PrismRunner(internal, prismPipelineOptions); + return new PrismRunner(prismPipelineOptions); + } + + private static void validate(PrismPipelineOptions options) { + checkArgument( + Strings.isNullOrEmpty(options.getJobEndpoint()), + "when specifying --jobEndpoint, use --runner=PortableRunner instead"); } @Override @@ -72,15 +82,47 @@ public PipelineResult run(Pipeline pipeline) { prismPipelineOptions.getDefaultEnvironmentType(), prismPipelineOptions.getJobEndpoint()); - return internal.run(pipeline); + try { + PrismExecutor executor = startPrism(); + PortableRunner delegate = PortableRunner.fromOptions(prismPipelineOptions); + return new PrismPipelineResult(delegate.run(pipeline), executor::stop); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + PrismExecutor startPrism() throws IOException { + PrismLocator locator = new PrismLocator(prismPipelineOptions); + int port = findAvailablePort(); + String portFlag = String.format(PrismExecutor.JOB_PORT_FLAG_TEMPLATE, port); + String serveHttpFlag = + String.format( + PrismExecutor.SERVE_HTTP_FLAG_TEMPLATE, prismPipelineOptions.getEnableWebUI()); + String idleShutdownTimeoutFlag = + String.format( + PrismExecutor.IDLE_SHUTDOWN_TIMEOUT, prismPipelineOptions.getIdleShutdownTimeout()); + String endpoint = "localhost:" + port; + prismPipelineOptions.setJobEndpoint(endpoint); + String command = locator.resolve(); + PrismExecutor executor = + PrismExecutor.builder() + .setCommand(command) + .setArguments(Arrays.asList(portFlag, serveHttpFlag, idleShutdownTimeoutFlag)) + .build(); + executor.execute(); + checkState(executor.isAlive()); + return executor; } private static void assignDefaultsIfNeeded(PrismPipelineOptions prismPipelineOptions) { if (Strings.isNullOrEmpty(prismPipelineOptions.getDefaultEnvironmentType())) { prismPipelineOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_LOOPBACK); } - if (Strings.isNullOrEmpty(prismPipelineOptions.getJobEndpoint())) { - prismPipelineOptions.setJobEndpoint(DEFAULT_PRISM_ENDPOINT); + } + + private static int findAvailablePort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); } } } diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunnerRegistrar.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunnerRegistrar.java new file mode 100644 index 000000000000..ff7b73ab6db0 --- /dev/null +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunnerRegistrar.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +/** + * Registers {@link PrismRunner} and {@link TestPrismRunner} with {@link PipelineRunnerRegistrar}. + */ +@AutoService(PipelineRunnerRegistrar.class) +public class PrismRunnerRegistrar implements PipelineRunnerRegistrar { + + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.of(PrismRunner.class, TestPrismRunner.class); + } +} diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismPipelineOptions.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismPipelineOptions.java new file mode 100644 index 000000000000..1c1252bee55d --- /dev/null +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismPipelineOptions.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import org.apache.beam.sdk.testing.TestPipelineOptions; + +/** {@link org.apache.beam.sdk.options.PipelineOptions} for use with the {@link TestPrismRunner}. */ +public interface TestPrismPipelineOptions extends PrismPipelineOptions, TestPipelineOptions {} diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismRunner.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismRunner.java new file mode 100644 index 000000000000..fbcb9e3d9576 --- /dev/null +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismRunner.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.function.Supplier; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.hamcrest.Matchers; +import org.joda.time.Duration; + +/** + * {@link TestPrismRunner} is the recommended {@link PipelineRunner} to use for tests that rely on + * sdks/go/cmd/prism. See + * {@link PrismRunner} for more details. + */ +public class TestPrismRunner extends PipelineRunner { + + private final PrismRunner internal; + private final TestPrismPipelineOptions prismPipelineOptions; + + /** + * Invoked from {@link Pipeline#run} where {@link TestPrismRunner} instantiates using {@link + * TestPrismPipelineOptions} configuration details. + */ + public static TestPrismRunner fromOptions(PipelineOptions options) { + TestPrismPipelineOptions prismPipelineOptions = options.as(TestPrismPipelineOptions.class); + PrismRunner delegate = PrismRunner.fromOptions(options); + return new TestPrismRunner(delegate, prismPipelineOptions); + } + + private TestPrismRunner(PrismRunner internal, TestPrismPipelineOptions options) { + this.internal = internal; + this.prismPipelineOptions = options; + } + + TestPrismPipelineOptions getTestPrismPipelineOptions() { + return prismPipelineOptions; + } + + @Override + public PipelineResult run(Pipeline pipeline) { + PrismPipelineResult result = (PrismPipelineResult) internal.run(pipeline); + try { + PipelineResult.State state = getWaitUntilFinishRunnable(result).get(); + assertThat( + "Pipeline did not succeed. Check Prism logs for further details.", + state, + Matchers.is(PipelineResult.State.DONE)); + } catch (RuntimeException e) { + // This is a temporary workaround to close the Prism process. + result.getCleanup().run(); + throw new AssertionError(e); + } + return result; + } + + private Supplier getWaitUntilFinishRunnable(PipelineResult result) { + if (prismPipelineOptions.getTestTimeoutSeconds() != null) { + Long testTimeoutSeconds = checkStateNotNull(prismPipelineOptions.getTestTimeoutSeconds()); + return () -> result.waitUntilFinish(Duration.standardSeconds(testTimeoutSeconds)); + } + return result::waitUntilFinish; + } +} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java index 315e585a0c5f..eb497f0a4c43 100644 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java +++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.prism; import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.runners.prism.PrismPipelineOptions.JOB_PORT_FLAG_NAME; import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest; import java.io.ByteArrayOutputStream; @@ -77,7 +78,9 @@ public void executeWithFileOutputThenStop() throws IOException { @Test public void executeWithCustomArgumentsThenStop() throws IOException { PrismExecutor executor = - underTest().setArguments(Collections.singletonList("-job_port=5555")).build(); + underTest() + .setArguments(Collections.singletonList("-" + JOB_PORT_FLAG_NAME + "=5555")) + .build(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); executor.execute(outputStream); sleep(3000L); @@ -86,6 +89,9 @@ public void executeWithCustomArgumentsThenStop() throws IOException { assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:5555"); } + @Test + public void executeWithPortFinderThenStop() throws IOException {} + private PrismExecutor.Builder underTest() { return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()); } diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java deleted file mode 100644 index 2ad7e2eb3dd9..000000000000 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static com.google.common.truth.Truth.assertThat; -import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.joda.time.Duration; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link PrismPipelineResult}. */ -@RunWith(JUnit4.class) -public class PrismPipelineResultTest { - - final PrismExecutor exec = executor(); - - @Before - public void setUp() throws IOException { - exec.execute(); - assertThat(exec.isAlive()).isTrue(); - } - - @After - public void tearDown() { - assertThat(exec.isAlive()).isFalse(); - } - - @Test - public void givenTerminated_reportsState() { - PipelineResult delegate = mock(PipelineResult.class); - when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.FAILED); - PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); - // Assigns terminal state. - underTest.waitUntilFinish(); - assertThat(underTest.getState()).isEqualTo(PipelineResult.State.FAILED); - } - - @Test - public void givenNotTerminated_reportsState() { - PipelineResult delegate = mock(PipelineResult.class); - when(delegate.getState()).thenReturn(PipelineResult.State.RUNNING); - PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); - assertThat(underTest.getState()).isEqualTo(PipelineResult.State.RUNNING); - exec.stop(); - } - - @Test - public void cancelStopsExecutable_reportsTerminalState() throws IOException { - PipelineResult delegate = mock(PipelineResult.class); - when(delegate.cancel()).thenReturn(PipelineResult.State.CANCELLED); - PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); - assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.CANCELLED); - } - - @Test - public void givenTerminated_cancelIsNoop_reportsTerminalState() throws IOException { - PipelineResult delegate = mock(PipelineResult.class); - when(delegate.cancel()).thenReturn(PipelineResult.State.FAILED); - PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); - assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.FAILED); - } - - @Test - public void givenPipelineRunWithDuration_waitUntilFinish_reportsTerminalState() { - PipelineResult delegate = mock(PipelineResult.class); - when(delegate.waitUntilFinish(Duration.millis(3000L))) - .thenReturn(PipelineResult.State.CANCELLED); - PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); - assertThat(underTest.waitUntilFinish(Duration.millis(3000L))) - .isEqualTo(PipelineResult.State.CANCELLED); - } - - @Test - public void givenTerminated_waitUntilFinishIsNoop_reportsTerminalState() { - PipelineResult delegate = mock(PipelineResult.class); - when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE); - PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); - // Terminate Job as setup for additional call. - underTest.waitUntilFinish(); - assertThat(underTest.waitUntilFinish()).isEqualTo(PipelineResult.State.DONE); - } - - @Test - public void givenNotTerminated_reportsMetrics() { - PipelineResult delegate = mock(PipelineResult.class); - when(delegate.metrics()).thenReturn(mock(MetricResults.class)); - PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); - assertThat(underTest.metrics()).isNotNull(); - exec.stop(); - } - - @Test - public void givenTerminated_reportsTerminatedMetrics() { - PipelineResult delegate = mock(PipelineResult.class); - when(delegate.metrics()).thenReturn(mock(MetricResults.class)); - when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE); - PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop); - // Terminate Job as setup for additional call. - underTest.waitUntilFinish(); - assertThat(underTest.metrics()).isNotNull(); - } - - private static PrismExecutor executor() { - return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()).build(); - } -} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java index 2cacb671be3e..e4d239275988 100644 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java +++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java @@ -18,54 +18,121 @@ package org.apache.beam.runners.prism; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; import static org.junit.Assume.assumeTrue; -import java.io.IOException; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.WithTimestamps; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.construction.Environments; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.junit.Ignore; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests for {@link PrismRunner}. */ - -// TODO(https://github.com/apache/beam/issues/31793): Remove @Ignore after finalizing PrismRunner. -// Depends on: https://github.com/apache/beam/issues/31402 and -// https://github.com/apache/beam/issues/31792. -@Ignore @RunWith(JUnit4.class) public class PrismRunnerTest { + + @Rule public TestPipeline pipeline = TestPipeline.fromOptions(options()); + // See build.gradle for test task configuration. private static final String PRISM_BUILD_TARGET_PROPERTY_NAME = "prism.buildTarget"; @Test - public void givenBoundedSource_runsUntilDone() { - Pipeline pipeline = Pipeline.create(options()); - pipeline.apply(Create.of(1, 2, 3)); - PipelineResult.State state = pipeline.run().waitUntilFinish(); - assertThat(state).isEqualTo(PipelineResult.State.DONE); + public void givenJobEndpointSet_TestPrismRunner_validateThrows() { + TestPrismPipelineOptions options = + PipelineOptionsFactory.create().as(TestPrismPipelineOptions.class); + options.setRunner(TestPrismRunner.class); + options.setJobEndpoint("endpoint"); + IllegalArgumentException error = + assertThrows(IllegalArgumentException.class, () -> TestPrismRunner.fromOptions(options)); + assertThat(error.getMessage()) + .isEqualTo("when specifying --jobEndpoint, use --runner=PortableRunner instead"); + } + + @Test + public void givenJobEndpointSet_PrismRunner_validateThrows() { + PrismPipelineOptions options = PipelineOptionsFactory.create().as(PrismPipelineOptions.class); + options.setRunner(PrismRunner.class); + options.setJobEndpoint("endpoint"); + IllegalArgumentException error = + assertThrows(IllegalArgumentException.class, () -> TestPrismRunner.fromOptions(options)); + assertThat(error.getMessage()) + .isEqualTo("when specifying --jobEndpoint, use --runner=PortableRunner instead"); + } + + @Test + public void givenEnvironmentTypeEmpty_TestPrismRunner_defaultsToLoopback() { + TestPrismPipelineOptions options = + PipelineOptionsFactory.create().as(TestPrismPipelineOptions.class); + options.setRunner(TestPrismRunner.class); + assertThat( + TestPrismRunner.fromOptions(options) + .getTestPrismPipelineOptions() + .getDefaultEnvironmentType()) + .isEqualTo(Environments.ENVIRONMENT_LOOPBACK); } @Test - public void givenUnboundedSource_runsUntilCancel() throws IOException { - Pipeline pipeline = Pipeline.create(options()); - pipeline.apply(PeriodicImpulse.create()); - PipelineResult result = pipeline.run(); - assertThat(result.getState()).isEqualTo(PipelineResult.State.RUNNING); - PipelineResult.State state = result.cancel(); - assertThat(state).isEqualTo(PipelineResult.State.CANCELLED); + public void givenEnvironmentTypeEmpty_PrismRunner_defaultsToLoopback() { + PrismPipelineOptions options = PipelineOptionsFactory.create().as(PrismPipelineOptions.class); + options.setRunner(PrismRunner.class); + assertThat( + PrismRunner.fromOptions(options).getPrismPipelineOptions().getDefaultEnvironmentType()) + .isEqualTo(Environments.ENVIRONMENT_LOOPBACK); + } + + @Test + public void prismReportsPAssertFailure() { + PAssert.that(pipeline.apply(Create.of(1, 2, 3))) + // Purposely introduce a failed assertion. + .containsInAnyOrder(1, 2, 3, 4); + assertThrows(AssertionError.class, pipeline::run); + } + + @Test + public void windowing() { + PCollection>> got = + pipeline + .apply(Create.of(1, 2, 100, 101, 102, 123)) + .apply(WithTimestamps.of(t -> Instant.ofEpochSecond(t))) + .apply(WithKeys.of("k")) + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) + .apply(GroupByKey.create()); + + List>> want = + Arrays.asList( + KV.of("k", Arrays.asList(1, 2)), + KV.of("k", Arrays.asList(100, 101, 102)), + KV.of("k", Collections.singletonList(123))); + + PAssert.that(got).containsInAnyOrder(want); + + pipeline.run(); } - private static PrismPipelineOptions options() { - PrismPipelineOptions opts = PipelineOptionsFactory.create().as(PrismPipelineOptions.class); + private static TestPrismPipelineOptions options() { + TestPrismPipelineOptions opts = + PipelineOptionsFactory.create().as(TestPrismPipelineOptions.class); - opts.setRunner(PrismRunner.class); + opts.setRunner(TestPrismRunner.class); opts.setPrismLocation(getLocalPrismBuildOrIgnoreTest()); + opts.setEnableWebUI(false); return opts; }