From ff318f156e419ad52fb46ce64a7fb81d9cf88daa Mon Sep 17 00:00:00 2001 From: Ritesh Ghorse Date: Thu, 7 Mar 2024 15:14:40 -0500 Subject: [PATCH] [Python] Log dependencies installed in submission environment (#28564) * log runtime dependencies * log submission env dependencies * rm spare line * update tests for staged files * update equals test to ignore artifacts * handle unit tests, refactor * unit test sub env staging, convert to string * change Log to Printf * change log level to warning * try env * add artifact_service method * correct urn * fix artifact comparison, file reader * add mock for python sdk dependencies and update artifact service method * fix lint * use magic mock instead of mocking entire function * update dataflow runner test * Update sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py * use debug option to disable * remove tmp directory mock --- .../runners/dataflow/dataflow_runner_test.py | 23 +++-- .../runners/portability/expansion_service.py | 7 ++ .../portability/portable_runner_test.py | 41 +++++++-- .../apache_beam/runners/portability/stager.py | 54 ++++++++++- .../runners/portability/stager_test.py | 91 ++++++++++++++----- .../transforms/environments_test.py | 11 +++ sdks/python/container/boot.go | 20 +++- 7 files changed, 209 insertions(+), 38 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index b20b5f293c07..b5568305ce65 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -16,7 +16,6 @@ # """Unit tests for the DataflowRunner class.""" - # pytype: skip-file import unittest @@ -206,12 +205,15 @@ def test_environment_override_translation_legacy_worker_harness_image(self): self.default_properties.append('--experiments=beam_fn_api') self.default_properties.append('--worker_harness_container_image=LEGACY') remote_runner = DataflowRunner() - with Pipeline(remote_runner, - options=PipelineOptions(self.default_properties)) as p: + options = PipelineOptions(self.default_properties) + options.view_as(DebugOptions).add_experiment( + 'disable_logging_submission_environment') + with Pipeline(remote_runner, options=options) as p: ( # pylint: disable=expression-not-assigned p | ptransform.Create([1, 2, 3]) | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) + self.assertEqual( list(remote_runner.proto_pipeline.components.environments.values()), [ @@ -219,19 +221,24 @@ def test_environment_override_translation_legacy_worker_harness_image(self): urn=common_urns.environments.DOCKER.urn, payload=beam_runner_api_pb2.DockerPayload( container_image='LEGACY').SerializeToString(), - capabilities=environments.python_sdk_docker_capabilities()) + capabilities=environments.python_sdk_docker_capabilities(), + dependencies=environments.python_sdk_dependencies( + options=options)) ]) def test_environment_override_translation_sdk_container_image(self): self.default_properties.append('--experiments=beam_fn_api') self.default_properties.append('--sdk_container_image=FOO') remote_runner = DataflowRunner() - with Pipeline(remote_runner, - options=PipelineOptions(self.default_properties)) as p: + options = PipelineOptions(self.default_properties) + options.view_as(DebugOptions).add_experiment( + 'disable_logging_submission_environment') + with Pipeline(remote_runner, options=options) as p: ( # pylint: disable=expression-not-assigned p | ptransform.Create([1, 2, 3]) | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) + self.assertEqual( list(remote_runner.proto_pipeline.components.environments.values()), [ @@ -239,7 +246,9 @@ def test_environment_override_translation_sdk_container_image(self): urn=common_urns.environments.DOCKER.urn, payload=beam_runner_api_pb2.DockerPayload( container_image='FOO').SerializeToString(), - capabilities=environments.python_sdk_docker_capabilities()) + capabilities=environments.python_sdk_docker_capabilities(), + dependencies=environments.python_sdk_dependencies( + options=options)) ]) def test_remote_runner_translation(self): diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py index c7728098f30c..4e328b68dfc2 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service.py @@ -29,6 +29,8 @@ from apache_beam.portability.api import beam_expansion_api_pb2 from apache_beam.portability.api import beam_expansion_api_pb2_grpc from apache_beam.runners import pipeline_context +from apache_beam.runners.portability import artifact_service +from apache_beam.runners.portability.artifact_service import BeamFilesystemHandler from apache_beam.transforms import environments from apache_beam.transforms import external from apache_beam.transforms import ptransform @@ -128,3 +130,8 @@ def with_pipeline(component, pcoll_id=None): except Exception: # pylint: disable=broad-except return beam_expansion_api_pb2.ExpansionResponse( error=traceback.format_exc()) + + def artifact_service(self): + """Returns a service to retrieve artifacts for use in a job.""" + return artifact_service.ArtifactRetrievalService( + BeamFilesystemHandler(None).file_reader) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 0b3704f57973..85d1607e9fa1 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -21,8 +21,10 @@ import socket import subprocess import sys +import tempfile import time import unittest +from unittest.mock import MagicMock import grpc @@ -308,12 +310,24 @@ def create_options(self): class PortableRunnerInternalTest(unittest.TestCase): + def setUp(self) -> None: + self.tmp_dir = tempfile.TemporaryDirectory() + self.actual_mkdtemp = tempfile.mkdtemp + tempfile.mkdtemp = MagicMock(return_value=self.tmp_dir.name) + + def tearDown(self) -> None: + tempfile.mkdtemp = self.actual_mkdtemp + self.tmp_dir.cleanup() + def test__create_default_environment(self): docker_image = environments.DockerEnvironment.default_docker_image() self.assertEqual( PortableRunner._create_environment( - PipelineOptions.from_dictionary({'sdk_location': 'container'})), - environments.DockerEnvironment(container_image=docker_image)) + options=PipelineOptions.from_dictionary( + {'sdk_location': 'container'})), + environments.DockerEnvironment( + container_image=docker_image, + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) def test__create_docker_environment(self): docker_image = 'py-docker' @@ -324,7 +338,9 @@ def test__create_docker_environment(self): 'environment_config': docker_image, 'sdk_location': 'container', })), - environments.DockerEnvironment(container_image=docker_image)) + environments.DockerEnvironment( + container_image=docker_image, + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) def test__create_process_environment(self): self.assertEqual( @@ -337,7 +353,11 @@ def test__create_process_environment(self): 'sdk_location': 'container', })), environments.ProcessEnvironment( - 'run.sh', os='linux', arch='amd64', env={'k1': 'v1'})) + 'run.sh', + os='linux', + arch='amd64', + env={'k1': 'v1'}, + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) self.assertEqual( PortableRunner._create_environment( PipelineOptions.from_dictionary({ @@ -345,7 +365,9 @@ def test__create_process_environment(self): 'environment_config': '{"command": "run.sh"}', 'sdk_location': 'container', })), - environments.ProcessEnvironment('run.sh')) + environments.ProcessEnvironment( + 'run.sh', + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) def test__create_external_environment(self): self.assertEqual( @@ -355,7 +377,9 @@ def test__create_external_environment(self): 'environment_config': 'localhost:50000', 'sdk_location': 'container', })), - environments.ExternalEnvironment('localhost:50000')) + environments.ExternalEnvironment( + 'localhost:50000', + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} ' for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()): self.assertEqual( @@ -366,7 +390,10 @@ def test__create_external_environment(self): 'sdk_location': 'container', })), environments.ExternalEnvironment( - 'localhost:50000', params={"k1": "v1"})) + 'localhost:50000', + params={"k1": "v1"}, + artifacts=environments.python_sdk_dependencies( + PipelineOptions()))) with self.assertRaises(ValueError): PortableRunner._create_environment( PipelineOptions.from_dictionary({ diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index d59b3e32bc17..f6207d80a9d1 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -52,6 +52,7 @@ import logging import os import shutil +import subprocess import sys import tempfile from importlib.metadata import distribution @@ -84,6 +85,8 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz' REQUIREMENTS_FILE = 'requirements.txt' EXTRA_PACKAGES_FILE = 'extra_packages.txt' +# Filename that stores the submission environment dependencies. +SUBMISSION_ENV_DEPENDENCIES_FILE = 'submission_environment_dependencies.txt' # One of the choices for user to use for requirements cache during staging SKIP_REQUIREMENTS_CACHE = 'skip' @@ -159,9 +162,10 @@ def extract_staging_tuple_iter( def create_job_resources(options, # type: PipelineOptions temp_dir, # type: str build_setup_args=None, # type: Optional[List[str]] - pypi_requirements=None, # type: Optional[List[str]] + pypi_requirements=None, # type: Optional[List[str]] populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]] - skip_prestaged_dependencies=False, # type: Optional[bool] + skip_prestaged_dependencies=False, # type: Optional[bool] + log_submission_env_dependencies=True, # type: Optional[bool] ): """For internal use only; no backwards-compatibility guarantees. @@ -183,6 +187,8 @@ def create_job_resources(options, # type: PipelineOptions cache. Used only for testing. skip_prestaged_dependencies: Skip staging dependencies that can be added into SDK containers during prebuilding. + log_submission_env_dependencies: (Optional) param to stage and log + submission environment dependencies. Defaults to True. Returns: A list of ArtifactInformation to be used for staging resources. @@ -365,6 +371,13 @@ def create_job_resources(options, # type: PipelineOptions Stager._create_file_stage_to_artifact( pickled_session_file, names.PICKLED_MAIN_SESSION_FILE)) + # stage the submission environment dependencies, if enabled. + if (log_submission_env_dependencies and + not options.view_as(DebugOptions).lookup_experiment( + 'disable_logging_submission_environment')): + resources.extend( + Stager._create_stage_submission_env_dependencies(temp_dir)) + return resources def stage_job_resources(self, @@ -850,3 +863,40 @@ def _create_beam_sdk(sdk_remote_location, temp_dir): return [ Stager._create_file_stage_to_artifact(local_download_file, staged_name) ] + + @staticmethod + def _create_stage_submission_env_dependencies(temp_dir): + """Create and stage a file with list of dependencies installed in the + submission environment. + + This list can be used at runtime to compare against the dependencies in the + runtime environment. This allows runners to warn users about any potential + dependency mismatches and help debug issues related to + environment mismatches. + + Args: + temp_dir: path to temporary location where the file should be + downloaded. + + Returns: + A list of ArtifactInformation of local file path that will be staged to + the staging location. + """ + try: + local_dependency_file_path = os.path.join( + temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE) + dependencies = subprocess.check_output( + [sys.executable, '-m', 'pip', 'freeze']) + local_python_path = f"Python Path: {sys.executable}\n" + with open(local_dependency_file_path, 'w') as f: + f.write(local_python_path + str(dependencies)) + return [ + Stager._create_file_stage_to_artifact( + local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE), + ] + except Exception as e: + _LOGGER.warning( + "Couldn't stage a list of installed dependencies in " + "submission environment. Got exception: %s", + e) + return [] diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 839f7e577733..25fd62b16533 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -162,7 +162,7 @@ def test_no_main_session(self): options.view_as(SetupOptions).save_main_session = False self.update_options(options) - self.assertEqual([], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -180,7 +180,10 @@ def test_with_main_session(self): options.view_as(SetupOptions).pickle_library = pickler.USE_DILL self.update_options(options) - self.assertEqual([names.PICKLED_MAIN_SESSION_FILE], + self.assertEqual([ + names.PICKLED_MAIN_SESSION_FILE, + stager.SUBMISSION_ENV_DEPENDENCIES_FILE + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) self.assertTrue( @@ -199,7 +202,7 @@ def test_main_session_not_staged_when_using_cloudpickle(self): # session is saved when pickle_library==cloudpickle. options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE self.update_options(options) - self.assertEqual([], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -208,10 +211,19 @@ def test_default_resources(self): options = PipelineOptions() self.update_options(options) - self.assertEqual([], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) + def test_no_submission_env_staging(self): + staging_dir = self.make_temp_dir() + options = PipelineOptions() + self.update_options(options) + + resources = self.stager.create_job_resources( + options, staging_dir, log_submission_env_dependencies=False) + self.assertEqual([], resources) + def test_with_requirements_file(self): staging_dir = self.make_temp_dir() requirements_cache_dir = self.make_temp_dir() @@ -225,7 +237,12 @@ def test_with_requirements_file(self): self.create_temp_file( os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing') self.assertEqual( - sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']), + sorted([ + stager.REQUIREMENTS_FILE, + 'abc.txt', + 'def.txt', + stager.SUBMISSION_ENV_DEPENDENCIES_FILE + ]), sorted( self.stager.create_and_stage_job_resources( options, @@ -246,9 +263,11 @@ def test_with_pypi_requirements(self): pypi_requirements=['nothing>=1.0,<2.0'], populate_requirements_cache=self.populate_requirements_cache, staging_location=staging_dir)[1] - self.assertEqual(3, len(resources)) + self.assertEqual(4, len(resources)) self.assertTrue({'abc.txt', 'def.txt'} <= set(resources)) - generated_requirements = (set(resources) - {'abc.txt', 'def.txt'}).pop() + generated_requirements = ( + set(resources) - + {'abc.txt', 'def.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILE}).pop() with open(os.path.join(staging_dir, generated_requirements)) as f: data = f.read() self.assertEqual('nothing>=1.0,<2.0', data) @@ -282,7 +301,12 @@ def test_with_requirements_file_and_cache(self): self.create_temp_file( os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing') self.assertEqual( - sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']), + sorted([ + stager.REQUIREMENTS_FILE, + 'abc.txt', + 'def.txt', + stager.SUBMISSION_ENV_DEPENDENCIES_FILE + ]), sorted( self.stager.create_and_stage_job_resources( options, @@ -313,7 +337,9 @@ def test_requirements_cache_not_populated_when_cache_disabled(self): populate_requirements_cache=self.populate_requirements_cache, staging_location=staging_dir)[1] assert not populate_requirements_cache.called - self.assertEqual([stager.REQUIREMENTS_FILE], resources) + self.assertEqual( + [stager.REQUIREMENTS_FILE, stager.SUBMISSION_ENV_DEPENDENCIES_FILE], + resources) self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'def.txt'))) @@ -378,7 +404,8 @@ def test_sdk_location_default(self): _, staged_resources = self.stager.create_and_stage_job_resources( options, temp_dir=self.make_temp_dir(), staging_location=staging_dir) - self.assertEqual([], staged_resources) + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE], + staged_resources) def test_sdk_location_local_directory(self): staging_dir = self.make_temp_dir() @@ -391,7 +418,10 @@ def test_sdk_location_local_directory(self): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], + self.assertEqual([ + names.STAGED_SDK_SOURCES_FILENAME, + stager.SUBMISSION_ENV_DEPENDENCIES_FILE + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME) @@ -409,7 +439,10 @@ def test_sdk_location_local_source_file(self): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], + self.assertEqual([ + names.STAGED_SDK_SOURCES_FILENAME, + stager.SUBMISSION_ENV_DEPENDENCIES_FILE + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME) @@ -427,7 +460,7 @@ def test_sdk_location_local_wheel_file(self): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual([sdk_filename], + self.assertEqual([sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILE], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) tarball_path = os.path.join(staging_dir, sdk_filename) @@ -463,7 +496,10 @@ def test_sdk_location_remote_source_file(self, *unused_mocks): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], + self.assertEqual([ + names.STAGED_SDK_SOURCES_FILENAME, + stager.SUBMISSION_ENV_DEPENDENCIES_FILE + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -485,7 +521,7 @@ def file_download(_, to_path): with mock.patch('apache_beam.runners.portability.stager_test' '.stager.Stager._download_file', staticmethod(file_download)): - self.assertEqual([sdk_filename], + self.assertEqual([sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILE], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -509,7 +545,10 @@ def file_download(_, to_path): with mock.patch('apache_beam.runners.portability.stager_test' '.stager.Stager._download_file', staticmethod(file_download)): - self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], + self.assertEqual([ + names.STAGED_SDK_SOURCES_FILENAME, + stager.SUBMISSION_ENV_DEPENDENCIES_FILE + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -551,7 +590,8 @@ def test_with_extra_packages(self): 'xyz2.tar', 'whl.whl', 'remote_file.tar.gz', - stager.EXTRA_PACKAGES_FILE + stager.EXTRA_PACKAGES_FILE, + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -659,7 +699,13 @@ def test_with_jar_packages(self): with mock.patch('apache_beam.runners.portability.stager_test' '.stager.Stager._is_remote_path', staticmethod(self.is_remote_path)): - self.assertEqual(['abc.jar', 'xyz.jar', 'ijk.jar', 'remote.jar'], + self.assertEqual([ + 'abc.jar', + 'xyz.jar', + 'ijk.jar', + 'remote.jar', + stager.SUBMISSION_ENV_DEPENDENCIES_FILE + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) self.assertEqual(['/tmp/remote/remote.jar'], self.remote_copied_files) @@ -719,7 +765,8 @@ def test_populate_requirements_cache_with_bdist(self): resources = self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1] for f in resources: - if f != stager.REQUIREMENTS_FILE: + if (f != stager.REQUIREMENTS_FILE and + f != stager.SUBMISSION_ENV_DEPENDENCIES_FILE): self.assertTrue(('.tar.gz' in f) or ('.whl' in f)) # requirements cache will populated only with sdists/sources @@ -744,7 +791,8 @@ def test_populate_requirements_cache_with_sdist(self): options, staging_location=staging_dir)[1] for f in resources: - if f != stager.REQUIREMENTS_FILE: + if (f != stager.REQUIREMENTS_FILE and + f != stager.SUBMISSION_ENV_DEPENDENCIES_FILE): self.assertTrue('.tar.gz' in f) self.assertTrue('.whl' not in f) @@ -777,7 +825,8 @@ def test_populate_requirements_cache_with_local_files(self): stager.REQUIREMENTS_FILE, stager.EXTRA_PACKAGES_FILE, 'nothing.tar.gz', - 'local_package.tar.gz' + 'local_package.tar.gz', + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ]), sorted(resources)) diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py index 25885553b2ff..c32a85579fcb 100644 --- a/sdks/python/apache_beam/transforms/environments_test.py +++ b/sdks/python/apache_beam/transforms/environments_test.py @@ -21,7 +21,9 @@ # pytype: skip-file import logging +import tempfile import unittest +from unittest.mock import MagicMock from apache_beam.options.pipeline_options import PortableOptions from apache_beam.portability import common_urns @@ -82,6 +84,15 @@ def test_default_capabilities(self): class EnvironmentOptionsTest(unittest.TestCase): + def setUp(self) -> None: + self.tmp_dir = tempfile.TemporaryDirectory() + self.actual_mkdtemp = tempfile.mkdtemp + tempfile.mkdtemp = MagicMock(return_value=self.tmp_dir.name) + + def tearDown(self) -> None: + tempfile.mkdtemp = self.actual_mkdtemp + self.tmp_dir.cleanup() + def test_process_variables_empty(self): options = PortableOptions([ '--environment_type=PROCESS', diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index c59807c72a98..5c84ad6bc7e4 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -412,6 +412,9 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str if err := logRuntimeDependencies(ctx, bufLogger); err != nil { logger.Warnf(ctx, "couldn't fetch the runtime python dependencies: %v", err) } + if err := logSubmissionEnvDependencies(ctx, bufLogger, workDir); err != nil { + logger.Warnf(ctx, "couldn't fetch the submission environment dependencies: %v", err) + } return nil } @@ -460,7 +463,6 @@ func processArtifactsInSetupOnlyMode() { // logRuntimeDependencies logs the python dependencies // installed in the runtime environment. func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger) error { - bufLogger.Printf(ctx, "Logging runtime dependencies:") pythonVersion, err := expansionx.GetPythonVersion() if err != nil { return err @@ -472,6 +474,7 @@ func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger } else { bufLogger.FlushAtDebug(ctx) } + bufLogger.Printf(ctx, "Logging runtime dependencies:") args = []string{"-m", "pip", "freeze"} if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { bufLogger.FlushAtError(ctx) @@ -480,3 +483,18 @@ func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger } return nil } + +// logSubmissionEnvDependencies logs the python dependencies +// installed in the submission environment. +func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.BufferedLogger, dir string) error { + bufLogger.Printf(ctx, "Logging submission environment dependencies:") + // path for submission environment dependencies should match with the + // one defined in apache_beam/runners/portability/stager.py. + filename := filepath.Join(dir, "submission_environment_dependencies.txt") + content, err := os.ReadFile(filename) + if err != nil { + return err + } + bufLogger.Printf(ctx, string(content)) + return nil +}