Skip to content

Commit

Permalink
fix artifact comparison, file reader
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse committed Dec 20, 2023
1 parent 789c764 commit df68c1d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# pytype: skip-file

import copy
import io
import traceback

from apache_beam import pipeline as beam_pipeline
Expand Down Expand Up @@ -131,4 +132,11 @@ def with_pipeline(component, pcoll_id=None):
error=traceback.format_exc())

def artifact_service(self):
return artifact_service.ArtifactRetrievalService(None)
return artifact_service.ArtifactRetrievalService(file_reader)


def file_reader(filepath: str) -> io.BytesIO:
"""Reads a file at given path and returns io.BytesIO object"""
with open(filepath, 'rb') as f:
data = f.read()
return io.BytesIO(data)
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,11 @@ def test__create_default_environment(self):
docker_image = environments.DockerEnvironment.default_docker_image()
self.assertEqual(
PortableRunner._create_environment(
PipelineOptions.from_dictionary({'sdk_location': 'container'})),
environments.DockerEnvironment(container_image=docker_image))
options=PipelineOptions.from_dictionary(
{'sdk_location': 'container'})),
environments.DockerEnvironment(
container_image=docker_image,
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_docker_environment(self):
docker_image = 'py-docker'
Expand All @@ -324,7 +327,9 @@ def test__create_docker_environment(self):
'environment_config': docker_image,
'sdk_location': 'container',
})),
environments.DockerEnvironment(container_image=docker_image))
environments.DockerEnvironment(
container_image=docker_image,
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_process_environment(self):
self.assertEqual(
Expand All @@ -337,7 +342,11 @@ def test__create_process_environment(self):
'sdk_location': 'container',
})),
environments.ProcessEnvironment(
'run.sh', os='linux', arch='amd64', env={'k1': 'v1'}))
'run.sh',
os='linux',
arch='amd64',
env={'k1': 'v1'},
artifacts=environments.python_sdk_dependencies(PipelineOptions())))
self.assertEqual(
PortableRunner._create_environment(
PipelineOptions.from_dictionary({
Expand All @@ -355,7 +364,9 @@ def test__create_external_environment(self):
'environment_config': 'localhost:50000',
'sdk_location': 'container',
})),
environments.ExternalEnvironment('localhost:50000'))
environments.ExternalEnvironment(
'localhost:50000',
artifacts=environments.python_sdk_dependencies(PipelineOptions())))
raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} '
for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()):
self.assertEqual(
Expand Down
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def create_job_resources(options, # type: PipelineOptions
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
skip_prestaged_dependencies=False, # type: Optional[bool]
log_submission_env_dependencies=True, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand All @@ -186,6 +187,8 @@ def create_job_resources(options, # type: PipelineOptions
cache. Used only for testing.
skip_prestaged_dependencies: Skip staging dependencies that can be
added into SDK containers during prebuilding.
log_submission_env_dependencies: (Optional) param to stage and log
submission environment dependencies. Defaults to True.
Returns:
A list of ArtifactInformation to be used for staging resources.
Expand Down Expand Up @@ -368,8 +371,10 @@ def create_job_resources(options, # type: PipelineOptions
Stager._create_file_stage_to_artifact(
pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))

# stage the submission environment dependencies
resources.extend(Stager._create_stage_submission_env_dependencies(temp_dir))
# stage the submission environment dependencies, if enabled.
if log_submission_env_dependencies:
resources.extend(
Stager._create_stage_submission_env_dependencies(temp_dir))

return resources

Expand Down
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/transforms/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,17 @@ def __init__(self,
dict(resource_hints) if resource_hints else {})

def __eq__(self, other):
equal_artifacts = True
for first, second in zip(self._artifacts, other._artifacts):
# do not compare type payload since it contains a unique hash.
if (first.type_urn != second.type_urn or
first.role_urn != second.role_urn or
first.role_payload != second.role_payload):
equal_artifacts = False
break

return (
self.__class__ == other.__class__ and
self._artifacts == other._artifacts
self.__class__ == other.__class__ and equal_artifacts
# Assuming that we don't have instances of the same Environment subclass
# with different set of capabilities.
and self._resource_hints == other._resource_hints)
Expand Down

0 comments on commit df68c1d

Please sign in to comment.