diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java index fda5db923a7f..0f9816337f91 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java @@ -31,6 +31,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.slf4j.Logger; @@ -157,6 +159,16 @@ abstract static class Builder { abstract Builder setArguments(List arguments); + Builder addArguments(List arguments) { + Optional> original = getArguments(); + if (!original.isPresent()) { + return this.setArguments(arguments); + } + List newArguments = + Stream.concat(original.get().stream(), arguments.stream()).collect(Collectors.toList()); + return this.setArguments(newArguments); + } + abstract Optional> getArguments(); abstract PrismExecutor autoBuild(); diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java index eb497f0a4c43..a81e3e24ee69 100644 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java +++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java @@ -59,7 +59,7 @@ public void executeWithStreamRedirectThenStop() throws IOException { sleep(3000L); executor.stop(); String output = outputStream.toString(StandardCharsets.UTF_8.name()); - assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:8073"); + assertThat(output).contains("level=INFO msg=\"Serving JobManagement\" endpoint=localhost:8073"); } @Test @@ -71,7 +71,8 @@ public void executeWithFileOutputThenStop() throws IOException { executor.stop(); try (Stream stream = Files.lines(log.toPath(), StandardCharsets.UTF_8)) { String output = stream.collect(Collectors.joining("\n")); - assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:8073"); + assertThat(output) + .contains("level=INFO msg=\"Serving JobManagement\" endpoint=localhost:8073"); } } @@ -79,21 +80,23 @@ public void executeWithFileOutputThenStop() throws IOException { public void executeWithCustomArgumentsThenStop() throws IOException { PrismExecutor executor = underTest() - .setArguments(Collections.singletonList("-" + JOB_PORT_FLAG_NAME + "=5555")) + .addArguments(Collections.singletonList("-" + JOB_PORT_FLAG_NAME + "=5555")) .build(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); executor.execute(outputStream); sleep(3000L); executor.stop(); String output = outputStream.toString(StandardCharsets.UTF_8.name()); - assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:5555"); + assertThat(output).contains("level=INFO msg=\"Serving JobManagement\" endpoint=localhost:5555"); } @Test public void executeWithPortFinderThenStop() throws IOException {} private PrismExecutor.Builder underTest() { - return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()); + return PrismExecutor.builder() + .setCommand(getLocalPrismBuildOrIgnoreTest()) + .setArguments(Collections.singletonList("--log_kind=text")); // disable color control chars } private void sleep(long millis) { diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java index e0bcbed1577c..68674d202cdb 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/metrics/TestSamzaRunnerWithTransformMetrics.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -59,6 +60,8 @@ public class TestSamzaRunnerWithTransformMetrics { @Test public void testSamzaRunnerWithDefaultMetrics() { + // TODO(https://github.com/apache/beam/issues/32208) + assumeTrue(System.getProperty("java.version").startsWith("1.")); SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); InMemoryMetricsReporter inMemoryMetricsReporter = new InMemoryMetricsReporter(); options.setMetricsReporters(ImmutableList.of(inMemoryMetricsReporter)); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java index 8670d9a46eac..73454cc95421 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.samza.runtime; +import static org.junit.Assume.assumeTrue; + import java.io.Serializable; import java.util.Arrays; import org.apache.beam.sdk.coders.KvCoder; @@ -35,11 +37,19 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; /** Tests for GroupByKeyOp. */ public class GroupByKeyOpTest implements Serializable { + + @BeforeClass + public static void beforeClass() { + // TODO(https://github.com/apache/beam/issues/32208) + assumeTrue(System.getProperty("java.version").startsWith("1.")); + } + @Rule public final transient TestPipeline pipeline = TestPipeline.fromOptions( diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java index 004162600179..9409efbcf394 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import java.io.File; import java.io.IOException; @@ -75,6 +76,7 @@ import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory; import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore; import org.apache.samza.system.SystemStreamPartition; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -91,6 +93,12 @@ public class SamzaStoreStateInternalsTest implements Serializable { TestPipeline.fromOptions( PipelineOptionsFactory.fromArgs("--runner=TestSamzaRunner").create()); + @BeforeClass + public static void beforeClass() { + // TODO(https://github.com/apache/beam/issues/32208) + assumeTrue(System.getProperty("java.version").startsWith("1.")); + } + @Test public void testMapStateIterator() { final String stateId = "foo";