Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate top-level display data in yaml main. #28512

Merged
merged 2 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
from apache_beam.transforms import ParDo
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display import HasDisplayData
from apache_beam.transforms.resources import merge_resource_hints
from apache_beam.transforms.resources import resource_hints_from_options
from apache_beam.transforms.sideinputs import get_sideinput_index
Expand All @@ -108,7 +109,7 @@
__all__ = ['Pipeline', 'PTransformOverride']


class Pipeline(object):
class Pipeline(HasDisplayData):
"""A pipeline object that manages a DAG of
:class:`~apache_beam.pvalue.PValue` s and their
:class:`~apache_beam.transforms.ptransform.PTransform` s.
Expand All @@ -133,9 +134,12 @@ def runner_implemented_transforms(cls):
common_urns.primitives.IMPULSE.urn,
])

def __init__(self, runner=None, options=None, argv=None):
# type: (Optional[Union[str, PipelineRunner]], Optional[PipelineOptions], Optional[List[str]]) -> None

def __init__(
self,
runner: Optional[Union[str, PipelineRunner]] = None,
options: Optional[PipelineOptions] = None,
argv: Optional[List[str]] = None,
display_data: Optional[Dict[str, Any]] = None):
"""Initialize a pipeline object.

Args:
Expand All @@ -151,6 +155,8 @@ def __init__(self, runner=None, options=None, argv=None):
to be used for building a
:class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
This will only be used if argument **options** is :data:`None`.
display_data (Dict[str: Any]): a dictionary of static data associated
with this pipeline that can be displayed when it runs.

Raises:
ValueError: if either the runner or options argument is not
Expand Down Expand Up @@ -233,6 +239,11 @@ def __init__(self, runner=None, options=None, argv=None):
# Records whether this pipeline contains any external transforms.
self.contains_external_transforms = False

self._display_data = display_data or {}

def display_data(self):
# type: () -> Dict[str, Any]
return self._display_data

@property # type: ignore[misc] # decorated property not supported
@deprecated(
Expand Down Expand Up @@ -918,7 +929,8 @@ def visit_transform(self, transform_node):
proto = beam_runner_api_pb2.Pipeline(
root_transform_ids=[root_transform_id],
components=context.to_runner_api(),
requirements=context.requirements())
requirements=context.requirements(),
display_data=DisplayData('', self._display_data).to_proto())
proto.components.transforms[root_transform_id].unique_name = (
root_transform_id)
self.merge_compatible_environments(proto)
Expand Down Expand Up @@ -974,7 +986,11 @@ def from_runner_api(
# type: (...) -> Pipeline

"""For internal use only; no backwards-compatibility guarantees."""
p = Pipeline(runner=runner, options=options)
p = Pipeline(
runner=runner,
options=options,
display_data={str(ix): d
for ix, d in enumerate(proto.display_data)})
from apache_beam.runners import pipeline_context
context = pipeline_context.PipelineContext(
proto.components, requirements=proto.requirements)
Expand Down
23 changes: 15 additions & 8 deletions sdks/python/apache_beam/transforms/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from datetime import timedelta
from typing import TYPE_CHECKING
from typing import List
from typing import Union

from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
Expand Down Expand Up @@ -101,7 +102,8 @@ def __init__(
):
# type: (...) -> None
self.namespace = namespace
self.items = [] # type: List[DisplayDataItem]
self.items = [
] # type: List[Union[DisplayDataItem, beam_runner_api_pb2.DisplayData]]
self._populate_items(display_data_dict)

def _populate_items(self, display_data_dict):
Expand All @@ -112,26 +114,31 @@ def _populate_items(self, display_data_dict):
subcomponent_display_data = DisplayData(
element._get_display_data_namespace(), element.display_data())
self.items += subcomponent_display_data.items
continue

if isinstance(element, DisplayDataItem):
elif isinstance(element, DisplayDataItem):
if element.should_drop():
continue
element.key = key
element.namespace = self.namespace
self.items.append(element)
continue

# If it's not a HasDisplayData element,
# nor a dictionary, then it's a simple value
self.items.append(
DisplayDataItem(element, namespace=self.namespace, key=key))
elif isinstance(element, beam_runner_api_pb2.DisplayData):
self.items.append(element)

else:
# If it's not a HasDisplayData element,
# nor a dictionary, then it's a simple value
self.items.append(
DisplayDataItem(element, namespace=self.namespace, key=key))

def to_proto(self):
# type: (...) -> List[beam_runner_api_pb2.DisplayData]

"""Returns a List of Beam proto representation of Display data."""
def create_payload(dd):
if isinstance(dd, beam_runner_api_pb2.DisplayData):
return dd

display_data_dict = None
try:
display_data_dict = dd.get_dict()
Expand Down
17 changes: 10 additions & 7 deletions sdks/python/apache_beam/yaml/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,22 @@ def _pipeline_spec_from_args(known_args):
raise ValueError(
"Exactly one of pipeline_spec or pipeline_spec_file must be set.")

return yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
return pipeline_yaml


def run(argv=None):
yaml_transform._LOGGER.setLevel('INFO')
known_args, pipeline_args = _configure_parser(argv)
pipeline_spec = _pipeline_spec_from_args(known_args)
pipeline_yaml = _pipeline_spec_from_args(known_args)
pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)

with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pipeline_args,
pickle_library='cloudpickle',
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
'options', {})))) as p:
with beam.Pipeline( #
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

erroneous '#'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually to prevent some odd yapf formatting. Clarifying.

options=beam.options.pipeline_options.PipelineOptions(
pipeline_args,
pickle_library='cloudpickle',
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
'options', {}))),
display_data={'yaml': pipeline_yaml}) as p:
print("Building pipeline...")
yaml_transform.expand_pipeline(p, pipeline_spec)
print("Running pipeline...")
Expand Down