From 46732b61769a9116c619dd8f3c9552cd543ab273 Mon Sep 17 00:00:00 2001
From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
Date: Thu, 29 Aug 2024 15:47:20 -0400
Subject: [PATCH] add ExternalTransformProvider example (#30666)
* add ExternalTransformProvider example
* cleanup old command
* clarify docs
* update
* indicate JavaExternalTransform is outdated
---
examples/multi-language/README.md | 3 +
.../python/wordcount_external.py | 118 ++++++++++++++++++
.../ExtractWordsProvider.java | 76 +++++++++++
.../schematransforms/JavaCountProvider.java | 78 ++++++++++++
.../schematransforms/WriteWordsProvider.java | 77 ++++++++++++
sdks/standard_expansion_services.yaml | 4 +
6 files changed, 356 insertions(+)
create mode 100644 examples/multi-language/python/wordcount_external.py
create mode 100644 examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
create mode 100644 examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
create mode 100644 examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
diff --git a/examples/multi-language/README.md b/examples/multi-language/README.md
index 4912eb14da39..479a56deab75 100644
--- a/examples/multi-language/README.md
+++ b/examples/multi-language/README.md
@@ -24,6 +24,9 @@ This project provides examples of Apache Beam
## Using Java transforms from Python
+* **python/wordcount_external** - A Python pipeline that runs the Word Count workflow using three external Java
+ SchemaTransforms. This example demonstrates the updated `ExternalTransformProvider` API.
+ #### _Outdated examples:_
* **python/addprefix** - A Python pipeline that reads a text file and attaches a prefix on the Java side to each input.
* **python/javacount** - A Python pipeline that counts words using the Java `Count.perElement()` transform.
* **python/javadatagenerator** - A Python pipeline that produces a set of strings generated from Java.
diff --git a/examples/multi-language/python/wordcount_external.py b/examples/multi-language/python/wordcount_external.py
new file mode 100644
index 000000000000..580c0269d361
--- /dev/null
+++ b/examples/multi-language/python/wordcount_external.py
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+
+import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
+from apache_beam.typehints.row_type import RowTypeConstraint
+"""A Python multi-language pipeline that counts words using multiple Java SchemaTransforms.
+
+This pipeline reads an input text file then extracts the words, counts them, and writes the results Java
+SchemaTransforms. The transforms are listed below and can be found in
+src/main/java/org/apache/beam/examples/schematransforms/:
+- `ExtractWordsProvider`
+- `JavaCountProvider`
+- `WriteWordsProvider`
+
+These Java transforms are accessible to the Python pipeline via an expansion service. Check out the
+[`README.md`](https://github.com/apache/beam/blob/master/examples/multi-language/README.md#1-start-the-expansion-service)
+for instructions on how to download the jar and run this expansion service.
+
+This example aims to demonstrate how to use the `ExternalTransformProvider` utility, which dynamically generates and
+provides user-friendly wrappers for external transforms.
+
+Example commands for executing this program:
+
+DirectRunner:
+$ python wordcount_external.py \
+ --runner DirectRunner \
+ --input \
+ --output \
+ --expansion_service_port
+
+DataflowRunner:
+$ python wordcount_external.py \
+ --runner DataflowRunner \
+ --temp_location $TEMP_LOCATION \
+ --project $GCP_PROJECT \
+ --region $GCP_REGION \
+ --job_name $JOB_NAME \
+ --num_workers $NUM_WORKERS \
+ --input "gs://dataflow-samples/shakespeare/kinglear.txt" \
+ --output "gs://$GCS_BUCKET/wordcount_external/output" \
+ --expansion_service_port
+"""
+
+# Original Java transform is in ExtractWordsProvider.java
+EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
+# Original Java transform is in JavaCountProvider.java
+COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
+# Original Java transform is in WriteWordsProvider.java
+WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"
+
+
+def run(input_path, output_path, expansion_service_port, pipeline_args):
+ pipeline_options = PipelineOptions(pipeline_args)
+
+ # Discover and get external transforms from this expansion service
+ provider = ExternalTransformProvider("localhost:" + expansion_service_port)
+ # Get transforms with identifiers, then use them as you would a regular
+ # native PTransform
+ Extract = provider.get_urn(EXTRACT_IDENTIFIER)
+ Count = provider.get_urn(COUNT_IDENTIFIER)
+ Write = provider.get_urn(WRITE_IDENTIFIER)
+
+ with beam.Pipeline(options=pipeline_options) as p:
+ lines = p | 'Read' >> ReadFromText(input_path)
+
+ words = (lines
+ | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
+ | 'Extract Words' >> Extract())
+ word_counts = words | 'Count Words' >> Count()
+ formatted_words = (
+ word_counts
+ | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
+ row.word, row.count))).with_output_types(
+ RowTypeConstraint.from_fields([('line', str)])))
+
+ formatted_words | 'Write' >> Write(file_path_prefix=output_path)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ dest='input',
+ required=True,
+ help='Input file')
+ parser.add_argument('--output',
+ dest='output',
+ required=True,
+ help='Output file')
+ parser.add_argument('--expansion_service_port',
+ dest='expansion_service_port',
+ required=True,
+ help='Expansion service port')
+ known_args, pipeline_args = parser.parse_known_args()
+
+ run(known_args.input, known_args.output, known_args.expansion_service_port,
+ pipeline_args)
diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
new file mode 100644
index 000000000000..724dbce276fb
--- /dev/null
+++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilanguage.schematransforms;
+
+import static org.apache.beam.examples.multilanguage.schematransforms.ExtractWordsProvider.Configuration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/** Splits a line into separate words and returns each word. */
+@AutoService(SchemaTransformProvider.class)
+public class ExtractWordsProvider extends TypedSchemaTransformProvider {
+ public static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build();
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:extract_words:v1";
+ }
+
+ @Override
+ protected SchemaTransform from(Configuration configuration) {
+ return new SchemaTransform() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ return PCollectionRowTuple.of(
+ "output",
+ input.get("input").apply(ParDo.of(new ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA));
+ }
+ };
+ }
+
+ static class ExtractWordsFn extends DoFn {
+ @ProcessElement
+ public void processElement(@Element Row element, OutputReceiver receiver) {
+ // Split the line into words.
+ String line = Preconditions.checkStateNotNull(element.getString("line"));
+ String[] words = line.split("[^\\p{L}]+", -1);
+
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", word).build());
+ }
+ }
+ }
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ protected abstract static class Configuration {}
+}
diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
new file mode 100644
index 000000000000..cabea594ae18
--- /dev/null
+++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilanguage.schematransforms;
+
+import static org.apache.beam.examples.multilanguage.schematransforms.JavaCountProvider.Configuration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@AutoService(SchemaTransformProvider.class)
+public class JavaCountProvider extends TypedSchemaTransformProvider {
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:count:v1";
+ }
+
+ @Override
+ protected SchemaTransform from(Configuration configuration) {
+ return new SchemaTransform() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ Schema outputSchema =
+ Schema.builder().addStringField("word").addInt64Field("count").build();
+
+ PCollection wordCounts =
+ input
+ .get("input")
+ .apply(Count.perElement())
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ kv ->
+ Row.withSchema(outputSchema)
+ .withFieldValue(
+ "word",
+ Preconditions.checkStateNotNull(
+ kv.getKey().getString("word")))
+ .withFieldValue("count", kv.getValue())
+ .build()))
+ .setRowSchema(outputSchema);
+
+ return PCollectionRowTuple.of("output", wordCounts);
+ }
+ };
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ protected abstract static class Configuration {}
+}
diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
new file mode 100644
index 000000000000..0b2017c5587a
--- /dev/null
+++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilanguage.schematransforms;
+
+import static org.apache.beam.examples.multilanguage.schematransforms.WriteWordsProvider.Configuration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@AutoService(SchemaTransformProvider.class)
+public class WriteWordsProvider extends TypedSchemaTransformProvider {
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:write_words:v1";
+ }
+
+ @Override
+ protected SchemaTransform from(Configuration configuration) {
+ return new SchemaTransform() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ input
+ .get("input")
+ .apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(row -> Preconditions.checkStateNotNull(row.getString("line"))))
+ .apply(TextIO.write().to(configuration.getFilePathPrefix()));
+
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+ };
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ protected abstract static class Configuration {
+ public static Builder builder() {
+ return new AutoValue_WriteWordsProvider_Configuration.Builder();
+ }
+
+ @SchemaFieldDescription("Writes to output files with this prefix.")
+ public abstract String getFilePathPrefix();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setFilePathPrefix(String filePathPrefix);
+
+ public abstract Configuration build();
+ }
+ }
+}
diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml
index 31a1a6343aed..ad965c8a1ee3 100644
--- a/sdks/standard_expansion_services.yaml
+++ b/sdks/standard_expansion_services.yaml
@@ -26,10 +26,14 @@
#
# Transform identifiers listed in the `skip_transforms` field will be skipped.
#
+#
# Any new gradle targets added here should also be added to:
# - sdks/python/build.gradle (as a dependency in the 'generateExternalTransformsConfig' task)
# - sdks/python/test-suites/xlang/build.gradle (look for 'servicesToGenerateFrom')
#
+# After making changes here, please run `./gradlew generateExternalTransformsConfig`
+# to regenerate the config file at sdks/standard_external_transforms.yaml
+#
# Refer to sdks/python/gen_xlang_wrappers.py for more info.
- gradle_target: 'sdks:java:io:expansion-service:shadowJar'