Skip to content

Commit

Permalink
Merge branch 'master' into spark-ordered-list-state
Browse files Browse the repository at this point in the history
  • Loading branch information
twosom authored Dec 18, 2024
2 parents 5926f6f + 5eed396 commit bfea9e1
Show file tree
Hide file tree
Showing 105 changed files with 3,982 additions and 404 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 5
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test",
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test"
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test",
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,3 @@ jobs:
commit: '${{ env.prsha || env.GITHUB_SHA }}'
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
files: '**/build/test-results/**/*.xml'
large_files: true
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,3 @@ jobs:
commit: '${{ env.prsha || env.GITHUB_SHA }}'
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
files: '**/build/test-results/**/*.xml'
large_files: true
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

name: Wordcount Python Cost Benchmarks Dataflow
name: Python Cost Benchmarks Dataflow

on:
schedule:
- cron: '30 18 * * 6' # Run at 6:30 pm UTC on Saturdays
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
Expand Down Expand Up @@ -47,16 +49,17 @@ env:
INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }}

jobs:
beam_Inference_Python_Benchmarks_Dataflow:
beam_Python_Cost_Benchmarks_Dataflow:
if: |
github.event_name == 'workflow_dispatch'
github.event_name == 'workflow_dispatch' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam')
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 900
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_Wordcount_Python_Cost_Benchmarks_Dataflow"]
job_phrase: ["Run Wordcount Cost Benchmark"]
job_name: ["beam_Python_CostBenchmark_Dataflow"]
job_phrase: ["Run Python Dataflow Cost Benchmarks"]
steps:
- uses: actions/checkout@v4
- name: Setup repository
Expand All @@ -76,10 +79,11 @@ jobs:
test-language: python
argument-file-paths: |
${{ github.workspace }}/.github/workflows/cost-benchmarks-pipeline-options/python_wordcount.txt
${{ github.workspace }}/.github/workflows/cost-benchmarks-pipeline-options/python_tf_mnist_classification.txt
# The env variables are created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>"
- name: get current time
run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV
- name: run wordcount on Dataflow Python
- name: Run wordcount on Dataflow
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 30
with:
Expand All @@ -88,4 +92,15 @@ jobs:
-PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
'-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
- name: Run Tensorflow MNIST Image Classification on Dataflow
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 30
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.tensorflow_mnist_classification_cost_benchmark \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/tensorflow_tests_requirements.txt \
'-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_2 }} --job_name=benchmark-tests-tf-mnist-classification-python-${{env.NOW_UTC}} --input_file=gs://apache-beam-ml/testing/inputs/it_mnist_data.csv --output_file=gs://temp-storage-for-end-to-end-tests/inference/result_tf_mnist-${{env.NOW_UTC}}.txt --model=gs://apache-beam-ml/models/tensorflow/mnist/' \
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

--region=us-central1
--machine_type=n1-standard-2
--num_workers=1
--disk_size_gb=50
--autoscaling_algorithm=NONE
--input_options={}
--staging_location=gs://temp-storage-for-perf-tests/loadtests
--temp_location=gs://temp-storage-for-perf-tests/loadtests
--requirements_file=apache_beam/ml/inference/tensorflow_tests_requirements.txt
--publish_to_big_query=true
--metrics_dataset=beam_run_inference
--metrics_table=tf_mnist_classification
--runner=DataflowRunner
10 changes: 9 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
## New Features / Improvements
* The datetime module is now available for use in jinja templatization for yaml.
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Breaking Changes
Expand Down Expand Up @@ -64,14 +65,21 @@

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

## New Features / Improvements

