diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 455d12b4d3c1..af0c5e3de66f 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1674,6 +1674,10 @@ def _add_argparse_args(cls, parser): action='append', default=[], help='JVM properties to pass to a Java job server.') + parser.add_argument( + '--jar_cache_dir', + default=None, + help='The location to store jar cache for job server.') class FlinkRunnerOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index c0616bc6451c..66acfe654791 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -31,6 +31,7 @@ from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import JobServerOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions from apache_beam.options.pipeline_options import TypeOptions @@ -645,6 +646,11 @@ def test_transform_name_mapping(self): mapping = options.view_as(GoogleCloudOptions).transform_name_mapping self.assertEqual(mapping['from'], 'to') + def test_jar_cache_dir(self): + options = PipelineOptions(['--jar_cache_dir=/path/to/jar_cache_dir']) + jar_cache_dir = options.view_as(JobServerOptions).jar_cache_dir + self.assertEqual(jar_cache_dir, '/path/to/jar_cache_dir') + def test_dataflow_service_options(self): options = PipelineOptions([ '--dataflow_service_option', diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index e44d8ab0ae93..eee75f66a277 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -127,6 +127,7 @@ def __init__(self, options): self._artifacts_dir = options.artifacts_dir self._java_launcher = options.job_server_java_launcher self._jvm_properties = options.job_server_jvm_properties + self._jar_cache_dir = options.jar_cache_dir def java_arguments( self, job_port, artifact_port, expansion_port, artifacts_dir): @@ -141,11 +142,11 @@ def path_to_beam_jar(gradle_target, artifact_id=None): gradle_target, artifact_id=artifact_id) @staticmethod - def local_jar(url): - return subprocess_server.JavaJarServer.local_jar(url) + def local_jar(url, jar_cache_dir=None): + return subprocess_server.JavaJarServer.local_jar(url, jar_cache_dir) def subprocess_cmd_and_endpoint(self): - jar_path = self.local_jar(self.path_to_jar()) + jar_path = self.local_jar(self.path_to_jar(), self._jar_cache_dir) artifacts_dir = ( self._artifacts_dir if self._artifacts_dir else self.local_temp_dir( prefix='artifacts')) diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 1e2ede281c9d..13b3629b24bf 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -41,7 +41,8 @@ def path_to_jar(self): return '/path/to/jar' @staticmethod - def local_jar(url): + def local_jar(url, jar_cache_dir=None): + logging.debug("url({%s}), jar_cache_dir({%s})", url, jar_cache_dir) return url diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 944c12625d7c..b1080cb643af 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -266,11 +266,17 @@ class JavaJarServer(SubprocessServer): 'local', (threading.local, ), dict(__init__=lambda self: setattr(self, 'replacements', {})))() - def __init__(self, stub_class, path_to_jar, java_arguments, classpath=None): + def __init__( + self, + stub_class, + path_to_jar, + java_arguments, + classpath=None, + cache_dir=None): if classpath: # java -jar ignores the classpath, so we make a new jar that embeds # the requested classpath. - path_to_jar = self.make_classpath_jar(path_to_jar, classpath) + path_to_jar = self.make_classpath_jar(path_to_jar, classpath, cache_dir) super().__init__( stub_class, ['java', '-jar', path_to_jar] + list(java_arguments)) self._existing_service = path_to_jar if is_service_endpoint(