Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RenderRunner cleanup #28496

Merged
merged 8 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions sdks/python/apache_beam/runners/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
except ImportError:
gcsio = None # type: ignore

_LOGGER = logging.getLogger(__name__)

# From the Beam site, circa November 2022.
DEFAULT_EDGE_STYLE = 'color="#ff570b"'
DEFAULT_TRANSFORM_STYLE = (
Expand Down Expand Up @@ -129,12 +131,6 @@ def _add_argparse_args(cls, parser):
help='Set to also log input pipeline proto to stdout.')
return parser

def __init__(self, *args, render_testing=False, **kwargs):
super().__init__(*args, **kwargs)
if self.render_port < 0 and not self.render_output and not render_testing:
raise ValueError(
'At least one of --render_port or --render_output must be provided.')


class PipelineRenderer:
def __init__(self, pipeline, options):
Expand Down Expand Up @@ -342,7 +338,7 @@ def page_callback_data(self, layout):
}

def render_data(self):
logging.info("Re-rendering pipeline...")
_LOGGER.info("Re-rendering pipeline...")
layout = self.layout_dot()
if self.options.render_output:
for path in self.options.render_output:
Expand All @@ -352,10 +348,10 @@ def render_data(self):
input=layout,
check=False)
if result.returncode:
logging.error(
_LOGGER.error(
"Failed render pipeline as %r: exit %s", path, result.returncode)
else:
logging.info("Rendered pipeline as %r", path)
_LOGGER.info("Rendered pipeline as %r", path)
return self.page_callback_data(layout)

def render_json(self):
Expand Down Expand Up @@ -404,15 +400,19 @@ class RenderRunner(runner.PipelineRunner):
# (such as counters, stage completion status, or possibly even PCollection
# samples) queryable and/or displayed. This could evolve into a full Beam
# UI.
def run_pipeline(self, pipeline_object, options, pipeline_proto=None):
if not pipeline_proto:
pipeline_proto = pipeline_object.to_runner_api()
def run_pipeline(self, pipeline_object, options):
return self.run_portable_pipeline(pipeline_object.to_runner_api(), options)

def run_portable_pipeline(self, pipeline_proto, options):
render_options = options.view_as(RenderOptions)
if render_options.render_port < 0 and not render_options.render_output:
raise ValueError(
'At least one of --render_port or --render_output must be provided.')
if render_options.log_proto:
logging.info(pipeline_proto)
_LOGGER.info(pipeline_proto)
renderer = PipelineRenderer(pipeline_proto, render_options)
try:
subprocess.run(['dotX', '-V'], capture_output=True, check=True)
subprocess.run(['dot', '-V'], capture_output=True, check=True)
except FileNotFoundError as exn:
# If dot is not available, we can at least output the raw .dot files.
dot_files = [
Expand All @@ -422,7 +422,7 @@ def run_pipeline(self, pipeline_object, options, pipeline_proto=None):
for output in dot_files:
with open(output, 'w') as fout:
fout.write(renderer.to_dot())
logging.info("Wrote pipeline as %s", output)
_LOGGER.info("Wrote pipeline as %s", output)

non_dot_files = set(render_options.render_output) - set(dot_files)
if non_dot_files:
Expand Down Expand Up @@ -543,17 +543,16 @@ def render_one(options):
pipeline_proto = beam_runner_api_pb2.Pipeline()
pipeline_proto.ParseFromString(content)

RenderRunner().run_pipeline(
None, pipeline_options.PipelineOptions(**vars(options)), pipeline_proto)
RenderRunner().run_portable_pipeline(
pipeline_proto, pipeline_options.PipelineOptions(**vars(options)))


def run_server(options):
class RenderBeamJob(local_job_service.BeamJob):
def _invoke_runner(self):
return RenderRunner().run_pipeline(
None,
pipeline_options.PipelineOptions(**vars(options)),
self._pipeline_proto)
return RenderRunner().run_portable_pipeline(
self._pipeline_proto,
pipeline_options.PipelineOptions(**vars(options)))

with tempfile.TemporaryDirectory() as staging_dir:
job_servicer = local_job_service.LocalJobServicer(
Expand Down
47 changes: 40 additions & 7 deletions sdks/python/apache_beam/runners/render_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
#
# pytype: skip-file

import os
import argparse
import logging
import subprocess
import unittest
import tempfile
import pytest

import apache_beam as beam
from apache_beam.runners import render
Expand All @@ -39,6 +42,16 @@ def test_basic_graph(self):
self.assertIn('CustomName', dot)
self.assertEqual(dot.count('->'), 2)

def test_render_config_validation(self):
p = beam.Pipeline()
_ = (
p | beam.Impulse() | beam.Map(lambda _: 2)
| 'CustomName' >> beam.Map(lambda x: x * x))
pipeline_proto = p.to_runner_api()
with pytest.raises(ValueError):
render.RenderRunner().run_portable_pipeline(
pipeline_proto, render.RenderOptions())

def test_side_input(self):
p = beam.Pipeline()
pcoll = p | beam.Impulse() | beam.FlatMap(lambda x: [1, 2, 3])
Expand All @@ -65,11 +78,35 @@ def test_composite_collapse(self):
renderer.update(toggle=[create_transform_id])
self.assertEqual(renderer.to_dot().count('->'), 1)

def test_dot_well_formed(self):

class DotRequiringRenderingTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
subprocess.run(['dot', '-V'], capture_output=True, check=True)
except FileNotFoundError:
cls._dot_installed = False
else:
cls._dot_installed = True

def setUp(self) -> None:
if not self._dot_installed:
self.skipTest('dot executable not installed')

def test_run_portable_pipeline(self):
p = beam.Pipeline()
_ = (
p | beam.Impulse() | beam.Map(lambda _: 2)
| 'CustomName' >> beam.Map(lambda x: x * x))
pipeline_proto = p.to_runner_api()

with tempfile.TemporaryDirectory() as tmpdir:
svg_path = os.path.join(tmpdir, "my_output.svg")
render.RenderRunner().run_portable_pipeline(
pipeline_proto, render.RenderOptions(render_output=[svg_path]))
assert os.path.exists(svg_path)

def test_dot_well_formed(self):
p = beam.Pipeline()
_ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x)
pipeline_proto = p.to_runner_api()
Expand All @@ -84,16 +121,12 @@ def test_dot_well_formed(self):
renderer.render_data()

def test_leaf_composite_filter(self):
try:
subprocess.run(['dot', '-V'], capture_output=True, check=True)
except FileNotFoundError:
self.skipTest('dot executable not installed')
p = beam.Pipeline()
_ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x)
dot = render.PipelineRenderer(
p.to_runner_api(),
render.RenderOptions(['--render_leaf_composite_nodes=Create'],
render_testing=True)).to_dot()
render.RenderOptions(['--render_leaf_composite_nodes=Create'
])).to_dot()
self.assertEqual(dot.count('->'), 1)


Expand Down
Loading