Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prism] Dev prism builds for python and Python Direct Runner fallbacks. #32876

Merged
merged 11 commits into from
Oct 21, 2024
53 changes: 53 additions & 0 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,36 @@ def visit_transform(self, applied_ptransform):
if timer.time_domain == TimeDomain.REAL_TIME:
self.supported_by_fnapi_runner = False

class _PrismRunnerSupportVisitor(PipelineVisitor):
"""Visitor determining if a Pipeline can be run on the PrismRunner."""
def accept(self, pipeline):
self.supported_by_prism_runner = True
pipeline.visit(self)
return self.supported_by_prism_runner

def visit_transform(self, applied_ptransform):
transform = applied_ptransform.transform
# Python SDK assumes the direct runner TestStream implementation is being used.
if isinstance(transform, TestStream):
self.supported_by_prism_runner = False
if isinstance(transform, beam.ParDo):
dofn = transform.dofn
# It's uncertain if the Prism Runner supports execution of CombineFns
# with deferred side inputs.
if isinstance(dofn, CombineValuesDoFn):
args, kwargs = transform.raw_side_inputs
args_to_check = itertools.chain(args, kwargs.values())
if any(isinstance(arg, ArgumentPlaceholder)
for arg in args_to_check):
self.supported_by_prism_runner = False
if userstate.is_stateful_dofn(dofn):
# https://github.com/apache/beam/issues/32786 - Remove once Real time clock is used.
_, timer_specs = userstate.get_dofn_specs(dofn)
for timer in timer_specs:
if timer.time_domain == TimeDomain.REAL_TIME:
self.supported_by_prism_runner = False

tryingPrism = False
# Check whether all transforms used in the pipeline are supported by the
# FnApiRunner, and the pipeline was not meant to be run as streaming.
if _FnApiRunnerSupportVisitor().accept(pipeline):
Expand All @@ -122,9 +152,32 @@ def visit_transform(self, applied_ptransform):
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=encoded_options))
runner = fn_runner.FnApiRunner(provision_info=provision_info)
elif _PrismRunnerSupportVisitor().accept(pipeline):
_LOGGER.info('Running pipeline with PrismRunner.')
from apache_beam.runners.portability import prism_runner
runner = prism_runner.PrismRunner()
tryingPrism = True
else:
runner = BundleBasedDirectRunner()

if tryingPrism:
try:
pr = runner.run_pipeline(pipeline, options)
# This is non-blocking, so if the state is *already* finished, something
# probably failed on job submission.
if pr.state.is_terminal() and pr.state != PipelineState.DONE:
_LOGGER.info(
'Pipeline failed on PrismRunner, falling back toDirectRunner.')
runner = BundleBasedDirectRunner()
else:
return pr
except Exception as e:
# If prism fails in Preparing the portable job, then the PortableRunner
# code raises an exception. Catch it, log it, and use the Direct runner instead.
_LOGGER.info('Exception with PrismRunner:\n %s\n' % (e))
_LOGGER.info('Falling back to DirectRunner')
runner = BundleBasedDirectRunner()

return runner.run_pipeline(pipeline, options)


Expand Down
104 changes: 76 additions & 28 deletions sdks/python/apache_beam/runners/portability/prism_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import platform
import shutil
import stat
import subprocess
import typing
import urllib
import zipfile
Expand Down Expand Up @@ -167,38 +168,85 @@ def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:

def path_to_binary(self) -> str:
if self._path is not None:
if not os.path.exists(self._path):
url = urllib.parse.urlparse(self._path)
if not url.scheme:
raise ValueError(
'Unable to parse binary URL "%s". If using a full URL, make '
'sure the scheme is specified. If using a local file xpath, '
'make sure the file exists; you may have to first build prism '
'using `go build `.' % (self._path))

# We have a URL, see if we need to construct a valid file name.
if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
# If this URL starts with the download prefix, let it through.
return self._path
# The only other valid option is a github release page.
if not self._path.startswith(GITHUB_TAG_PREFIX):
raise ValueError(
'Provided --prism_location URL is not an Apache Beam Github '
'Release page URL or download URL: %s' % (self._path))
# Get the root tag for this URL
root_tag = os.path.basename(os.path.normpath(self._path))
return self.construct_download_url(
root_tag, platform.system(), platform.machine())
return self._path
else:
if '.dev' in self._version:
# The path is overidden, check various cases.
if os.path.exists(self._path):
# The path is local and exists, use directly.
return self._path

# Check if the path is a URL.
url = urllib.parse.urlparse(self._path)
if not url.scheme:
raise ValueError(
'Unable to parse binary URL "%s". If using a full URL, make '
'sure the scheme is specified. If using a local file xpath, '
'make sure the file exists; you may have to first build prism '
'using `go build `.' % (self._path))

# We have a URL, see if we need to construct a valid file name.
if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
# If this URL starts with the download prefix, let it through.
return self._path
# The only other valid option is a github release page.
if not self._path.startswith(GITHUB_TAG_PREFIX):
raise ValueError(
'Unable to derive URL for dev versions "%s". Please provide an '
'alternate version to derive the release URL with the '
'--prism_beam_version_override flag.' % (self._version))
'Provided --prism_location URL is not an Apache Beam Github '
'Release page URL or download URL: %s' % (self._path))
# Get the root tag for this URL
root_tag = os.path.basename(os.path.normpath(self._path))
return self.construct_download_url(
root_tag, platform.system(), platform.machine())

if '.dev' not in self._version:
# Not a development version, so construct the production download URL
return self.construct_download_url(
self._version, platform.system(), platform.machine())

# This is a development version! Assume Go is installed.
# Set the install directory to the cache location.
envdict = {**os.environ, "GOBIN": self.BIN_CACHE}
PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism"

process = subprocess.run(["go", "install", PRISMPKG],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=envdict)
if process.returncode == 0:
# Successfully installed
return '%s/prism' % (self.BIN_CACHE)

# We failed to build for some reason.
output = process.stdout.decode("utf-8")
if "not in a module" not in output and "no required module provides" not in output:
# This branch handles two classes of failures:
# 1. Go isn't installed, so it needs to be installed by the Beam SDK developer.
# 2. Go is installed, and they are building in a local version of Prism,
# but there was a compile error that the developer should address.
# Either way, the @latest fallback either would fail, or hide the error, so fail now.
_LOGGER.info(output)
raise ValueError(
'Unable to install a local of Prism: "%s";\n'
'Likely Go is not installed, or a local change to Prism did not compile.\n'
'Please install Go (see https://go.dev/doc/install) to enable automatic local builds.\n'
'Alternatively provide a binary with the --prism_location flag.'
'\nCaptured output:\n %s' % (self._version, output))

# Go is installed and claims we're not in a Go module that has access to the Prism package.

# Fallback to using the @latest version of prism, which works everywhere.
process = subprocess.run(["go", "install", PRISMPKG + "@latest"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=envdict)

if process.returncode == 0:
return '%s/prism' % (self.BIN_CACHE)

output = process.stdout.decode("utf-8")
raise ValueError(
'We were unable to execute the subprocess "%s" to automatically build prism. \n'
'Alternatively provide an alternate binary with the --prism_location flag.'
'\nCaptured output:\n %s' % (process.args, output))

def subprocess_cmd_and_endpoint(
self) -> typing.Tuple[typing.List[typing.Any], str]:
bin_path = self.local_bin(
Expand Down
Loading