Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically execute unbounded pipelines in streaming mode. #30959

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
* Merged sdks/java/fn-execution and runners/core-construction-java into the main SDK. These artifacts were never meant for users, but noting
that they no longer exist. These are steps to bring portability into the core SDK alongside all other core functionality.
* Added Vertex AI Feature Store handler for Enrichment transform (Python) ([#30388](https://github.com/apache/beam/pull/30388))
* Python Dataflow users no longer need to manually specify --streaming for pipelines using unbounded sources such as ReadFromPubSub.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the wrong release. I'd also vote we put this under breaking changes (since that's likely where folks will look if they do actually see this behavior change).


## Breaking Changes

Expand Down
27 changes: 27 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.common import group_by_key_input_visitor
from apache_beam.runners.common import merge_common_environments
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
Expand Down Expand Up @@ -415,6 +416,19 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)

if any(pcoll.is_bounded == beam_runner_api_pb2.IsBounded.UNBOUNDED
for pcoll in self.proto_pipeline.components.pcollections.values()):
if (not options.view_as(StandardOptions).streaming and
not options.view_as(DebugOptions).lookup_experiment(
'allow_unbounded_pcollections_in_batch_mode')):
_LOGGER.info(
'Automatically inferring streaming mode '
'due to unbounded PCollections.')
options.view_as(StandardOptions).streaming = True

if options.view_as(StandardOptions).streaming:
_check_and_add_missing_streaming_options(options)

# Dataflow can only handle Docker environments.
for env_id, env in self.proto_pipeline.components.environments.items():
self.proto_pipeline.components.environments[env_id].CopyFrom(
Expand Down Expand Up @@ -473,6 +487,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None):
if test_options.dry_run:
result = PipelineResult(PipelineState.DONE)
result.wait_until_finish = lambda duration=None: None
result.job = self.job
return result

# Get a Dataflow API client and set its options
Expand Down Expand Up @@ -598,9 +613,21 @@ def _check_and_add_missing_options(options):
"an SDK preinstalled in the default Dataflow dev runtime environment "
"or in a custom container image, use --sdk_location=container.")


def _check_and_add_missing_streaming_options(options):
# Type: (PipelineOptions) -> None

"""Validates and adds missing pipeline options depending on options set.

Must be called after it has been determined whether we're running in
streaming mode.

:param options: PipelineOptions for this pipeline.
"""
# Streaming only supports using runner v2 (aka unified worker).
# Runner v2 only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
debug_options = options.view_as(DebugOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
if (not google_cloud_options.enable_streaming_engine and
(debug_options.lookup_experiment("enable_windmill_service") or
Expand Down
61 changes: 61 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options
from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_streaming_options
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.extra_assertions import ExtraAssertionsMixin
Expand Down Expand Up @@ -523,6 +524,7 @@ def test_batch_is_runner_v2(self):
def test_streaming_is_runner_v2(self):
options = PipelineOptions(['--sdk_location=container', '--streaming'])
_check_and_add_missing_options(options)
_check_and_add_missing_streaming_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
'use_runner_v2',
Expand Down Expand Up @@ -554,6 +556,7 @@ def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
'--dataflow_service_options=enable_prime'
])
_check_and_add_missing_options(options)
_check_and_add_missing_streaming_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
'use_runner_v2',
Expand All @@ -564,6 +567,64 @@ def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
options.view_as(DebugOptions).lookup_experiment(expected, False),
expected)

@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
@mock.patch(
'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
lambda *args: [])
def test_auto_streaming_with_unbounded(self):
options = PipelineOptions([
'--sdk_location=container',
'--runner=DataflowRunner',
'--dry_run=True',
'--temp_location=gs://bucket',
'--project=project',
'--region=region'
])
with beam.Pipeline(options=options) as p:
_ = p | beam.io.ReadFromPubSub('projects/some-project/topics/some-topic')
self.assertEqual(
p.result.job.proto.type,
apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING)

@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
@mock.patch(
'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
lambda *args: [])
def test_auto_streaming_no_unbounded(self):
options = PipelineOptions([
'--sdk_location=container',
'--runner=DataflowRunner',
'--dry_run=True',
'--temp_location=gs://bucket',
'--project=project',
'--region=region'
])
with beam.Pipeline(options=options) as p:
_ = p | beam.Create([1, 2, 3])
self.assertEqual(
p.result.job.proto.type,
apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH)

@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
@mock.patch(
'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
lambda *args: [])
def test_explicit_streaming_no_unbounded(self):
options = PipelineOptions([
'--streaming',
'--sdk_location=container',
'--runner=DataflowRunner',
'--dry_run=True',
'--temp_location=gs://bucket',
'--project=project',
'--region=region'
])
with beam.Pipeline(options=options) as p:
_ = p | beam.Create([1, 2, 3])
self.assertEqual(
p.result.job.proto.type,
apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING)


if __name__ == '__main__':
unittest.main()
Loading