* Added OrderedList state support in SparkRunner ([#33211](https://github.com/apache/beam/issues/33211)).
* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
* Added support for stateful processing in Spark Runner for streaming pipelines. Timer functionality is not yet supported and will be implemented in a future release ([#33237](https://github.com/apache/beam/issues/33237)).
* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
* Support OrderedListState in Prism ([#32929](https://github.com/apache/beam/issues/32929)).
* Added OrderedList state support in SparkRunner ([#33211](https://github.com/apache/beam/issues/33211)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ class BeamModulePlugin implements Plugin<Project> {
def gax_version = "2.55.0"
def google_ads_version = "33.0.0"
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "2.2.16"
def google_cloud_bigdataoss_version = "2.2.26"
// [bomupgrader] determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom
def google_cloud_spanner_version = "6.79.0"
def google_code_gson_version = "2.10.1"
Expand Down
3 changes: 3 additions & 0 deletions contributor-docs/code-change-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ Follow these steps for Maven projects.
<id>Maven-Snapshot</id>
<name>maven snapshot repository</name>
<url>https://repository.apache.org/content/groups/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
```

Expand Down
4 changes: 0 additions & 4 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ def createPrismValidatesRunnerTask = { name, environmentType ->
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'

// Not yet implemented in Prism
// https://github.com/apache/beam/issues/32929
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'

// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
Expand Down
2 changes: 1 addition & 1 deletion runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test)
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'

// State and Timers
excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.ArrayList;
import java.util.LinkedHashMap;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.StateAndTimers;
import org.apache.beam.runners.spark.stateful.StateAndTimers;
import org.apache.beam.runners.spark.translation.ValueAndCoderKryoSerializer;
import org.apache.beam.runners.spark.translation.ValueAndCoderLazySerializable;
import org.apache.beam.runners.spark.util.ByteArray;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.spark.stateful;

import static org.apache.beam.runners.spark.translation.TranslationUtils.checkpointIfNeeded;
import static org.apache.beam.runners.spark.translation.TranslationUtils.getBatchDuration;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -35,7 +38,6 @@
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.ReifyTimestampsAndWindowsFunction;
import org.apache.beam.runners.spark.translation.TranslationUtils;
Expand All @@ -60,10 +62,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;
Expand Down Expand Up @@ -100,27 +100,6 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class);

/** State and Timers wrapper. */
public static class StateAndTimers implements Serializable {
// Serializable state for internals (namespace to state tag to coded value).
private final Table<String, String, byte[]> state;
private final Collection<byte[]> serTimers;

private StateAndTimers(
final Table<String, String, byte[]> state, final Collection<byte[]> timers) {
this.state = state;
this.serTimers = timers;
}

Table<String, String, byte[]> getState() {
return state;
}

Collection<byte[]> getTimers() {
return serTimers;
}
}

private static class OutputWindowedValueHolder<K, V>
implements OutputWindowedValue<KV<K, Iterable<V>>> {
private final List<WindowedValue<KV<K, Iterable<V>>>> windowedValues = new ArrayList<>();
Expand Down Expand Up @@ -348,7 +327,7 @@ private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(

// empty outputs are filtered later using DStream filtering
final StateAndTimers updated =
new StateAndTimers(
StateAndTimers.of(
stateInternals.getState(),
SparkTimerInternals.serializeTimers(
timerInternals.getTimers(), timerDataCoder));
Expand Down Expand Up @@ -466,21 +445,6 @@ private static <W extends BoundedWindow> TimerInternals.TimerDataCoderV2 timerDa
return TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
}

private static void checkpointIfNeeded(
final DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> firedStream,
final SerializablePipelineOptions options) {

final Long checkpointDurationMillis = getBatchDuration(options);

if (checkpointDurationMillis > 0) {
firedStream.checkpoint(new Duration(checkpointDurationMillis));
}
}

private static Long getBatchDuration(final SerializablePipelineOptions options) {
return options.get().as(SparkPipelineOptions.class).getCheckpointDurationMillis();
}

private static <K, InputT> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> stripStateValues(
final DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> firedStream,
final Coder<K> keyCoder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class SparkStateInternals<K> implements StateInternals {
public class SparkStateInternals<K> implements StateInternals {

private final K key;
// Serializable state for internals (namespace to state tag to coded value).
Expand All @@ -85,11 +85,11 @@ private SparkStateInternals(K key, Table<String, String, byte[]> stateTable) {
this.stateTable = stateTable;
}

static <K> SparkStateInternals<K> forKey(K key) {
public static <K> SparkStateInternals<K> forKey(K key) {
return new SparkStateInternals<>(key);
}

static <K> SparkStateInternals<K> forKeyAndState(
public static <K> SparkStateInternals<K> forKeyAndState(
K key, Table<String, String, byte[]> stateTable) {
return new SparkStateInternals<>(key, stateTable);
}
Expand Down Expand Up @@ -417,17 +417,25 @@ public void put(MapKeyT key, MapValueT value) {
@Override
public ReadableState<MapValueT> computeIfAbsent(
MapKeyT key, Function<? super MapKeyT, ? extends MapValueT> mappingFunction) {
Map<MapKeyT, MapValueT> sparkMapState = readValue();
Map<MapKeyT, MapValueT> sparkMapState = readAsMap();
MapValueT current = sparkMapState.get(key);
if (current == null) {
put(key, mappingFunction.apply(key));
}
return ReadableStates.immediate(current);
}

private Map<MapKeyT, MapValueT> readAsMap() {
Map<MapKeyT, MapValueT> mapState = readValue();
if (mapState == null) {
mapState = new HashMap<>();
}
return mapState;
}

@Override
public void remove(MapKeyT key) {
Map<MapKeyT, MapValueT> sparkMapState = readValue();
Map<MapKeyT, MapValueT> sparkMapState = readAsMap();
sparkMapState.remove(key);
writeValue(sparkMapState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Collection<TimerData> getTimers() {
return timers;
}

void addTimers(Iterator<TimerData> timers) {
public void addTimers(Iterator<TimerData> timers) {
while (timers.hasNext()) {
TimerData timer = timers.next();
this.timers.add(timer);
Expand Down Expand Up @@ -163,7 +163,8 @@ public void setTimer(
Instant target,
Instant outputTimestamp,
TimeDomain timeDomain) {
throw new UnsupportedOperationException("Setting a timer by ID not yet supported.");
this.setTimer(
TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
}

@Override
Expand Down
Loading

0 comments on commit bfea9e1

Please sign in to comment.