diff --git a/examples/multi-language/python/wordcount_external.py b/examples/multi-language/python/wordcount_external.py index eec64e0d0d10..7298d81c1b44 100644 --- a/examples/multi-language/python/wordcount_external.py +++ b/examples/multi-language/python/wordcount_external.py @@ -19,6 +19,7 @@ import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external_transform_provider import ExternalTransformProvider from apache_beam.typehints.row_type import RowTypeConstraint """A Python multi-language pipeline that counts words using multiple Java SchemaTransforms. @@ -67,17 +68,22 @@ def run(input_path, output_path, expansion_service_port, pipeline_args): pipeline_options = PipelineOptions(pipeline_args) - provider = ExternalTransformProvider("localhost:" + expansion_service_port) - # Retrieve portable transforms - Extract = provider.get_urn(EXTRACT_IDENTIFIER) - Count = provider.get_urn(COUNT_IDENTIFIER) - Write = provider.get_urn(WRITE_IDENTIFIER) - with beam.Pipeline(options=pipeline_options) as p: + expansion_service = BeamJarExpansionService( + "examples:multi-language:shadowJar") + if expansion_service_port: + expansion_service = "localhost:" + expansion_service_port + + provider = ExternalTransformProvider(expansion_service) + # Retrieve portable transforms + Extract = provider.get_urn(EXTRACT_IDENTIFIER) + Count = provider.get_urn(COUNT_IDENTIFIER) + Write = provider.get_urn(WRITE_IDENTIFIER) + _ = (p | 'Read' >> beam.io.ReadFromText(input_path) | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) - | 'Extract Words' >> Extract(filter=["king", "palace"]) + | 'Extract Words' >> Extract(drop=["king", "palace"]) | 'Count Words' >> Count() | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( row.word, row.count))).with_output_types( @@ -100,8 +106,10 @@ def run(input_path, output_path, expansion_service_port, pipeline_args): help='Output file') parser.add_argument('--expansion_service_port', dest='expansion_service_port', - required=True, - help='Expansion service port') + required=False, + help='Expansion service port. If left empty, the ' + 'existing multi-language examples service will ' + 'be used by default.') known_args, pipeline_args = parser.parse_known_args() run(known_args.input, known_args.output, known_args.expansion_service_port, diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java index 66bab336ce95..b7224ecec6b4 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java @@ -52,10 +52,10 @@ protected SchemaTransform from(Configuration configuration) { static class ExtractWordsTransform extends SchemaTransform { private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); - private final List filter; + private final List drop; ExtractWordsTransform(Configuration configuration) { - this.filter = configuration.getFilter(); + this.drop = configuration.getDrop(); } @Override @@ -73,7 +73,7 @@ public void process(@Element Row element, OutputReceiver receiver) { String line = Preconditions.checkStateNotNull(element.getString("line")); String[] words = line.split("[^\\p{L}]+", -1); Arrays.stream(words) - .filter(filter::contains) + .filter(w -> !drop.contains(w)) .forEach( word -> receiver.output( @@ -93,12 +93,12 @@ public static Builder builder() { return new AutoValue_ExtractWordsProvider_Configuration.Builder(); } - @SchemaFieldDescription("List of words to filter out.") - public abstract List getFilter(); + @SchemaFieldDescription("List of words to drop.") + public abstract List getDrop(); @AutoValue.Builder public abstract static class Builder { - public abstract Builder setFilter(List foo); + public abstract Builder setDrop(List foo); public abstract Configuration build(); } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index e44f7482dc61..a4dd449e115a 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -239,7 +239,8 @@ def dict_to_row(schema_proto, py_value): extra = set(py_value.keys()) - set(row_type._fields) if extra: raise ValueError( - f"Unknown fields: {extra}. Valid fields: {row_type._fields}") + f"Transform '{self.identifier()}' was configured with unknown " + f"fields: {extra}. Valid fields: {set(row_type._fields)}") return row_type( *[ dict_to_row_recursive(