diff --git a/sdks/python/apache_beam/runners/render.py b/sdks/python/apache_beam/runners/render.py index 306bf8c2090b..da153d25a4bd 100644 --- a/sdks/python/apache_beam/runners/render.py +++ b/sdks/python/apache_beam/runners/render.py @@ -78,6 +78,8 @@ except ImportError: gcsio = None # type: ignore +_LOGGER = logging.getLogger(__name__) + # From the Beam site, circa November 2022. DEFAULT_EDGE_STYLE = 'color="#ff570b"' DEFAULT_TRANSFORM_STYLE = ( @@ -129,12 +131,6 @@ def _add_argparse_args(cls, parser): help='Set to also log input pipeline proto to stdout.') return parser - def __init__(self, *args, render_testing=False, **kwargs): - super().__init__(*args, **kwargs) - if self.render_port < 0 and not self.render_output and not render_testing: - raise ValueError( - 'At least one of --render_port or --render_output must be provided.') - class PipelineRenderer: def __init__(self, pipeline, options): @@ -342,7 +338,7 @@ def page_callback_data(self, layout): } def render_data(self): - logging.info("Re-rendering pipeline...") + _LOGGER.info("Re-rendering pipeline...") layout = self.layout_dot() if self.options.render_output: for path in self.options.render_output: @@ -352,10 +348,10 @@ def render_data(self): input=layout, check=False) if result.returncode: - logging.error( + _LOGGER.error( "Failed render pipeline as %r: exit %s", path, result.returncode) else: - logging.info("Rendered pipeline as %r", path) + _LOGGER.info("Rendered pipeline as %r", path) return self.page_callback_data(layout) def render_json(self): @@ -404,15 +400,19 @@ class RenderRunner(runner.PipelineRunner): # (such as counters, stage completion status, or possibly even PCollection # samples) queryable and/or displayed. This could evolve into a full Beam # UI. - def run_pipeline(self, pipeline_object, options, pipeline_proto=None): - if not pipeline_proto: - pipeline_proto = pipeline_object.to_runner_api() + def run_pipeline(self, pipeline_object, options): + return self.run_portable_pipeline(pipeline_object.to_runner_api(), options) + + def run_portable_pipeline(self, pipeline_proto, options): render_options = options.view_as(RenderOptions) + if render_options.render_port < 0 and not render_options.render_output: + raise ValueError( + 'At least one of --render_port or --render_output must be provided.') if render_options.log_proto: - logging.info(pipeline_proto) + _LOGGER.info(pipeline_proto) renderer = PipelineRenderer(pipeline_proto, render_options) try: - subprocess.run(['dotX', '-V'], capture_output=True, check=True) + subprocess.run(['dot', '-V'], capture_output=True, check=True) except FileNotFoundError as exn: # If dot is not available, we can at least output the raw .dot files. dot_files = [ @@ -422,7 +422,7 @@ def run_pipeline(self, pipeline_object, options, pipeline_proto=None): for output in dot_files: with open(output, 'w') as fout: fout.write(renderer.to_dot()) - logging.info("Wrote pipeline as %s", output) + _LOGGER.info("Wrote pipeline as %s", output) non_dot_files = set(render_options.render_output) - set(dot_files) if non_dot_files: @@ -543,17 +543,16 @@ def render_one(options): pipeline_proto = beam_runner_api_pb2.Pipeline() pipeline_proto.ParseFromString(content) - RenderRunner().run_pipeline( - None, pipeline_options.PipelineOptions(**vars(options)), pipeline_proto) + RenderRunner().run_portable_pipeline( + pipeline_proto, pipeline_options.PipelineOptions(**vars(options))) def run_server(options): class RenderBeamJob(local_job_service.BeamJob): def _invoke_runner(self): - return RenderRunner().run_pipeline( - None, - pipeline_options.PipelineOptions(**vars(options)), - self._pipeline_proto) + return RenderRunner().run_portable_pipeline( + self._pipeline_proto, + pipeline_options.PipelineOptions(**vars(options))) with tempfile.TemporaryDirectory() as staging_dir: job_servicer = local_job_service.LocalJobServicer( diff --git a/sdks/python/apache_beam/runners/render_test.py b/sdks/python/apache_beam/runners/render_test.py index 4dca2b8b5221..67e7afc1c7b9 100644 --- a/sdks/python/apache_beam/runners/render_test.py +++ b/sdks/python/apache_beam/runners/render_test.py @@ -16,10 +16,13 @@ # # pytype: skip-file +import os import argparse import logging import subprocess import unittest +import tempfile +import pytest import apache_beam as beam from apache_beam.runners import render @@ -39,6 +42,16 @@ def test_basic_graph(self): self.assertIn('CustomName', dot) self.assertEqual(dot.count('->'), 2) + def test_render_config_validation(self): + p = beam.Pipeline() + _ = ( + p | beam.Impulse() | beam.Map(lambda _: 2) + | 'CustomName' >> beam.Map(lambda x: x * x)) + pipeline_proto = p.to_runner_api() + with pytest.raises(ValueError): + render.RenderRunner().run_portable_pipeline( + pipeline_proto, render.RenderOptions()) + def test_side_input(self): p = beam.Pipeline() pcoll = p | beam.Impulse() | beam.FlatMap(lambda x: [1, 2, 3]) @@ -65,11 +78,35 @@ def test_composite_collapse(self): renderer.update(toggle=[create_transform_id]) self.assertEqual(renderer.to_dot().count('->'), 1) - def test_dot_well_formed(self): + +class DotRequiringRenderingTest(unittest.TestCase): + @classmethod + def setUpClass(cls): try: subprocess.run(['dot', '-V'], capture_output=True, check=True) except FileNotFoundError: + cls._dot_installed = False + else: + cls._dot_installed = True + + def setUp(self) -> None: + if not self._dot_installed: # type: ignore[attr-defined] self.skipTest('dot executable not installed') + + def test_run_portable_pipeline(self): + p = beam.Pipeline() + _ = ( + p | beam.Impulse() | beam.Map(lambda _: 2) + | 'CustomName' >> beam.Map(lambda x: x * x)) + pipeline_proto = p.to_runner_api() + + with tempfile.TemporaryDirectory() as tmpdir: + svg_path = os.path.join(tmpdir, "my_output.svg") + render.RenderRunner().run_portable_pipeline( + pipeline_proto, render.RenderOptions(render_output=[svg_path])) + assert os.path.exists(svg_path) + + def test_dot_well_formed(self): p = beam.Pipeline() _ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x) pipeline_proto = p.to_runner_api() @@ -84,16 +121,12 @@ def test_dot_well_formed(self): renderer.render_data() def test_leaf_composite_filter(self): - try: - subprocess.run(['dot', '-V'], capture_output=True, check=True) - except FileNotFoundError: - self.skipTest('dot executable not installed') p = beam.Pipeline() _ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x) dot = render.PipelineRenderer( p.to_runner_api(), - render.RenderOptions(['--render_leaf_composite_nodes=Create'], - render_testing=True)).to_dot() + render.RenderOptions(['--render_leaf_composite_nodes=Create' + ])).to_dot() self.assertEqual(dot.count('->'), 1)