From 8f3887bb512785893ad3939ac48d459bdfe103aa Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Fri, 19 Jan 2024 20:24:26 -0500 Subject: [PATCH] [YAML] Require numStreams for unbounded BigQueryStorageWriteApiSchemaTransform xlang transform Signed-off-by: Jeffrey Kinard --- .../BigQueryStorageWriteApiSchemaTransformProvider.java | 8 ++++++++ sdks/python/apache_beam/yaml/standard_io.yaml | 2 ++ 2 files changed, 10 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 8c4edd2244b4..52ce97294aa1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -349,6 +349,14 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); Boolean autoSharding = configuration.getAutoSharding(); int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams(); + + // TODO(https://github.com/apache/beam/issues/30058): remove once Dataflow supports multiple + // DoFn's per fused step. + if (numStreams < 1) { + throw new IllegalStateException( + "numStreams must be set to a positive integer when input data is unbounded."); + } + boolean useAtLeastOnceSemantics = configuration.getUseAtLeastOnceSemantics() == null ? false diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 5d1598d2705c..4d26ce96b677 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -38,6 +38,8 @@ create_disposition: 'createDisposition' write_disposition: 'writeDisposition' error_handling: 'errorHandling' + # TODO(https://github.com/apache/beam/issues/30058): Required until autosharding support is fixed + num_streams: 'numStreams' underlying_provider: type: beamJar transforms: