From 79534aff995261001fbc4ef4119eaa290b36d4a2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 11 Dec 2024 18:14:16 -0500 Subject: [PATCH 1/2] new python multi-lang quickstart --- .../site/content/en/documentation/glossary.md | 14 +- .../en/documentation/programming-guide.md | 4 + .../sdks/python-multi-language-pipelines-2.md | 211 ++++++++++++++++++ .../sdks/python-multi-language-pipelines.md | 8 +- 4 files changed, 233 insertions(+), 4 deletions(-) create mode 100644 website/www/site/content/en/documentation/sdks/python-multi-language-pipelines-2.md diff --git a/website/www/site/content/en/documentation/glossary.md b/website/www/site/content/en/documentation/glossary.md index 142cb9e8df5b..31b29fdb549d 100644 --- a/website/www/site/content/en/documentation/glossary.md +++ b/website/www/site/content/en/documentation/glossary.md @@ -114,7 +114,7 @@ To learn more, see: ## Cross-language transforms -Transforms that can be shared across Beam SDKs. With cross-language transforms, you can use transforms written in any supported SDK language (currently, Java and Python) in a pipeline written in a different SDK language. For example, you could use the Apache Kafka connector from the Java SDK in a Python streaming pipeline. Cross-language transforms make it possible to provide new functionality simultaneously in different SDKs. +Portable transforms that can be shared across Beam SDKs. With cross-language transforms, you can use transforms written in any supported SDK language (currently, Java and Python) in a pipeline written in a different SDK language. For example, you could use the Apache Kafka connector from the Java SDK in a Python streaming pipeline. Cross-language transforms make it possible to provide new functionality simultaneously in different SDKs. To learn more, see: @@ -347,6 +347,18 @@ To learn more, see: * [Programming guide: Schemas](/documentation/programming-guide/#schemas) * [Schema Patterns](/documentation/patterns/schema/) +## SchemaTransform + +A transform that takes and produces PCollections of Beam Rows with a predefined Schema, i.e.: + +```java +SchemaTransform extends PTransform {} +``` + +Mainly used for portable transform use-cases. To learn more, see: + +* [Python Multi-Language Guide](sdks/python-custom-multi-language-pipelines-guide.md) + ## Session A time interval for grouping data events. A session is defined by some minimum gap duration between events. For example, a data stream representing user mouse activity may have periods with high concentrations of clicks followed by periods of inactivity. A session can represent such a pattern of activity delimited by inactivity. diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 955c2b8797d1..430fba68a3b0 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -7639,6 +7639,8 @@ In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/jav #### 13.1.1. Creating cross-language Java transforms +For Beam versions 2.60.0+, please follow [this guide](sdks/python-custom-multi-language-pipelines-guide.md) instead. + There are two ways to make Java transforms available to other SDKs. * Option 1: In some cases, you can use existing Java transforms from other SDKs without writing any additional Java code. @@ -8040,6 +8042,8 @@ input.apply( #### 13.2.2. Using cross-language transforms in a Python pipeline +For Beam versions 2.60.0+, please follow [this guide](sdks/python-custom-multi-language-pipelines-guide.md#use-the-portable-transform-in-a-python-pipeline) instead. + If a Python-specific wrapper for a cross-language transform is available, use that. Otherwise, you have to use the lower-level [`ExternalTransform`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform) class to access the transform. **Using an SDK wrapper** diff --git a/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines-2.md b/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines-2.md new file mode 100644 index 000000000000..3ef5306b4209 --- /dev/null +++ b/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines-2.md @@ -0,0 +1,211 @@ +--- +type: languages +title: "Python multi-language pipelines quickstart" +--- + + +# Python multi-language pipelines quickstart + +This page provides instructions for running the updated multi-language examples for the Python SDK. For more details, see this step-by-step [guide](python-custom-multi-language-pipelines-guide.md) on creating and running custom multi-language transforms. + +The code shown in this quickstart is available in a [collection of runnable examples](https://github.com/apache/beam/tree/master/examples/multi-language). + +To build and run a multi-language Python pipeline, you need a Python environment with the Beam SDK installed. If you don’t have an environment set up, first complete the [Apache Beam Python SDK Quickstart](/get-started/quickstart-py/). + +A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language and uses transform(s) from another Beam SDK language. These “other-language” transforms are called [*cross-language transforms*](../glossary.md#cross-language-transforms). The idea is to make pipeline components easier to share across the Beam SDKs, and to grow the pool of available transforms for all the SDKs. In the examples below, the multi-language pipeline is built with the Beam Python SDK, and the cross-language transforms are built with the Beam Java SDK. + +## Create a cross-language transform + +Here's a Java transform provider, [ExtractWordsProvider](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java), that is uniquely identified with the URN `"beam:schematransform:org.apache.beam:extract_words:v1"`. Given a Configuration object, it will provide a transform: + +```java +@AutoService(SchemaTransformProvider.class) +public class ExtractWordsProvider extends TypedSchemaTransformProvider { + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:extract_words:v1"; + } + + @Override + protected SchemaTransform from(Configuration configuration) { + return new ExtractWordsTransform(configuration); + } +} +``` +> **NOTE**: To ensure that your URN doesn't run into confilcts with URNs from other transforms, follow the URN conventions described [here](../programming-guide.md#1314-defining-a-urn). + + +The config object is a simple Java object (POJO) that has fields required by the transform. AutoValue is encouraged, and the `@DefaultSchema` annotation helps Beam do some necessary conversions in the background: +```java + @DefaultSchema(AutoValueSchema.class) + @AutoValue + protected abstract static class Configuration { + public static Builder builder() { + return new AutoValue_ExtractWordsProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("List of words to filter out.") + public abstract List getFilter(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setFilter(List foo); + + public abstract Configuration build(); + } + } +``` + +The transform can be any implementation of your choice, as long as it meets the requirements of a [SchemaTransform](../glossary.md#schematransform). For this example, the transform does the following: + +```java + static class ExtractWordsTransform extends SchemaTransform { + private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); + private final List filter; + + ExtractWordsTransform(Configuration configuration) { + this.filter = configuration.getFilter(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + return PCollectionRowTuple.of( + "output", + input + .getSinglePCollection() + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process(@Element Row element, OutputReceiver receiver) { + // Split the line into words. + String line = Preconditions.checkStateNotNull(element.getString("line")); + String[] words = line.split("[^\\p{L}]+", -1); + Arrays.stream(words) + .filter(filter::contains) + .forEach( + word -> + receiver.output( + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue("word", word) + .build())); + } + })) + .setRowSchema(OUTPUT_SCHEMA)); + } + } +``` + +This example uses other Java transform providers as well, [JavaCountProvider](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java) and [WriteWordsProvider](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java), but they follow the same pattern. + +## Choose an expansion service + +When building a job for a multi-language pipeline, Beam uses an [expansion service](../glossary#expansion-service) to expand [composite transforms](../glossary#composite-transform). You must have at least one expansion service per remote SDK. + +Before running a multi-language pipeline, you need to build an expansion service that can access your Java transform. It’s often easier to create a single shaded JAR that contains both. Both Python and Java dependencies will be staged for the runner by the Python SDK. + +Assuming you've built a JAR named **java-multilang-bundled-0.1.jar**, you can start the service with a command like the following, where `12345` is the port on which the expansion service will run: + +``` +java -jar java-multilang-bundled-0.1.jar 12345 +``` + +For instructions on running an example expansion service, see [this README](https://github.com/apache/beam/blob/master/examples/multi-language/README.md#instructions-for-running-the-pipelines). + +## Create a Python pipeline + +Your Python pipeline can now use the [**ExternalTransformProvider**](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external_transform_provider.html#apache_beam.transforms.external_transform_provider.ExternalTransformProvider) API to configure your cross-language transform. Here’s an example constructed from [wordcount_external.py](https://github.com/apache/beam/blob/master/examples/multi-language/python/wordcount_external.py): + +First, determine the transform identifiers you are looking for: + +```python +EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1" +COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1" +WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1" +``` + +Then, initialize the `ExternalTransformProvider` with your expansion service. This can take two parameters: + +* `expansion_services`: an expansion service, or list of expansion services +* `urn_pattern`: (optional) a regex pattern to match valid transforms + +```python +provider = ExternalTransformProvider("localhost:" + expansion_service_port) +``` + +Next, retrieve each portable transform: +```python +Extract = provider.get_urn(EXTRACT_IDENTIFIER) +Count = provider.get_urn(COUNT_IDENTIFIER) +Write = provider.get_urn(WRITE_IDENTIFIER) +``` + +Finally, build your multi-language Python pipeline using a mix of native and portable transforms: +```python +with beam.Pipeline(options=pipeline_options) as p: +_ = (p + | 'Read' >> beam.io.ReadFromText(input_path) + | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) + | 'Extract Words' >> Extract(filter=["king", "palace"]) + | 'Count Words' >> Count() + | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( + row.word, row.count))).with_output_types( + RowTypeConstraint.from_fields([('line', str)])) + | 'Write' >> Write(file_path_prefix=output_path)) +``` + +## Run the pipeline + +The exact commands for running the Python pipeline will vary based on your environment. Assuming that your pipeline is coded in a file named **wordcount_external.py**, the steps should be similar to those below. For more information, see [the comments in addprefix.py](https://github.com/apache/beam/blob/41d585f82b10195f758d14e3a54076ea1f05aa75/examples/multi-language/python/addprefix.py#L18-L40). + +### Run with direct runner + +In the following command, `input1` is a file containing lines of text: + +``` +$ python wordcount_external.py \ + --runner DirectRunner \ + --input \ + --output \ + --expansion_service_port +``` + +### Run with Dataflow runner + +The following script runs the multi-language pipeline on Dataflow, using example text from a Cloud Storage bucket. You'll need to adapt the script to your environment. + +``` +#!/bin/bash +export GCP_PROJECT= +export GCS_BUCKET= +export TEMP_LOCATION=gs://$GCS_BUCKET/tmp +export GCP_REGION= +export JOB_NAME="wordcount-external-`date +%Y%m%d-%H%M%S`" +export NUM_WORKERS="1" + +gsutil rm gs://$GCS_BUCKET/wordcount-external/* + +python wordcount_external.py \ + --runner DataflowRunner \ + --temp_location $TEMP_LOCATION \ + --project $GCP_PROJECT \ + --region $GCP_REGION \ + --job_name $JOB_NAME \ + --num_workers $NUM_WORKERS \ + --input "gs://dataflow-samples/shakespeare/kinglear.txt" \ + --output "gs://$GCS_BUCKET/wordcount-external/output" \ + --expansion_service_port +``` diff --git a/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines.md b/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines.md index 332a62901a23..a36ed6e70b26 100644 --- a/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines.md +++ b/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines.md @@ -1,6 +1,6 @@ --- type: languages -title: "Python multi-language pipelines quickstart" +title: "(Legacy) Python multi-language pipelines quickstart" --- -# Python multi-language pipelines quickstart +# (Legacy) Python multi-language pipelines quickstart + +> Note: it's encouraged to use the newer SchemaTransform framework; check out the updated [quickstart](python-multi-language-pipelines-2.md) and [guide](python-custom-multi-language-pipelines-guide.md) This page provides a high-level overview of creating multi-language pipelines with the Apache Beam SDK for Python. For a more comprehensive treatment of the topic, see [Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines). @@ -119,7 +121,7 @@ The steps for running the expansion service will vary depending on your build to java -jar java-prefix-bundled-0.1.jar 12345 ``` -For instructions on running an example expansion service, see [this README](https://github.com/apache/beam/blob/master/examples/multi-language/README.md). +For instructions on running an example expansion service, see [this README](https://github.com/apache/beam/blob/master/examples/multi-language/README.md#instructions-for-running-the-pipelines). ## Create a Python pipeline From 51901733f02b3ac839a53ad51462faae2f6df007 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 18 Dec 2024 16:08:48 -0500 Subject: [PATCH 2/2] update example code --- .../sdks/python-multi-language-pipelines-2.md | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines-2.md b/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines-2.md index 3ef5306b4209..0d98a3bf05de 100644 --- a/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines-2.md +++ b/website/www/site/content/en/documentation/sdks/python-multi-language-pipelines-2.md @@ -57,27 +57,32 @@ The config object is a simple Java object (POJO) that has fields required by the return new AutoValue_ExtractWordsProvider_Configuration.Builder(); } - @SchemaFieldDescription("List of words to filter out.") - public abstract List getFilter(); + @SchemaFieldDescription("List of words to drop.") + public abstract List getDrop(); @AutoValue.Builder public abstract static class Builder { - public abstract Builder setFilter(List foo); + public abstract Builder setDrop(List drop); public abstract Configuration build(); } } ``` +Beam uses this configuration to generate a Python transform with the following signature: +```python +Extract(drop=["foo", "bar"]) +``` + The transform can be any implementation of your choice, as long as it meets the requirements of a [SchemaTransform](../glossary.md#schematransform). For this example, the transform does the following: ```java static class ExtractWordsTransform extends SchemaTransform { private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); - private final List filter; + private final List drop; ExtractWordsTransform(Configuration configuration) { - this.filter = configuration.getFilter(); + this.drop = configuration.getDrop(); } @Override @@ -95,7 +100,7 @@ The transform can be any implementation of your choice, as long as it meets the String line = Preconditions.checkStateNotNull(element.getString("line")); String[] words = line.split("[^\\p{L}]+", -1); Arrays.stream(words) - .filter(filter::contains) + .filter(w -> !drop.contains(w)) .forEach( word -> receiver.output( @@ -159,7 +164,7 @@ with beam.Pipeline(options=pipeline_options) as p: _ = (p | 'Read' >> beam.io.ReadFromText(input_path) | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) - | 'Extract Words' >> Extract(filter=["king", "palace"]) + | 'Extract Words' >> Extract(drop=["king", "palace"]) | 'Count Words' >> Count() | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( row.word, row.count))).with_output_types(