Skip to content

Commit

Permalink
minor adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Dec 18, 2024
1 parent 387e05b commit 34df8bf
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
26 changes: 17 additions & 9 deletions examples/multi-language/python/wordcount_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.typehints.row_type import RowTypeConstraint
"""A Python multi-language pipeline that counts words using multiple Java SchemaTransforms.
Expand Down Expand Up @@ -67,17 +68,22 @@
def run(input_path, output_path, expansion_service_port, pipeline_args):
pipeline_options = PipelineOptions(pipeline_args)

provider = ExternalTransformProvider("localhost:" + expansion_service_port)
# Retrieve portable transforms
Extract = provider.get_urn(EXTRACT_IDENTIFIER)
Count = provider.get_urn(COUNT_IDENTIFIER)
Write = provider.get_urn(WRITE_IDENTIFIER)

with beam.Pipeline(options=pipeline_options) as p:
expansion_service = BeamJarExpansionService(
"examples:multi-language:shadowJar")
if expansion_service_port:
expansion_service = "localhost:" + expansion_service_port

provider = ExternalTransformProvider(expansion_service)
# Retrieve portable transforms
Extract = provider.get_urn(EXTRACT_IDENTIFIER)
Count = provider.get_urn(COUNT_IDENTIFIER)
Write = provider.get_urn(WRITE_IDENTIFIER)

_ = (p
| 'Read' >> beam.io.ReadFromText(input_path)
| 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
| 'Extract Words' >> Extract(filter=["king", "palace"])
| 'Extract Words' >> Extract(drop=["king", "palace"])
| 'Count Words' >> Count()
| 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
row.word, row.count))).with_output_types(
Expand All @@ -100,8 +106,10 @@ def run(input_path, output_path, expansion_service_port, pipeline_args):
help='Output file')
parser.add_argument('--expansion_service_port',
dest='expansion_service_port',
required=True,
help='Expansion service port')
required=False,
help='Expansion service port. If left empty, the '
'existing multi-language examples service will '
'be used by default.')
known_args, pipeline_args = parser.parse_known_args()

run(known_args.input, known_args.output, known_args.expansion_service_port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ protected SchemaTransform from(Configuration configuration) {

static class ExtractWordsTransform extends SchemaTransform {
private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build();
private final List<String> filter;
private final List<String> drop;

ExtractWordsTransform(Configuration configuration) {
this.filter = configuration.getFilter();
this.drop = configuration.getDrop();
}

@Override
Expand All @@ -73,7 +73,7 @@ public void process(@Element Row element, OutputReceiver<Row> receiver) {
String line = Preconditions.checkStateNotNull(element.getString("line"));
String[] words = line.split("[^\\p{L}]+", -1);
Arrays.stream(words)
.filter(filter::contains)
.filter(w -> !drop.contains(w))
.forEach(
word ->
receiver.output(
Expand All @@ -93,12 +93,12 @@ public static Builder builder() {
return new AutoValue_ExtractWordsProvider_Configuration.Builder();
}

@SchemaFieldDescription("List of words to filter out.")
public abstract List<String> getFilter();
@SchemaFieldDescription("List of words to drop.")
public abstract List<String> getDrop();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setFilter(List<String> foo);
public abstract Builder setDrop(List<String> foo);

public abstract Configuration build();
}
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ def dict_to_row(schema_proto, py_value):
extra = set(py_value.keys()) - set(row_type._fields)
if extra:
raise ValueError(
f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
f"Transform '{self.identifier()}' was configured with unknown "
f"fields: {extra}. Valid fields: {set(row_type._fields)}")
return row_type(
*[
dict_to_row_recursive(
Expand Down

0 comments on commit 34df8bf

Please sign in to comment.