diff --git a/README.md b/README.md index 7d5a447..23782b9 100644 --- a/README.md +++ b/README.md @@ -66,8 +66,7 @@ Great! As you can see in the console output, the OpenHEXA CLI has created a new structure required for an OpenHEXA pipeline. You can now `cd` in the new pipeline directory and run the pipeline: ```shell -cd my_awesome_pipeline -python pipeline.py +openhexa pipelines run ./my_awesome_pipeline ``` Congratulations! You have successfully run your first pipeline locally. diff --git a/openhexa/cli/api.py b/openhexa/cli/api.py index bd7ab22..24685a6 100644 --- a/openhexa/cli/api.py +++ b/openhexa/cli/api.py @@ -6,7 +6,6 @@ import json import logging import os -import shutil import tempfile import typing from dataclasses import asdict @@ -330,7 +329,7 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals ensure_pipeline_config_exists(path) env_vars = get_local_workspace_config(path) # # Prepare the mount for the workspace's files - mount_files_path = Path(env_vars["WORKSPACE_FILES_PATH"]).absolute() + mount_files_path = Path(env_vars.pop("WORKSPACE_FILES_PATH")).absolute() try: docker_client = docker.from_env() docker_client.ping() @@ -342,11 +341,10 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals if image is None: image = env_vars.get("WORKSPACE_DOCKER_IMAGE", "blsq/openhexa-blsq-environment:latest") - # Create temporary directory with the files to mount + zip_file = generate_zip_file(path) tmp_dir = tempfile.mkdtemp() - for file_path in path.glob("**/*"): - if file_path.suffix in (".py", ".ipynb", ".txt", ".md", ".yaml"): - shutil.copy(file_path, tmp_dir) + with ZipFile(zip_file) as zf: + zf.extractall(tmp_dir) volumes = { tmp_dir: {"bind": "/home/hexa/pipeline", "mode": "rw"}, @@ -363,6 +361,8 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals **env_vars, } + print(environment) + command = f"pipeline run --config {base64.b64encode(json.dumps(config).encode('utf-8')).decode('utf-8')}" try: docker_client.images.get(image) @@ -477,31 +477,24 @@ def create_pipeline_structure(pipeline_name: str, base_path: Path, workspace: st return output_directory -def upload_pipeline( - pipeline_directory_path: typing.Union[str, Path], - name: str, - description: str = None, - link: str = None, -): - """Upload the pipeline contained in the provided directory using the GraphQL API. +def generate_zip_file(pipeline_directory_path: typing.Union[str, Path]) -> io.BytesIO: + """Generate a ZIP file containing the pipeline code. - The pipeline code will be zipped and base64-encoded before being sent to the backend. - """ - if settings.current_workspace is None: - raise NoActiveWorkspaceError - - directory = pipeline_directory_path.absolute() - pipeline = get_pipeline_metadata(directory) - - zip_file = io.BytesIO(b"") + Args: + pipeline_directory_path (typing.Union[str, Path]): The path to the pipeline directory. + Returns + ------- + io.BytesIO: A BytesIO object containing the ZIP file. + """ if settings.debug: click.echo("Generating ZIP file:") + zip_file = io.BytesIO(b"") files = [] # We exclude the workspace directory since it can break the mount of the bucket on /home/hexa/workspace # This is also the default value of the WORKSPACE_FILES_PATH env var - excluded_paths = [directory / "workspace"] + excluded_paths = [pipeline_directory_path / "workspace"] try: env_vars = get_local_workspace_config(pipeline_directory_path) if env_vars.get("WORKSPACE_FILES_PATH") and Path(env_vars["WORKSPACE_FILES_PATH"]) not in excluded_paths: @@ -509,9 +502,8 @@ def upload_pipeline( except FileNotFoundError: # No workspace.yaml file found, we can ignore this error and assume the default value of WORKSPACE_FILES_PATH pass - with ZipFile(zip_file, "w") as zipObj: - for path in directory.glob("**/*"): + for path in pipeline_directory_path.glob("**/*"): if path.name == "python": # We are in a virtual environment excluded_paths.append(path.parent.parent) # .//bin/python -> ./ @@ -532,9 +524,27 @@ def upload_pipeline( continue if settings.debug: click.echo(f"\t{file_path.name}") - zipObj.write(file_path, file_path.relative_to(directory)) - + zipObj.write(file_path, file_path.relative_to(pipeline_directory_path)) zip_file.seek(0) + return zip_file + + +def upload_pipeline( + pipeline_directory_path: typing.Union[str, Path], + name: str, + description: str = None, + link: str = None, +): + """Upload the pipeline contained in the provided directory using the GraphQL API. + + The pipeline code will be zipped and base64-encoded before being sent to the backend. + """ + if settings.current_workspace is None: + raise NoActiveWorkspaceError + + directory = pipeline_directory_path.absolute() + pipeline = get_pipeline_metadata(directory) + zip_file = generate_zip_file(directory) if settings.debug: # Write zip_file to disk for debugging diff --git a/openhexa/sdk/pipelines/pipeline.py b/openhexa/sdk/pipelines/pipeline.py index af64e2c..b9b3a6c 100644 --- a/openhexa/sdk/pipelines/pipeline.py +++ b/openhexa/sdk/pipelines/pipeline.py @@ -202,14 +202,10 @@ def __call__(self, config: typing.Optional[dict[str, typing.Any]] = None): This method can be called with an explicit configuration. If no configuration is provided, it will parse the command-line arguments to build it. """ - # Handle local workspace config for dev / testing, if appropriate - if get_environment() == Environment.LOCAL_PIPELINE: - os.environ.update(get_local_workspace_config(Path("/home/hexa/pipeline"))) - # User can run their pipeline using `python pipeline.py`. It's considered as a standalone usage of the library. # Since we still support this use case for the moment, we'll try to load the workspace.yaml # at the path of the file - elif get_environment() == Environment.STANDALONE: + if get_environment() == Environment.STANDALONE: os.environ.update(get_local_workspace_config(Path(sys.argv[0]).parent)) if config is None: # Called without arguments, in the pipeline file itself diff --git a/openhexa/sdk/pipelines/utils.py b/openhexa/sdk/pipelines/utils.py index 518ad8e..1bc9a2e 100644 --- a/openhexa/sdk/pipelines/utils.py +++ b/openhexa/sdk/pipelines/utils.py @@ -68,8 +68,8 @@ def get_local_workspace_config(path: Path): if "files" in local_workspace_config: try: files_path = path / Path(local_workspace_config["files"]["path"]) - if not files_path.exists(): - # Let's create the folder if it doesn't exist + if files_path.exists() is False: + # Let's create the folder if it doesn't exist (only needed when running the pipeline using `python pipeline.py`) files_path.mkdir(parents=True) env_vars["WORKSPACE_FILES_PATH"] = str(files_path.resolve()) except KeyError: diff --git a/openhexa/sdk/workspaces/current_workspace.py b/openhexa/sdk/workspaces/current_workspace.py index 73bf404..f132759 100644 --- a/openhexa/sdk/workspaces/current_workspace.py +++ b/openhexa/sdk/workspaces/current_workspace.py @@ -137,8 +137,8 @@ def files_path(self) -> str: /home/hexa/workspace/some/path """ # FIXME: This is a hack to make the SDK work in the context of the `python pipeline.py` command. - # We can remove this once we deprecate this way of running pipelines - return os.environ["WORKSPACE_FILES_PATH"] if "WORKSPACE_FILES_PATH" in os.environ else "/home/hexa/workspace" + # We can remove this once we deprecate this way of running pipelines and only use /home/hexa/workspace + return os.environ.get("WORKSPACE_FILES_PATH", "/home/hexa/workspace") @property def tmp_path(self) -> str: