From 1c4564e3d0244f024b559ef1a1c0ea1db752bc50 Mon Sep 17 00:00:00 2001 From: "s21.lee" Date: Wed, 31 Jul 2024 10:20:38 +0900 Subject: [PATCH 01/23] Add JobServerOption for jar_cache_dir Signed-off-by: s21.lee --- sdks/python/apache_beam/options/pipeline_options.py | 5 ++++- sdks/python/apache_beam/options/pipeline_options_test.py | 6 ++++++ .../python/apache_beam/runners/portability/job_server.py | 7 ++++--- .../apache_beam/runners/portability/job_server_test.py | 9 +++++++-- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 6b1dd8bb48c0..e206dfd34f2a 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1627,7 +1627,10 @@ def _add_argparse_args(cls, parser): action='append', default=[], help='JVM properties to pass to a Java job server.') - + parser.add_argument( + '--jar_cache_dir', + default=None, + help='The location to store jar cache for job server.') class FlinkRunnerOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 61b227d9a246..28167767ff75 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -32,6 +32,7 @@ from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import JobServerOptions from apache_beam.options.pipeline_options import ProfilingOptions from apache_beam.options.pipeline_options import TypeOptions from apache_beam.options.pipeline_options import WorkerOptions @@ -639,6 +640,11 @@ def test_transform_name_mapping(self): mapping = options.view_as(GoogleCloudOptions).transform_name_mapping self.assertEqual(mapping['from'], 'to') + def test_jar_cache_dir(self): + options = PipelineOptions(['--jar_cache_dir=/path/to/jar_cache_dir']) + jar_cache_dir = options.view_as(JobServerOptions).jar_cache_dir + self.assertEqual(jar_cache_dir, '/path/to/jar_cache_dir') + def test_dataflow_service_options(self): options = PipelineOptions([ '--dataflow_service_option', diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index e44d8ab0ae93..eee75f66a277 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -127,6 +127,7 @@ def __init__(self, options): self._artifacts_dir = options.artifacts_dir self._java_launcher = options.job_server_java_launcher self._jvm_properties = options.job_server_jvm_properties + self._jar_cache_dir = options.jar_cache_dir def java_arguments( self, job_port, artifact_port, expansion_port, artifacts_dir): @@ -141,11 +142,11 @@ def path_to_beam_jar(gradle_target, artifact_id=None): gradle_target, artifact_id=artifact_id) @staticmethod - def local_jar(url): - return subprocess_server.JavaJarServer.local_jar(url) + def local_jar(url, jar_cache_dir=None): + return subprocess_server.JavaJarServer.local_jar(url, jar_cache_dir) def subprocess_cmd_and_endpoint(self): - jar_path = self.local_jar(self.path_to_jar()) + jar_path = self.local_jar(self.path_to_jar(), self._jar_cache_dir) artifacts_dir = ( self._artifacts_dir if self._artifacts_dir else self.local_temp_dir( prefix='artifacts')) diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 1e2ede281c9d..077074f0e4e7 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -25,7 +25,7 @@ class JavaJarJobServerStub(JavaJarJobServer): def java_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir): + self, job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): return [ '--artifacts-dir', artifacts_dir, @@ -35,13 +35,15 @@ def java_arguments( artifact_port, '--expansion-port', expansion_port + '--jar_cache_dir', + jar_cache_dir ] def path_to_jar(self): return '/path/to/jar' @staticmethod - def local_jar(url): + def local_jar(url, jar_cache_dir): return url @@ -54,6 +56,7 @@ def test_subprocess_cmd_and_endpoint(self): '--artifacts_dir=/path/to/artifacts/', '--job_server_java_launcher=/path/to/java', '--job_server_jvm_properties=-Dsome.property=value' + '--jar_cache_dir=/path/to/cache_dir' ]) job_server = JavaJarJobServerStub(pipeline_options) subprocess_cmd, endpoint = job_server.subprocess_cmd_and_endpoint() @@ -72,6 +75,8 @@ def test_subprocess_cmd_and_endpoint(self): 8098, '--expansion-port', 8097 + '--jar-cache-dir', + '/path/to/cache_dir' ]) self.assertEqual(endpoint, 'localhost:8099') From ce443c2d43b8907ccbf67b46a269d5cb201d696f Mon Sep 17 00:00:00 2001 From: "s21.lee" Date: Thu, 22 Aug 2024 14:53:05 +0900 Subject: [PATCH 02/23] fixed for job_server_test error - add missing comma Signed-off-by: s21.lee --- sdks/python/apache_beam/runners/portability/job_server_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 077074f0e4e7..cc0b7a3fb4c7 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -34,7 +34,7 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port + expansion_port, '--jar_cache_dir', jar_cache_dir ] From 9817eadd755fc0e34e1835c88971da11e5ec2ab8 Mon Sep 17 00:00:00 2001 From: s21lee Date: Thu, 22 Aug 2024 15:20:22 +0900 Subject: [PATCH 03/23] fix error for missing comma Signed-off-by: s21lee --- sdks/python/apache_beam/runners/portability/job_server_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index cc0b7a3fb4c7..c87cac4d617c 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -74,7 +74,7 @@ def test_subprocess_cmd_and_endpoint(self): '--artifact-port', 8098, '--expansion-port', - 8097 + 8097, '--jar-cache-dir', '/path/to/cache_dir' ]) From 8756dd8b81d438159ebe63a9a94e4c63a25a87ec Mon Sep 17 00:00:00 2001 From: s21lee Date: Thu, 22 Aug 2024 15:33:33 +0900 Subject: [PATCH 04/23] fix lint Signed-off-by: s21lee --- .../apache_beam/options/pipeline_options.py | 1 + .../runners/portability/job_server_test.py | 107 +++++++++--------- 2 files changed, 57 insertions(+), 51 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index e206dfd34f2a..06c7d50d97bd 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1632,6 +1632,7 @@ def _add_argparse_args(cls, parser): default=None, help='The location to store jar cache for job server.') + class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index c87cac4d617c..24cafd973404 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -24,63 +24,68 @@ class JavaJarJobServerStub(JavaJarJobServer): - def java_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): - return [ - '--artifacts-dir', - artifacts_dir, - '--job-port', - job_port, - '--artifact-port', - artifact_port, - '--expansion-port', - expansion_port, - '--jar_cache_dir', - jar_cache_dir - ] + def java_arguments( + self, + job_port, + artifact_port, + expansion_port, + artifacts_dir, + jar_cache_dir): + return [ + '--artifacts-dir', + artifacts_dir, + '--job-port', + job_port, + '--artifact-port', + artifact_port, + '--expansion-port', + expansion_port, + '--jar_cache_dir', + jar_cache_dir + ] - def path_to_jar(self): - return '/path/to/jar' + def path_to_jar(self): + return '/path/to/jar' - @staticmethod - def local_jar(url, jar_cache_dir): - return url + @staticmethod + def local_jar(url, jar_cache_dir): + return url class JavaJarJobServerTest(unittest.TestCase): - def test_subprocess_cmd_and_endpoint(self): - pipeline_options = PipelineOptions([ - '--job_port=8099', - '--artifact_port=8098', - '--expansion_port=8097', - '--artifacts_dir=/path/to/artifacts/', - '--job_server_java_launcher=/path/to/java', - '--job_server_jvm_properties=-Dsome.property=value' - '--jar_cache_dir=/path/to/cache_dir' - ]) - job_server = JavaJarJobServerStub(pipeline_options) - subprocess_cmd, endpoint = job_server.subprocess_cmd_and_endpoint() - self.assertEqual( - subprocess_cmd, - [ - '/path/to/java', - '-jar', - '-Dsome.property=value', - '/path/to/jar', - '--artifacts-dir', - '/path/to/artifacts/', - '--job-port', - 8099, - '--artifact-port', - 8098, - '--expansion-port', - 8097, - '--jar-cache-dir', - '/path/to/cache_dir' + def test_subprocess_cmd_and_endpoint(self): + pipeline_options = PipelineOptions([ + '--job_port=8099', + '--artifact_port=8098', + '--expansion_port=8097', + '--artifacts_dir=/path/to/artifacts/', + '--job_server_java_launcher=/path/to/java', + '--job_server_jvm_properties=-Dsome.property=value' + '--jar_cache_dir=/path/to/cache_dir' ]) - self.assertEqual(endpoint, 'localhost:8099') + job_server = JavaJarJobServerStub(pipeline_options) + subprocess_cmd, endpoint = job_server.subprocess_cmd_and_endpoint() + self.assertEqual( + subprocess_cmd, + [ + '/path/to/java', + '-jar', + '-Dsome.property=value', + '/path/to/jar', + '--artifacts-dir', + '/path/to/artifacts/', + '--job-port', + 8099, + '--artifact-port', + 8098, + '--expansion-port', + 8097, + '--jar-cache-dir', + '/path/to/cache_dir' + ]) + self.assertEqual(endpoint, 'localhost:8099') if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() + logging.getLogger().setLevel(logging.INFO) + unittest.main() From e8322a7968845ba6c08c2b5fd91b676c99309dfa Mon Sep 17 00:00:00 2001 From: s21lee Date: Thu, 22 Aug 2024 17:50:48 +0900 Subject: [PATCH 05/23] fix lint Signed-off-by: s21lee --- .../runners/portability/job_server_test.py | 109 +++++++++--------- 1 file changed, 53 insertions(+), 56 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 24cafd973404..4ec40a1af8e1 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -24,68 +24,65 @@ class JavaJarJobServerStub(JavaJarJobServer): - def java_arguments( - self, - job_port, - artifact_port, - expansion_port, - artifacts_dir, - jar_cache_dir): - return [ - '--artifacts-dir', - artifacts_dir, - '--job-port', - job_port, - '--artifact-port', - artifact_port, - '--expansion-port', - expansion_port, - '--jar_cache_dir', - jar_cache_dir - ] + def java_arguments( + self, + job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): + return [ + '--artifacts-dir', + artifacts_dir, + '--job-port', + job_port, + '--artifact-port', + artifact_port, + '--expansion-port', + expansion_port, + '--jar_cache_dir', + jar_cache_dir + ] - def path_to_jar(self): - return '/path/to/jar' + def path_to_jar(self): + return '/path/to/jar' - @staticmethod - def local_jar(url, jar_cache_dir): - return url + @staticmethod + def local_jar(url, jar_cache_dir): + print(f'using {jar_cache_dir}') + return url class JavaJarJobServerTest(unittest.TestCase): - def test_subprocess_cmd_and_endpoint(self): - pipeline_options = PipelineOptions([ - '--job_port=8099', - '--artifact_port=8098', - '--expansion_port=8097', - '--artifacts_dir=/path/to/artifacts/', - '--job_server_java_launcher=/path/to/java', - '--job_server_jvm_properties=-Dsome.property=value' - '--jar_cache_dir=/path/to/cache_dir' + def test_subprocess_cmd_and_endpoint(self): + pipeline_options = PipelineOptions([ + '--job_port=8099', + '--artifact_port=8098', + '--expansion_port=8097', + '--artifacts_dir=/path/to/artifacts/', + '--job_server_java_launcher=/path/to/java', + '--job_server_jvm_properties=-Dsome.property=value' + '--jar_cache_dir=/path/to/cache_dir' + ]) + job_server = JavaJarJobServerStub(pipeline_options) + subprocess_cmd, endpoint = job_server.subprocess_cmd_and_endpoint() + self.assertEqual( + subprocess_cmd, + [ + '/path/to/java', + '-jar', + '-Dsome.property=value', + '/path/to/jar', + '--artifacts-dir', + '/path/to/artifacts/', + '--job-port', + 8099, + '--artifact-port', + 8098, + '--expansion-port', + 8097, + '--jar-cache-dir', + '/path/to/cache_dir' ]) - job_server = JavaJarJobServerStub(pipeline_options) - subprocess_cmd, endpoint = job_server.subprocess_cmd_and_endpoint() - self.assertEqual( - subprocess_cmd, - [ - '/path/to/java', - '-jar', - '-Dsome.property=value', - '/path/to/jar', - '--artifacts-dir', - '/path/to/artifacts/', - '--job-port', - 8099, - '--artifact-port', - 8098, - '--expansion-port', - 8097, - '--jar-cache-dir', - '/path/to/cache_dir' - ]) - self.assertEqual(endpoint, 'localhost:8099') + self.assertEqual(endpoint, 'localhost:8099') if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() + logging.getLogger().setLevel(logging.INFO) + unittest.main() From a654566eb26d4d31c384c2ca2badd606b32c1c23 Mon Sep 17 00:00:00 2001 From: s21lee Date: Tue, 27 Aug 2024 13:29:38 +0900 Subject: [PATCH 06/23] fix for unit test error Signed-off-by: s21lee --- sdks/python/apache_beam/runners/portability/flink_runner.py | 6 ++++-- sdks/python/apache_beam/runners/portability/job_server.py | 2 +- .../apache_beam/runners/portability/job_server_test.py | 2 +- sdks/python/apache_beam/runners/portability/spark_runner.py | 6 ++++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index c9bf15b46e22..12d70e06722b 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -111,7 +111,7 @@ def path_to_jar(self): ':runners:flink:%s:job-server:shadowJar' % self._flink_version) def java_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir): + self, job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): return [ '--flink-master', self._master_url, @@ -122,5 +122,7 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port + expansion_port, + '--jar_cache_dir', + jar_cache_dir ] diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index eee75f66a277..b35aa6373efd 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -130,7 +130,7 @@ def __init__(self, options): self._jar_cache_dir = options.jar_cache_dir def java_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir): + self, job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): raise NotImplementedError(type(self)) def path_to_jar(self): diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 4ec40a1af8e1..5c4c177c4a79 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -77,7 +77,7 @@ def test_subprocess_cmd_and_endpoint(self): 8098, '--expansion-port', 8097, - '--jar-cache-dir', + '--jar_cache_dir', '/path/to/cache_dir' ]) self.assertEqual(endpoint, 'localhost:8099') diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py b/sdks/python/apache_beam/runners/portability/spark_runner.py index 480fbdecdce3..8ce4243c5b1d 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner.py @@ -103,7 +103,7 @@ def path_to_jar(self): return self.path_to_beam_jar(':runners:spark:3:job-server:shadowJar') def java_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir): + self, job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): return [ '--spark-master-url', self._master_url, @@ -114,5 +114,7 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port + expansion_port, + '--jar_cache_dir', + jar_cache_dir ] From cf6fea46ae39f83ff054d0ece651833771905c21 Mon Sep 17 00:00:00 2001 From: s21lee Date: Tue, 27 Aug 2024 17:12:33 +0900 Subject: [PATCH 07/23] fix lint Signed-off-by: s21lee --- .../python/apache_beam/runners/portability/flink_runner.py | 7 ++++++- sdks/python/apache_beam/runners/portability/job_server.py | 7 ++++++- .../apache_beam/runners/portability/job_server_test.py | 6 +++++- .../python/apache_beam/runners/portability/spark_runner.py | 7 ++++++- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index 12d70e06722b..eb5ed4854b74 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -111,7 +111,12 @@ def path_to_jar(self): ':runners:flink:%s:job-server:shadowJar' % self._flink_version) def java_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): + self, + job_port, + artifact_port, + expansion_port, + artifacts_dir, + jar_cache_dir): return [ '--flink-master', self._master_url, diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index b35aa6373efd..419e62212653 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -130,7 +130,12 @@ def __init__(self, options): self._jar_cache_dir = options.jar_cache_dir def java_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): + self, + job_port, + artifact_port, + expansion_port, + artifacts_dir, + jar_cache_dir): raise NotImplementedError(type(self)) def path_to_jar(self): diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 5c4c177c4a79..0e9d03449221 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -26,7 +26,11 @@ class JavaJarJobServerStub(JavaJarJobServer): def java_arguments( self, - job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): + job_port, + artifact_port, + expansion_port, + artifacts_dir, + jar_cache_dir): return [ '--artifacts-dir', artifacts_dir, diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py b/sdks/python/apache_beam/runners/portability/spark_runner.py index 8ce4243c5b1d..7eaee63cd240 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner.py @@ -103,7 +103,12 @@ def path_to_jar(self): return self.path_to_beam_jar(':runners:spark:3:job-server:shadowJar') def java_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir, jar_cache_dir): + self, + job_port, + artifact_port, + expansion_port, + artifacts_dir, + jar_cache_dir): return [ '--spark-master-url', self._master_url, From 2ebf174af22d37acf1b15db2c7399acf8b2290fc Mon Sep 17 00:00:00 2001 From: s21lee Date: Wed, 28 Aug 2024 09:56:19 +0900 Subject: [PATCH 08/23] fix test error Signed-off-by: s21lee --- sdks/python/apache_beam/runners/portability/job_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index 419e62212653..8a51d8a9050a 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -160,5 +160,5 @@ def subprocess_cmd_and_endpoint(self): jar_path ] + list( self.java_arguments( - job_port, self._artifact_port, self._expansion_port, artifacts_dir)) + job_port, self._artifact_port, self._expansion_port, artifacts_dir, self.jar_cache_dir)) return (subprocess_cmd, 'localhost:%s' % job_port) From 468a5e1d1e300460552560817a5daef85f263a6a Mon Sep 17 00:00:00 2001 From: s21lee Date: Wed, 28 Aug 2024 10:22:00 +0900 Subject: [PATCH 09/23] fix lint Signed-off-by: s21lee --- .../apache_beam/runners/portability/job_server.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index 8a51d8a9050a..dc4f0b0a2fea 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -156,9 +156,12 @@ def subprocess_cmd_and_endpoint(self): self._artifacts_dir if self._artifacts_dir else self.local_temp_dir( prefix='artifacts')) job_port, = subprocess_server.pick_port(self._job_port) - subprocess_cmd = [self._java_launcher, '-jar'] + self._jvm_properties + [ - jar_path - ] + list( - self.java_arguments( - job_port, self._artifact_port, self._expansion_port, artifacts_dir, self.jar_cache_dir)) + subprocess_cmd = [self._java_launcher, '-jar' + ] + self._jvm_properties + [jar_path] + list( + self.java_arguments( + job_port, + self._artifact_port, + self._expansion_port, + artifacts_dir, + self.jar_cache_dir)) return (subprocess_cmd, 'localhost:%s' % job_port) From bceffabe1034053c0f17ca044d1b79b1dbd877c3 Mon Sep 17 00:00:00 2001 From: s21lee Date: Thu, 29 Aug 2024 11:50:25 +0900 Subject: [PATCH 10/23] fix error for typo Signed-off-by: s21lee --- sdks/python/apache_beam/runners/portability/job_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index dc4f0b0a2fea..c167bf32024c 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -163,5 +163,5 @@ def subprocess_cmd_and_endpoint(self): self._artifact_port, self._expansion_port, artifacts_dir, - self.jar_cache_dir)) + self._jar_cache_dir)) return (subprocess_cmd, 'localhost:%s' % job_port) From 29b3f6fdaf8f5790567386fe590dffa903c1afb8 Mon Sep 17 00:00:00 2001 From: s21lee Date: Sun, 1 Sep 2024 11:45:26 +0900 Subject: [PATCH 11/23] fix test error Signed-off-by: s21lee --- sdks/python/apache_beam/utils/subprocess_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 944c12625d7c..b7a3b6efb6a3 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -266,11 +266,11 @@ class JavaJarServer(SubprocessServer): 'local', (threading.local, ), dict(__init__=lambda self: setattr(self, 'replacements', {})))() - def __init__(self, stub_class, path_to_jar, java_arguments, classpath=None): + def __init__(self, stub_class, path_to_jar, java_arguments, classpath=None, cache_dir=None): if classpath: # java -jar ignores the classpath, so we make a new jar that embeds # the requested classpath. - path_to_jar = self.make_classpath_jar(path_to_jar, classpath) + path_to_jar = self.make_classpath_jar(path_to_jar, classpath, cache_dir) super().__init__( stub_class, ['java', '-jar', path_to_jar] + list(java_arguments)) self._existing_service = path_to_jar if is_service_endpoint( From 3255f12770166ee507caa5266f25794990584262 Mon Sep 17 00:00:00 2001 From: s21lee Date: Wed, 23 Oct 2024 14:49:30 +0900 Subject: [PATCH 12/23] fix lint Signed-off-by: s21lee --- sdks/python/apache_beam/utils/subprocess_server.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index b7a3b6efb6a3..b1080cb643af 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -266,7 +266,13 @@ class JavaJarServer(SubprocessServer): 'local', (threading.local, ), dict(__init__=lambda self: setattr(self, 'replacements', {})))() - def __init__(self, stub_class, path_to_jar, java_arguments, classpath=None, cache_dir=None): + def __init__( + self, + stub_class, + path_to_jar, + java_arguments, + classpath=None, + cache_dir=None): if classpath: # java -jar ignores the classpath, so we make a new jar that embeds # the requested classpath. From 64aac4ab25f75dc3717669fd546301f975bc6a1e Mon Sep 17 00:00:00 2001 From: s21lee Date: Thu, 24 Oct 2024 18:58:54 +0900 Subject: [PATCH 13/23] fix error Signed-off-by: s21lee --- .../apache_beam/runners/portability/flink_runner.py | 4 +--- .../runners/portability/flink_uber_jar_job_server.py | 3 ++- .../apache_beam/runners/portability/job_server_test.py | 10 +++------- .../apache_beam/runners/portability/spark_runner.py | 4 +--- .../runners/portability/spark_uber_jar_job_server.py | 1 + 5 files changed, 8 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index eb5ed4854b74..89a803748c65 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -127,7 +127,5 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port, - '--jar_cache_dir', - jar_cache_dir + expansion_port ] diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py index 3b302e334a5f..9052754c264a 100644 --- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py @@ -50,6 +50,7 @@ def __init__(self, master_url, options): pipeline_options.FlinkRunnerOptions).flink_job_server_jar) self._artifact_port = ( options.view_as(pipeline_options.JobServerOptions).artifact_port) + self._jar_cache_dir = (options.view_as(pipeline_options.JobServerOptions.jar_cache_dir)) self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink') def start(self): @@ -77,7 +78,7 @@ def executable_jar(self): else: url = job_server.JavaJarJobServer.path_to_beam_jar( ':runners:flink:%s:job-server:shadowJar' % self.flink_version()) - return job_server.JavaJarJobServer.local_jar(url) + return job_server.JavaJarJobServer.local_jar(url, self._jar_cache_dir) def flink_version(self): full_version = requests.get( diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 0e9d03449221..77af9693d257 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -39,9 +39,7 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port, - '--jar_cache_dir', - jar_cache_dir + expansion_port ] def path_to_jar(self): @@ -49,7 +47,7 @@ def path_to_jar(self): @staticmethod def local_jar(url, jar_cache_dir): - print(f'using {jar_cache_dir}') + print(f'local_jar url({url}) jar_cache_dir({jar_cache_dir})') return url @@ -80,9 +78,7 @@ def test_subprocess_cmd_and_endpoint(self): '--artifact-port', 8098, '--expansion-port', - 8097, - '--jar_cache_dir', - '/path/to/cache_dir' + 8097 ]) self.assertEqual(endpoint, 'localhost:8099') diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py b/sdks/python/apache_beam/runners/portability/spark_runner.py index 7eaee63cd240..a76518682b13 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner.py @@ -119,7 +119,5 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port, - '--jar_cache_dir', - jar_cache_dir + expansion_port ] diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py index f754b4c330ad..9ffc244dc48d 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py @@ -49,6 +49,7 @@ def __init__(self, rest_url, options): self._rest_url = rest_url self._artifact_port = ( options.view_as(pipeline_options.JobServerOptions).artifact_port) + self._jar_cache_dir = (options.view_as(pipeline_options.JobServerOptions.jar_cache_dir)) self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-spark') spark_options = options.view_as(pipeline_options.SparkRunnerOptions) self._executable_jar = spark_options.spark_job_server_jar From 4b3e841e54b2abe039d1a9e3835052419c7d4c84 Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 10:04:23 +0900 Subject: [PATCH 14/23] fix error Signed-off-by: s21lee --- .../runners/portability/flink_uber_jar_job_server.py | 2 +- .../runners/portability/spark_uber_jar_job_server.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py index 9052754c264a..9b087a665953 100644 --- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py @@ -50,7 +50,7 @@ def __init__(self, master_url, options): pipeline_options.FlinkRunnerOptions).flink_job_server_jar) self._artifact_port = ( options.view_as(pipeline_options.JobServerOptions).artifact_port) - self._jar_cache_dir = (options.view_as(pipeline_options.JobServerOptions.jar_cache_dir)) + self._jar_cache_dir = (options.view_as(pipeline_options.JobServerOptions).jar_cache_dir) self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink') def start(self): diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py index 9ffc244dc48d..66b1770a6b05 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py @@ -49,7 +49,7 @@ def __init__(self, rest_url, options): self._rest_url = rest_url self._artifact_port = ( options.view_as(pipeline_options.JobServerOptions).artifact_port) - self._jar_cache_dir = (options.view_as(pipeline_options.JobServerOptions.jar_cache_dir)) + self._jar_cache_dir = (options.view_as(pipeline_options.JobServerOptions).jar_cache_dir) self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-spark') spark_options = options.view_as(pipeline_options.SparkRunnerOptions) self._executable_jar = spark_options.spark_job_server_jar From 0408016056e46a3a0445d5a92697cab10bf8e926 Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 10:26:25 +0900 Subject: [PATCH 15/23] fix lint Signed-off-by: s21lee --- .../runners/portability/flink_uber_jar_job_server.py | 3 ++- .../runners/portability/spark_uber_jar_job_server.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py index 9b087a665953..d2b839b15f09 100644 --- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py @@ -50,7 +50,8 @@ def __init__(self, master_url, options): pipeline_options.FlinkRunnerOptions).flink_job_server_jar) self._artifact_port = ( options.view_as(pipeline_options.JobServerOptions).artifact_port) - self._jar_cache_dir = (options.view_as(pipeline_options.JobServerOptions).jar_cache_dir) + self._jar_cache_dir = ( + options.view_as(pipeline_options.JobServerOptions).jar_cache_dir) self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink') def start(self): diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py index 66b1770a6b05..a827e9b17241 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py @@ -49,7 +49,8 @@ def __init__(self, rest_url, options): self._rest_url = rest_url self._artifact_port = ( options.view_as(pipeline_options.JobServerOptions).artifact_port) - self._jar_cache_dir = (options.view_as(pipeline_options.JobServerOptions).jar_cache_dir) + self._jar_cache_dir = ( + options.view_as(pipeline_options.JobServerOptions).jar_cache_dir) self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-spark') spark_options = options.view_as(pipeline_options.SparkRunnerOptions) self._executable_jar = spark_options.spark_job_server_jar From 7af4cccd121b9e969cec82698e40931c7dde3e5e Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 10:45:26 +0900 Subject: [PATCH 16/23] fix error Signed-off-by: s21lee --- .../apache_beam/runners/portability/flink_runner.py | 4 +++- .../apache_beam/runners/portability/job_server_test.py | 8 ++++++-- .../apache_beam/runners/portability/spark_runner.py | 4 +++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index 89a803748c65..a95c0fb637b8 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -127,5 +127,7 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port + expansion_port, + '--jar-cache-dir', + jar_cache_dir ] diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 77af9693d257..f24c332ca618 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -39,7 +39,9 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port + expansion_port, + '--jar-cache-dir', + jar_cache_dir ] def path_to_jar(self): @@ -60,7 +62,7 @@ def test_subprocess_cmd_and_endpoint(self): '--artifacts_dir=/path/to/artifacts/', '--job_server_java_launcher=/path/to/java', '--job_server_jvm_properties=-Dsome.property=value' - '--jar_cache_dir=/path/to/cache_dir' + '--jar_cache_dir=/path/to/cache_dir/' ]) job_server = JavaJarJobServerStub(pipeline_options) subprocess_cmd, endpoint = job_server.subprocess_cmd_and_endpoint() @@ -73,6 +75,8 @@ def test_subprocess_cmd_and_endpoint(self): '/path/to/jar', '--artifacts-dir', '/path/to/artifacts/', + '--jar-cache-dir', + '/path/to/cache_dir/', '--job-port', 8099, '--artifact-port', diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py b/sdks/python/apache_beam/runners/portability/spark_runner.py index a76518682b13..166e5b3512e3 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner.py @@ -119,5 +119,7 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port + expansion_port, + '--jar-cache-dir', + jar_cache_dir ] From 20cab7ba5813a10029ad64a2f3cad4e016ec02c2 Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 11:08:33 +0900 Subject: [PATCH 17/23] fix error Signed-off-by: s21lee --- .../apache_beam/runners/portability/flink_runner.py | 7 ++----- .../runners/portability/flink_uber_jar_job_server.py | 4 +--- .../apache_beam/runners/portability/job_server.py | 6 ++---- .../apache_beam/runners/portability/job_server_test.py | 10 ++-------- .../apache_beam/runners/portability/spark_runner.py | 7 ++----- .../runners/portability/spark_uber_jar_job_server.py | 2 -- 6 files changed, 9 insertions(+), 27 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index a95c0fb637b8..505504311087 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -115,8 +115,7 @@ def java_arguments( job_port, artifact_port, expansion_port, - artifacts_dir, - jar_cache_dir): + artifacts_dir): return [ '--flink-master', self._master_url, @@ -127,7 +126,5 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port, - '--jar-cache-dir', - jar_cache_dir + expansion_port ] diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py index d2b839b15f09..3b302e334a5f 100644 --- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py @@ -50,8 +50,6 @@ def __init__(self, master_url, options): pipeline_options.FlinkRunnerOptions).flink_job_server_jar) self._artifact_port = ( options.view_as(pipeline_options.JobServerOptions).artifact_port) - self._jar_cache_dir = ( - options.view_as(pipeline_options.JobServerOptions).jar_cache_dir) self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink') def start(self): @@ -79,7 +77,7 @@ def executable_jar(self): else: url = job_server.JavaJarJobServer.path_to_beam_jar( ':runners:flink:%s:job-server:shadowJar' % self.flink_version()) - return job_server.JavaJarJobServer.local_jar(url, self._jar_cache_dir) + return job_server.JavaJarJobServer.local_jar(url) def flink_version(self): full_version = requests.get( diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index c167bf32024c..b5ccde69da8c 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -134,8 +134,7 @@ def java_arguments( job_port, artifact_port, expansion_port, - artifacts_dir, - jar_cache_dir): + artifacts_dir): raise NotImplementedError(type(self)) def path_to_jar(self): @@ -162,6 +161,5 @@ def subprocess_cmd_and_endpoint(self): job_port, self._artifact_port, self._expansion_port, - artifacts_dir, - self._jar_cache_dir)) + artifacts_dir)) return (subprocess_cmd, 'localhost:%s' % job_port) diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index f24c332ca618..38e27514afda 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -29,8 +29,7 @@ def java_arguments( job_port, artifact_port, expansion_port, - artifacts_dir, - jar_cache_dir): + artifacts_dir): return [ '--artifacts-dir', artifacts_dir, @@ -39,9 +38,7 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port, - '--jar-cache-dir', - jar_cache_dir + expansion_port ] def path_to_jar(self): @@ -62,7 +59,6 @@ def test_subprocess_cmd_and_endpoint(self): '--artifacts_dir=/path/to/artifacts/', '--job_server_java_launcher=/path/to/java', '--job_server_jvm_properties=-Dsome.property=value' - '--jar_cache_dir=/path/to/cache_dir/' ]) job_server = JavaJarJobServerStub(pipeline_options) subprocess_cmd, endpoint = job_server.subprocess_cmd_and_endpoint() @@ -75,8 +71,6 @@ def test_subprocess_cmd_and_endpoint(self): '/path/to/jar', '--artifacts-dir', '/path/to/artifacts/', - '--jar-cache-dir', - '/path/to/cache_dir/', '--job-port', 8099, '--artifact-port', diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py b/sdks/python/apache_beam/runners/portability/spark_runner.py index 166e5b3512e3..b25aae7eccb1 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner.py @@ -107,8 +107,7 @@ def java_arguments( job_port, artifact_port, expansion_port, - artifacts_dir, - jar_cache_dir): + artifacts_dir): return [ '--spark-master-url', self._master_url, @@ -119,7 +118,5 @@ def java_arguments( '--artifact-port', artifact_port, '--expansion-port', - expansion_port, - '--jar-cache-dir', - jar_cache_dir + expansion_port ] diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py index a827e9b17241..f754b4c330ad 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py @@ -49,8 +49,6 @@ def __init__(self, rest_url, options): self._rest_url = rest_url self._artifact_port = ( options.view_as(pipeline_options.JobServerOptions).artifact_port) - self._jar_cache_dir = ( - options.view_as(pipeline_options.JobServerOptions).jar_cache_dir) self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-spark') spark_options = options.view_as(pipeline_options.SparkRunnerOptions) self._executable_jar = spark_options.spark_job_server_jar From 6677156bebfc1f9a1576d5a8f0415d9d2b3dcdef Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 11:10:11 +0900 Subject: [PATCH 18/23] fix error Signed-off-by: s21lee --- sdks/python/apache_beam/runners/portability/job_server_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index 38e27514afda..e13a9794190d 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -45,8 +45,7 @@ def path_to_jar(self): return '/path/to/jar' @staticmethod - def local_jar(url, jar_cache_dir): - print(f'local_jar url({url}) jar_cache_dir({jar_cache_dir})') + def local_jar(url): return url From 6f8d88e27f815d3916f5067af685cb5943b1e097 Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 11:30:14 +0900 Subject: [PATCH 19/23] fix lint Signed-off-by: s21lee --- .../python/apache_beam/runners/portability/flink_runner.py | 7 +------ .../python/apache_beam/runners/portability/spark_runner.py | 7 +------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index 505504311087..f8d40889505f 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -110,12 +110,7 @@ def path_to_jar(self): return self.path_to_beam_jar( ':runners:flink:%s:job-server:shadowJar' % self._flink_version) - def java_arguments( - self, - job_port, - artifact_port, - expansion_port, - artifacts_dir): + def java_arguments(self, job_port, artifact_port, expansion_port, artifacts_dir): return [ '--flink-master', self._master_url, diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py b/sdks/python/apache_beam/runners/portability/spark_runner.py index b25aae7eccb1..6c490d4086ee 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner.py @@ -102,12 +102,7 @@ def path_to_jar(self): raise ValueError('Support for Spark 2 was dropped.') return self.path_to_beam_jar(':runners:spark:3:job-server:shadowJar') - def java_arguments( - self, - job_port, - artifact_port, - expansion_port, - artifacts_dir): + def java_arguments(self, job_port, artifact_port, expansion_port, artifacts_dir): return [ '--spark-master-url', self._master_url, From 5fd029674da952e0ed23b5229b31e19e391fd205 Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 11:36:08 +0900 Subject: [PATCH 20/23] fix lint Signed-off-by: s21lee --- .../apache_beam/runners/portability/job_server.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index b5ccde69da8c..0c640e703e30 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -155,11 +155,9 @@ def subprocess_cmd_and_endpoint(self): self._artifacts_dir if self._artifacts_dir else self.local_temp_dir( prefix='artifacts')) job_port, = subprocess_server.pick_port(self._job_port) - subprocess_cmd = [self._java_launcher, '-jar' - ] + self._jvm_properties + [jar_path] + list( - self.java_arguments( - job_port, - self._artifact_port, - self._expansion_port, - artifacts_dir)) + subprocess_cmd = [self._java_launcher, '-jar'] + self._jvm_properties + [ + jar_path + ] + list( + self.java_arguments( + job_port, self._artifact_port, self._expansion_port, artifacts_dir)) return (subprocess_cmd, 'localhost:%s' % job_port) From 1db43a96ac72f3ee8ecca359660dc88e930b046b Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 11:56:43 +0900 Subject: [PATCH 21/23] fix error and lint Signed-off-by: s21lee --- .../apache_beam/runners/portability/flink_runner.py | 3 ++- .../python/apache_beam/runners/portability/job_server.py | 6 +----- .../apache_beam/runners/portability/job_server_test.py | 9 +++------ .../apache_beam/runners/portability/spark_runner.py | 3 ++- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index f8d40889505f..c9bf15b46e22 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -110,7 +110,8 @@ def path_to_jar(self): return self.path_to_beam_jar( ':runners:flink:%s:job-server:shadowJar' % self._flink_version) - def java_arguments(self, job_port, artifact_port, expansion_port, artifacts_dir): + def java_arguments( + self, job_port, artifact_port, expansion_port, artifacts_dir): return [ '--flink-master', self._master_url, diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index 0c640e703e30..eee75f66a277 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -130,11 +130,7 @@ def __init__(self, options): self._jar_cache_dir = options.jar_cache_dir def java_arguments( - self, - job_port, - artifact_port, - expansion_port, - artifacts_dir): + self, job_port, artifact_port, expansion_port, artifacts_dir): raise NotImplementedError(type(self)) def path_to_jar(self): diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index e13a9794190d..ebe84c19f8a6 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -25,11 +25,7 @@ class JavaJarJobServerStub(JavaJarJobServer): def java_arguments( - self, - job_port, - artifact_port, - expansion_port, - artifacts_dir): + self, job_port, artifact_port, expansion_port, artifacts_dir): return [ '--artifacts-dir', artifacts_dir, @@ -45,7 +41,8 @@ def path_to_jar(self): return '/path/to/jar' @staticmethod - def local_jar(url): + def local_jar(url, jar_cache_dir): + logging.debug(f"url({url}), jar_cache_dir({jar_cache_dir})") return url diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py b/sdks/python/apache_beam/runners/portability/spark_runner.py index 6c490d4086ee..480fbdecdce3 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner.py @@ -102,7 +102,8 @@ def path_to_jar(self): raise ValueError('Support for Spark 2 was dropped.') return self.path_to_beam_jar(':runners:spark:3:job-server:shadowJar') - def java_arguments(self, job_port, artifact_port, expansion_port, artifacts_dir): + def java_arguments( + self, job_port, artifact_port, expansion_port, artifacts_dir): return [ '--spark-master-url', self._master_url, From 42c22054a16efdc65c41262bc99805e6eaa344ff Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 12:59:10 +0900 Subject: [PATCH 22/23] fix lint Signed-off-by: s21lee --- .../python/apache_beam/runners/portability/job_server_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server_test.py b/sdks/python/apache_beam/runners/portability/job_server_test.py index ebe84c19f8a6..13b3629b24bf 100644 --- a/sdks/python/apache_beam/runners/portability/job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/job_server_test.py @@ -41,8 +41,8 @@ def path_to_jar(self): return '/path/to/jar' @staticmethod - def local_jar(url, jar_cache_dir): - logging.debug(f"url({url}), jar_cache_dir({jar_cache_dir})") + def local_jar(url, jar_cache_dir=None): + logging.debug("url({%s}), jar_cache_dir({%s})", url, jar_cache_dir) return url From e71c42b551c4428e2a5578f45198523f06996145 Mon Sep 17 00:00:00 2001 From: s21lee Date: Fri, 25 Oct 2024 14:20:20 +0900 Subject: [PATCH 23/23] fix lint Signed-off-by: s21lee --- sdks/python/apache_beam/options/pipeline_options_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 2cd2b380f356..66acfe654791 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -31,8 +31,8 @@ from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions -from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import JobServerOptions +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions from apache_beam.options.pipeline_options import TypeOptions from apache_beam.options.pipeline_options import WorkerOptions