diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index b204adc7fc5d..6b1dd8bb48c0 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -515,6 +515,7 @@ class StandardOptions(PipelineOptions): 'apache_beam.runners.interactive.interactive_runner.InteractiveRunner', 'apache_beam.runners.portability.flink_runner.FlinkRunner', 'apache_beam.runners.portability.portable_runner.PortableRunner', + 'apache_beam.runners.portability.prism_runner.PrismRunner', 'apache_beam.runners.portability.spark_runner.SparkRunner', 'apache_beam.runners.test.TestDirectRunner', 'apache_beam.runners.test.TestDataflowRunner', @@ -1707,6 +1708,24 @@ def _add_argparse_args(cls, parser): help='Spark major version to use.') +class PrismRunnerOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--prism_location', + help='Path or URL to a prism binary, or zipped binary for the current ' + 'platform (Operating System and Architecture). May also be an Apache ' + 'Beam Github Release page URL, with a matching beam_version_override ' + 'set. This option overrides all others for finding a prism binary.') + parser.add_argument( + '--prism_beam_version_override', + help= + 'Override the SDK\'s version for deriving the Github Release URLs for ' + 'downloading a zipped prism binary, for the current platform. If ' + 'prism_location is set to a Github Release page URL, them it will use ' + 'that release page as a base when constructing the download URL.') + + class TestOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py new file mode 100644 index 000000000000..eeccaf5748ce --- /dev/null +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -0,0 +1,216 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A runner for executing portable pipelines on Apache Beam Prism.""" + +# this will make using the list parameterized generic happy +# on python 3.8 so we aren't revisiting this code after we +# sunset it +from __future__ import annotations + +import logging +import os +import platform +import shutil +import stat +import typing +import urllib +import zipfile +from urllib.error import URLError +from urllib.request import urlopen + +from apache_beam.io.filesystems import FileSystems +from apache_beam.options import pipeline_options +from apache_beam.runners.portability import job_server +from apache_beam.runners.portability import portable_runner +from apache_beam.transforms import environments +from apache_beam.utils import subprocess_server +from apache_beam.version import __version__ as beam_version + +# pytype: skip-file + +# Prefix for constructing a download URL +GITHUB_DOWNLOAD_PREFIX = 'https://github.com/apache/beam/releases/download/' +# Prefix for constructing a release URL, so we can derive a download URL +GITHUB_TAG_PREFIX = 'https://github.com/apache/beam/releases/tag/' + +_LOGGER = logging.getLogger(__name__) + + +class PrismRunner(portable_runner.PortableRunner): + """A runner for launching jobs on Prism, automatically downloading and + starting a Prism instance if needed. + """ + def default_environment( + self, + options: pipeline_options.PipelineOptions) -> environments.Environment: + portable_options = options.view_as(pipeline_options.PortableOptions) + if (not portable_options.environment_type and + not portable_options.output_executable_path): + portable_options.environment_type = 'LOOPBACK' + return super().default_environment(options) + + def default_job_server(self, options): + return job_server.StopOnExitJobServer(PrismJobServer(options)) + + def create_job_service_handle(self, job_service, options): + return portable_runner.JobServiceHandle( + job_service, options, retain_unknown_options=True) + + +class PrismJobServer(job_server.SubprocessJobServer): + PRISM_CACHE = os.path.expanduser("~/.apache_beam/cache/prism") + BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin") + + def __init__(self, options): + super().__init__() + prism_options = options.view_as(pipeline_options.PrismRunnerOptions) + # Options flow: + # If the path is set, always download and unzip the provided path, + # even if a binary is cached. + self._path = prism_options.prism_location + # Which version to use when constructing the prism download url. + if prism_options.prism_beam_version_override: + self._version = prism_options.prism_beam_version_override + else: + self._version = 'v' + beam_version + + job_options = options.view_as(pipeline_options.JobServerOptions) + self._job_port = job_options.job_port + + @classmethod + def maybe_unzip_and_make_executable(cls, url: str, bin_cache: str) -> str: + if zipfile.is_zipfile(url): + z = zipfile.ZipFile(url) + url = z.extract( + os.path.splitext(os.path.basename(url))[0], path=bin_cache) + + # Make sure the binary is executable. + st = os.stat(url) + os.chmod(url, st.st_mode | stat.S_IEXEC) + return url + + # Finds the bin or zip in the local cache, and if not, fetches it. + @classmethod + def local_bin( + cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str: + # ignore_cache sets whether we should always be downloading and unzipping + # the file or not, to avoid staleness issues. + if bin_cache == '': + bin_cache = cls.BIN_CACHE + if os.path.exists(url): + _LOGGER.info('Using local prism binary from %s' % url) + return cls.maybe_unzip_and_make_executable(url, bin_cache=bin_cache) + else: + cached_bin = os.path.join(bin_cache, os.path.basename(url)) + if os.path.exists(cached_bin) and not ignore_cache: + _LOGGER.info('Using cached prism binary from %s' % url) + else: + _LOGGER.info('Downloading prism binary from %s' % url) + if not os.path.exists(bin_cache): + os.makedirs(bin_cache) + try: + try: + url_read = FileSystems.open(url) + except ValueError: + url_read = urlopen(url) + with open(cached_bin + '.tmp', 'wb') as zip_write: + shutil.copyfileobj(url_read, zip_write, length=1 << 20) + os.rename(cached_bin + '.tmp', cached_bin) + except URLError as e: + raise RuntimeError( + 'Unable to fetch remote prism binary at %s: %s' % (url, e)) + return cls.maybe_unzip_and_make_executable( + cached_bin, bin_cache=bin_cache) + + def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str: + """Construct the prism download URL with the appropriate release tag. + This maps operating systems and machine architectures to the compatible + and canonical names used by the Go build targets. + + platform.system() provides compatible listings, so we need to filter out + the unsupported versions.""" + opsys = sys.lower() + if opsys not in ['linux', 'windows', 'darwin']: + raise ValueError( + 'Operating System "%s" unsupported for constructing a Prism release ' + 'binary URL.' % (opsys)) + + # platform.machine() will vary by system, but many names are compatible. + arch = mach.lower() + if arch in ['amd64', 'x86_64', 'x86-64', 'x64']: + arch = 'amd64' + if arch in ['arm64', 'aarch64_be', 'aarch64', 'armv8b', 'armv8l']: + arch = 'arm64' + + if arch not in ['amd64', 'arm64']: + raise ValueError( + 'Machine archictecture "%s" unsupported for constructing a Prism ' + 'release binary URL.' % (opsys)) + return ( + GITHUB_DOWNLOAD_PREFIX + + f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip") + + def path_to_binary(self) -> str: + if self._path is not None: + if not os.path.exists(self._path): + url = urllib.parse.urlparse(self._path) + if not url.scheme: + raise ValueError( + 'Unable to parse binary URL "%s". If using a full URL, make ' + 'sure the scheme is specified. If using a local file xpath, ' + 'make sure the file exists; you may have to first build prism ' + 'using `go build `.' % (self._path)) + + # We have a URL, see if we need to construct a valid file name. + if self._path.startswith(GITHUB_DOWNLOAD_PREFIX): + # If this URL starts with the download prefix, let it through. + return self._path + # The only other valid option is a github release page. + if not self._path.startswith(GITHUB_TAG_PREFIX): + raise ValueError( + 'Provided --prism_location URL is not an Apache Beam Github ' + 'Release page URL or download URL: %s' % (self._path)) + # Get the root tag for this URL + root_tag = os.path.basename(os.path.normpath(self._path)) + return self.construct_download_url( + root_tag, platform.system(), platform.machine()) + return self._path + else: + if '.dev' in self._version: + raise ValueError( + 'Unable to derive URL for dev versions "%s". Please provide an ' + 'alternate version to derive the release URL with the ' + '--prism_beam_version_override flag.' % (self._version)) + return self.construct_download_url( + self._version, platform.system(), platform.machine()) + + def subprocess_cmd_and_endpoint( + self) -> typing.Tuple[typing.List[typing.Any], str]: + bin_path = self.local_bin( + self.path_to_binary(), ignore_cache=(self._path is not None)) + job_port, = subprocess_server.pick_port(self._job_port) + subprocess_cmd = [bin_path] + self.prism_arguments(job_port) + return (subprocess_cmd, f"localhost:{job_port}") + + def prism_arguments(self, job_port) -> typing.List[typing.Any]: + return [ + '--job_port', + job_port, + '--serve_http', + False, + ] diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py new file mode 100644 index 000000000000..f1ccf66a2289 --- /dev/null +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -0,0 +1,269 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +import argparse +import logging +import shlex +import typing +import unittest +from os import linesep +from os import path +from os.path import exists +from shutil import rmtree +from tempfile import mkdtemp + +import pytest + +import apache_beam as beam +from apache_beam import Impulse +from apache_beam import Map +from apache_beam.io.external.generate_sequence import GenerateSequence +from apache_beam.io.kafka import ReadFromKafka +from apache_beam.io.kafka import WriteToKafka +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import PortableOptions +from apache_beam.runners.portability import portable_runner_test +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.sql import SqlTransform + +# Run as +# +# pytest prism_runner_test.py[::TestClass::test_case] \ +# --test-pipeline-options="--environment_type=LOOPBACK" + +_LOGGER = logging.getLogger(__name__) + +Row = typing.NamedTuple("Row", [("col1", int), ("col2", str)]) +beam.coders.registry.register_coder(Row, beam.coders.RowCoder) + + +class PrismRunnerTest(portable_runner_test.PortableRunnerTest): + _use_grpc = True + _use_subprocesses = True + + conf_dir = None + expansion_port = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.environment_type = None + self.environment_config = None + self.enable_commit = False + + def setUp(self): + self.enable_commit = False + + @pytest.fixture(autouse=True) + def parse_options(self, request): + if not request.config.option.test_pipeline_options: + raise unittest.SkipTest( + 'Skipping because --test-pipeline-options is not specified.') + test_pipeline_options = request.config.option.test_pipeline_options + parser = argparse.ArgumentParser(add_help=True) + parser.add_argument( + '--prism_bin', help='Prism binary to submit jobs.', action='store') + parser.add_argument( + '--environment_type', + default='LOOPBACK', + choices=['DOCKER', 'PROCESS', 'LOOPBACK'], + help='Set the environment type for running user code. DOCKER runs ' + 'user code in a container. PROCESS runs user code in ' + 'automatically started processes. LOOPBACK runs user code on ' + 'the same process that originally submitted the job.') + parser.add_argument( + '--environment_option', + '--environment_options', + dest='environment_options', + action='append', + default=None, + help=( + 'Environment configuration for running the user code. ' + 'Recognized options depend on --environment_type.\n ' + 'For DOCKER: docker_container_image (optional)\n ' + 'For PROCESS: process_command (required), process_variables ' + '(optional, comma-separated)\n ' + 'For EXTERNAL: external_service_address (required)')) + known_args, unknown_args = parser.parse_known_args( + shlex.split(test_pipeline_options)) + if unknown_args: + _LOGGER.warning('Discarding unrecognized arguments %s' % unknown_args) + self.set_prism_bin(known_args.prism_bin) + self.environment_type = known_args.environment_type + self.environment_options = known_args.environment_options + + @classmethod + def tearDownClass(cls): + if cls.conf_dir and exists(cls.conf_dir): + _LOGGER.info("removing conf dir: %s" % cls.conf_dir) + rmtree(cls.conf_dir) + super().tearDownClass() + + @classmethod + def _create_conf_dir(cls): + """Create (and save a static reference to) a "conf dir", used to provide + metrics configs and verify metrics output + + It gets cleaned up when the suite is done executing""" + + if hasattr(cls, 'conf_dir'): + cls.conf_dir = mkdtemp(prefix='prismtest-conf') + + # path for a FileReporter to write metrics to + cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt') + + # path to write Prism configuration to + conf_path = path.join(cls.conf_dir, 'prism-conf.yaml') + file_reporter = 'org.apache.beam.runners.prism.metrics.FileReporter' + with open(conf_path, 'w') as f: + f.write( + linesep.join([ + 'metrics.reporters: file', + 'metrics.reporter.file.class: %s' % file_reporter, + 'metrics.reporter.file.path: %s' % cls.test_metrics_path, + 'metrics.scope.operator: ', + ])) + + @classmethod + def _subprocess_command(cls, job_port, expansion_port): + # will be cleaned up at the end of this method, and recreated and used by + # the job server + tmp_dir = mkdtemp(prefix='prismtest') + + cls._create_conf_dir() + cls.expansion_port = expansion_port + + try: + return [ + cls.prism_bin, + '--job_port', + str(job_port), + ] + finally: + rmtree(tmp_dir) + + @classmethod + def get_expansion_service(cls): + # TODO Move expansion address resides into PipelineOptions + return 'localhost:%s' % cls.expansion_port + + @classmethod + def set_prism_bin(cls, prism_bin): + cls.prism_bin = prism_bin + + def create_options(self): + options = super().create_options() + options.view_as(DebugOptions).experiments = ['beam_fn_api'] + options.view_as(DebugOptions).experiments = [ + 'pre_optimize=default' + ] + options.view_as(DebugOptions).experiments + options.view_as(PortableOptions).environment_type = self.environment_type + options.view_as( + PortableOptions).environment_options = self.environment_options + + return options + + # Can't read host files from within docker, read a "local" file there. + def test_read(self): + print('name:', __name__) + with self.create_pipeline() as p: + lines = p | beam.io.ReadFromText('/etc/profile') + assert_that(lines, lambda lines: len(lines) > 0) + + def test_external_transform(self): + with self.create_pipeline() as p: + res = ( + p + | GenerateSequence( + start=1, stop=10, expansion_service=self.get_expansion_service())) + + assert_that(res, equal_to([i for i in range(1, 10)])) + + def test_expand_kafka_read(self): + # We expect to fail here because we do not have a Kafka cluster handy. + # Nevertheless, we check that the transform is expanded by the + # ExpansionService and that the pipeline fails during execution. + with self.assertRaises(Exception) as ctx: + self.enable_commit = True + with self.create_pipeline() as p: + # pylint: disable=expression-not-assigned + ( + p + | ReadFromKafka( + consumer_config={ + 'bootstrap.servers': 'notvalid1:7777, notvalid2:3531', + 'group.id': 'any_group' + }, + topics=['topic1', 'topic2'], + key_deserializer='org.apache.kafka.' + 'common.serialization.' + 'ByteArrayDeserializer', + value_deserializer='org.apache.kafka.' + 'common.serialization.' + 'LongDeserializer', + commit_offset_in_finalize=True, + timestamp_policy=ReadFromKafka.create_time_policy, + expansion_service=self.get_expansion_service())) + self.assertTrue( + 'No resolvable bootstrap urls given in bootstrap.servers' in str( + ctx.exception), + 'Expected to fail due to invalid bootstrap.servers, but ' + 'failed due to:\n%s' % str(ctx.exception)) + + def test_expand_kafka_write(self): + # We just test the expansion but do not execute. + # pylint: disable=expression-not-assigned + ( + self.create_pipeline() + | Impulse() + | Map(lambda input: (1, input)) + | WriteToKafka( + producer_config={ + 'bootstrap.servers': 'localhost:9092, notvalid2:3531' + }, + topic='topic1', + key_serializer='org.apache.kafka.' + 'common.serialization.' + 'LongSerializer', + value_serializer='org.apache.kafka.' + 'common.serialization.' + 'ByteArraySerializer', + expansion_service=self.get_expansion_service())) + + def test_sql(self): + with self.create_pipeline() as p: + output = ( + p + | 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)]) + | 'Sql' >> SqlTransform( + """SELECT col1, col2 || '*' || col2 as col2, + power(col1, 2) as col3 + FROM PCOLLECTION + """, + expansion_service=self.get_expansion_service())) + assert_that( + output, + equal_to([(x, '{x}*{x}'.format(x=x), x * x) for x in range(5)])) + + +# Inherits all other tests. + +if __name__ == '__main__': + # Run the tests. + logging.getLogger().setLevel(logging.INFO) + unittest.main()