Skip to content

Commit

Permalink
Merge pull request apache#28496 RenderRunner cleanup and fixes.
Browse files Browse the repository at this point in the history
RenderRunner cleanup
  • Loading branch information
robertwb authored Sep 18, 2023
2 parents b6f432d + 90059bd commit 91842e6
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 28 deletions.
41 changes: 20 additions & 21 deletions sdks/python/apache_beam/runners/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
except ImportError:
gcsio = None # type: ignore

_LOGGER = logging.getLogger(__name__)

# From the Beam site, circa November 2022.
DEFAULT_EDGE_STYLE = 'color="#ff570b"'
DEFAULT_TRANSFORM_STYLE = (
Expand Down Expand Up @@ -129,12 +131,6 @@ def _add_argparse_args(cls, parser):
help='Set to also log input pipeline proto to stdout.')
return parser

def __init__(self, *args, render_testing=False, **kwargs):
super().__init__(*args, **kwargs)
if self.render_port < 0 and not self.render_output and not render_testing:
raise ValueError(
'At least one of --render_port or --render_output must be provided.')


class PipelineRenderer:
def __init__(self, pipeline, options):
Expand Down Expand Up @@ -342,7 +338,7 @@ def page_callback_data(self, layout):
}

def render_data(self):
logging.info("Re-rendering pipeline...")
_LOGGER.info("Re-rendering pipeline...")
layout = self.layout_dot()
if self.options.render_output:
for path in self.options.render_output:
Expand All @@ -352,10 +348,10 @@ def render_data(self):
input=layout,
check=False)
if result.returncode:
logging.error(
_LOGGER.error(
"Failed render pipeline as %r: exit %s", path, result.returncode)
else:
logging.info("Rendered pipeline as %r", path)
_LOGGER.info("Rendered pipeline as %r", path)
return self.page_callback_data(layout)

def render_json(self):
Expand Down Expand Up @@ -404,15 +400,19 @@ class RenderRunner(runner.PipelineRunner):
# (such as counters, stage completion status, or possibly even PCollection
# samples) queryable and/or displayed. This could evolve into a full Beam
# UI.
def run_pipeline(self, pipeline_object, options, pipeline_proto=None):
if not pipeline_proto:
pipeline_proto = pipeline_object.to_runner_api()
def run_pipeline(self, pipeline_object, options):
return self.run_portable_pipeline(pipeline_object.to_runner_api(), options)

def run_portable_pipeline(self, pipeline_proto, options):
render_options = options.view_as(RenderOptions)
if render_options.render_port < 0 and not render_options.render_output:
raise ValueError(
'At least one of --render_port or --render_output must be provided.')
if render_options.log_proto:
logging.info(pipeline_proto)
_LOGGER.info(pipeline_proto)
renderer = PipelineRenderer(pipeline_proto, render_options)
try:
subprocess.run(['dotX', '-V'], capture_output=True, check=True)
subprocess.run(['dot', '-V'], capture_output=True, check=True)
except FileNotFoundError as exn:
# If dot is not available, we can at least output the raw .dot files.
dot_files = [
Expand All @@ -422,7 +422,7 @@ def run_pipeline(self, pipeline_object, options, pipeline_proto=None):
for output in dot_files:
with open(output, 'w') as fout:
fout.write(renderer.to_dot())
logging.info("Wrote pipeline as %s", output)
_LOGGER.info("Wrote pipeline as %s", output)

non_dot_files = set(render_options.render_output) - set(dot_files)
if non_dot_files:
Expand Down Expand Up @@ -543,17 +543,16 @@ def render_one(options):
pipeline_proto = beam_runner_api_pb2.Pipeline()
pipeline_proto.ParseFromString(content)

RenderRunner().run_pipeline(
None, pipeline_options.PipelineOptions(**vars(options)), pipeline_proto)
RenderRunner().run_portable_pipeline(
pipeline_proto, pipeline_options.PipelineOptions(**vars(options)))


def run_server(options):
class RenderBeamJob(local_job_service.BeamJob):
def _invoke_runner(self):
return RenderRunner().run_pipeline(
None,
pipeline_options.PipelineOptions(**vars(options)),
self._pipeline_proto)
return RenderRunner().run_portable_pipeline(
self._pipeline_proto,
pipeline_options.PipelineOptions(**vars(options)))

with tempfile.TemporaryDirectory() as staging_dir:
job_servicer = local_job_service.LocalJobServicer(
Expand Down
47 changes: 40 additions & 7 deletions sdks/python/apache_beam/runners/render_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
#
# pytype: skip-file

import os
import argparse
import logging
import subprocess
import unittest
import tempfile
import pytest

import apache_beam as beam
from apache_beam.runners import render
Expand All @@ -39,6 +42,16 @@ def test_basic_graph(self):
self.assertIn('CustomName', dot)
self.assertEqual(dot.count('->'), 2)

def test_render_config_validation(self):
p = beam.Pipeline()
_ = (
p | beam.Impulse() | beam.Map(lambda _: 2)
| 'CustomName' >> beam.Map(lambda x: x * x))
pipeline_proto = p.to_runner_api()
with pytest.raises(ValueError):
render.RenderRunner().run_portable_pipeline(
pipeline_proto, render.RenderOptions())

def test_side_input(self):
p = beam.Pipeline()
pcoll = p | beam.Impulse() | beam.FlatMap(lambda x: [1, 2, 3])
Expand All @@ -65,11 +78,35 @@ def test_composite_collapse(self):
renderer.update(toggle=[create_transform_id])
self.assertEqual(renderer.to_dot().count('->'), 1)

def test_dot_well_formed(self):

class DotRequiringRenderingTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
subprocess.run(['dot', '-V'], capture_output=True, check=True)
except FileNotFoundError:
cls._dot_installed = False
else:
cls._dot_installed = True

def setUp(self) -> None:
if not self._dot_installed: # type: ignore[attr-defined]
self.skipTest('dot executable not installed')

def test_run_portable_pipeline(self):
p = beam.Pipeline()
_ = (
p | beam.Impulse() | beam.Map(lambda _: 2)
| 'CustomName' >> beam.Map(lambda x: x * x))
pipeline_proto = p.to_runner_api()

with tempfile.TemporaryDirectory() as tmpdir:
svg_path = os.path.join(tmpdir, "my_output.svg")
render.RenderRunner().run_portable_pipeline(
pipeline_proto, render.RenderOptions(render_output=[svg_path]))
assert os.path.exists(svg_path)

def test_dot_well_formed(self):
p = beam.Pipeline()
_ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x)
pipeline_proto = p.to_runner_api()
Expand All @@ -84,16 +121,12 @@ def test_dot_well_formed(self):
renderer.render_data()

def test_leaf_composite_filter(self):
try:
subprocess.run(['dot', '-V'], capture_output=True, check=True)
except FileNotFoundError:
self.skipTest('dot executable not installed')
p = beam.Pipeline()
_ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x)
dot = render.PipelineRenderer(
p.to_runner_api(),
render.RenderOptions(['--render_leaf_composite_nodes=Create'],
render_testing=True)).to_dot()
render.RenderOptions(['--render_leaf_composite_nodes=Create'
])).to_dot()
self.assertEqual(dot.count('->'), 1)


Expand Down

0 comments on commit 91842e6

Please sign in to comment.