Skip to content

Commit

Permalink
Cherry picking PR #28618 into 2.51.0 (setting numShards for Python Bi…
Browse files Browse the repository at this point in the history
…gQuery xlang) (#28631)
  • Loading branch information
ahmedabu98 authored Sep 27, 2023
1 parent 70f4a1a commit 2420c90
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow (\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
description('Runs end-to-end cross language GCP IO tests on the Dataflow runner.')
description('Runs end-to-end cross language GCP IO tests on the Dataflow runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"')


// Set common parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run Python_Xlang_Gcp_Direct PostCommit\")', this) {
description('Runs end-to-end cross language GCP IO tests on the Direct runner.')
description('Runs end-to-end cross language GCP IO tests on the Direct runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"')

// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ public void validate() {
!Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
invalidConfigMessage + "Output must not be empty if error handling specified.");
}

if (this.getAutoSharding() != null && this.getAutoSharding()) {
checkArgument(
this.getNumStreams() == 0,
invalidConfigMessage
+ "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options.");
}
}

/**
Expand Down Expand Up @@ -218,11 +225,17 @@ public static Builder builder() {
public abstract Boolean getUseAtLeastOnceSemantics();

@SchemaFieldDescription(
"This option enables using a dynamically determined number of shards to write to "
"This option enables using a dynamically determined number of Storage Write API streams to write to "
+ "BigQuery. Only applicable to unbounded data.")
@Nullable
public abstract Boolean getAutoSharding();

@SchemaFieldDescription(
"Specifies the number of write streams that the Storage API sink will use. "
+ "This parameter is only applicable when writing unbounded data.")
@Nullable
public abstract Integer getNumStreams();

@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
@Nullable
public abstract ErrorHandling getErrorHandling();
Expand All @@ -243,6 +256,8 @@ public abstract static class Builder {

public abstract Builder setAutoSharding(Boolean autoSharding);

public abstract Builder setNumStreams(Integer numStreams);

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
Expand Down Expand Up @@ -321,13 +336,19 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
Long triggeringFrequency = configuration.getTriggeringFrequencySeconds();
Boolean autoSharding = configuration.getAutoSharding();
write =
write.withTriggeringFrequency(
(triggeringFrequency == null || triggeringFrequency <= 0)
? DEFAULT_TRIGGERING_FREQUENCY
: Duration.standardSeconds(triggeringFrequency));
// use default value true for autoSharding if not configured for STORAGE_WRITE_API
if (autoSharding == null || autoSharding) {
Integer numStreams = configuration.getNumStreams();
// Triggering frequency is only applicable for exactly-once
if (!configuration.getUseAtLeastOnceSemantics()) {
write =
write.withTriggeringFrequency(
(triggeringFrequency == null || triggeringFrequency <= 0)
? DEFAULT_TRIGGERING_FREQUENCY
: Duration.standardSeconds(triggeringFrequency));
}
// set num streams if specified, otherwise default to autoSharding
if (numStreams > 0) {
write = write.withNumStorageWriteApiStreams(numStreams);
} else if (autoSharding == null || autoSharding) {
write = write.withAutoSharding();
}
}
Expand Down
44 changes: 28 additions & 16 deletions sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
from hamcrest.core import assert_that as hamcrest_assert

import apache_beam as beam
from apache_beam.io.external.generate_sequence import GenerateSequence
from apache_beam.io.gcp.bigquery import StorageWriteToBigQuery
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

# Protect against environments where bigquery library is not available.
Expand All @@ -51,9 +52,9 @@


@pytest.mark.uses_gcp_java_expansion_service
@unittest.skipUnless(
os.environ.get('EXPANSION_PORT'),
"EXPANSION_PORT environment var is not provided.")
# @unittest.skipUnless(
# os.environ.get('EXPANSION_PORT'),
# "EXPANSION_PORT environment var is not provided.")
class BigQueryXlangStorageWriteIT(unittest.TestCase):
BIGQUERY_DATASET = 'python_xlang_storage_write'

Expand Down Expand Up @@ -104,6 +105,7 @@ def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.args = self.test_pipeline.get_full_options_as_args()
self.project = self.test_pipeline.get_option('project')
self._runner = PipelineOptions(self.args).get_all_options()['runner']

self.bigquery_client = BigQueryWrapper()
self.dataset_id = '%s_%s_%s' % (
Expand Down Expand Up @@ -244,8 +246,7 @@ def test_write_with_beam_rows(self):
table=table_id, expansion_service=self.expansion_service))
hamcrest_assert(p, bq_matcher)

def run_streaming(
self, table_name, auto_sharding=False, use_at_least_once=False):
def run_streaming(self, table_name, num_streams=0, use_at_least_once=False):
elements = self.ELEMENTS.copy()
schema = self.ALL_TYPES_SCHEMA
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name)
Expand All @@ -260,33 +261,44 @@ def run_streaming(
streaming=True,
allow_unsafe_triggers=True)

auto_sharding = (num_streams == 0)
with beam.Pipeline(argv=args) as p:
_ = (
p
| GenerateSequence(
start=0, stop=4, expansion_service=self.expansion_service)
| beam.Map(lambda x: elements[x])
| PeriodicImpulse(0, 4, 1)
| beam.Map(lambda t: elements[t])
| beam.io.WriteToBigQuery(
table=table_id,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
schema=schema,
triggering_frequency=1,
with_auto_sharding=auto_sharding,
num_storage_api_streams=num_streams,
use_at_least_once=use_at_least_once,
expansion_service=self.expansion_service))
hamcrest_assert(p, bq_matcher)

def test_streaming(self):
table = 'streaming'
def test_streaming_with_fixed_num_streams(self):
# skip if dataflow runner is not specified
if not self._runner or "dataflowrunner" not in self._runner.lower():
self.skipTest(
"The exactly-once route has the requirement "
"`beam:requirement:pardo:on_window_expiration:v1`, "
"which is currently only supported by the Dataflow runner")
table = 'streaming_fixed_num_streams'
self.run_streaming(table_name=table, num_streams=4)

@unittest.skip(
"Streaming to the Storage Write API sink with autosharding is broken "
"with Dataflow Runner V2.")
def test_streaming_with_auto_sharding(self):
table = 'streaming_with_auto_sharding'
self.run_streaming(table_name=table)

def test_streaming_with_at_least_once(self):
table = 'streaming'
table = 'streaming_with_at_least_once'
self.run_streaming(table_name=table, use_at_least_once=True)

def test_streaming_with_auto_sharding(self):
table = 'streaming_with_auto_sharding'
self.run_streaming(table_name=table, auto_sharding=True)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,7 @@ def __init__(
# TODO(https://github.com/apache/beam/issues/20712): Switch the default
# when the feature is mature.
with_auto_sharding=False,
num_storage_api_streams=0,
ignore_unknown_columns=False,
load_job_project_id=None,
max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
Expand Down Expand Up @@ -2018,6 +2019,9 @@ def __init__(
determined number of shards to write to BigQuery. This can be used for
all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
applicable to unbounded input.
num_storage_api_streams: Specifies the number of write streams that the
Storage API sink will use. This parameter is only applicable when
writing unbounded data.
ignore_unknown_columns: Accept rows that contain values that do not match
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. This option is only valid for
Expand Down Expand Up @@ -2060,6 +2064,7 @@ def __init__(
self.use_at_least_once = use_at_least_once
self.expansion_service = expansion_service
self.with_auto_sharding = with_auto_sharding
self._num_storage_api_streams = num_storage_api_streams
self.insert_retry_strategy = insert_retry_strategy
self._validate = validate
self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.JSON
Expand Down Expand Up @@ -2259,6 +2264,7 @@ def find_in_nested_dict(schema):
triggering_frequency=triggering_frequency,
use_at_least_once=self.use_at_least_once,
with_auto_sharding=self.with_auto_sharding,
num_storage_api_streams=self._num_storage_api_streams,
expansion_service=self.expansion_service))

if is_rows:
Expand Down Expand Up @@ -2521,6 +2527,7 @@ def __init__(
triggering_frequency=0,
use_at_least_once=False,
with_auto_sharding=False,
num_storage_api_streams=0,
expansion_service=None):
"""Initialize a StorageWriteToBigQuery transform.
Expand Down Expand Up @@ -2558,6 +2565,7 @@ def __init__(
self._triggering_frequency = triggering_frequency
self._use_at_least_once = use_at_least_once
self._with_auto_sharding = with_auto_sharding
self._num_storage_api_streams = num_storage_api_streams
self._expansion_service = (
expansion_service or _default_io_expansion_service())
self.schematransform_config = SchemaAwareExternalTransform.discover_config(
Expand All @@ -2569,6 +2577,7 @@ def expand(self, input):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
autoSharding=self._with_auto_sharding,
numStreams=self._num_storage_api_streams,
createDisposition=self._create_disposition,
table=self._table,
triggeringFrequencySeconds=self._triggering_frequency,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ BigQuery Storage Write API for Python SDK currently has some limitations on supp
{{< paragraph class="language-py" >}}
**Note:** If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run `./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build` to build the expansion-service jar. If you are running from a released Beam SDK, the jar will already be included.

**Note:** Auto sharding is not currently supported for Python's Storage Write API.
**Note:** Auto sharding is not currently supported for Python's Storage Write API exactly-once mode on DataflowRunner.

{{< /paragraph >}}

Expand Down

0 comments on commit 2420c90

Please sign in to comment.