Skip to content

Commit

Permalink
Merge branch 'master' into storapi_schemaupdate_dynamicdest
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 authored Dec 19, 2024
2 parents 5159a61 + fe6b7aa commit 21e0585
Show file tree
Hide file tree
Showing 42 changed files with 1,025 additions and 206 deletions.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ github:

protected_branches:
master: {}
release-2.62.0: {}
release-2.61.0: {}
release-2.60.0: {}
release-2.59.0: {}
Expand Down
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 5
"modification": 6
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test",
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test"
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test",
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test"
}
5 changes: 3 additions & 2 deletions .github/workflows/beam_Python_CostBenchmarks_Dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:
-PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
'-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
- name: Run Tensorflow MNIST Image Classification on Dataflow
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 30
Expand All @@ -102,4 +102,5 @@ jobs:
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.tensorflow_mnist_classification_cost_benchmark \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_2 }} --job_name=benchmark-tests-tf-mnist-classification-python-${{env.NOW_UTC}} --input_file=gs://apache-beam-ml/testing/inputs/it_mnist_data.csv --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_tf_mnist-${{env.NOW_UTC}}.txt --model=gs://apache-beam-ml/models/tensorflow/mnist/' \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/tensorflow_tests_requirements.txt \
'-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_2 }} --job_name=benchmark-tests-tf-mnist-classification-python-${{env.NOW_UTC}} --input_file=gs://apache-beam-ml/testing/inputs/it_mnist_data.csv --output_file=gs://temp-storage-for-end-to-end-tests/inference/result_tf_mnist-${{env.NOW_UTC}}.txt --model=gs://apache-beam-ml/models/tensorflow/mnist/' \
43 changes: 31 additions & 12 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
## New Features / Improvements
* The datetime module is now available for use in jinja templatization for yaml.
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Breaking Changes
Expand All @@ -54,7 +53,7 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.62.0] - Unreleased
# [2.63.0] - Unreleased

## Highlights

Expand All @@ -63,23 +62,14 @@

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

## New Features / Improvements

* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
* Support OrderedListState in Prism ([#32929](https://github.com/apache/beam/issues/32929)).

## Breaking Changes

* Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used.
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations
Expand All @@ -94,12 +84,41 @@

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed (CVE-2024-47561)[https://www.cve.org/CVERecord?id=CVE-2024-47561] (Java) by upgrading Avro version to 1.11.4

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.62.0] - Unreleased

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

## New Features / Improvements

* Added support for stateful processing in Spark Runner for streaming pipelines. Timer functionality is not yet supported and will be implemented in a future release ([#33237](https://github.com/apache/beam/issues/33237)).
* The datetime module is now available for use in jinja templatization for yaml.
* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
* Support OrderedListState in Prism ([#32929](https://github.com/apache/beam/issues/32929)).

## Breaking Changes

* Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used.

## Bugfixes

* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).

## Security Fixes

* Fixed (CVE-2024-47561)[https://www.cve.org/CVERecord?id=CVE-2024-47561] (Java) by upgrading Avro version to 1.11.4

# [2.61.0] - 2024-11-25

## Highlights
Expand Down
3 changes: 3 additions & 0 deletions contributor-docs/code-change-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ Follow these steps for Maven projects.
<id>Maven-Snapshot</id>
<name>maven snapshot repository</name>
<url>https://repository.apache.org/content/groups/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
```

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.62.0-SNAPSHOT
sdk_version=2.62.0.dev
version=2.63.0-SNAPSHOT
sdk_version=2.63.0.dev

javaVersion=1.8

Expand Down
2 changes: 1 addition & 1 deletion runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test)
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'

// State and Timers
excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.ArrayList;
import java.util.LinkedHashMap;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.StateAndTimers;
import org.apache.beam.runners.spark.stateful.StateAndTimers;
import org.apache.beam.runners.spark.translation.ValueAndCoderKryoSerializer;
import org.apache.beam.runners.spark.translation.ValueAndCoderLazySerializable;
import org.apache.beam.runners.spark.util.ByteArray;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.spark.stateful;

import static org.apache.beam.runners.spark.translation.TranslationUtils.checkpointIfNeeded;
import static org.apache.beam.runners.spark.translation.TranslationUtils.getBatchDuration;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -35,7 +38,6 @@
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.ReifyTimestampsAndWindowsFunction;
import org.apache.beam.runners.spark.translation.TranslationUtils;
Expand All @@ -60,10 +62,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;
Expand Down Expand Up @@ -100,27 +100,6 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class);

/** State and Timers wrapper. */
public static class StateAndTimers implements Serializable {
// Serializable state for internals (namespace to state tag to coded value).
private final Table<String, String, byte[]> state;
private final Collection<byte[]> serTimers;

private StateAndTimers(
final Table<String, String, byte[]> state, final Collection<byte[]> timers) {
this.state = state;
this.serTimers = timers;
}

Table<String, String, byte[]> getState() {
return state;
}

Collection<byte[]> getTimers() {
return serTimers;
}
}

private static class OutputWindowedValueHolder<K, V>
implements OutputWindowedValue<KV<K, Iterable<V>>> {
private final List<WindowedValue<KV<K, Iterable<V>>>> windowedValues = new ArrayList<>();
Expand Down Expand Up @@ -348,7 +327,7 @@ private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(

// empty outputs are filtered later using DStream filtering
final StateAndTimers updated =
new StateAndTimers(
StateAndTimers.of(
stateInternals.getState(),
SparkTimerInternals.serializeTimers(
timerInternals.getTimers(), timerDataCoder));
Expand Down Expand Up @@ -466,21 +445,6 @@ private static <W extends BoundedWindow> TimerInternals.TimerDataCoderV2 timerDa
return TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
}

private static void checkpointIfNeeded(
final DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> firedStream,
final SerializablePipelineOptions options) {

final Long checkpointDurationMillis = getBatchDuration(options);

if (checkpointDurationMillis > 0) {
firedStream.checkpoint(new Duration(checkpointDurationMillis));
}
}

private static Long getBatchDuration(final SerializablePipelineOptions options) {
return options.get().as(SparkPipelineOptions.class).getCheckpointDurationMillis();
}

private static <K, InputT> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> stripStateValues(
final DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> firedStream,
final Coder<K> keyCoder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class SparkStateInternals<K> implements StateInternals {
public class SparkStateInternals<K> implements StateInternals {

private final K key;
// Serializable state for internals (namespace to state tag to coded value).
Expand All @@ -79,11 +79,11 @@ private SparkStateInternals(K key, Table<String, String, byte[]> stateTable) {
this.stateTable = stateTable;
}

static <K> SparkStateInternals<K> forKey(K key) {
public static <K> SparkStateInternals<K> forKey(K key) {
return new SparkStateInternals<>(key);
}

static <K> SparkStateInternals<K> forKeyAndState(
public static <K> SparkStateInternals<K> forKeyAndState(
K key, Table<String, String, byte[]> stateTable) {
return new SparkStateInternals<>(key, stateTable);
}
Expand Down Expand Up @@ -412,17 +412,25 @@ public void put(MapKeyT key, MapValueT value) {
@Override
public ReadableState<MapValueT> computeIfAbsent(
MapKeyT key, Function<? super MapKeyT, ? extends MapValueT> mappingFunction) {
Map<MapKeyT, MapValueT> sparkMapState = readValue();
Map<MapKeyT, MapValueT> sparkMapState = readAsMap();
MapValueT current = sparkMapState.get(key);
if (current == null) {
put(key, mappingFunction.apply(key));
}
return ReadableStates.immediate(current);
}

private Map<MapKeyT, MapValueT> readAsMap() {
Map<MapKeyT, MapValueT> mapState = readValue();
if (mapState == null) {
mapState = new HashMap<>();
}
return mapState;
}

@Override
public void remove(MapKeyT key) {
Map<MapKeyT, MapValueT> sparkMapState = readValue();
Map<MapKeyT, MapValueT> sparkMapState = readAsMap();
sparkMapState.remove(key);
writeValue(sparkMapState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Collection<TimerData> getTimers() {
return timers;
}

void addTimers(Iterator<TimerData> timers) {
public void addTimers(Iterator<TimerData> timers) {
while (timers.hasNext()) {
TimerData timer = timers.next();
this.timers.add(timer);
Expand Down Expand Up @@ -163,7 +163,8 @@ public void setTimer(
Instant target,
Instant outputTimestamp,
TimeDomain timeDomain) {
throw new UnsupportedOperationException("Setting a timer by ID not yet supported.");
this.setTimer(
TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.stateful;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table;

/** State and Timers wrapper. */
@AutoValue
public abstract class StateAndTimers implements Serializable {
public abstract Table<String, String, byte[]> getState();

public abstract Collection<byte[]> getTimers();

public static StateAndTimers of(
final Table<String, String, byte[]> state, final Collection<byte[]> timers) {
return new AutoValue_StateAndTimers.Builder().setState(state).setTimers(timers).build();
}

@AutoValue.Builder
abstract static class Builder {
abstract Builder setState(Table<String, String, byte[]> state);

abstract Builder setTimers(Collection<byte[]> timers);

abstract StateAndTimers build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.joda.time.Instant;

/** DoFnRunner decorator which registers {@link MetricsContainerImpl}. */
class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
private final DoFnRunner<InputT, OutputT> delegate;
private final String stepName;
private final MetricsContainerStepMapAccumulator metricsAccum;

DoFnRunnerWithMetrics(
public DoFnRunnerWithMetrics(
String stepName,
DoFnRunner<InputT, OutputT> delegate,
MetricsContainerStepMapAccumulator metricsAccum) {
Expand Down
Loading

0 comments on commit 21e0585

Please sign in to comment.