diff --git a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy index d1ee27088c72..1280fcb4e233 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow', 'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow (\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) { - description('Runs end-to-end cross language GCP IO tests on the Dataflow runner.') + description('Runs end-to-end cross language GCP IO tests on the Dataflow runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"') // Set common parameters. diff --git a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy index 438b735fba7f..e4bf771be1ae 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct', 'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run Python_Xlang_Gcp_Direct PostCommit\")', this) { - description('Runs end-to-end cross language GCP IO tests on the Direct runner.') + description('Runs end-to-end cross language GCP IO tests on the Direct runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"') // Set common parameters. commonJobProperties.setTopLevelMainJobProperties(delegate) diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java index 713880229281..311ce9575c2e 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java @@ -96,7 +96,7 @@ public class BigtableResourceManager implements ResourceManager { private final Set cdcEnabledTables; private boolean hasInstance; - private Iterable clusters; + private List clusters; private final boolean usingStaticInstance; @@ -195,12 +195,12 @@ public String getInstanceId() { /** * Creates a Bigtable instance in which all clusters, nodes and tables will exist. * - * @param clusters Collection of BigtableResourceManagerCluster objects to associate with the - * given Bigtable instance. + * @param clusters List of BigtableResourceManagerCluster objects to associate with the given + * Bigtable instance. * @throws BigtableResourceManagerException if there is an error creating the instance in * Bigtable. */ - public synchronized void createInstance(Iterable clusters) + public synchronized void createInstance(List clusters) throws BigtableResourceManagerException { // Check to see if instance already exists, and throw error if it does @@ -559,7 +559,7 @@ public List getClusterNames() { } private Iterable getClusters() { - if (usingStaticInstance && this.clusters == null) { + if (usingStaticInstance && this.clusters.isEmpty()) { try (BigtableInstanceAdminClient instanceAdminClient = bigtableResourceManagerClientFactory.bigtableInstanceAdminClient()) { List managedClusters = new ArrayList<>(); diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java index a893493d766e..28f1f5bf60c5 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java @@ -21,6 +21,7 @@ import com.google.cloud.bigtable.admin.v2.models.StorageType; import java.time.format.DateTimeFormatter; +import java.util.List; import java.util.regex.Pattern; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -51,9 +52,9 @@ private BigtableResourceManagerUtils() {} * @param zone the zone/region that the cluster will be deployed to. * @param numNodes the number of nodes that the cluster will contain. * @param storageType the type of storage to configure the cluster with (SSD or HDD). - * @return Collection containing a single BigtableResourceManagerCluster object. + * @return List containing a single BigtableResourceManagerCluster object. */ - static Iterable generateDefaultClusters( + static List generateDefaultClusters( String baseString, String zone, int numNodes, StorageType storageType) { String clusterId = diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java index f8673ed696cc..74b25e84c691 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.Before; @@ -73,7 +74,7 @@ public class BigtableResourceManagerTest { private static final StorageType CLUSTER_STORAGE_TYPE = StorageType.SSD; private BigtableResourceManager testManager; - private Iterable cluster; + private List cluster; @Before public void setUp() throws IOException { diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 2acc30455e22..93bff703a552 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -51,8 +51,8 @@ evaluationDependsOn(":sdks:java:container:java11") ext.dataflowLegacyEnvironmentMajorVersion = '8' ext.dataflowFnapiEnvironmentMajorVersion = '8' -ext.dataflowLegacyContainerVersion = 'beam-master-20230809' -ext.dataflowFnapiContainerVersion = 'beam-master-20230809' +ext.dataflowLegacyContainerVersion = '2.51.0' +ext.dataflowFnapiContainerVersion = '2.51.0' ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3' processResources { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index e44617930119..1b9eb309ec46 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -176,6 +176,13 @@ public void validate() { !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()), invalidConfigMessage + "Output must not be empty if error handling specified."); } + + if (this.getAutoSharding() != null && this.getAutoSharding()) { + checkArgument( + this.getNumStreams() == 0, + invalidConfigMessage + + "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options."); + } } /** @@ -218,11 +225,17 @@ public static Builder builder() { public abstract Boolean getUseAtLeastOnceSemantics(); @SchemaFieldDescription( - "This option enables using a dynamically determined number of shards to write to " + "This option enables using a dynamically determined number of Storage Write API streams to write to " + "BigQuery. Only applicable to unbounded data.") @Nullable public abstract Boolean getAutoSharding(); + @SchemaFieldDescription( + "Specifies the number of write streams that the Storage API sink will use. " + + "This parameter is only applicable when writing unbounded data.") + @Nullable + public abstract Integer getNumStreams(); + @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") @Nullable public abstract ErrorHandling getErrorHandling(); @@ -243,6 +256,8 @@ public abstract static class Builder { public abstract Builder setAutoSharding(Boolean autoSharding); + public abstract Builder setNumStreams(Integer numStreams); + public abstract Builder setErrorHandling(ErrorHandling errorHandling); /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */ @@ -321,13 +336,19 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { if (inputRows.isBounded() == IsBounded.UNBOUNDED) { Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); Boolean autoSharding = configuration.getAutoSharding(); - write = - write.withTriggeringFrequency( - (triggeringFrequency == null || triggeringFrequency <= 0) - ? DEFAULT_TRIGGERING_FREQUENCY - : Duration.standardSeconds(triggeringFrequency)); - // use default value true for autoSharding if not configured for STORAGE_WRITE_API - if (autoSharding == null || autoSharding) { + Integer numStreams = configuration.getNumStreams(); + // Triggering frequency is only applicable for exactly-once + if (!configuration.getUseAtLeastOnceSemantics()) { + write = + write.withTriggeringFrequency( + (triggeringFrequency == null || triggeringFrequency <= 0) + ? DEFAULT_TRIGGERING_FREQUENCY + : Duration.standardSeconds(triggeringFrequency)); + } + // set num streams if specified, otherwise default to autoSharding + if (numStreams > 0) { + write = write.withNumStorageWriteApiStreams(numStreams); + } else if (autoSharding == null || autoSharding) { write = write.withAutoSharding(); } } diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index fbfde550ea70..5917ca4dc729 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -30,12 +30,13 @@ from hamcrest.core import assert_that as hamcrest_assert import apache_beam as beam -from apache_beam.io.external.generate_sequence import GenerateSequence from apache_beam.io.gcp.bigquery import StorageWriteToBigQuery from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.transforms.periodicsequence import PeriodicImpulse from apache_beam.utils.timestamp import Timestamp # Protect against environments where bigquery library is not available. @@ -51,9 +52,9 @@ @pytest.mark.uses_gcp_java_expansion_service -@unittest.skipUnless( - os.environ.get('EXPANSION_PORT'), - "EXPANSION_PORT environment var is not provided.") +# @unittest.skipUnless( +# os.environ.get('EXPANSION_PORT'), +# "EXPANSION_PORT environment var is not provided.") class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' @@ -104,6 +105,7 @@ def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() self.project = self.test_pipeline.get_option('project') + self._runner = PipelineOptions(self.args).get_all_options()['runner'] self.bigquery_client = BigQueryWrapper() self.dataset_id = '%s_%s_%s' % ( @@ -244,8 +246,7 @@ def test_write_with_beam_rows(self): table=table_id, expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) - def run_streaming( - self, table_name, auto_sharding=False, use_at_least_once=False): + def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): elements = self.ELEMENTS.copy() schema = self.ALL_TYPES_SCHEMA table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name) @@ -260,33 +261,44 @@ def run_streaming( streaming=True, allow_unsafe_triggers=True) + auto_sharding = (num_streams == 0) with beam.Pipeline(argv=args) as p: _ = ( p - | GenerateSequence( - start=0, stop=4, expansion_service=self.expansion_service) - | beam.Map(lambda x: elements[x]) + | PeriodicImpulse(0, 4, 1) + | beam.Map(lambda t: elements[t]) | beam.io.WriteToBigQuery( table=table_id, method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, schema=schema, + triggering_frequency=1, with_auto_sharding=auto_sharding, + num_storage_api_streams=num_streams, use_at_least_once=use_at_least_once, expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) - def test_streaming(self): - table = 'streaming' + def test_streaming_with_fixed_num_streams(self): + # skip if dataflow runner is not specified + if not self._runner or "dataflowrunner" not in self._runner.lower(): + self.skipTest( + "The exactly-once route has the requirement " + "`beam:requirement:pardo:on_window_expiration:v1`, " + "which is currently only supported by the Dataflow runner") + table = 'streaming_fixed_num_streams' + self.run_streaming(table_name=table, num_streams=4) + + @unittest.skip( + "Streaming to the Storage Write API sink with autosharding is broken " + "with Dataflow Runner V2.") + def test_streaming_with_auto_sharding(self): + table = 'streaming_with_auto_sharding' self.run_streaming(table_name=table) def test_streaming_with_at_least_once(self): - table = 'streaming' + table = 'streaming_with_at_least_once' self.run_streaming(table_name=table, use_at_least_once=True) - def test_streaming_with_auto_sharding(self): - table = 'streaming_with_auto_sharding' - self.run_streaming(table_name=table, auto_sharding=True) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index e092ad069ad0..986919fd6b82 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1869,6 +1869,7 @@ def __init__( # TODO(https://github.com/apache/beam/issues/20712): Switch the default # when the feature is mature. with_auto_sharding=False, + num_storage_api_streams=0, ignore_unknown_columns=False, load_job_project_id=None, max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE, @@ -2018,6 +2019,9 @@ def __init__( determined number of shards to write to BigQuery. This can be used for all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only applicable to unbounded input. + num_storage_api_streams: Specifies the number of write streams that the + Storage API sink will use. This parameter is only applicable when + writing unbounded data. ignore_unknown_columns: Accept rows that contain values that do not match the schema. The unknown values are ignored. Default is False, which treats unknown values as errors. This option is only valid for @@ -2060,6 +2064,7 @@ def __init__( self.use_at_least_once = use_at_least_once self.expansion_service = expansion_service self.with_auto_sharding = with_auto_sharding + self._num_storage_api_streams = num_storage_api_streams self.insert_retry_strategy = insert_retry_strategy self._validate = validate self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.JSON @@ -2259,6 +2264,7 @@ def find_in_nested_dict(schema): triggering_frequency=triggering_frequency, use_at_least_once=self.use_at_least_once, with_auto_sharding=self.with_auto_sharding, + num_storage_api_streams=self._num_storage_api_streams, expansion_service=self.expansion_service)) if is_rows: @@ -2521,6 +2527,7 @@ def __init__( triggering_frequency=0, use_at_least_once=False, with_auto_sharding=False, + num_storage_api_streams=0, expansion_service=None): """Initialize a StorageWriteToBigQuery transform. @@ -2558,6 +2565,7 @@ def __init__( self._triggering_frequency = triggering_frequency self._use_at_least_once = use_at_least_once self._with_auto_sharding = with_auto_sharding + self._num_storage_api_streams = num_storage_api_streams self._expansion_service = ( expansion_service or _default_io_expansion_service()) self.schematransform_config = SchemaAwareExternalTransform.discover_config( @@ -2569,6 +2577,7 @@ def expand(self, input): expansion_service=self._expansion_service, rearrange_based_on_discovery=True, autoSharding=self._with_auto_sharding, + numStreams=self._num_storage_api_streams, createDisposition=self._create_disposition, table=self._table, triggeringFrequencySeconds=self._triggering_frequency, diff --git a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md index eae98b84d2c1..7a31b63a3c96 100644 --- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md +++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md @@ -788,7 +788,7 @@ BigQuery Storage Write API for Python SDK currently has some limitations on supp {{< paragraph class="language-py" >}} **Note:** If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run `./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build` to build the expansion-service jar. If you are running from a released Beam SDK, the jar will already be included. -**Note:** Auto sharding is not currently supported for Python's Storage Write API. +**Note:** Auto sharding is not currently supported for Python's Storage Write API exactly-once mode on DataflowRunner. {{< /paragraph >}}