Skip to content

Commit

Permalink
Merge pull request #200 from BLSQ/fix-mount_path-nf
Browse files Browse the repository at this point in the history
fix: copy workspace folder to correct path
  • Loading branch information
nazarfil authored Aug 28, 2024
2 parents 69f6bff + 2357214 commit d316597
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 38 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ Great! As you can see in the console output, the OpenHEXA CLI has created a new
structure required for an OpenHEXA pipeline. You can now `cd` in the new pipeline directory and run the pipeline:

```shell
cd my_awesome_pipeline
python pipeline.py
openhexa pipelines run ./my_awesome_pipeline
```

Congratulations! You have successfully run your first pipeline locally.
Expand Down
64 changes: 37 additions & 27 deletions openhexa/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import json
import logging
import os
import shutil
import tempfile
import typing
from dataclasses import asdict
Expand Down Expand Up @@ -330,7 +329,7 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals
ensure_pipeline_config_exists(path)
env_vars = get_local_workspace_config(path)
# # Prepare the mount for the workspace's files
mount_files_path = Path(env_vars["WORKSPACE_FILES_PATH"]).absolute()
mount_files_path = Path(env_vars.pop("WORKSPACE_FILES_PATH")).absolute()
try:
docker_client = docker.from_env()
docker_client.ping()
Expand All @@ -342,11 +341,10 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals
if image is None:
image = env_vars.get("WORKSPACE_DOCKER_IMAGE", "blsq/openhexa-blsq-environment:latest")

# Create temporary directory with the files to mount
zip_file = generate_zip_file(path)
tmp_dir = tempfile.mkdtemp()
for file_path in path.glob("**/*"):
if file_path.suffix in (".py", ".ipynb", ".txt", ".md", ".yaml"):
shutil.copy(file_path, tmp_dir)
with ZipFile(zip_file) as zf:
zf.extractall(tmp_dir)

volumes = {
tmp_dir: {"bind": "/home/hexa/pipeline", "mode": "rw"},
Expand All @@ -363,6 +361,8 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals
**env_vars,
}

print(environment)

command = f"pipeline run --config {base64.b64encode(json.dumps(config).encode('utf-8')).decode('utf-8')}"
try:
docker_client.images.get(image)
Expand Down Expand Up @@ -477,41 +477,33 @@ def create_pipeline_structure(pipeline_name: str, base_path: Path, workspace: st
return output_directory


def upload_pipeline(
pipeline_directory_path: typing.Union[str, Path],
name: str,
description: str = None,
link: str = None,
):
"""Upload the pipeline contained in the provided directory using the GraphQL API.
def generate_zip_file(pipeline_directory_path: typing.Union[str, Path]) -> io.BytesIO:
"""Generate a ZIP file containing the pipeline code.
The pipeline code will be zipped and base64-encoded before being sent to the backend.
"""
if settings.current_workspace is None:
raise NoActiveWorkspaceError

directory = pipeline_directory_path.absolute()
pipeline = get_pipeline_metadata(directory)

zip_file = io.BytesIO(b"")
Args:
pipeline_directory_path (typing.Union[str, Path]): The path to the pipeline directory.
Returns
-------
io.BytesIO: A BytesIO object containing the ZIP file.
"""
if settings.debug:
click.echo("Generating ZIP file:")
zip_file = io.BytesIO(b"")
files = []

# We exclude the workspace directory since it can break the mount of the bucket on /home/hexa/workspace
# This is also the default value of the WORKSPACE_FILES_PATH env var
excluded_paths = [directory / "workspace"]
excluded_paths = [pipeline_directory_path / "workspace"]
try:
env_vars = get_local_workspace_config(pipeline_directory_path)
if env_vars.get("WORKSPACE_FILES_PATH") and Path(env_vars["WORKSPACE_FILES_PATH"]) not in excluded_paths:
excluded_paths.append(Path(env_vars["WORKSPACE_FILES_PATH"]))
except FileNotFoundError:
# No workspace.yaml file found, we can ignore this error and assume the default value of WORKSPACE_FILES_PATH
pass

with ZipFile(zip_file, "w") as zipObj:
for path in directory.glob("**/*"):
for path in pipeline_directory_path.glob("**/*"):
if path.name == "python":
# We are in a virtual environment
excluded_paths.append(path.parent.parent) # ./<venv>/bin/python -> ./<venv>
Expand All @@ -532,9 +524,27 @@ def upload_pipeline(
continue
if settings.debug:
click.echo(f"\t{file_path.name}")
zipObj.write(file_path, file_path.relative_to(directory))

zipObj.write(file_path, file_path.relative_to(pipeline_directory_path))
zip_file.seek(0)
return zip_file


def upload_pipeline(
pipeline_directory_path: typing.Union[str, Path],
name: str,
description: str = None,
link: str = None,
):
"""Upload the pipeline contained in the provided directory using the GraphQL API.
The pipeline code will be zipped and base64-encoded before being sent to the backend.
"""
if settings.current_workspace is None:
raise NoActiveWorkspaceError

directory = pipeline_directory_path.absolute()
pipeline = get_pipeline_metadata(directory)
zip_file = generate_zip_file(directory)

if settings.debug:
# Write zip_file to disk for debugging
Expand Down
6 changes: 1 addition & 5 deletions openhexa/sdk/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,10 @@ def __call__(self, config: typing.Optional[dict[str, typing.Any]] = None):
This method can be called with an explicit configuration. If no configuration is provided, it will parse the
command-line arguments to build it.
"""
# Handle local workspace config for dev / testing, if appropriate
if get_environment() == Environment.LOCAL_PIPELINE:
os.environ.update(get_local_workspace_config(Path("/home/hexa/pipeline")))

# User can run their pipeline using `python pipeline.py`. It's considered as a standalone usage of the library.
# Since we still support this use case for the moment, we'll try to load the workspace.yaml
# at the path of the file
elif get_environment() == Environment.STANDALONE:
if get_environment() == Environment.STANDALONE:
os.environ.update(get_local_workspace_config(Path(sys.argv[0]).parent))

if config is None: # Called without arguments, in the pipeline file itself
Expand Down
4 changes: 2 additions & 2 deletions openhexa/sdk/pipelines/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def get_local_workspace_config(path: Path):
if "files" in local_workspace_config:
try:
files_path = path / Path(local_workspace_config["files"]["path"])
if not files_path.exists():
# Let's create the folder if it doesn't exist
if files_path.exists() is False:
# Let's create the folder if it doesn't exist (only needed when running the pipeline using `python pipeline.py`)
files_path.mkdir(parents=True)
env_vars["WORKSPACE_FILES_PATH"] = str(files_path.resolve())
except KeyError:
Expand Down
4 changes: 2 additions & 2 deletions openhexa/sdk/workspaces/current_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ def files_path(self) -> str:
/home/hexa/workspace/some/path
"""
# FIXME: This is a hack to make the SDK work in the context of the `python pipeline.py` command.
# We can remove this once we deprecate this way of running pipelines
return os.environ["WORKSPACE_FILES_PATH"] if "WORKSPACE_FILES_PATH" in os.environ else "/home/hexa/workspace"
# We can remove this once we deprecate this way of running pipelines and only use /home/hexa/workspace
return os.environ.get("WORKSPACE_FILES_PATH", "/home/hexa/workspace")

@property
def tmp_path(self) -> str:
Expand Down

0 comments on commit d316597

Please sign in to comment.