Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Log dependencies installed in submission environment #28564

Merged
merged 26 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e441b52
log runtime dependencies
riteshghorse Sep 19, 2023
1497427
log submission env dependencies
riteshghorse Sep 20, 2023
b696f6f
rm spare line
riteshghorse Sep 20, 2023
8cbeb61
update tests for staged files
riteshghorse Sep 20, 2023
a2761c9
Merge branch 'master' of https://github.com/apache/beam into sdeps
riteshghorse Sep 20, 2023
1e68428
update equals test to ignore artifacts
riteshghorse Sep 21, 2023
284f772
handle unit tests, refactor
riteshghorse Sep 22, 2023
8f03251
unit test sub env staging, convert to string
riteshghorse Sep 22, 2023
7e6e62a
change Log to Printf
riteshghorse Sep 26, 2023
c03378f
Merge branch 'master' of https://github.com/apache/beam into sdeps
riteshghorse Sep 27, 2023
cc24a1d
change log level to warning
riteshghorse Oct 3, 2023
6b6b222
try env
riteshghorse Dec 6, 2023
b16d23d
Merge remote-tracking branch 'origin' into sdeps
riteshghorse Dec 6, 2023
649c8a0
merge master
riteshghorse Dec 18, 2023
9b65385
add artifact_service method
riteshghorse Dec 20, 2023
f2e821d
merge master
riteshghorse Dec 20, 2023
789c764
correct urn
riteshghorse Dec 20, 2023
df68c1d
fix artifact comparison, file reader
riteshghorse Dec 20, 2023
4dfe7e0
add mock for python sdk dependencies and update artifact service method
riteshghorse Feb 5, 2024
b7d62bc
fix lint
riteshghorse Feb 5, 2024
404286b
use magic mock instead of mocking entire function
riteshghorse Feb 9, 2024
da2b954
update dataflow runner test
riteshghorse Feb 15, 2024
eeedc44
Update sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
riteshghorse Feb 15, 2024
c527114
use debug option to disable
riteshghorse Mar 7, 2024
279e01d
pull upstream
riteshghorse Mar 7, 2024
80d6d18
remove tmp directory mock
riteshghorse Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,13 @@ def test_environment_override_translation_legacy_worker_harness_image(self):
p | ptransform.Create([1, 2, 3])
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())
self.assertEqual(
list(remote_runner.proto_pipeline.components.environments.values()),
[
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='LEGACY').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
])
first = remote_runner.proto_pipeline.components.environments.values()
second = beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='LEGACY').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
self.assertTrue(first.__eq__(second))
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved

def test_environment_override_translation_sdk_container_image(self):
self.default_properties.append('--experiments=beam_fn_api')
Expand All @@ -231,15 +229,13 @@ def test_environment_override_translation_sdk_container_image(self):
p | ptransform.Create([1, 2, 3])
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())
self.assertEqual(
list(remote_runner.proto_pipeline.components.environments.values()),
[
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='FOO').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
])
first = remote_runner.proto_pipeline.components.environments.values()
second = beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='FOO').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
self.assertTrue(first.__eq__(second))

def test_remote_runner_translation(self):
remote_runner = DataflowRunner()
Expand Down
26 changes: 26 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import logging
import os
import shutil
import subprocess
import sys
import tempfile
from importlib.metadata import distribution
Expand Down Expand Up @@ -84,6 +85,8 @@
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
# Filename that stores the submission environment dependencies.
SUBMISSION_ENV_DEPENDENCIES_FILE = 'submission_environment_dependencies.txt'
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
# One of the choices for user to use for requirements cache during staging
SKIP_REQUIREMENTS_CACHE = 'skip'

