Skip to content

Commit

Permalink
Merge pull request #30959 Automatically execute unbounded pipelines i…
Browse files Browse the repository at this point in the history
…n streaming mode.
  • Loading branch information
robertwb authored Apr 15, 2024
2 parents eb7ad46 + c165f8a commit 63ebda0
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Default consumer polling timeout for KafkaIO.Read was increased from 1 second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration duration) to configure this timeout value when necessary ([#30870](https://github.com/apache/beam/issues/30870)).
* Python Dataflow users no longer need to manually specify --streaming for pipelines using unbounded sources such as ReadFromPubSub.

## Deprecations

Expand Down
27 changes: 27 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.common import group_by_key_input_visitor
from apache_beam.runners.common import merge_common_environments
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
Expand Down Expand Up @@ -415,6 +416,19 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)

if any(pcoll.is_bounded == beam_runner_api_pb2.IsBounded.UNBOUNDED
for pcoll in self.proto_pipeline.components.pcollections.values()):
if (not options.view_as(StandardOptions).streaming and
not options.view_as(DebugOptions).lookup_experiment(
'allow_unbounded_pcollections_in_batch_mode')):
_LOGGER.info(
'Automatically inferring streaming mode '
'due to unbounded PCollections.')
options.view_as(StandardOptions).streaming = True

if options.view_as(StandardOptions).streaming:
_check_and_add_missing_streaming_options(options)

# Dataflow can only handle Docker environments.
for env_id, env in self.proto_pipeline.components.environments.items():
self.proto_pipeline.components.environments[env_id].CopyFrom(
Expand Down Expand Up @@ -473,6 +487,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None):
if test_options.dry_run:
result = PipelineResult(PipelineState.DONE)
result.wait_until_finish = lambda duration=None: None
result.job = self.job
return result

# Get a Dataflow API client and set its options
Expand Down Expand Up @@ -598,9 +613,21 @@ def _check_and_add_missing_options(options):
"an SDK preinstalled in the default Dataflow dev runtime environment "
"or in a custom container image, use --sdk_location=container.")


def _check_and_add_missing_streaming_options(options):
# Type: (PipelineOptions) -> None

"""Validates and adds missing pipeline options depending on options set.
Must be called after it has been determined whether we're running in
streaming mode.
:param options: PipelineOptions for this pipeline.
"""
# Streaming only supports using runner v2 (aka unified worker).
# Runner v2 only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
debug_options = options.view_as(DebugOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
if (not google_cloud_options.enable_streaming_engine and
(debug_options.lookup_experiment("enable_windmill_service") or
Expand Down
61 changes: 61 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options
from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_streaming_options
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.extra_assertions import ExtraAssertionsMixin
Expand Down Expand Up @@ -523,6 +524,7 @@ def test_batch_is_runner_v2(self):
def test_streaming_is_runner_v2(self):
options = PipelineOptions(['--sdk_location=container', '--streaming'])
_check_and_add_missing_options(options)
_check_and_add_missing_streaming_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
'use_runner_v2',
Expand Down Expand Up @@ -554,6 +556,7 @@ def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
'--dataflow_service_options=enable_prime'
])
_check_and_add_missing_options(options)
_check_and_add_missing_streaming_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
'use_runner_v2',
Expand All @@ -564,6 +567,64 @@ def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
options.view_as(DebugOptions).lookup_experiment(expected, False),
expected)

@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
@mock.patch(
'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
lambda *args: [])
def test_auto_streaming_with_unbounded(self):
options = PipelineOptions([
'--sdk_location=container',
'--runner=DataflowRunner',
'--dry_run=True',
'--temp_location=gs://bucket',
'--project=project',
'--region=region'
])
with beam.Pipeline(options=options) as p:
_ = p | beam.io.ReadFromPubSub('projects/some-project/topics/some-topic')
self.assertEqual(
p.result.job.proto.type,
apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING)

@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
@mock.patch(
'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
lambda *args: [])
def test_auto_streaming_no_unbounded(self):
options = PipelineOptions([
'--sdk_location=container',
'--runner=DataflowRunner',
'--dry_run=True',
'--temp_location=gs://bucket',
'--project=project',
'--region=region'
])
with beam.Pipeline(options=options) as p:
_ = p | beam.Create([1, 2, 3])
self.assertEqual(
p.result.job.proto.type,
apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH)

@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
@mock.patch(
'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
lambda *args: [])
def test_explicit_streaming_no_unbounded(self):
options = PipelineOptions([
'--streaming',
'--sdk_location=container',
'--runner=DataflowRunner',
'--dry_run=True',
'--temp_location=gs://bucket',
'--project=project',
'--region=region'
])
with beam.Pipeline(options=options) as p:
_ = p | beam.Create([1, 2, 3])
self.assertEqual(
p.result.job.proto.type,
apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING)


if __name__ == '__main__':
unittest.main()

0 comments on commit 63ebda0

Please sign in to comment.