diff --git a/.github/trigger_files/beam_PostCommit_Java_Hadoop_Versions.json b/.github/trigger_files/beam_PostCommit_Java_Hadoop_Versions.json index 08c2e40784a9..920c8d132e4a 100644 --- a/.github/trigger_files/beam_PostCommit_Java_Hadoop_Versions.json +++ b/.github/trigger_files/beam_PostCommit_Java_Hadoop_Versions.json @@ -1,3 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 } \ No newline at end of file diff --git a/.github/workflows/beam_PreCommit_Java.yml b/.github/workflows/beam_PreCommit_Java.yml index 772eab98c343..20dafca72a57 100644 --- a/.github/workflows/beam_PreCommit_Java.yml +++ b/.github/workflows/beam_PreCommit_Java.yml @@ -19,6 +19,7 @@ on: tags: ['v*'] branches: ['master', 'release-*'] paths: + - "buildSrc/**" - 'model/**' - 'sdks/java/**' - 'runners/**' diff --git a/CHANGES.md b/CHANGES.md index cdedce22e975..261fafc024f3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -88,7 +88,6 @@ * Removed support for Flink 1.15 and 1.16 * Removed support for Python 3.8 * X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). -* Upgrade antlr from 4.7 to 4.13.1 ([#33016](https://github.com/apache/beam/pull/33016)). ## Bugfixes diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 8d8bf9339c6e..5af91ec2f056 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -665,8 +665,8 @@ class BeamModulePlugin implements Plugin { activemq_junit : "org.apache.activemq.tooling:activemq-junit:$activemq_version", activemq_kahadb_store : "org.apache.activemq:activemq-kahadb-store:$activemq_version", activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version", - antlr : "org.antlr:antlr4:4.13.1", - antlr_runtime : "org.antlr:antlr4-runtime:4.13.1", + antlr : "org.antlr:antlr4:4.7", + antlr_runtime : "org.antlr:antlr4-runtime:4.7", args4j : "args4j:args4j:2.33", auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version", avro : "org.apache.avro:avro:1.11.3", diff --git a/examples/notebooks/beam-ml/run_inference_vllm.ipynb b/examples/notebooks/beam-ml/run_inference_vllm.ipynb index fea953bc1e66..e9f1e53a452b 100644 --- a/examples/notebooks/beam-ml/run_inference_vllm.ipynb +++ b/examples/notebooks/beam-ml/run_inference_vllm.ipynb @@ -352,11 +352,11 @@ "\n", "1. In the sidebar, click **Files** to open the **Files** pane.\n", "2. In an environment with Docker installed, download the file **VllmDockerfile** file to an empty folder.\n", - "3. Run the following commands. Replace `` with a valid [Artifact Registry](https://cloud.google.com/artifact-registry/docs/overview) repository.\n", + "3. Run the following commands. Replace `:` with a valid [Artifact Registry](https://cloud.google.com/artifact-registry/docs/overview) repository and tag.\n", "\n", " ```\n", - " docker build -t \":latest\" -f VllmDockerfile ./\n", - " docker image push \":latest\"\n", + " docker build -t \":\" -f VllmDockerfile ./\n", + " docker image push \":\"\n", " ```" ], "metadata": { @@ -373,7 +373,8 @@ "First, define the pipeline options that you want to use to launch the Dataflow job. Before running the next cell, replace the following variables:\n", "\n", "- ``: the name of a valid [Google Cloud Storage](https://cloud.google.com/storage?e=48754805&hl=en) bucket. Don't include a `gs://` prefix or trailing slashes.\n", - "- ``: the name of the Google Artifact Registry repository that you used in the previous step. Don't include the `latest` tag, because this tag is automatically appended as part of the cell.\n", + "- ``: the name of the Google Artifact Registry repository that you used in the previous step. \n", + "- ``: image tag used in the previous step. Prefer a versioned tag or SHA instead of :latest tag or mutable tags.\n", "- ``: the name of the Google Cloud project that you created your bucket and Artifact Registry repository in.\n", "\n", "This workflow uses the following Dataflow service option: `worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx`. When you use this service option, Dataflow to installs a T4 GPU that uses a `5xx` series Nvidia driver on each worker machine. The 5xx driver is required to run vLLM jobs." @@ -396,7 +397,7 @@ "options = PipelineOptions()\n", "\n", "BUCKET_NAME = '' # Replace with your bucket name.\n", - "CONTAINER_LOCATION = '' # Replace with your container location ( from the previous step)\n", + "CONTAINER_IMAGE = ':' # Replace with the image repository and tag from the previous step.\n", "PROJECT_NAME = '' # Replace with your GCP project\n", "\n", "options.view_as(GoogleCloudOptions).project = PROJECT_NAME\n", @@ -428,7 +429,7 @@ "# Choose a machine type compatible with GPU type\n", "options.view_as(WorkerOptions).machine_type = \"n1-standard-4\"\n", "\n", - "options.view_as(WorkerOptions).worker_harness_container_image = '%s:latest' % CONTAINER_LOCATION" + "options.view_as(WorkerOptions).sdk_container_image = CONTAINER_IMAGE" ], "metadata": { "id": "kXy9FRYVCSjq" @@ -484,6 +485,7 @@ " def process(self, element, *args, **kwargs):\n", " yield \"Input: {input}, Output: {output}\".format(input=element.example, output=element.inference)\n", "\n", + "logging.getLogger().setLevel(logging.INFO) # Output additional Dataflow Job metadata and launch logs. \n", "prompts = [\n", " \"Hello, my name is\",\n", " \"The president of the United States is\",\n", diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle index c357b8a04328..404b864c9c31 100644 --- a/runners/direct-java/build.gradle +++ b/runners/direct-java/build.gradle @@ -22,12 +22,12 @@ plugins { id 'org.apache.beam.module' } // Shade away runner execution utilities till because this causes ServiceLoader conflicts with // TransformPayloadTranslatorRegistrar amongst other runners. This only happens in the DirectRunner // because it is likely to appear on the classpath of another runner. -def dependOnProjects = [ - ":runners:core-java", - ":runners:local-java", - ":runners:java-fn-execution", - ":sdks:java:core", - ] +def dependOnProjectsAndConfigs = [ + ":runners:core-java":null, + ":runners:local-java":null, + ":runners:java-fn-execution":null, + ":sdks:java:core":"shadow", +] applyJavaNature( automaticModuleName: 'org.apache.beam.runners.direct', @@ -36,8 +36,8 @@ applyJavaNature( ], shadowClosure: { dependencies { - dependOnProjects.each { - include(project(path: it, configuration: "shadow")) + dependOnProjectsAndConfigs.each { + include(project(path: it.key, configuration: "shadow")) } } }, @@ -63,8 +63,10 @@ configurations { dependencies { shadow library.java.vendored_guava_32_1_2_jre shadow project(path: ":model:pipeline", configuration: "shadow") - dependOnProjects.each { - implementation project(it) + dependOnProjectsAndConfigs.each { + // For projects producing shadowjar, use the packaged jar as dependency to + // handle redirected packages from it + implementation project(path: it.key, configuration: it.value) } shadow library.java.vendored_grpc_1_60_1 shadow library.java.joda_time diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index ff72add83e4d..6ce60283735f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -65,6 +65,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commits; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter; @@ -199,6 +200,7 @@ private StreamingDataflowWorker( this.workCommitter = windmillServiceEnabled ? StreamingEngineWorkCommitter.builder() + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) .setCommitWorkStreamFactory( WindmillStreamPool.create( numCommitThreads, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java index f2893f3e7191..5f039be7b00f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java @@ -18,33 +18,24 @@ package org.apache.beam.runners.dataflow.worker.streaming; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; -/** Bounded set of queues, with a maximum total weight. */ +/** Queue bounded by a {@link WeightedSemaphore}. */ public final class WeightedBoundedQueue { private final LinkedBlockingQueue queue; - private final int maxWeight; - private final Semaphore limit; - private final Function weigher; + private final WeightedSemaphore weightedSemaphore; private WeightedBoundedQueue( - LinkedBlockingQueue linkedBlockingQueue, - int maxWeight, - Semaphore limit, - Function weigher) { + LinkedBlockingQueue linkedBlockingQueue, WeightedSemaphore weightedSemaphore) { this.queue = linkedBlockingQueue; - this.maxWeight = maxWeight; - this.limit = limit; - this.weigher = weigher; + this.weightedSemaphore = weightedSemaphore; } - public static WeightedBoundedQueue create(int maxWeight, Function weigherFn) { - return new WeightedBoundedQueue<>( - new LinkedBlockingQueue<>(), maxWeight, new Semaphore(maxWeight, true), weigherFn); + public static WeightedBoundedQueue create(WeightedSemaphore weightedSemaphore) { + return new WeightedBoundedQueue<>(new LinkedBlockingQueue<>(), weightedSemaphore); } /** @@ -52,15 +43,15 @@ public static WeightedBoundedQueue create(int maxWeight, Function { + private final int maxWeight; + private final Semaphore limit; + private final Function weigher; + + private WeightedSemaphore(int maxWeight, Semaphore limit, Function weigher) { + this.maxWeight = maxWeight; + this.limit = limit; + this.weigher = weigher; + } + + public static WeightedSemaphore create(int maxWeight, Function weigherFn) { + return new WeightedSemaphore<>(maxWeight, new Semaphore(maxWeight, true), weigherFn); + } + + public void acquireUninterruptibly(V value) { + limit.acquireUninterruptibly(computePermits(value)); + } + + public void release(V value) { + limit.release(computePermits(value)); + } + + private int computePermits(V value) { + return Math.min(weigher.apply(value), maxWeight); + } + + public int currentWeight() { + return maxWeight - limit.availablePermits(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/Commits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/Commits.java new file mode 100644 index 000000000000..498e90f78e29 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/Commits.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.commits; + +import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; + +/** Utility class for commits. */ +@Internal +public final class Commits { + + /** Max bytes of commits queued on the user worker. */ + @VisibleForTesting static final int MAX_QUEUED_COMMITS_BYTES = 500 << 20; // 500MB + + private Commits() {} + + public static WeightedSemaphore maxCommitByteSemaphore() { + return WeightedSemaphore.create(MAX_QUEUED_COMMITS_BYTES, Commit::getSize); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java index 6889764afe69..20b95b0661d0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java @@ -42,7 +42,6 @@ public final class StreamingApplianceWorkCommitter implements WorkCommitter { private static final Logger LOG = LoggerFactory.getLogger(StreamingApplianceWorkCommitter.class); private static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; - private static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB private final Consumer commitWorkFn; private final WeightedBoundedQueue commitQueue; @@ -53,9 +52,7 @@ public final class StreamingApplianceWorkCommitter implements WorkCommitter { private StreamingApplianceWorkCommitter( Consumer commitWorkFn, Consumer onCommitComplete) { this.commitWorkFn = commitWorkFn; - this.commitQueue = - WeightedBoundedQueue.create( - MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); + this.commitQueue = WeightedBoundedQueue.create(Commits.maxCommitByteSemaphore()); this.commitWorkers = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() @@ -73,10 +70,9 @@ public static StreamingApplianceWorkCommitter create( } @Override - @SuppressWarnings("FutureReturnValueIgnored") public void start() { if (!commitWorkers.isShutdown()) { - commitWorkers.submit(this::commitLoop); + commitWorkers.execute(this::commitLoop); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java index bf1007bc4bfb..85fa1d67c6c3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; @@ -46,7 +47,6 @@ public final class StreamingEngineWorkCommitter implements WorkCommitter { private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineWorkCommitter.class); private static final int TARGET_COMMIT_BATCH_KEYS = 5; - private static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB private static final String NO_BACKEND_WORKER_TOKEN = ""; private final Supplier> commitWorkStreamFactory; @@ -61,11 +61,10 @@ public final class StreamingEngineWorkCommitter implements WorkCommitter { Supplier> commitWorkStreamFactory, int numCommitSenders, Consumer onCommitComplete, - String backendWorkerToken) { + String backendWorkerToken, + WeightedSemaphore commitByteSemaphore) { this.commitWorkStreamFactory = commitWorkStreamFactory; - this.commitQueue = - WeightedBoundedQueue.create( - MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); + this.commitQueue = WeightedBoundedQueue.create(commitByteSemaphore); this.commitSenders = Executors.newFixedThreadPool( numCommitSenders, @@ -90,12 +89,11 @@ public static Builder builder() { } @Override - @SuppressWarnings("FutureReturnValueIgnored") public void start() { Preconditions.checkState( isRunning.compareAndSet(false, true), "Multiple calls to WorkCommitter.start()."); for (int i = 0; i < numCommitSenders; i++) { - commitSenders.submit(this::streamingCommitLoop); + commitSenders.execute(this::streamingCommitLoop); } } @@ -166,6 +164,8 @@ private void streamingCommitLoop() { return; } } + + // take() blocks until a value is available in the commitQueue. Preconditions.checkNotNull(initialCommit); if (initialCommit.work().isFailed()) { @@ -258,6 +258,8 @@ public interface Builder { Builder setCommitWorkStreamFactory( Supplier> commitWorkStreamFactory); + Builder setCommitByteSemaphore(WeightedSemaphore commitByteSemaphore); + Builder setNumCommitSenders(int numCommitSenders); Builder setOnCommitComplete(Consumer onCommitComplete); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java index 4f035c88774c..c71001fbeee7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -30,27 +31,29 @@ @RunWith(JUnit4.class) public class WeightBoundedQueueTest { - @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private static final int MAX_WEIGHT = 10; + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); @Test public void testPut_hasCapacity() { - WeightedBoundedQueue queue = - WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedSemaphore weightedSemaphore = + WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedBoundedQueue queue = WeightedBoundedQueue.create(weightedSemaphore); int insertedValue = 1; queue.put(insertedValue); - assertEquals(insertedValue, queue.queuedElementsWeight()); + assertEquals(insertedValue, weightedSemaphore.currentWeight()); assertEquals(1, queue.size()); assertEquals(insertedValue, (int) queue.poll()); } @Test public void testPut_noCapacity() throws InterruptedException { - WeightedBoundedQueue queue = - WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedSemaphore weightedSemaphore = + WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedBoundedQueue queue = WeightedBoundedQueue.create(weightedSemaphore); // Insert value that takes all the capacity into the queue. queue.put(MAX_WEIGHT); @@ -71,7 +74,7 @@ public void testPut_noCapacity() throws InterruptedException { // Should only see the first value in the queue, since the queue is at capacity. thread2 // should be blocked. - assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(MAX_WEIGHT, weightedSemaphore.currentWeight()); assertEquals(1, queue.size()); // Poll the queue, pulling off the only value inside and freeing up the capacity in the queue. @@ -80,14 +83,15 @@ public void testPut_noCapacity() throws InterruptedException { // Wait for the putThread which was previously blocked due to the queue being at capacity. putThread.join(); - assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(MAX_WEIGHT, weightedSemaphore.currentWeight()); assertEquals(1, queue.size()); } @Test public void testPoll() { - WeightedBoundedQueue queue = - WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedSemaphore weightedSemaphore = + WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedBoundedQueue queue = WeightedBoundedQueue.create(weightedSemaphore); int insertedValue1 = 1; int insertedValue2 = 2; @@ -95,7 +99,7 @@ public void testPoll() { queue.put(insertedValue1); queue.put(insertedValue2); - assertEquals(insertedValue1 + insertedValue2, queue.queuedElementsWeight()); + assertEquals(insertedValue1 + insertedValue2, weightedSemaphore.currentWeight()); assertEquals(2, queue.size()); assertEquals(insertedValue1, (int) queue.poll()); assertEquals(1, queue.size()); @@ -104,7 +108,8 @@ public void testPoll() { @Test public void testPoll_withTimeout() throws InterruptedException { WeightedBoundedQueue queue = - WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedBoundedQueue.create( + WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i))); int pollWaitTimeMillis = 10000; int insertedValue1 = 1; @@ -132,7 +137,8 @@ public void testPoll_withTimeout() throws InterruptedException { @Test public void testPoll_withTimeout_timesOut() throws InterruptedException { WeightedBoundedQueue queue = - WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedBoundedQueue.create( + WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i))); int defaultPollResult = -10; int pollWaitTimeMillis = 100; int insertedValue1 = 1; @@ -144,13 +150,17 @@ public void testPoll_withTimeout_timesOut() throws InterruptedException { Thread pollThread = new Thread( () -> { - int polled; + @Nullable Integer polled; try { polled = queue.poll(pollWaitTimeMillis, TimeUnit.MILLISECONDS); - pollResult.set(polled); + if (polled != null) { + pollResult.set(polled); + } } catch (InterruptedException e) { throw new RuntimeException(e); } + + assertNull(polled); }); pollThread.start(); @@ -164,7 +174,8 @@ public void testPoll_withTimeout_timesOut() throws InterruptedException { @Test public void testPoll_emptyQueue() { WeightedBoundedQueue queue = - WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedBoundedQueue.create( + WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i))); assertNull(queue.poll()); } @@ -172,7 +183,8 @@ public void testPoll_emptyQueue() { @Test public void testTake() throws InterruptedException { WeightedBoundedQueue queue = - WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedBoundedQueue.create( + WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i))); AtomicInteger value = new AtomicInteger(); // Should block until value is available @@ -194,4 +206,39 @@ public void testTake() throws InterruptedException { assertEquals(MAX_WEIGHT, value.get()); } + + @Test + public void testPut_sharedWeigher() throws InterruptedException { + WeightedSemaphore weigher = + WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + WeightedBoundedQueue queue1 = WeightedBoundedQueue.create(weigher); + WeightedBoundedQueue queue2 = WeightedBoundedQueue.create(weigher); + + // Insert value that takes all the weight into the queue1. + queue1.put(MAX_WEIGHT); + + // Try to insert a value into the queue2. This will block since there is no capacity in the + // weigher. + Thread putThread = new Thread(() -> queue2.put(MAX_WEIGHT)); + putThread.start(); + // Should only see the first value in the queue, since the queue is at capacity. putThread + // should be blocked. The weight should be the same however, since queue1 and queue2 are sharing + // the weigher. + Thread.sleep(100); + assertEquals(MAX_WEIGHT, weigher.currentWeight()); + assertEquals(1, queue1.size()); + assertEquals(0, queue2.size()); + + // Poll queue1, pulling off the only value inside and freeing up the capacity in the weigher. + queue1.poll(); + + // Wait for the putThread which was previously blocked due to the weigher being at capacity. + putThread.join(); + + assertEquals(MAX_WEIGHT, weigher.currentWeight()); + assertEquals(1, queue2.size()); + queue2.poll(); + assertEquals(0, queue2.size()); + assertEquals(0, weigher.currentWeight()); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java index 546a2883e3b2..c05a4dd340dd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java @@ -121,6 +121,7 @@ public void setUp() throws IOException { private WorkCommitter createWorkCommitter(Consumer onCommitComplete) { return StreamingEngineWorkCommitter.builder() + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) .setCommitWorkStreamFactory(commitWorkStreamFactory) .setOnCommitComplete(onCommitComplete) .build(); @@ -342,6 +343,7 @@ public void testMultipleCommitSendersSingleStream() { Set completeCommits = Collections.newSetFromMap(new ConcurrentHashMap<>()); workCommitter = StreamingEngineWorkCommitter.builder() + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) .setCommitWorkStreamFactory(commitWorkStreamFactory) .setNumCommitSenders(5) .setOnCommitComplete(completeCommits::add) diff --git a/sdks/go.mod b/sdks/go.mod index 81221f98e276..ff711cbe91b0 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -25,16 +25,16 @@ go 1.21.0 require ( cloud.google.com/go/bigquery v1.63.1 cloud.google.com/go/bigtable v1.33.0 - cloud.google.com/go/datastore v1.19.0 + cloud.google.com/go/datastore v1.20.0 cloud.google.com/go/profiler v0.4.1 - cloud.google.com/go/pubsub v1.44.0 + cloud.google.com/go/pubsub v1.45.1 cloud.google.com/go/spanner v1.70.0 cloud.google.com/go/storage v1.45.0 - github.com/aws/aws-sdk-go-v2 v1.32.2 + github.com/aws/aws-sdk-go-v2 v1.32.4 github.com/aws/aws-sdk-go-v2/config v1.28.0 - github.com/aws/aws-sdk-go-v2/credentials v1.17.41 + github.com/aws/aws-sdk-go-v2/credentials v1.17.42 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.33 - github.com/aws/aws-sdk-go-v2/service/s3 v1.66.0 + github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3 github.com/aws/smithy-go v1.22.0 github.com/docker/go-connections v0.5.0 github.com/dustin/go-humanize v1.0.1 @@ -56,7 +56,7 @@ require ( golang.org/x/net v0.30.0 golang.org/x/oauth2 v0.23.0 golang.org/x/sync v0.8.0 - golang.org/x/sys v0.26.0 + golang.org/x/sys v0.27.0 golang.org/x/text v0.19.0 google.golang.org/api v0.203.0 google.golang.org/genproto v0.0.0-20241015192408-796eee8c2d53 @@ -132,18 +132,18 @@ require ( github.com/apache/thrift v0.17.0 // indirect github.com/aws/aws-sdk-go v1.34.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.18 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.23 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.32.3 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/sdks/go.sum b/sdks/go.sum index a45baf72a02b..c24cb10126c8 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -240,8 +240,8 @@ cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7 cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/datastore v1.10.0/go.mod h1:PC5UzAmDEkAmkfaknstTYbNpgE49HAgW2J1gcgUfmdM= cloud.google.com/go/datastore v1.11.0/go.mod h1:TvGxBIHCS50u8jzG+AW/ppf87v1of8nwzFNgEZU1D3c= -cloud.google.com/go/datastore v1.19.0 h1:p5H3bUQltOa26GcMRAxPoNwoqGkq5v8ftx9/ZBB35MI= -cloud.google.com/go/datastore v1.19.0/go.mod h1:KGzkszuj87VT8tJe67GuB+qLolfsOt6bZq/KFuWaahc= +cloud.google.com/go/datastore v1.20.0 h1:NNpXoyEqIJmZFc0ACcwBEaXnmscUpcG4NkKnbCePmiM= +cloud.google.com/go/datastore v1.20.0/go.mod h1:uFo3e+aEpRfHgtp5pp0+6M0o147KoPaYNaPAKpfh8Ew= cloud.google.com/go/datastream v1.2.0/go.mod h1:i/uTP8/fZwgATHS/XFu0TcNUhuA0twZxxQ3EyCUQMwo= cloud.google.com/go/datastream v1.3.0/go.mod h1:cqlOX8xlyYF/uxhiKn6Hbv6WjwPPuI9W2M9SAXwaLLQ= cloud.google.com/go/datastream v1.4.0/go.mod h1:h9dpzScPhDTs5noEMQVWP8Wx8AFBRyS0s8KWPx/9r0g= @@ -451,8 +451,8 @@ cloud.google.com/go/pubsub v1.26.0/go.mod h1:QgBH3U/jdJy/ftjPhTkyXNj543Tin1pRYcd cloud.google.com/go/pubsub v1.27.1/go.mod h1:hQN39ymbV9geqBnfQq6Xf63yNhUAhv9CZhzp5O6qsW0= cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8= cloud.google.com/go/pubsub v1.30.0/go.mod h1:qWi1OPS0B+b5L+Sg6Gmc9zD1Y+HaM0MdUr7LsupY1P4= -cloud.google.com/go/pubsub v1.44.0 h1:pLaMJVDTlnUDIKT5L0k53YyLszfBbGoUBo/IqDK/fEI= -cloud.google.com/go/pubsub v1.44.0/go.mod h1:BD4a/kmE8OePyHoa1qAHEw1rMzXX+Pc8Se54T/8mc3I= +cloud.google.com/go/pubsub v1.45.1 h1:ZC/UzYcrmK12THWn1P72z+Pnp2vu/zCZRXyhAfP1hJY= +cloud.google.com/go/pubsub v1.45.1/go.mod h1:3bn7fTmzZFwaUjllitv1WlsNMkqBgGUb3UdMhI54eCc= cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg= cloud.google.com/go/pubsublite v1.6.0/go.mod h1:1eFCS0U11xlOuMFV/0iBqw3zP12kddMeCbj/F3FSj9k= cloud.google.com/go/pubsublite v1.7.0/go.mod h1:8hVMwRXfDfvGm3fahVbtDbiLePT3gpoiJYJY+vxWxVM= @@ -689,53 +689,53 @@ github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve github.com/aws/aws-sdk-go v1.34.0 h1:brux2dRrlwCF5JhTL7MUT3WUwo9zfDHZZp3+g3Mvlmo= github.com/aws/aws-sdk-go v1.34.0/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go-v2 v1.7.1/go.mod h1:L5LuPC1ZgDr2xQS7AmIec/Jlc7O/Y1u2KxJyNVab250= -github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI= -github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= +github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkOxNE= +github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 h1:pT3hpW0cOHRJx8Y0DfJUEQuqPild8jRGmSFmBgvydr0= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6/go.mod h1:j/I2++U0xX+cr44QjHay4Cvxj6FUbnxrgmqN3H1jTZA= github.com/aws/aws-sdk-go-v2/config v1.5.0/go.mod h1:RWlPOAW3E3tbtNAqTwvSW54Of/yP3oiZXMI0xfUdjyA= github.com/aws/aws-sdk-go-v2/config v1.28.0 h1:FosVYWcqEtWNxHn8gB/Vs6jOlNwSoyOCA/g/sxyySOQ= github.com/aws/aws-sdk-go-v2/config v1.28.0/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc= github.com/aws/aws-sdk-go-v2/credentials v1.3.1/go.mod h1:r0n73xwsIVagq8RsxmZbGSRQFj9As3je72C2WzUIToc= -github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8= -github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU= +github.com/aws/aws-sdk-go-v2/credentials v1.17.42 h1:sBP0RPjBU4neGpIYyx8mkU2QqLPl5u9cmdTWVzIpHkM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.42/go.mod h1:FwZBfU530dJ26rv9saAbxa9Ej3eF/AK0OAY86k13n4M= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.3.0/go.mod h1:2LAuqPx1I6jNfaGDucWfA2zqQCYCOMCDHiCOciALyNw= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17/go.mod h1:1ZRXLdTpzdJb9fwTMXiLipENRxkGMTn1sfKexGllQCw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.18 h1:68jFVtt3NulEzojFesM/WVarlFpCaXLKaBxDpzkQ9OQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.18/go.mod h1:Fjnn5jQVIo6VyedMc0/EhPpfNlPl7dHV916O6B+49aE= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.2/go.mod h1:qaqQiHSrOUVOfKe6fhgQ6UzhxjwqVW8aHNegd6Ws4w4= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.33 h1:X+4YY5kZRI/cOoSMVMGTqFXHAMg1bvvay7IBcqHpybQ= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.33/go.mod h1:DPynzu+cn92k5UQ6tZhX+wfTB4ah6QDU/NgdHqatmvk= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 h1:UAsR3xA31QGf79WzpG/ixT9FZvQlh5HY1NRqSHBNOCk= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21/go.mod h1:JNr43NFf5L9YaG3eKTm7HQzls9J+A9YYcGI5Quh1r2Y= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 h1:6jZVETqmYCadGFvrYEQfC5fAQmlo80CeL5psbno6r0s= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21/go.mod h1:1SR0GbLlnN3QUmYaflZNiH1ql+1qrSiB2vwcJ+4UM60= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23/go.mod h1:35EVp9wyeANdujZruvHiQUAo9E3vbhnIO1mTCAxMlY0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 h1:pgYW9FCabt2M25MoHYCfMrVY2ghiiBKYWUVXfwZs+sU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23/go.mod h1:c48kLgzO19wAu3CPkDWC28JbaJ+hfQlsdl7I2+oqIbk= github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1/go.mod h1:Zy8smImhTdOETZqfyn01iNOe0CNggVbPjCajyaz6Gvg= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 h1:7edmS3VOBDhK00b/MwGtGglCm7hhwNYnjJs/PgFdMQE= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21/go.mod h1:Q9o5h4HoIWG8XfzxqiuK/CGUbepCJ8uTlaE3bAbxytQ= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.23 h1:1SZBDiRzzs3sNhOMVApyWPduWYGAX0imGy06XiBnCAM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.23/go.mod h1:i9TkxgbZmHVh2S0La6CAXtnyFhlCX/pJ0JsOvBAS6Mk= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.1/go.mod h1:v33JQ57i2nekYTA70Mb+O18KeH4KqhdqxTJZNK1zdRE= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 h1:4FMHqLfk0efmTqhXVRL5xYRqlEBNBiRI7N6w4jsEdd4= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2/go.mod h1:LWoqeWlK9OZeJxsROW2RqrSPvQHKTpp69r/iDjwsSaw= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.4 h1:aaPpoG15S2qHkWm4KlEyF01zovK1nW4BBbyXuHNSE90= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.4/go.mod h1:eD9gS2EARTKgGr/W5xwgY/ik9z/zqpW+m/xOQbVxrMk= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.1/go.mod h1:zceowr5Z1Nh2WVP8bf/3ikB41IZW59E4yIYbg+pC6mw= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 h1:s7NA1SOw8q/5c0wr8477yOPp0z+uBaXBnLE0XYb0POA= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2/go.mod h1:fnjjWyAW/Pj5HYOxl9LJqWtEwS7W2qgcRLWP+uWbss0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 h1:tHxQi/XHPK0ctd/wdOw0t7Xrc2OxcRCnVzv8lwWPu0c= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4/go.mod h1:4GQbF1vJzG60poZqWatZlhP31y8PGCCVTvIGPdaaYJ0= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.1/go.mod h1:6EQZIwNNvHpq/2/QSJnp4+ECvqIy55w95Ofs0ze+nGQ= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 h1:t7iUP9+4wdc5lt3E41huP+GvQZJD38WLsgVp4iOtAjg= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2/go.mod h1:/niFCtmuQNxqx9v8WAPq5qh7EH25U4BF6tjoyq9bObM= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.4 h1:E5ZAVOmI2apR8ADb72Q63KqwwwdW1XcMeXIlrZ1Psjg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.4/go.mod h1:wezzqVUOVVdk+2Z/JzQT4NxAU0NbhRe5W8pIE72jsWI= github.com/aws/aws-sdk-go-v2/service/s3 v1.11.1/go.mod h1:XLAGFrEjbvMCLvAtWLLP32yTv8GpBquCApZEycDLunI= -github.com/aws/aws-sdk-go-v2/service/s3 v1.66.0 h1:xA6XhTF7PE89BCNHJbQi8VvPzcgMtmGC5dr8S8N7lHk= -github.com/aws/aws-sdk-go-v2/service/s3 v1.66.0/go.mod h1:cB6oAuus7YXRZhWCc1wIwPywwZ1XwweNp2TVAEGYeB8= +github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3 h1:neNOYJl72bHrz9ikAEED4VqWyND/Po0DnEx64RW6YM4= +github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3/go.mod h1:TMhLIyRIyoGVlaEMAt+ITMbwskSTpcGsCPDq91/ihY0= github.com/aws/aws-sdk-go-v2/service/sso v1.3.1/go.mod h1:J3A3RGUvuCZjvSuZEcOpHDnzZP/sKbhDWV2T1EOzFIM= -github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 h1:bSYXVyUzoTHoKalBmwaZxs97HU9DWWI3ehHSAMa7xOk= -github.com/aws/aws-sdk-go-v2/service/sso v1.24.2/go.mod h1:skMqY7JElusiOUjMJMOv1jJsP7YUg7DrhgqZZWuzu1U= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 h1:AhmO1fHINP9vFYUE0LHzCWg/LfUWUF+zFPEcY9QXb7o= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2/go.mod h1:o8aQygT2+MVP0NaV6kbdE1YnnIM8RRVQzoeUH45GOdI= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.3 h1:UTpsIf0loCIWEbrqdLb+0RxnTXfWh2vhw4nQmFi4nPc= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.3/go.mod h1:FZ9j3PFHHAR+w0BSEjK955w5YD2UwB/l/H0yAK3MJvI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3 h1:2YCmIXv3tmiItw0LlYf6v7gEHebLY45kBEnPezbUKyU= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3/go.mod h1:u19stRyNPxGhj6dRm+Cdgu6N75qnbW7+QN0q0dsAk58= github.com/aws/aws-sdk-go-v2/service/sts v1.6.0/go.mod h1:q7o0j7d7HrJk/vr9uUt3BVRASvcU7gYZB9PUgPiByXg= -github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 h1:CiS7i0+FUe+/YY1GvIBLLrR/XNGZ4CtM1Ll0XavNuVo= -github.com/aws/aws-sdk-go-v2/service/sts v1.32.2/go.mod h1:HtaiBI8CjYoNVde8arShXb94UbQQi9L4EMr6D+xGBwo= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.3 h1:wVnQ6tigGsRqSWDEEyH6lSAJ9OyFUsSnbaUWChuSGzs= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.3/go.mod h1:VZa9yTFyj4o10YGsmDO4gbQJUvvhY72fhumT8W4LqsE= github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= @@ -1524,8 +1524,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= diff --git a/sdks/go/container/tools/buffered_logging.go b/sdks/go/container/tools/buffered_logging.go index 445d19fabfdc..a7b84e56af3a 100644 --- a/sdks/go/container/tools/buffered_logging.go +++ b/sdks/go/container/tools/buffered_logging.go @@ -18,13 +18,15 @@ package tools import ( "context" "log" - "math" "os" "strings" "time" ) -const initialLogSize int = 255 +const ( + initialLogSize int = 255 + defaultFlushInterval time.Duration = 15 * time.Second +) // BufferedLogger is a wrapper around the FnAPI logging client meant to be used // in place of stdout and stderr in bootloader subprocesses. Not intended for @@ -41,7 +43,7 @@ type BufferedLogger struct { // NewBufferedLogger returns a new BufferedLogger type by reference. func NewBufferedLogger(logger *Logger) *BufferedLogger { - return &BufferedLogger{logger: logger, lastFlush: time.Now(), flushInterval: time.Duration(math.MaxInt64), periodicFlushContext: context.Background(), now: time.Now} + return &BufferedLogger{logger: logger, lastFlush: time.Now(), flushInterval: defaultFlushInterval, periodicFlushContext: context.Background(), now: time.Now} } // NewBufferedLoggerWithFlushInterval returns a new BufferedLogger type by reference. This type will diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e150c22de62d..07144c8de053 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -81,7 +81,6 @@ dependencies { shadow library.java.vendored_grpc_1_60_1 shadow library.java.vendored_guava_32_1_2_jre shadow library.java.byte_buddy - shadow library.java.antlr_runtime shadow library.java.commons_compress shadow library.java.commons_lang3 testImplementation library.java.mockito_inline diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PipelineOptionsTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PipelineOptionsTranslation.java index cd6ab7dd414a..de1717f0a45f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PipelineOptionsTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PipelineOptionsTranslation.java @@ -43,6 +43,9 @@ public class PipelineOptionsTranslation { new ObjectMapper() .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + public static final String PIPELINE_OPTIONS_URN_PREFIX = "beam:option:"; + public static final String PIPELINE_OPTIONS_URN_SUFFIX = ":v1"; + /** Converts the provided {@link PipelineOptions} to a {@link Struct}. */ public static Struct toProto(PipelineOptions options) { Struct.Builder builder = Struct.newBuilder(); @@ -65,9 +68,9 @@ public static Struct toProto(PipelineOptions options) { while (optionsEntries.hasNext()) { Map.Entry entry = optionsEntries.next(); optionsUsingUrns.put( - "beam:option:" + PIPELINE_OPTIONS_URN_PREFIX + CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, entry.getKey()) - + ":v1", + + PIPELINE_OPTIONS_URN_SUFFIX, entry.getValue()); } @@ -92,7 +95,9 @@ public static PipelineOptions fromProto(Struct protoOptions) { mapWithoutUrns.put( CaseFormat.LOWER_UNDERSCORE.to( CaseFormat.LOWER_CAMEL, - optionKey.substring("beam:option:".length(), optionKey.length() - ":v1".length())), + optionKey.substring( + PIPELINE_OPTIONS_URN_PREFIX.length(), + optionKey.length() - PIPELINE_OPTIONS_URN_SUFFIX.length())), optionValue); } return MAPPER.readValue( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java index 296a53f48e80..fd178f8e7649 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -226,5 +227,19 @@ public long getNum() { public String getStr() { return this.str; } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Pojo)) { + return false; + } + Pojo pojo = (Pojo) o; + return num == pojo.num && Objects.equals(str, pojo.str); + } + + @Override + public int hashCode() { + return Objects.hash(num, str); + } } } diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 150fe9729573..9c5b5a0ad136 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -60,7 +60,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PortablePipelineOptions; -import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; @@ -535,7 +534,7 @@ private static void invokeSetter(ConfigT config, @Nullable Object valu } private @MonotonicNonNull Map registeredTransforms; - private final PipelineOptions pipelineOptions; + private final PipelineOptions commandLineOptions; private final @Nullable String loopbackAddress; public ExpansionService() { @@ -551,7 +550,7 @@ public ExpansionService(PipelineOptions opts) { } public ExpansionService(PipelineOptions opts, @Nullable String loopbackAddress) { - this.pipelineOptions = opts; + this.commandLineOptions = opts; this.loopbackAddress = loopbackAddress; } @@ -587,12 +586,15 @@ private Map loadRegisteredTransforms() { request.getTransform().getSpec().getUrn()); LOG.debug("Full transform: {}", request.getTransform()); Set existingTransformIds = request.getComponents().getTransformsMap().keySet(); - Pipeline pipeline = - createPipeline(PipelineOptionsTranslation.fromProto(request.getPipelineOptions())); + + PipelineOptions pipelineOptionsFromRequest = + PipelineOptionsTranslation.fromProto(request.getPipelineOptions()); + Pipeline pipeline = createPipeline(pipelineOptionsFromRequest); + boolean isUseDeprecatedRead = - ExperimentalOptions.hasExperiment(pipelineOptions, "use_deprecated_read") + ExperimentalOptions.hasExperiment(commandLineOptions, "use_deprecated_read") || ExperimentalOptions.hasExperiment( - pipelineOptions, "beam_fn_api_use_deprecated_read"); + commandLineOptions, "beam_fn_api_use_deprecated_read"); if (!isUseDeprecatedRead) { ExperimentalOptions.addExperiment( pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api"); @@ -629,7 +631,7 @@ private Map loadRegisteredTransforms() { if (transformProvider == null) { if (getUrn(ExpansionMethods.Enum.JAVA_CLASS_LOOKUP).equals(urn)) { AllowList allowList = - pipelineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist(); + commandLineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist(); assert allowList != null; transformProvider = new JavaClassLookupTransformProvider(allowList); } else if (getUrn(SCHEMA_TRANSFORM).equals(urn)) { @@ -671,7 +673,7 @@ private Map loadRegisteredTransforms() { RunnerApi.Environment defaultEnvironment = Environments.createOrGetDefaultEnvironment( pipeline.getOptions().as(PortablePipelineOptions.class)); - if (pipelineOptions.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) { + if (commandLineOptions.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) { PortablePipelineOptions externalOptions = PipelineOptionsFactory.create().as(PortablePipelineOptions.class); externalOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_EXTERNAL); @@ -723,35 +725,34 @@ private Map loadRegisteredTransforms() { } protected Pipeline createPipeline(PipelineOptions requestOptions) { - // TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation - PipelineOptions effectiveOpts = PipelineOptionsFactory.create(); - PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class); - PortablePipelineOptions specifiedOptions = pipelineOptions.as(PortablePipelineOptions.class); - Optional.ofNullable(specifiedOptions.getDefaultEnvironmentType()) - .ifPresent(portableOptions::setDefaultEnvironmentType); - Optional.ofNullable(specifiedOptions.getDefaultEnvironmentConfig()) - .ifPresent(portableOptions::setDefaultEnvironmentConfig); - List filesToStage = specifiedOptions.getFilesToStage(); + // We expect the ExpansionRequest to contain a valid set of options to be used for this + // expansion. + // Additionally, we override selected options using options values set via command line or + // ExpansionService wide overrides. + + PortablePipelineOptions requestPortablePipelineOptions = + requestOptions.as(PortablePipelineOptions.class); + PortablePipelineOptions commandLinePortablePipelineOptions = + commandLineOptions.as(PortablePipelineOptions.class); + Optional.ofNullable(commandLinePortablePipelineOptions.getDefaultEnvironmentType()) + .ifPresent(requestPortablePipelineOptions::setDefaultEnvironmentType); + Optional.ofNullable(commandLinePortablePipelineOptions.getDefaultEnvironmentConfig()) + .ifPresent(requestPortablePipelineOptions::setDefaultEnvironmentConfig); + List filesToStage = commandLinePortablePipelineOptions.getFilesToStage(); if (filesToStage != null) { - effectiveOpts.as(PortablePipelineOptions.class).setFilesToStage(filesToStage); + requestPortablePipelineOptions + .as(PortablePipelineOptions.class) + .setFilesToStage(filesToStage); } - effectiveOpts + requestPortablePipelineOptions .as(ExperimentalOptions.class) - .setExperiments(pipelineOptions.as(ExperimentalOptions.class).getExperiments()); - effectiveOpts.setRunner(NotRunnableRunner.class); - effectiveOpts + .setExperiments(commandLineOptions.as(ExperimentalOptions.class).getExperiments()); + requestPortablePipelineOptions.setRunner(NotRunnableRunner.class); + requestPortablePipelineOptions .as(ExpansionServiceOptions.class) .setExpansionServiceConfig( - pipelineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig()); - // TODO(https://github.com/apache/beam/issues/20090): Figure out the correct subset of options - // to propagate. - if (requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion() != null) { - effectiveOpts - .as(StreamingOptions.class) - .setUpdateCompatibilityVersion( - requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion()); - } - return Pipeline.create(effectiveOpts); + commandLineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig()); + return Pipeline.create(requestOptions); } @Override diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java index 1c8d515d5c85..9ee0c2c1797b 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.expansion.service; +import static org.apache.beam.sdk.util.construction.PipelineOptionsTranslation.PIPELINE_OPTIONS_URN_PREFIX; +import static org.apache.beam.sdk.util.construction.PipelineOptionsTranslation.PIPELINE_OPTIONS_URN_SUFFIX; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; @@ -49,6 +51,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; @@ -58,15 +62,20 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Value; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Resources; import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matchers; +import org.junit.Assert; import org.junit.Test; /** Tests for {@link ExpansionService}. */ @@ -76,6 +85,7 @@ public class ExpansionServiceTest { private static final String TEST_URN = "test:beam:transforms:count"; + private static final String TEST_OPTIONS_URN = "test:beam:transforms:test_options"; private static final String TEST_NAME = "TestName"; @@ -98,9 +108,59 @@ public class ExpansionServiceTest { @AutoService(ExpansionService.ExpansionServiceRegistrar.class) public static class TestTransformRegistrar implements ExpansionService.ExpansionServiceRegistrar { + static final String EXPECTED_STRING_VALUE = "abcde"; + static final Boolean EXPECTED_BOOLEAN_VALUE = true; + static final Integer EXPECTED_INTEGER_VALUE = 12345; + @Override public Map knownTransforms() { - return ImmutableMap.of(TEST_URN, (spec, options) -> Count.perElement()); + return ImmutableMap.of( + TEST_URN, (spec, options) -> Count.perElement(), + TEST_OPTIONS_URN, + (spec, options) -> + new TestOptionsTransform( + EXPECTED_STRING_VALUE, EXPECTED_BOOLEAN_VALUE, EXPECTED_INTEGER_VALUE)); + } + } + + public interface TestOptions extends PipelineOptions { + String getStringOption(); + + void setStringOption(String value); + + Boolean getBooleanOption(); + + void setBooleanOption(Boolean value); + + Integer getIntegerOption(); + + void setIntegerOption(Integer value); + } + + public static class TestOptionsTransform + extends PTransform, PCollection> { + String expectedStringValue; + + Boolean expectedBooleanValue; + + Integer expectedIntegerValue; + + public TestOptionsTransform( + String expectedStringValue, Boolean expectedBooleanValue, Integer expectedIntegerValue) { + this.expectedStringValue = expectedStringValue; + this.expectedBooleanValue = expectedBooleanValue; + this.expectedIntegerValue = expectedIntegerValue; + } + + @Override + public PCollection expand(PCollection input) { + TestOptions testOption = input.getPipeline().getOptions().as(TestOptions.class); + + Assert.assertEquals(expectedStringValue, testOption.getStringOption()); + Assert.assertEquals(expectedBooleanValue, testOption.getBooleanOption()); + Assert.assertEquals(expectedIntegerValue, testOption.getIntegerOption()); + + return input; } } @@ -146,6 +206,58 @@ public void testConstruct() { } } + @Test + public void testConstructWithPipelineOptions() { + PipelineOptionsFactory.register(TestOptions.class); + Pipeline p = Pipeline.create(); + p.apply(Impulse.create()); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + String inputPcollId = + Iterables.getOnlyElement( + Iterables.getOnlyElement(pipelineProto.getComponents().getTransformsMap().values()) + .getOutputsMap() + .values()); + + Struct optionsStruct = + Struct.newBuilder() + .putFields( + PIPELINE_OPTIONS_URN_PREFIX + "string_option" + PIPELINE_OPTIONS_URN_SUFFIX, + Value.newBuilder() + .setStringValue(TestTransformRegistrar.EXPECTED_STRING_VALUE) + .build()) + .putFields( + PIPELINE_OPTIONS_URN_PREFIX + "boolean_option" + PIPELINE_OPTIONS_URN_SUFFIX, + Value.newBuilder() + .setBoolValue(TestTransformRegistrar.EXPECTED_BOOLEAN_VALUE) + .build()) + .putFields( + PIPELINE_OPTIONS_URN_PREFIX + "integer_option" + PIPELINE_OPTIONS_URN_SUFFIX, + Value.newBuilder() + .setNumberValue(TestTransformRegistrar.EXPECTED_INTEGER_VALUE) + .build()) + .build(); + ExpansionApi.ExpansionRequest request = + ExpansionApi.ExpansionRequest.newBuilder() + .setComponents(pipelineProto.getComponents()) + .setPipelineOptions(optionsStruct) + .setTransform( + RunnerApi.PTransform.newBuilder() + .setUniqueName(TEST_NAME) + .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(TEST_OPTIONS_URN)) + .putInputs("input", inputPcollId)) + .setNamespace(TEST_NAMESPACE) + .build(); + ExpansionApi.ExpansionResponse response = expansionService.expand(request); + RunnerApi.PTransform expandedTransform = response.getTransform(); + assertEquals(TEST_NAMESPACE + TEST_NAME, expandedTransform.getUniqueName()); + + // Verify it has the right input. + assertThat(expandedTransform.getInputsMap().values(), contains(inputPcollId)); + + // Verify it has the right output. + assertThat(expandedTransform.getOutputsMap().keySet(), contains("output")); + } + @Test public void testConstructGenerateSequenceWithRegistration() { ExternalTransforms.ExternalConfigurationPayload payload = diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java index 372e77c54c67..d0f6427a262e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; @@ -138,10 +137,6 @@ public synchronized void assign(final Collection assigned) { .collect(Collectors.toList()); super.assign(realPartitions); assignedPartitions.set(ImmutableList.copyOf(realPartitions)); - for (TopicPartition tp : realPartitions) { - updateBeginningOffsets(ImmutableMap.of(tp, 0L)); - updateEndOffsets(ImmutableMap.of(tp, (long) kafkaRecords.get(tp).size())); - } } // Override offsetsForTimes() in order to look up the offsets by timestamp. @Override @@ -163,9 +158,12 @@ public synchronized Map offsetsForTimes( } }; - for (String topic : getTopics()) { - consumer.updatePartitions(topic, partitionInfoMap.get(topic)); - } + partitionInfoMap.forEach(consumer::updatePartitions); + consumer.updateBeginningOffsets( + kafkaRecords.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 0L))); + consumer.updateEndOffsets( + kafkaRecords.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> (long) e.getValue().size()))); Runnable recordEnqueueTask = new Runnable() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index d9dde11a3081..a6cf7ebb12a5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -291,8 +291,8 @@ * grouped into batches. The default maximum size of the batch is set to 1MB or 5000 mutated cells, * or 500 rows (whichever is reached first). To override this use {@link * Write#withBatchSizeBytes(long) withBatchSizeBytes()}, {@link Write#withMaxNumMutations(long) - * withMaxNumMutations()} or {@link Write#withMaxNumMutations(long) withMaxNumRows()}. Setting - * either to a small value or zero disables batching. + * withMaxNumMutations()} or {@link Write#withMaxNumRows(long) withMaxNumRows()}. Setting either to + * a small value or zero disables batching. * *

Note that the maximum diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java index 5cd9cb47b696..76440b1ebf1a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +/** A provider for reading from Cloud Spanner using a Schema Transform Provider. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) @@ -54,43 +55,81 @@ * *

The transformation leverages the {@link SpannerIO} to perform the read operation and maps the * results to Beam rows, preserving the schema. - * - *

Example usage in a YAML pipeline using query: - * - *

{@code
- * pipeline:
- *   transforms:
- *     - type: ReadFromSpanner
- *       name: ReadShipments
- *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
- *       config:
- *         project_id: 'apache-beam-testing'
- *         instance_id: 'shipment-test'
- *         database_id: 'shipment'
- *         query: 'SELECT * FROM shipments'
- * }
- * - *

Example usage in a YAML pipeline using a table and columns: - * - *

{@code
- * pipeline:
- *   transforms:
- *     - type: ReadFromSpanner
- *       name: ReadShipments
- *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
- *       config:
- *         project_id: 'apache-beam-testing'
- *         instance_id: 'shipment-test'
- *         database_id: 'shipment'
- *         table: 'shipments'
- *         columns: ['customer_id', 'customer_name']
- * }
*/ @AutoService(SchemaTransformProvider.class) public class SpannerReadSchemaTransformProvider extends TypedSchemaTransformProvider< SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> { + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:spanner_read:v1"; + } + + @Override + public String description() { + return "Performs a Bulk read from Google Cloud Spanner using a specified SQL query or " + + "by directly accessing a single table and its columns.\n" + + "\n" + + "Both Query and Read APIs are supported. See more information about " + + "
reading from Cloud Spanner.\n" + + "\n" + + "Example configuration for performing a read using a SQL query: ::\n" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " query: 'SELECT * FROM table'\n" + + "\n" + + "It is also possible to read a table by specifying a table name and a list of columns. For " + + "example, the following configuration will perform a read on an entire table: ::\n" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + " columns: ['col1', 'col2']\n" + + "\n" + + "Additionally, to read using a " + + "Secondary Index, specify the index name: ::" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + " index: 'my-index'\n" + + " columns: ['col1', 'col2']\n" + + "\n" + + "### Advanced Usage\n" + + "\n" + + "Reads by default use the " + + "PartitionQuery API which enforces some limitations on the type of queries that can be used so that " + + "the data can be read in parallel. If the query is not supported by the PartitionQuery API, then you " + + "can specify a non-partitioned read by setting batching to false.\n" + + "\n" + + "For example: ::" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " batching: false\n" + + " ...\n" + + "\n" + + "Note: See " + + "SpannerIO for more advanced information."; + } + static class SpannerSchemaTransformRead extends SchemaTransform implements Serializable { private final SpannerReadSchemaTransformConfiguration configuration; @@ -113,6 +152,12 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } else { read = read.withTable(configuration.getTableId()).withColumns(configuration.getColumns()); } + if (!Strings.isNullOrEmpty(configuration.getIndex())) { + read = read.withIndex(configuration.getIndex()); + } + if (Boolean.FALSE.equals(configuration.getBatching())) { + read = read.withBatching(false); + } PCollection spannerRows = input.getPipeline().apply(read); Schema schema = spannerRows.getSchema(); PCollection rows = @@ -124,11 +169,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - @Override - public String identifier() { - return "beam:schematransform:org.apache.beam:spanner_read:v1"; - } - @Override public List inputCollectionNames() { return Collections.emptyList(); @@ -157,6 +197,10 @@ public abstract static class Builder { public abstract Builder setColumns(List columns); + public abstract Builder setIndex(String index); + + public abstract Builder setBatching(Boolean batching); + public abstract SpannerReadSchemaTransformConfiguration build(); } @@ -193,16 +237,16 @@ public static Builder builder() { .Builder(); } - @SchemaFieldDescription("Specifies the GCP project ID.") - @Nullable - public abstract String getProjectId(); - @SchemaFieldDescription("Specifies the Cloud Spanner instance.") public abstract String getInstanceId(); @SchemaFieldDescription("Specifies the Cloud Spanner database.") public abstract String getDatabaseId(); + @SchemaFieldDescription("Specifies the GCP project ID.") + @Nullable + public abstract String getProjectId(); + @SchemaFieldDescription("Specifies the Cloud Spanner table.") @Nullable public abstract String getTableId(); @@ -211,9 +255,20 @@ public static Builder builder() { @Nullable public abstract String getQuery(); - @SchemaFieldDescription("Specifies the columns to read from the table.") + @SchemaFieldDescription( + "Specifies the columns to read from the table. This parameter is required when table is specified.") @Nullable public abstract List getColumns(); + + @SchemaFieldDescription( + "Specifies the Index to read from. This parameter can only be specified when using table.") + @Nullable + public abstract String getIndex(); + + @SchemaFieldDescription( + "Set to false to disable batching. Useful when using a query that is not compatible with the PartitionQuery API. Defaults to true.") + @Nullable + public abstract Boolean getBatching(); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index 9f079c78f886..8601da09ea09 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -67,49 +67,37 @@ *

The transformation uses the {@link SpannerIO} to perform the write operation and provides * options to handle failed mutations, either by throwing an error, or passing the failed mutation * further in the pipeline for dealing with accordingly. - * - *

Example usage in a YAML pipeline without error handling: - * - *

{@code
- * pipeline:
- *   transforms:
- *     - type: WriteToSpanner
- *       name: WriteShipments
- *       config:
- *         project_id: 'apache-beam-testing'
- *         instance_id: 'shipment-test'
- *         database_id: 'shipment'
- *         table_id: 'shipments'
- *
- * }
- * - *

Example usage in a YAML pipeline using error handling: - * - *

{@code
- * pipeline:
- *   transforms:
- *     - type: WriteToSpanner
- *       name: WriteShipments
- *       config:
- *         project_id: 'apache-beam-testing'
- *         instance_id: 'shipment-test'
- *         database_id: 'shipment'
- *         table_id: 'shipments'
- *         error_handling:
- *           output: 'errors'
- *
- *     - type: WriteToJson
- *       input: WriteSpanner.my_error_output
- *       config:
- *          path: errors.json
- *
- * }
*/ @AutoService(SchemaTransformProvider.class) public class SpannerWriteSchemaTransformProvider extends TypedSchemaTransformProvider< SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> { + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:spanner_write:v1"; + } + + @Override + public String description() { + return "Performs a bulk write to a Google Cloud Spanner table.\n" + + "\n" + + "Example configuration for performing a write to a single table: ::\n" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " project_id: 'my-project-id'\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + "\n" + + "Note: See " + + "SpannerIO for more advanced information."; + } + @Override protected Class configurationClass() { return SpannerWriteSchemaTransformConfiguration.class; @@ -225,11 +213,6 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { } } - @Override - public String identifier() { - return "beam:schematransform:org.apache.beam:spanner_write:v1"; - } - @Override public List inputCollectionNames() { return Collections.singletonList("input"); @@ -244,10 +227,6 @@ public List outputCollectionNames() { @DefaultSchema(AutoValueSchema.class) public abstract static class SpannerWriteSchemaTransformConfiguration implements Serializable { - @SchemaFieldDescription("Specifies the GCP project.") - @Nullable - public abstract String getProjectId(); - @SchemaFieldDescription("Specifies the Cloud Spanner instance.") public abstract String getInstanceId(); @@ -257,7 +236,11 @@ public abstract static class SpannerWriteSchemaTransformConfiguration implements @SchemaFieldDescription("Specifies the Cloud Spanner table.") public abstract String getTableId(); - @SchemaFieldDescription("Specifies how to handle errors.") + @SchemaFieldDescription("Specifies the GCP project.") + @Nullable + public abstract String getProjectId(); + + @SchemaFieldDescription("Whether and how to handle write errors.") @Nullable public abstract ErrorHandling getErrorHandling(); diff --git a/sdks/java/io/hadoop-common/build.gradle b/sdks/java/io/hadoop-common/build.gradle index 466aa8fb6730..b0303d29ff98 100644 --- a/sdks/java/io/hadoop-common/build.gradle +++ b/sdks/java/io/hadoop-common/build.gradle @@ -25,10 +25,10 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop Common" ext.summary = "Library to add shared Hadoop classes among Beam IOs." def hadoopVersions = [ - "285": "2.8.5", - "292": "2.9.2", "2102": "2.10.2", "324": "3.2.4", + "336": "3.3.6", + "341": "3.4.1", ] hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} diff --git a/sdks/java/io/hadoop-file-system/build.gradle b/sdks/java/io/hadoop-file-system/build.gradle index 3fc872bb5d02..fafa8b5c7e34 100644 --- a/sdks/java/io/hadoop-file-system/build.gradle +++ b/sdks/java/io/hadoop-file-system/build.gradle @@ -26,10 +26,10 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop File System" ext.summary = "Library to read and write Hadoop/HDFS file formats from Beam." def hadoopVersions = [ - "285": "2.8.5", - "292": "2.9.2", "2102": "2.10.2", "324": "3.2.4", + "336": "3.3.6", + "341": "3.4.1", ] hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} diff --git a/sdks/java/io/hadoop-format/build.gradle b/sdks/java/io/hadoop-format/build.gradle index dbb9f8fdd73d..4664005a1fc8 100644 --- a/sdks/java/io/hadoop-format/build.gradle +++ b/sdks/java/io/hadoop-format/build.gradle @@ -30,10 +30,10 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop Format" ext.summary = "IO to read data from sources and to write data to sinks that implement Hadoop MapReduce Format." def hadoopVersions = [ - "285": "2.8.5", - "292": "2.9.2", "2102": "2.10.2", "324": "3.2.4", + "336": "3.3.6", + "341": "3.4.1", ] hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} diff --git a/sdks/java/io/hcatalog/build.gradle b/sdks/java/io/hcatalog/build.gradle index c4f1b76ec390..364c10fa738b 100644 --- a/sdks/java/io/hcatalog/build.gradle +++ b/sdks/java/io/hcatalog/build.gradle @@ -30,9 +30,10 @@ description = "Apache Beam :: SDKs :: Java :: IO :: HCatalog" ext.summary = "IO to read and write for HCatalog source." def hadoopVersions = [ - "285": "2.8.5", - "292": "2.9.2", "2102": "2.10.2", + "324": "3.2.4", + "336": "3.3.6", + "341": "3.4.1", ] hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index e10c6f38e20f..6754b0aecf50 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -29,10 +29,10 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" ext.summary = "Integration with Iceberg data warehouses." def hadoopVersions = [ - "285": "2.8.5", - "292": "2.9.2", - "2102": "2.10.2", - "324": "3.2.4", + "2102": "2.10.2", + "324": "3.2.4", + "336": "3.3.6", + "341": "3.4.1", ] hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 9a3262e19845..7941c13b0dfe 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -136,4 +136,8 @@ public long bytesWritten() { public DataFile getDataFile() { return icebergDataWriter.toDataFile(); } + + public String path() { + return absoluteFilename; + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 12c425993826..255fce9ece4e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -90,6 +90,7 @@ class DestinationState { final Cache writers; private final List dataFiles = Lists.newArrayList(); @VisibleForTesting final Map writerCounts = Maps.newHashMap(); + private final List exceptions = Lists.newArrayList(); DestinationState(IcebergDestination icebergDestination, Table table) { this.icebergDestination = icebergDestination; @@ -112,11 +113,14 @@ class DestinationState { try { recordWriter.close(); } catch (IOException e) { - throw new RuntimeException( - String.format( - "Encountered an error when closing data writer for table '%s', partition %s", - icebergDestination.getTableIdentifier(), pk), - e); + RuntimeException rethrow = + new RuntimeException( + String.format( + "Encountered an error when closing data writer for table '%s', path: %s", + icebergDestination.getTableIdentifier(), recordWriter.path()), + e); + exceptions.add(rethrow); + throw rethrow; } openWriters--; dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk)); @@ -282,6 +286,17 @@ public void close() throws IOException { // removing writers from the state's cache will trigger the logic to collect each writer's // data file. state.writers.invalidateAll(); + // first check for any exceptions swallowed by the cache + if (!state.exceptions.isEmpty()) { + IllegalStateException exception = + new IllegalStateException( + String.format("Encountered %s failed writer(s).", state.exceptions.size())); + for (Exception e : state.exceptions) { + exception.addSuppressed(e); + } + throw exception; + } + if (state.dataFiles.isEmpty()) { continue; } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 8ced06bc944f..2bce390e0992 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -42,6 +42,7 @@ import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.checkerframework.checker.nullness.qual.Nullable; @@ -49,6 +50,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.junit.runner.RunWith; @@ -412,4 +414,62 @@ public void testWriterKeepsUpWithUpdatingPartitionSpec() throws IOException { dataFile.path().toString(), either(containsString("id=1")).or(containsString("id=2"))); } } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testWriterExceptionGetsCaught() throws IOException { + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 2); + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "abcdef", true).build(); + PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA); + partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + + writerManager.write(windowedDestination, row); + + RecordWriterManager.DestinationState state = + writerManager.destinations.get(windowedDestination); + // replace with a failing record writer + FailingRecordWriter failingWriter = + new FailingRecordWriter( + catalog, windowedDestination.getValue(), "test_failing_writer", partitionKey); + state.writers.put(partitionKey, failingWriter); + writerManager.write(windowedDestination, row); + + // this tests that we indeed enter the catch block + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Encountered 1 failed writer(s)"); + try { + writerManager.close(); + } catch (IllegalStateException e) { + // fetch underlying exceptions and validate + Throwable[] underlyingExceptions = e.getSuppressed(); + assertEquals(1, underlyingExceptions.length); + for (Throwable t : underlyingExceptions) { + assertThat( + t.getMessage(), + containsString("Encountered an error when closing data writer for table")); + assertThat( + t.getMessage(), + containsString(windowedDestination.getValue().getTableIdentifier().toString())); + assertThat(t.getMessage(), containsString(failingWriter.path())); + Throwable realCause = t.getCause(); + assertEquals("I am failing!", realCause.getMessage()); + } + + throw e; + } + } + + static class FailingRecordWriter extends RecordWriter { + FailingRecordWriter( + Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey) + throws IOException { + super(catalog, destination, filename, partitionKey); + } + + @Override + public void close() throws IOException { + throw new IOException("I am failing!"); + } + } } diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index ec4654bd88df..c2f056b0b7cb 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -35,9 +35,6 @@ ext { } def kafkaVersions = [ - '01103': "0.11.0.3", - '100': "1.0.0", - '111': "1.1.1", '201': "2.0.1", '211': "2.1.1", '222': "2.2.2", @@ -139,15 +136,13 @@ task kafkaVersionsCompatibilityTest { description = 'Runs KafkaIO with different Kafka client APIs' def testNames = createTestList(kafkaVersions, "Test") dependsOn testNames - dependsOn (":sdks:java:io:kafka:kafka-01103:kafkaVersion01103BatchIT") - dependsOn (":sdks:java:io:kafka:kafka-100:kafkaVersion100BatchIT") - dependsOn (":sdks:java:io:kafka:kafka-111:kafkaVersion111BatchIT") dependsOn (":sdks:java:io:kafka:kafka-201:kafkaVersion201BatchIT") dependsOn (":sdks:java:io:kafka:kafka-211:kafkaVersion211BatchIT") dependsOn (":sdks:java:io:kafka:kafka-222:kafkaVersion222BatchIT") dependsOn (":sdks:java:io:kafka:kafka-231:kafkaVersion231BatchIT") dependsOn (":sdks:java:io:kafka:kafka-241:kafkaVersion241BatchIT") dependsOn (":sdks:java:io:kafka:kafka-251:kafkaVersion251BatchIT") + dependsOn (":sdks:java:io:kafka:kafka-312:kafkaVersion312BatchIT") } static def createTestList(Map prefixMap, String suffix) { diff --git a/sdks/java/io/kafka/kafka-01103/build.gradle b/sdks/java/io/kafka/kafka-01103/build.gradle deleted file mode 100644 index 3a74bf04ef22..000000000000 --- a/sdks/java/io/kafka/kafka-01103/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="0.11.0.3" - undelimited="01103" - sdfCompatible=false -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-100/build.gradle b/sdks/java/io/kafka/kafka-100/build.gradle deleted file mode 100644 index bd5fa67b1cfc..000000000000 --- a/sdks/java/io/kafka/kafka-100/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="1.0.0" - undelimited="100" - sdfCompatible=false -} - -apply from: "../kafka-integration-test.gradle" diff --git a/sdks/java/io/kafka/kafka-111/build.gradle b/sdks/java/io/kafka/kafka-111/build.gradle deleted file mode 100644 index c2b0c8f82827..000000000000 --- a/sdks/java/io/kafka/kafka-111/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="1.1.1" - undelimited="111" - sdfCompatible=false -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 6ce6c7d5d233..d86a5d0ce686 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -138,7 +138,6 @@ public boolean start() throws IOException { name, spec.getOffsetConsumerConfig(), spec.getConsumerConfig()); offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig); - ConsumerSpEL.evaluateAssign(offsetConsumer, topicPartitions); // Fetch offsets once before running periodically. updateLatestOffsets(); @@ -711,23 +710,28 @@ private void setupInitialOffset(PartitionState pState) { // Called from setupInitialOffset() at the start and then periodically from offsetFetcher thread. private void updateLatestOffsets() { Consumer offsetConsumer = Preconditions.checkStateNotNull(this.offsetConsumer); - for (PartitionState p : partitionStates) { - try { - Instant fetchTime = Instant.now(); - ConsumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition); - long offset = offsetConsumer.position(p.topicPartition); - p.setLatestOffset(offset, fetchTime); - } catch (Exception e) { - if (closed.get()) { // Ignore the exception if the reader is closed. - break; - } + List topicPartitions = + Preconditions.checkStateNotNull(source.getSpec().getTopicPartitions()); + Instant fetchTime = Instant.now(); + try { + Map endOffsets = offsetConsumer.endOffsets(topicPartitions); + for (PartitionState p : partitionStates) { + p.setLatestOffset( + Preconditions.checkStateNotNull( + endOffsets.get(p.topicPartition), + "No end offset found for partition %s.", + p.topicPartition), + fetchTime); + } + } catch (Exception e) { + if (!closed.get()) { // Ignore the exception if the reader is closed. LOG.warn( - "{}: exception while fetching latest offset for partition {}. will be retried.", + "{}: exception while fetching latest offset for partitions {}. will be retried.", this, - p.topicPartition, + topicPartitions, e); - // Don't update the latest offset. } + // Don't update the latest offset. } LOG.debug("{}: backlog {}", this, getSplitBacklogBytes()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 4bda8cf28d4e..7c2064883488 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -247,19 +248,21 @@ private static class KafkaLatestOffsetEstimator private final Consumer offsetConsumer; private final TopicPartition topicPartition; private final Supplier memoizedBacklog; - private boolean closed; KafkaLatestOffsetEstimator( Consumer offsetConsumer, TopicPartition topicPartition) { this.offsetConsumer = offsetConsumer; this.topicPartition = topicPartition; - ConsumerSpEL.evaluateAssign(this.offsetConsumer, ImmutableList.of(this.topicPartition)); memoizedBacklog = Suppliers.memoizeWithExpiration( () -> { synchronized (offsetConsumer) { - ConsumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition); - return offsetConsumer.position(topicPartition); + return Preconditions.checkStateNotNull( + offsetConsumer + .endOffsets(Collections.singleton(topicPartition)) + .get(topicPartition), + "No end offset found for partition %s.", + topicPartition); } }, 1, @@ -270,7 +273,6 @@ private static class KafkaLatestOffsetEstimator protected void finalize() { try { Closeables.close(offsetConsumer, true); - closed = true; LOG.info("Offset Estimator consumer was closed for {}", topicPartition); } catch (Exception anyException) { LOG.warn("Failed to close offset consumer for {}", topicPartition); @@ -281,10 +283,6 @@ protected void finalize() { public long estimate() { return memoizedBacklog.get(); } - - public boolean isClosed() { - return closed; - } } @GetInitialRestriction @@ -373,7 +371,7 @@ public OffsetRangeTracker restrictionTracker( TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition); - if (offsetEstimator == null || offsetEstimator.isClosed()) { + if (offsetEstimator == null) { Map updatedConsumerConfig = overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 764e406f71cb..e614320db150 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -77,7 +77,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaIO.Read.FakeFlinkPipelineOptions; -import org.apache.beam.sdk.io.kafka.KafkaMocks.PositionErrorConsumerFactory; +import org.apache.beam.sdk.io.kafka.KafkaMocks.EndOffsetErrorConsumerFactory; import org.apache.beam.sdk.io.kafka.KafkaMocks.SendErrorProducerFactory; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.Lineage; @@ -267,10 +267,6 @@ private static MockConsumer mkMockConsumer( public synchronized void assign(final Collection assigned) { super.assign(assigned); assignedPartitions.set(ImmutableList.copyOf(assigned)); - for (TopicPartition tp : assigned) { - updateBeginningOffsets(ImmutableMap.of(tp, 0L)); - updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size())); - } } // Override offsetsForTimes() in order to look up the offsets by timestamp. @Override @@ -290,9 +286,12 @@ public synchronized Map offsetsForTimes( } }; - for (String topic : topics) { - consumer.updatePartitions(topic, partitionMap.get(topic)); - } + partitionMap.forEach(consumer::updatePartitions); + consumer.updateBeginningOffsets( + records.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 0L))); + consumer.updateEndOffsets( + records.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> (long) e.getValue().size()))); // MockConsumer does not maintain any relationship between partition seek position and the // records added. e.g. if we add 10 records to a partition and then seek to end of the @@ -1525,13 +1524,14 @@ public void testUnboundedReaderLogsCommitFailure() throws Exception { List topics = ImmutableList.of("topic_a"); - PositionErrorConsumerFactory positionErrorConsumerFactory = new PositionErrorConsumerFactory(); + EndOffsetErrorConsumerFactory endOffsetErrorConsumerFactory = + new EndOffsetErrorConsumerFactory(); UnboundedSource, KafkaCheckpointMark> source = KafkaIO.read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) - .withConsumerFactoryFn(positionErrorConsumerFactory) + .withConsumerFactoryFn(endOffsetErrorConsumerFactory) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .makeSource(); @@ -1540,7 +1540,7 @@ public void testUnboundedReaderLogsCommitFailure() throws Exception { reader.start(); - unboundedReaderExpectedLogs.verifyWarn("exception while fetching latest offset for partition"); + unboundedReaderExpectedLogs.verifyWarn("exception while fetching latest offset for partitions"); reader.close(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMocks.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMocks.java index 0844d71e7105..1303f1da3bcd 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMocks.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMocks.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kafka; import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -27,8 +28,8 @@ import org.apache.beam.sdk.values.KV; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; @@ -66,51 +67,33 @@ public Producer apply(Map input) { } } - public static final class PositionErrorConsumer extends MockConsumer { - - public PositionErrorConsumer() { - super(null); - } - - @Override - public synchronized long position(TopicPartition partition) { - throw new KafkaException("fakeException"); - } - - @Override - public synchronized List partitionsFor(String topic) { - return Collections.singletonList( - new PartitionInfo("topic_a", 1, new Node(1, "myServer1", 9092), null, null)); - } - } - - public static final class PositionErrorConsumerFactory + public static final class EndOffsetErrorConsumerFactory implements SerializableFunction, Consumer> { - public PositionErrorConsumerFactory() {} + public EndOffsetErrorConsumerFactory() {} @Override public MockConsumer apply(Map input) { + final MockConsumer consumer; if (input.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - return new PositionErrorConsumer(); - } else { - MockConsumer consumer = - new MockConsumer(null) { + consumer = + new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public synchronized long position(TopicPartition partition) { - return 1L; - } - - @Override - public synchronized ConsumerRecords poll(long timeout) { - return ConsumerRecords.empty(); + public synchronized Map endOffsets( + Collection partitions) { + throw new KafkaException("fakeException"); } }; - consumer.updatePartitions( - "topic_a", - Collections.singletonList( - new PartitionInfo("topic_a", 1, new Node(1, "myServer1", 9092), null, null))); - return consumer; + } else { + consumer = new MockConsumer(OffsetResetStrategy.EARLIEST); } + consumer.updatePartitions( + "topic_a", + Collections.singletonList( + new PartitionInfo("topic_a", 1, new Node(1, "myServer1", 9092), null, null))); + consumer.updateBeginningOffsets( + Collections.singletonMap(new TopicPartition("topic_a", 1), 0L)); + consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("topic_a", 1), 0L)); + return consumer; } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 3189bbb140f0..52c141685760 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -205,6 +205,8 @@ public SimpleMockKafkaConsumer( OffsetResetStrategy offsetResetStrategy, TopicPartition topicPartition) { super(offsetResetStrategy); this.topicPartition = topicPartition; + updateBeginningOffsets(ImmutableMap.of(topicPartition, 0L)); + updateEndOffsets(ImmutableMap.of(topicPartition, Long.MAX_VALUE)); } public void reset() { @@ -214,6 +216,8 @@ public void reset() { this.startOffsetForTime = KV.of(0L, Instant.now()); this.stopOffsetForTime = KV.of(Long.MAX_VALUE, null); this.numOfRecordsPerPoll = 0L; + updateBeginningOffsets(ImmutableMap.of(topicPartition, 0L)); + updateEndOffsets(ImmutableMap.of(topicPartition, Long.MAX_VALUE)); } public void setRemoved() { diff --git a/sdks/java/io/parquet/build.gradle b/sdks/java/io/parquet/build.gradle index e8f1603f0b58..d5f22b31cc56 100644 --- a/sdks/java/io/parquet/build.gradle +++ b/sdks/java/io/parquet/build.gradle @@ -27,10 +27,10 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Parquet" ext.summary = "IO to read and write on Parquet storage format." def hadoopVersions = [ - "285": "2.8.5", - "292": "2.9.2", "2102": "2.10.2", "324": "3.2.4", + "336": "3.3.6", + "341": "3.4.1", ] hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference.py b/sdks/python/apache_beam/ml/inference/vllm_inference.py index b86d33ec16b1..799083d16ceb 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_inference.py +++ b/sdks/python/apache_beam/ml/inference/vllm_inference.py @@ -21,6 +21,7 @@ import logging import os import subprocess +import sys import threading import time import uuid @@ -118,7 +119,7 @@ def __init__(self, model_name: str, vllm_server_kwargs: Dict[str, str]): def start_server(self, retries=3): if not self._server_started: server_cmd = [ - 'python', + sys.executable, '-m', 'vllm.entrypoints.openai.api_server', '--model', diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index 84a5e62f0abd..4088e17afe2c 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -20,13 +20,17 @@ import itertools import re +import docstring_parser import yaml from apache_beam.portability.api import schema_pb2 +from apache_beam.typehints import schemas from apache_beam.utils import subprocess_server +from apache_beam.utils.python_callable import PythonCallableWithSource from apache_beam.version import __version__ as beam_version from apache_beam.yaml import json_utils from apache_beam.yaml import yaml_provider +from apache_beam.yaml.yaml_mapping import ErrorHandlingConfig def _singular(name): @@ -135,8 +139,28 @@ def maybe_row_parameters(t): def maybe_optional(t): return " (Optional)" if t.nullable else "" + def normalize_error_handling(f): + doc = docstring_parser.parse( + ErrorHandlingConfig.__doc__, docstring_parser.DocstringStyle.GOOGLE) + if f.name == "error_handling": + f = schema_pb2.Field( + name="error_handling", + type=schema_pb2.FieldType( + row_type=schema_pb2.RowType( + schema=schema_pb2.Schema( + fields=[ + schemas.schema_field( + param.arg_name, + PythonCallableWithSource.load_from_expression( + param.type_name), + param.description) for param in doc.params + ]))), + description=f.description) + return f + def lines(): for f in schema.fields: + f = normalize_error_handling(f) yield ''.join([ f'**{f.name}** `{pretty_type(f.type)}`', maybe_optional(f.type), diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 4de36b3dc9e0..400ab07a41fa 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -271,6 +271,8 @@ table: 'table_id' query: 'query' columns: 'columns' + index: 'index' + batching: 'batching' 'WriteToSpanner': project: 'project_id' instance: 'instance_id' diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 960fcdeecf30..5c14b0f5ea79 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -418,6 +418,11 @@ def checking_func(row): class ErrorHandlingConfig(NamedTuple): + """Class to define Error Handling parameters. + + Args: + output (str): Name to use for the output error collection + """ output: str # TODO: Other parameters are valid here too, but not common to Java. diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile index 7bea6229668f..f3d22a4b5bc6 100644 --- a/sdks/python/container/Dockerfile +++ b/sdks/python/container/Dockerfile @@ -103,9 +103,33 @@ RUN if [ "$pull_licenses" = "true" ] ; then \ python /tmp/license_scripts/pull_licenses_py.py ; \ fi -FROM beam +FROM beam as base ARG pull_licenses COPY --from=third_party_licenses /opt/apache/beam/third_party_licenses /opt/apache/beam/third_party_licenses RUN if [ "$pull_licenses" != "true" ] ; then \ rm -rf /opt/apache/beam/third_party_licenses ; \ fi + +ARG TARGETARCH +FROM gcr.io/distroless/python3-debian12:latest-${TARGETARCH} as distroless +ARG py_version + +# Contains header files needed by the Python interpreter. +COPY --from=base /usr/local/include /usr/local/include + +# Contains the Python interpreter executables. +COPY --from=base /usr/local/bin /usr/local/bin + +# Contains the Python library dependencies. +COPY --from=base /usr/local/lib /usr/local/lib + +# Python standard library modules. +COPY --from=base /usr/lib/python${py_version} /usr/lib/python${py_version} + +# Contains the boot entrypoint and related files such as licenses. +COPY --from=base /opt /opt + +ENV PATH "$PATH:/usr/local/bin" + +# Despite the ENTRYPOINT set above, need to reset since deriving the layer derives from a different image. +ENTRYPOINT ["/opt/apache/beam/boot"] diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index 0175778a6301..885662362894 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -71,10 +71,16 @@ def copyLauncherDependencies = tasks.register("copyLauncherDependencies", Copy) } def pushContainers = project.rootProject.hasProperty(["isRelease"]) || project.rootProject.hasProperty("push-containers") +def baseBuildTarget = 'base' +def buildTarget = project.findProperty('container-build-target') ?: 'base' +var imageName = project.docker_image_default_repo_prefix + "python${project.ext.pythonVersion}_sdk" +if (buildTarget != baseBuildTarget) { + imageName += "_${buildTarget}" +} docker { name containerImageName( - name: project.docker_image_default_repo_prefix + "python${project.ext.pythonVersion}_sdk", + name: imageName, root: project.rootProject.hasProperty(["docker-repository-root"]) ? project.rootProject["docker-repository-root"] : project.docker_image_default_repo_root, @@ -90,6 +96,7 @@ docker { platform(*project.containerPlatforms()) load project.useBuildx() && !pushContainers push pushContainers + target buildTarget } dockerPrepare.dependsOn copyLauncherDependencies diff --git a/sdks/python/test-suites/dataflow/build.gradle b/sdks/python/test-suites/dataflow/build.gradle index 04a79683fd36..4500b395b0a6 100644 --- a/sdks/python/test-suites/dataflow/build.gradle +++ b/sdks/python/test-suites/dataflow/build.gradle @@ -60,6 +60,12 @@ task validatesContainerTests { } } +task validatesDistrolessContainerTests { + getVersionsAsList('distroless_python_versions').each { + dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:validatesDistrolessContainer") + } +} + task examplesPostCommit { getVersionsAsList('dataflow_examples_postcommit_py_versions').each { dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:examples") diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 71d44652bc7e..cd0db4a62f77 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -380,6 +380,51 @@ task validatesContainer() { } } +/** + * Validates the distroless (https://github.com/GoogleContainerTools/distroless) variant of the Python SDK container + * image (sdks/python/container/Dockerfile). + * To test a single version of Python: + * ./gradlew :sdks:python:test-suites:dataflow:py311:validatesDistrolessContainer + * See https://cwiki.apache.org/confluence/display/BEAM/Python+Tips#PythonTips-VirtualEnvironmentSetup + * for more information on setting up different Python versions. + */ +task validatesDistrolessContainer() { + def pyversion = "${project.ext.pythonVersion.replace('.', '')}" + def buildTarget = 'distroless' + def repository = "us.gcr.io/apache-beam-testing/${System.getenv('USER')}" + def tag = java.time.Instant.now().getEpochSecond() + def imageURL = "${repository}/beam_python${project.ext.pythonVersion}_sdk_${buildTarget}:${tag}" + project.rootProject.ext['docker-repository-root'] = repository + project.rootProject.ext['container-build-target'] = buildTarget + project.rootProject.ext['docker-tag'] = tag + if (project.rootProject.hasProperty('dry-run')) { + println "Running in dry run mode: imageURL: ${imageURL}, pyversion: ${pyversion}, buildTarget: ${buildTarget}, repository: ${repository}, tag: ${tag}, envdir: ${envdir}" + return + } + dependsOn 'initializeForDataflowJob' + dependsOn ":sdks:python:container:py${pyversion}:docker" + dependsOn ":sdks:python:container:py${pyversion}:dockerPush" + def testTarget = "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it" + def argMap = [ + "output": "gs://temp-storage-for-end-to-end-tests/py-it-cloud/output", + "project": "apache-beam-testing", + "region": "us-central1", + "runner": "TestDataflowRunner", + "sdk_container_image": "${imageURL}", + "sdk_location": "container", + "staging_location": "gs://temp-storage-for-end-to-end-tests/staging-it", + "temp_location": "gs://temp-storage-for-end-to-end-tests/temp-it", + ] + def cmdArgs = mapToArgString(argMap) + doLast { + exec { + workingDir = "${rootDir}/sdks/python" + executable 'sh' + args '-c', ". ${envdir}/bin/activate && pytest ${testTarget} --test-pipeline-options=\"${cmdArgs}\"" + } + } +} + task validatesContainerARM() { def pyversion = "${project.ext.pythonVersion.replace('.', '')}" dependsOn 'initializeForDataflowJob' diff --git a/sdks/python/test-suites/gradle.properties b/sdks/python/test-suites/gradle.properties index d027cd3144d3..08266c4b0dd5 100644 --- a/sdks/python/test-suites/gradle.properties +++ b/sdks/python/test-suites/gradle.properties @@ -54,3 +54,6 @@ prism_examples_postcommit_py_versions=3.9,3.12 # cross language postcommit python test suites cross_language_validates_py_versions=3.9,3.12 + +# Python versions to support distroless variants +distroless_python_versions=3.9,3.10,3.11,3.12 diff --git a/settings.gradle.kts b/settings.gradle.kts index a38f69dac09e..ca30a5ea750a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -333,6 +333,8 @@ project(":beam-test-gha").projectDir = file(".github") include("beam-validate-runner") project(":beam-validate-runner").projectDir = file(".test-infra/validate-runner") include("com.google.api.gax.batching") +include("sdks:java:io:kafka:kafka-312") +findProject(":sdks:java:io:kafka:kafka-312")?.name = "kafka-312" include("sdks:java:io:kafka:kafka-251") findProject(":sdks:java:io:kafka:kafka-251")?.name = "kafka-251" include("sdks:java:io:kafka:kafka-241") @@ -345,12 +347,6 @@ include("sdks:java:io:kafka:kafka-211") findProject(":sdks:java:io:kafka:kafka-211")?.name = "kafka-211" include("sdks:java:io:kafka:kafka-201") findProject(":sdks:java:io:kafka:kafka-201")?.name = "kafka-201" -include("sdks:java:io:kafka:kafka-111") -findProject(":sdks:java:io:kafka:kafka-111")?.name = "kafka-111" -include("sdks:java:io:kafka:kafka-100") -findProject(":sdks:java:io:kafka:kafka-100")?.name = "kafka-100" -include("sdks:java:io:kafka:kafka-01103") -findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" include("sdks:java:managed") findProject(":sdks:java:managed")?.name = "managed" include("sdks:java:io:iceberg")