From 6a1a6a4b456d320709f0e4c491171f01489a82b6 Mon Sep 17 00:00:00 2001 From: nazarfil Date: Mon, 19 Aug 2024 20:32:19 +0200 Subject: [PATCH 1/8] fix: copy workspace folder to correct path --- openhexa/cli/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openhexa/cli/api.py b/openhexa/cli/api.py index bd7ab22..5585b20 100644 --- a/openhexa/cli/api.py +++ b/openhexa/cli/api.py @@ -351,7 +351,7 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals volumes = { tmp_dir: {"bind": "/home/hexa/pipeline", "mode": "rw"}, mount_files_path: { - "bind": "/home/hexa/workspace", + "bind": "/home/hexa/pipeline/workspace", "mode": "rw", }, } From f6a19b2f61b2a997822110f473846520495a3943 Mon Sep 17 00:00:00 2001 From: nazarfil Date: Tue, 20 Aug 2024 10:29:51 +0200 Subject: [PATCH 2/8] fix: copies files correctly to workspace directory --- openhexa/cli/api.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/openhexa/cli/api.py b/openhexa/cli/api.py index 5585b20..d1ab680 100644 --- a/openhexa/cli/api.py +++ b/openhexa/cli/api.py @@ -346,7 +346,10 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals tmp_dir = tempfile.mkdtemp() for file_path in path.glob("**/*"): if file_path.suffix in (".py", ".ipynb", ".txt", ".md", ".yaml"): - shutil.copy(file_path, tmp_dir) + relative_path = file_path.relative_to(path) + destination_path = Path(tmp_dir) / relative_path + destination_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(file_path, destination_path) volumes = { tmp_dir: {"bind": "/home/hexa/pipeline", "mode": "rw"}, From 9ee1c898446e365b3cac2ea3a47b14503220f335 Mon Sep 17 00:00:00 2001 From: nazarfil Date: Wed, 21 Aug 2024 10:24:58 +0200 Subject: [PATCH 3/8] Revert "fix: copies files correctly to workspace directory" This reverts commit f6a19b2f61b2a997822110f473846520495a3943. --- openhexa/cli/api.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/openhexa/cli/api.py b/openhexa/cli/api.py index d1ab680..5585b20 100644 --- a/openhexa/cli/api.py +++ b/openhexa/cli/api.py @@ -346,10 +346,7 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals tmp_dir = tempfile.mkdtemp() for file_path in path.glob("**/*"): if file_path.suffix in (".py", ".ipynb", ".txt", ".md", ".yaml"): - relative_path = file_path.relative_to(path) - destination_path = Path(tmp_dir) / relative_path - destination_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(file_path, destination_path) + shutil.copy(file_path, tmp_dir) volumes = { tmp_dir: {"bind": "/home/hexa/pipeline", "mode": "rw"}, From b2ddbe07807019e95d92bfa86050b99c71cab403 Mon Sep 17 00:00:00 2001 From: nazarfil Date: Wed, 21 Aug 2024 10:25:07 +0200 Subject: [PATCH 4/8] Revert "fix: copy workspace folder to correct path" This reverts commit 6a1a6a4b456d320709f0e4c491171f01489a82b6. --- openhexa/cli/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openhexa/cli/api.py b/openhexa/cli/api.py index 5585b20..bd7ab22 100644 --- a/openhexa/cli/api.py +++ b/openhexa/cli/api.py @@ -351,7 +351,7 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals volumes = { tmp_dir: {"bind": "/home/hexa/pipeline", "mode": "rw"}, mount_files_path: { - "bind": "/home/hexa/pipeline/workspace", + "bind": "/home/hexa/workspace", "mode": "rw", }, } From 99cd6f2e8825609b77f8d03ad9b579379aa3cdb6 Mon Sep 17 00:00:00 2001 From: nazarfil Date: Thu, 22 Aug 2024 15:23:19 +0100 Subject: [PATCH 5/8] fix(Utils): run pipeline config with right path --- openhexa/sdk/pipelines/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/openhexa/sdk/pipelines/utils.py b/openhexa/sdk/pipelines/utils.py index 518ad8e..d0c049d 100644 --- a/openhexa/sdk/pipelines/utils.py +++ b/openhexa/sdk/pipelines/utils.py @@ -69,8 +69,9 @@ def get_local_workspace_config(path: Path): try: files_path = path / Path(local_workspace_config["files"]["path"]) if not files_path.exists(): - # Let's create the folder if it doesn't exist - files_path.mkdir(parents=True) + # When we start the pipeline container, we mount the workspace folder, + # if it doesn't exist, it means we don't provide the correct mount path, which is the case in local + files_path = Path("/home/hexa/workspace") env_vars["WORKSPACE_FILES_PATH"] = str(files_path.resolve()) except KeyError: exception_message = ( From e4bb7a37cae0df31c9978d5d2c3b1b78f0fdeb69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quentin=20G=C3=A9r=C3=B4me?= Date: Tue, 27 Aug 2024 17:37:40 +0200 Subject: [PATCH 6/8] fix(Run): In local runs using docker, the configuration from the workspace.yaml should not be parsed and loaded The envvars are already exported in the docker container by the CLI. --- openhexa/cli/api.py | 64 +++++++++++--------- openhexa/sdk/pipelines/pipeline.py | 6 +- openhexa/sdk/pipelines/utils.py | 7 +-- openhexa/sdk/workspaces/current_workspace.py | 4 +- 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/openhexa/cli/api.py b/openhexa/cli/api.py index bd7ab22..24685a6 100644 --- a/openhexa/cli/api.py +++ b/openhexa/cli/api.py @@ -6,7 +6,6 @@ import json import logging import os -import shutil import tempfile import typing from dataclasses import asdict @@ -330,7 +329,7 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals ensure_pipeline_config_exists(path) env_vars = get_local_workspace_config(path) # # Prepare the mount for the workspace's files - mount_files_path = Path(env_vars["WORKSPACE_FILES_PATH"]).absolute() + mount_files_path = Path(env_vars.pop("WORKSPACE_FILES_PATH")).absolute() try: docker_client = docker.from_env() docker_client.ping() @@ -342,11 +341,10 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals if image is None: image = env_vars.get("WORKSPACE_DOCKER_IMAGE", "blsq/openhexa-blsq-environment:latest") - # Create temporary directory with the files to mount + zip_file = generate_zip_file(path) tmp_dir = tempfile.mkdtemp() - for file_path in path.glob("**/*"): - if file_path.suffix in (".py", ".ipynb", ".txt", ".md", ".yaml"): - shutil.copy(file_path, tmp_dir) + with ZipFile(zip_file) as zf: + zf.extractall(tmp_dir) volumes = { tmp_dir: {"bind": "/home/hexa/pipeline", "mode": "rw"}, @@ -363,6 +361,8 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals **env_vars, } + print(environment) + command = f"pipeline run --config {base64.b64encode(json.dumps(config).encode('utf-8')).decode('utf-8')}" try: docker_client.images.get(image) @@ -477,31 +477,24 @@ def create_pipeline_structure(pipeline_name: str, base_path: Path, workspace: st return output_directory -def upload_pipeline( - pipeline_directory_path: typing.Union[str, Path], - name: str, - description: str = None, - link: str = None, -): - """Upload the pipeline contained in the provided directory using the GraphQL API. +def generate_zip_file(pipeline_directory_path: typing.Union[str, Path]) -> io.BytesIO: + """Generate a ZIP file containing the pipeline code. - The pipeline code will be zipped and base64-encoded before being sent to the backend. - """ - if settings.current_workspace is None: - raise NoActiveWorkspaceError - - directory = pipeline_directory_path.absolute() - pipeline = get_pipeline_metadata(directory) - - zip_file = io.BytesIO(b"") + Args: + pipeline_directory_path (typing.Union[str, Path]): The path to the pipeline directory. + Returns + ------- + io.BytesIO: A BytesIO object containing the ZIP file. + """ if settings.debug: click.echo("Generating ZIP file:") + zip_file = io.BytesIO(b"") files = [] # We exclude the workspace directory since it can break the mount of the bucket on /home/hexa/workspace # This is also the default value of the WORKSPACE_FILES_PATH env var - excluded_paths = [directory / "workspace"] + excluded_paths = [pipeline_directory_path / "workspace"] try: env_vars = get_local_workspace_config(pipeline_directory_path) if env_vars.get("WORKSPACE_FILES_PATH") and Path(env_vars["WORKSPACE_FILES_PATH"]) not in excluded_paths: @@ -509,9 +502,8 @@ def upload_pipeline( except FileNotFoundError: # No workspace.yaml file found, we can ignore this error and assume the default value of WORKSPACE_FILES_PATH pass - with ZipFile(zip_file, "w") as zipObj: - for path in directory.glob("**/*"): + for path in pipeline_directory_path.glob("**/*"): if path.name == "python": # We are in a virtual environment excluded_paths.append(path.parent.parent) # .//bin/python -> ./ @@ -532,9 +524,27 @@ def upload_pipeline( continue if settings.debug: click.echo(f"\t{file_path.name}") - zipObj.write(file_path, file_path.relative_to(directory)) - + zipObj.write(file_path, file_path.relative_to(pipeline_directory_path)) zip_file.seek(0) + return zip_file + + +def upload_pipeline( + pipeline_directory_path: typing.Union[str, Path], + name: str, + description: str = None, + link: str = None, +): + """Upload the pipeline contained in the provided directory using the GraphQL API. + + The pipeline code will be zipped and base64-encoded before being sent to the backend. + """ + if settings.current_workspace is None: + raise NoActiveWorkspaceError + + directory = pipeline_directory_path.absolute() + pipeline = get_pipeline_metadata(directory) + zip_file = generate_zip_file(directory) if settings.debug: # Write zip_file to disk for debugging diff --git a/openhexa/sdk/pipelines/pipeline.py b/openhexa/sdk/pipelines/pipeline.py index af64e2c..b9b3a6c 100644 --- a/openhexa/sdk/pipelines/pipeline.py +++ b/openhexa/sdk/pipelines/pipeline.py @@ -202,14 +202,10 @@ def __call__(self, config: typing.Optional[dict[str, typing.Any]] = None): This method can be called with an explicit configuration. If no configuration is provided, it will parse the command-line arguments to build it. """ - # Handle local workspace config for dev / testing, if appropriate - if get_environment() == Environment.LOCAL_PIPELINE: - os.environ.update(get_local_workspace_config(Path("/home/hexa/pipeline"))) - # User can run their pipeline using `python pipeline.py`. It's considered as a standalone usage of the library. # Since we still support this use case for the moment, we'll try to load the workspace.yaml # at the path of the file - elif get_environment() == Environment.STANDALONE: + if get_environment() == Environment.STANDALONE: os.environ.update(get_local_workspace_config(Path(sys.argv[0]).parent)) if config is None: # Called without arguments, in the pipeline file itself diff --git a/openhexa/sdk/pipelines/utils.py b/openhexa/sdk/pipelines/utils.py index d0c049d..89f69d6 100644 --- a/openhexa/sdk/pipelines/utils.py +++ b/openhexa/sdk/pipelines/utils.py @@ -68,10 +68,9 @@ def get_local_workspace_config(path: Path): if "files" in local_workspace_config: try: files_path = path / Path(local_workspace_config["files"]["path"]) - if not files_path.exists(): - # When we start the pipeline container, we mount the workspace folder, - # if it doesn't exist, it means we don't provide the correct mount path, which is the case in local - files_path = Path("/home/hexa/workspace") + if files_path.exists() is False: + # Let's create the folder if it doesn't exist + files_path.mkdir(parents=True) env_vars["WORKSPACE_FILES_PATH"] = str(files_path.resolve()) except KeyError: exception_message = ( diff --git a/openhexa/sdk/workspaces/current_workspace.py b/openhexa/sdk/workspaces/current_workspace.py index 73bf404..f132759 100644 --- a/openhexa/sdk/workspaces/current_workspace.py +++ b/openhexa/sdk/workspaces/current_workspace.py @@ -137,8 +137,8 @@ def files_path(self) -> str: /home/hexa/workspace/some/path """ # FIXME: This is a hack to make the SDK work in the context of the `python pipeline.py` command. - # We can remove this once we deprecate this way of running pipelines - return os.environ["WORKSPACE_FILES_PATH"] if "WORKSPACE_FILES_PATH" in os.environ else "/home/hexa/workspace" + # We can remove this once we deprecate this way of running pipelines and only use /home/hexa/workspace + return os.environ.get("WORKSPACE_FILES_PATH", "/home/hexa/workspace") @property def tmp_path(self) -> str: From 8cce1ce694c79ac6f974e11ed6701108778eb840 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quentin=20G=C3=A9r=C3=B4me?= Date: Tue, 27 Aug 2024 17:38:21 +0200 Subject: [PATCH 7/8] chore(readme): Use the `openhexa pipelines run` command instead of `python pipeline.py` in the quickstart --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 7d5a447..23782b9 100644 --- a/README.md +++ b/README.md @@ -66,8 +66,7 @@ Great! As you can see in the console output, the OpenHEXA CLI has created a new structure required for an OpenHEXA pipeline. You can now `cd` in the new pipeline directory and run the pipeline: ```shell -cd my_awesome_pipeline -python pipeline.py +openhexa pipelines run ./my_awesome_pipeline ``` Congratulations! You have successfully run your first pipeline locally. From 2357214befbcdc26f2ef742af1e5ebfa02243d1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quentin=20G=C3=A9r=C3=B4me?= Date: Wed, 28 Aug 2024 11:29:32 +0200 Subject: [PATCH 8/8] chore: add a comment on the creation of the files directory --- openhexa/sdk/pipelines/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openhexa/sdk/pipelines/utils.py b/openhexa/sdk/pipelines/utils.py index 89f69d6..1bc9a2e 100644 --- a/openhexa/sdk/pipelines/utils.py +++ b/openhexa/sdk/pipelines/utils.py @@ -69,7 +69,7 @@ def get_local_workspace_config(path: Path): try: files_path = path / Path(local_workspace_config["files"]["path"]) if files_path.exists() is False: - # Let's create the folder if it doesn't exist + # Let's create the folder if it doesn't exist (only needed when running the pipeline using `python pipeline.py`) files_path.mkdir(parents=True) env_vars["WORKSPACE_FILES_PATH"] = str(files_path.resolve()) except KeyError: