Skip to content

Commit

Permalink
Populate top-level display data in yaml main. (#28512)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Oct 9, 2023
1 parent dafe928 commit 2bbb348
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 21 deletions.
28 changes: 22 additions & 6 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
from apache_beam.transforms import ParDo
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display import HasDisplayData
from apache_beam.transforms.resources import merge_resource_hints
from apache_beam.transforms.resources import resource_hints_from_options
from apache_beam.transforms.sideinputs import get_sideinput_index
Expand All @@ -108,7 +109,7 @@
__all__ = ['Pipeline', 'PTransformOverride']


class Pipeline(object):
class Pipeline(HasDisplayData):
"""A pipeline object that manages a DAG of
:class:`~apache_beam.pvalue.PValue` s and their
:class:`~apache_beam.transforms.ptransform.PTransform` s.
Expand All @@ -133,9 +134,12 @@ def runner_implemented_transforms(cls):
common_urns.primitives.IMPULSE.urn,
])

def __init__(self, runner=None, options=None, argv=None):
# type: (Optional[Union[str, PipelineRunner]], Optional[PipelineOptions], Optional[List[str]]) -> None

def __init__(
self,
runner: Optional[Union[str, PipelineRunner]] = None,
options: Optional[PipelineOptions] = None,
argv: Optional[List[str]] = None,
display_data: Optional[Dict[str, Any]] = None):
"""Initialize a pipeline object.
Args:
Expand All @@ -151,6 +155,8 @@ def __init__(self, runner=None, options=None, argv=None):
to be used for building a
:class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
This will only be used if argument **options** is :data:`None`.
display_data (Dict[str: Any]): a dictionary of static data associated
with this pipeline that can be displayed when it runs.
Raises:
ValueError: if either the runner or options argument is not
Expand Down Expand Up @@ -233,6 +239,11 @@ def __init__(self, runner=None, options=None, argv=None):
# Records whether this pipeline contains any external transforms.
self.contains_external_transforms = False

self._display_data = display_data or {}

def display_data(self):
# type: () -> Dict[str, Any]
return self._display_data

@property # type: ignore[misc] # decorated property not supported
def options(self):
Expand Down Expand Up @@ -914,7 +925,8 @@ def visit_transform(self, transform_node):
proto = beam_runner_api_pb2.Pipeline(
root_transform_ids=[root_transform_id],
components=context.to_runner_api(),
requirements=context.requirements())
requirements=context.requirements(),
display_data=DisplayData('', self._display_data).to_proto())
proto.components.transforms[root_transform_id].unique_name = (
root_transform_id)
self.merge_compatible_environments(proto)
Expand Down Expand Up @@ -970,7 +982,11 @@ def from_runner_api(
# type: (...) -> Pipeline

"""For internal use only; no backwards-compatibility guarantees."""
p = Pipeline(runner=runner, options=options)
p = Pipeline(
runner=runner,
options=options,
display_data={str(ix): d
for ix, d in enumerate(proto.display_data)})
from apache_beam.runners import pipeline_context
context = pipeline_context.PipelineContext(
proto.components, requirements=proto.requirements)
Expand Down
23 changes: 15 additions & 8 deletions sdks/python/apache_beam/transforms/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from datetime import timedelta
from typing import TYPE_CHECKING
from typing import List
from typing import Union

from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
Expand Down Expand Up @@ -101,7 +102,8 @@ def __init__(
):
# type: (...) -> None
self.namespace = namespace
self.items = [] # type: List[DisplayDataItem]
self.items = [
] # type: List[Union[DisplayDataItem, beam_runner_api_pb2.DisplayData]]
self._populate_items(display_data_dict)

def _populate_items(self, display_data_dict):
Expand All @@ -112,26 +114,31 @@ def _populate_items(self, display_data_dict):
subcomponent_display_data = DisplayData(
element._get_display_data_namespace(), element.display_data())
self.items += subcomponent_display_data.items
continue

if isinstance(element, DisplayDataItem):
elif isinstance(element, DisplayDataItem):
if element.should_drop():
continue
element.key = key
element.namespace = self.namespace
self.items.append(element)
continue

# If it's not a HasDisplayData element,
# nor a dictionary, then it's a simple value
self.items.append(
DisplayDataItem(element, namespace=self.namespace, key=key))
elif isinstance(element, beam_runner_api_pb2.DisplayData):
self.items.append(element)

else:
# If it's not a HasDisplayData element,
# nor a dictionary, then it's a simple value
self.items.append(
DisplayDataItem(element, namespace=self.namespace, key=key))

def to_proto(self):
# type: (...) -> List[beam_runner_api_pb2.DisplayData]

"""Returns a List of Beam proto representation of Display data."""
def create_payload(dd):
if isinstance(dd, beam_runner_api_pb2.DisplayData):
return dd

display_data_dict = None
try:
display_data_dict = dd.get_dict()
Expand Down
17 changes: 10 additions & 7 deletions sdks/python/apache_beam/yaml/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,22 @@ def _pipeline_spec_from_args(known_args):
raise ValueError(
"Exactly one of pipeline_spec or pipeline_spec_file must be set.")

return yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
return pipeline_yaml


def run(argv=None):
yaml_transform._LOGGER.setLevel('INFO')
known_args, pipeline_args = _configure_parser(argv)
pipeline_spec = _pipeline_spec_from_args(known_args)
pipeline_yaml = _pipeline_spec_from_args(known_args)
pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)

with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pipeline_args,
pickle_library='cloudpickle',
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
'options', {})))) as p:
with beam.Pipeline( # linebreak for better yapf formatting
options=beam.options.pipeline_options.PipelineOptions(
pipeline_args,
pickle_library='cloudpickle',
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
'options', {}))),
display_data={'yaml': pipeline_yaml}) as p:
print("Building pipeline...")
yaml_transform.expand_pipeline(p, pipeline_spec)
print("Running pipeline...")
Expand Down

0 comments on commit 2bbb348

Please sign in to comment.