Expand Down Expand Up @@ -162,6 +165,7 @@ def create_job_resources(options, # type: PipelineOptions
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
skip_prestaged_dependencies=False, # type: Optional[bool]
log_submission_env_dependencies=True, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.

Expand Down Expand Up @@ -365,6 +369,11 @@ def create_job_resources(options, # type: PipelineOptions
Stager._create_file_stage_to_artifact(
pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))

# stage the submission environment dependencies
if log_submission_env_dependencies:
resources.extend(
Stager._create_stage_submission_env_dependencies(temp_dir))

return resources

def stage_job_resources(self,
Expand Down Expand Up @@ -837,3 +846,20 @@ def _create_beam_sdk(sdk_remote_location, temp_dir):
return [
Stager._create_file_stage_to_artifact(local_download_file, staged_name)
]

@staticmethod
def _create_stage_submission_env_dependencies(temp_dir):
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
try:
local_dependency_file_path = os.path.join(
temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE)
dependencies = subprocess.check_output(
[sys.executable, '-m', 'pip', 'freeze'])
with open(local_dependency_file_path, 'w') as f:
f.write(str(dependencies))
return [
Stager._create_file_stage_to_artifact(
local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE)
]
except Exception:
_LOGGER.debug("couldn't stage dependencies from submission environment")
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
return []
91 changes: 70 additions & 21 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_no_main_session(self):
options.view_as(SetupOptions).save_main_session = False
self.update_options(options)

self.assertEqual([],
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand All @@ -180,7 +180,10 @@ def test_with_main_session(self):
options.view_as(SetupOptions).pickle_library = pickler.USE_DILL
self.update_options(options)

self.assertEqual([names.PICKLED_MAIN_SESSION_FILE],
self.assertEqual([
names.PICKLED_MAIN_SESSION_FILE,
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertTrue(
Expand All @@ -199,7 +202,7 @@ def test_main_session_not_staged_when_using_cloudpickle(self):
# session is saved when pickle_library==cloudpickle.
options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE
self.update_options(options)
self.assertEqual([],
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand All @@ -208,10 +211,19 @@ def test_default_resources(self):
options = PipelineOptions()
self.update_options(options)

self.assertEqual([],
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

def test_no_submission_env_staging(self):
staging_dir = self.make_temp_dir()
options = PipelineOptions()
self.update_options(options)

resources = self.stager.create_job_resources(
options, staging_dir, log_submission_env_dependencies=False)
self.assertEqual([], resources)

def test_with_requirements_file(self):
staging_dir = self.make_temp_dir()
requirements_cache_dir = self.make_temp_dir()
Expand All @@ -225,7 +237,12 @@ def test_with_requirements_file(self):
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
self.assertEqual(
sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']),
sorted([
stager.REQUIREMENTS_FILE,
'abc.txt',
'def.txt',
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
]),
sorted(
self.stager.create_and_stage_job_resources(
options,
Expand All @@ -246,9 +263,11 @@ def test_with_pypi_requirements(self):
pypi_requirements=['nothing>=1.0,<2.0'],
populate_requirements_cache=self.populate_requirements_cache,
staging_location=staging_dir)[1]
self.assertEqual(3, len(resources))
self.assertEqual(4, len(resources))
self.assertTrue({'abc.txt', 'def.txt'} <= set(resources))
generated_requirements = (set(resources) - {'abc.txt', 'def.txt'}).pop()
generated_requirements = (
set(resources) -
{'abc.txt', 'def.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILE}).pop()
with open(os.path.join(staging_dir, generated_requirements)) as f:
data = f.read()
self.assertEqual('nothing>=1.0,<2.0', data)
Expand Down Expand Up @@ -282,7 +301,12 @@ def test_with_requirements_file_and_cache(self):
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
self.assertEqual(
sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']),
sorted([
stager.REQUIREMENTS_FILE,
'abc.txt',
'def.txt',
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
]),
sorted(
self.stager.create_and_stage_job_resources(
options,
Expand Down Expand Up @@ -313,7 +337,9 @@ def test_requirements_cache_not_populated_when_cache_disabled(self):
populate_requirements_cache=self.populate_requirements_cache,
staging_location=staging_dir)[1]
assert not populate_requirements_cache.called
self.assertEqual([stager.REQUIREMENTS_FILE], resources)
self.assertEqual(
[stager.REQUIREMENTS_FILE, stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
resources)
self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'def.txt')))

Expand Down Expand Up @@ -378,7 +404,8 @@ def test_sdk_location_default(self):
_, staged_resources = self.stager.create_and_stage_job_resources(
options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)

self.assertEqual([], staged_resources)
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
staged_resources)

def test_sdk_location_local_directory(self):
staging_dir = self.make_temp_dir()
Expand All @@ -391,7 +418,10 @@ def test_sdk_location_local_directory(self):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location

self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.assertEqual([
names.STAGED_SDK_SOURCES_FILENAME,
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
Expand All @@ -409,7 +439,10 @@ def test_sdk_location_local_source_file(self):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location

self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.assertEqual([
names.STAGED_SDK_SOURCES_FILENAME,
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
Expand All @@ -427,7 +460,7 @@ def test_sdk_location_local_wheel_file(self):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location

self.assertEqual([sdk_filename],
self.assertEqual([sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
tarball_path = os.path.join(staging_dir, sdk_filename)
Expand Down Expand Up @@ -463,7 +496,10 @@ def test_sdk_location_remote_source_file(self, *unused_mocks):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location

self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.assertEqual([
names.STAGED_SDK_SOURCES_FILENAME,
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand All @@ -485,7 +521,7 @@ def file_download(_, to_path):
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._download_file',
staticmethod(file_download)):
self.assertEqual([sdk_filename],
self.assertEqual([sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand All @@ -509,7 +545,10 @@ def file_download(_, to_path):
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._download_file',
staticmethod(file_download)):
self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.assertEqual([
names.STAGED_SDK_SOURCES_FILENAME,
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand Down Expand Up @@ -551,7 +590,8 @@ def test_with_extra_packages(self):
'xyz2.tar',
'whl.whl',
'remote_file.tar.gz',
stager.EXTRA_PACKAGES_FILE
stager.EXTRA_PACKAGES_FILE,
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
Expand Down Expand Up @@ -659,7 +699,13 @@ def test_with_jar_packages(self):
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._is_remote_path',
staticmethod(self.is_remote_path)):
self.assertEqual(['abc.jar', 'xyz.jar', 'ijk.jar', 'remote.jar'],
self.assertEqual([
'abc.jar',
'xyz.jar',
'ijk.jar',
'remote.jar',
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertEqual(['/tmp/remote/remote.jar'], self.remote_copied_files)
Expand Down Expand Up @@ -719,7 +765,8 @@ def test_populate_requirements_cache_with_bdist(self):
resources = self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1]
for f in resources:
if f != stager.REQUIREMENTS_FILE:
if (f != stager.REQUIREMENTS_FILE and
f != stager.SUBMISSION_ENV_DEPENDENCIES_FILE):
self.assertTrue(('.tar.gz' in f) or ('.whl' in f))

# requirements cache will populated only with sdists/sources
Expand All @@ -744,7 +791,8 @@ def test_populate_requirements_cache_with_sdist(self):
options, staging_location=staging_dir)[1]

for f in resources:
if f != stager.REQUIREMENTS_FILE:
if (f != stager.REQUIREMENTS_FILE and
f != stager.SUBMISSION_ENV_DEPENDENCIES_FILE):
self.assertTrue('.tar.gz' in f)
self.assertTrue('.whl' not in f)

Expand Down Expand Up @@ -777,7 +825,8 @@ def test_populate_requirements_cache_with_local_files(self):
stager.REQUIREMENTS_FILE,
stager.EXTRA_PACKAGES_FILE,
'nothing.tar.gz',
'local_package.tar.gz'
'local_package.tar.gz',
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
]),
sorted(resources))

Expand Down
17 changes: 9 additions & 8 deletions sdks/python/apache_beam/transforms/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ def __init__(self,
dict(resource_hints) if resource_hints else {})

def __eq__(self, other):
# don't compare artifacts since they have different hashes in their names.
return (
self.__class__ == other.__class__ and
self._artifacts == other._artifacts
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
self.__class__ == other.__class__
# Assuming that we don't have instances of the same Environment subclass
# with different set of capabilities.
and self._resource_hints == other._resource_hints)
Expand Down Expand Up @@ -580,7 +580,7 @@ def from_options(cls, options):
url,
params=params,
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
artifacts=python_sdk_dependencies(options, logDependencies=False),
resource_hints=resource_hints_from_options(options))


Expand All @@ -605,7 +605,7 @@ def from_options(cls, options):
# type: (PortableOptions) -> EmbeddedPythonEnvironment
return cls(
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
artifacts=python_sdk_dependencies(options, logDependencies=False),
resource_hints=resource_hints_from_options(options),
)

Expand Down Expand Up @@ -692,11 +692,11 @@ def from_options(cls, options):
state_cache_size=config.get('state_cache_size'),
data_buffer_time_limit_ms=config.get('data_buffer_time_limit_ms'),
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options))
artifacts=python_sdk_dependencies(options, logDependencies=False))
else:
return cls(
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
artifacts=python_sdk_dependencies(options, logDependencies=False),
resource_hints=resource_hints_from_options(options))

@staticmethod
Expand Down Expand Up @@ -838,7 +838,7 @@ def _python_sdk_capabilities_iter():
yield common_urns.protocols.DATA_SAMPLING.urn


def python_sdk_dependencies(options, tmp_dir=None):
def python_sdk_dependencies(options, tmp_dir=None, logDependencies=True):
if tmp_dir is None:
tmp_dir = tempfile.mkdtemp()
skip_prestaged_dependencies = options.view_as(
Expand All @@ -850,4 +850,5 @@ def python_sdk_dependencies(options, tmp_dir=None):
artifact[0] + artifact[1]
for artifact in PyPIArtifactRegistry.get_artifacts()
],
skip_prestaged_dependencies=skip_prestaged_dependencies)
skip_prestaged_dependencies=skip_prestaged_dependencies,
log_submission_env_dependencies=logDependencies)
Loading