diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 042b483d50f1..14177cd603d8 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -88,6 +88,7 @@ from apache_beam.transforms import ParDo from apache_beam.transforms import ptransform from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display import HasDisplayData from apache_beam.transforms.resources import merge_resource_hints from apache_beam.transforms.resources import resource_hints_from_options from apache_beam.transforms.sideinputs import get_sideinput_index @@ -108,7 +109,7 @@ __all__ = ['Pipeline', 'PTransformOverride'] -class Pipeline(object): +class Pipeline(HasDisplayData): """A pipeline object that manages a DAG of :class:`~apache_beam.pvalue.PValue` s and their :class:`~apache_beam.transforms.ptransform.PTransform` s. @@ -133,9 +134,12 @@ def runner_implemented_transforms(cls): common_urns.primitives.IMPULSE.urn, ]) - def __init__(self, runner=None, options=None, argv=None): - # type: (Optional[Union[str, PipelineRunner]], Optional[PipelineOptions], Optional[List[str]]) -> None - + def __init__( + self, + runner: Optional[Union[str, PipelineRunner]] = None, + options: Optional[PipelineOptions] = None, + argv: Optional[List[str]] = None, + display_data: Optional[Dict[str, Any]] = None): """Initialize a pipeline object. Args: @@ -151,6 +155,8 @@ def __init__(self, runner=None, options=None, argv=None): to be used for building a :class:`~apache_beam.options.pipeline_options.PipelineOptions` object. This will only be used if argument **options** is :data:`None`. + display_data (Dict[str: Any]): a dictionary of static data associated + with this pipeline that can be displayed when it runs. Raises: ValueError: if either the runner or options argument is not @@ -233,6 +239,11 @@ def __init__(self, runner=None, options=None, argv=None): # Records whether this pipeline contains any external transforms. self.contains_external_transforms = False + self._display_data = display_data or {} + + def display_data(self): + # type: () -> Dict[str, Any] + return self._display_data @property # type: ignore[misc] # decorated property not supported def options(self): @@ -914,7 +925,8 @@ def visit_transform(self, transform_node): proto = beam_runner_api_pb2.Pipeline( root_transform_ids=[root_transform_id], components=context.to_runner_api(), - requirements=context.requirements()) + requirements=context.requirements(), + display_data=DisplayData('', self._display_data).to_proto()) proto.components.transforms[root_transform_id].unique_name = ( root_transform_id) self.merge_compatible_environments(proto) @@ -970,7 +982,11 @@ def from_runner_api( # type: (...) -> Pipeline """For internal use only; no backwards-compatibility guarantees.""" - p = Pipeline(runner=runner, options=options) + p = Pipeline( + runner=runner, + options=options, + display_data={str(ix): d + for ix, d in enumerate(proto.display_data)}) from apache_beam.runners import pipeline_context context = pipeline_context.PipelineContext( proto.components, requirements=proto.requirements) diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index b52a8fd5b6dd..0d1dd552413e 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -45,6 +45,7 @@ from datetime import timedelta from typing import TYPE_CHECKING from typing import List +from typing import Union from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 @@ -101,7 +102,8 @@ def __init__( ): # type: (...) -> None self.namespace = namespace - self.items = [] # type: List[DisplayDataItem] + self.items = [ + ] # type: List[Union[DisplayDataItem, beam_runner_api_pb2.DisplayData]] self._populate_items(display_data_dict) def _populate_items(self, display_data_dict): @@ -112,26 +114,31 @@ def _populate_items(self, display_data_dict): subcomponent_display_data = DisplayData( element._get_display_data_namespace(), element.display_data()) self.items += subcomponent_display_data.items - continue - if isinstance(element, DisplayDataItem): + elif isinstance(element, DisplayDataItem): if element.should_drop(): continue element.key = key element.namespace = self.namespace self.items.append(element) - continue - # If it's not a HasDisplayData element, - # nor a dictionary, then it's a simple value - self.items.append( - DisplayDataItem(element, namespace=self.namespace, key=key)) + elif isinstance(element, beam_runner_api_pb2.DisplayData): + self.items.append(element) + + else: + # If it's not a HasDisplayData element, + # nor a dictionary, then it's a simple value + self.items.append( + DisplayDataItem(element, namespace=self.namespace, key=key)) def to_proto(self): # type: (...) -> List[beam_runner_api_pb2.DisplayData] """Returns a List of Beam proto representation of Display data.""" def create_payload(dd): + if isinstance(dd, beam_runner_api_pb2.DisplayData): + return dd + display_data_dict = None try: display_data_dict = dd.get_dict() diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py index eb0695f337b4..e2ec8df9cfc3 100644 --- a/sdks/python/apache_beam/yaml/main.py +++ b/sdks/python/apache_beam/yaml/main.py @@ -51,19 +51,22 @@ def _pipeline_spec_from_args(known_args): raise ValueError( "Exactly one of pipeline_spec or pipeline_spec_file must be set.") - return yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader) + return pipeline_yaml def run(argv=None): yaml_transform._LOGGER.setLevel('INFO') known_args, pipeline_args = _configure_parser(argv) - pipeline_spec = _pipeline_spec_from_args(known_args) + pipeline_yaml = _pipeline_spec_from_args(known_args) + pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader) - with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( - pipeline_args, - pickle_library='cloudpickle', - **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( - 'options', {})))) as p: + with beam.Pipeline( # linebreak for better yapf formatting + options=beam.options.pipeline_options.PipelineOptions( + pipeline_args, + pickle_library='cloudpickle', + **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( + 'options', {}))), + display_data={'yaml': pipeline_yaml}) as p: print("Building pipeline...") yaml_transform.expand_pipeline(p, pipeline_spec) print("Running pipeline...")