diff --git a/CHANGELOG.md b/CHANGELOG.md index d7ffb8aca..ec165d88b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 asset metadata. Although the exact numerical value is not yet important, whether the size is reported to be zero or positive does have consequences. +- Pack deps, call_before, and call_after assets into one file. + +### Fixed + +- Reduced number of assets to upload when submitting a dispatch. ### Operations diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index 9372a43e0..e31670c90 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -51,9 +51,7 @@ SDK_NODE_META_KEYS = { "executor", "executor_data", - "deps", - "call_before", - "call_after", + "hooks", } SDK_LAT_META_KEYS = { @@ -61,9 +59,7 @@ "executor_data", "workflow_executor", "workflow_executor_data", - "deps", - "call_before", - "call_after", + "hooks", } DEFERRED_KEYS = { diff --git a/covalent/_serialize/electron.py b/covalent/_serialize/electron.py index 03a3b1ae2..fe5763675 100644 --- a/covalent/_serialize/electron.py +++ b/covalent/_serialize/electron.py @@ -39,9 +39,7 @@ "function_string": AssetType.TEXT, "value": AssetType.TRANSPORTABLE, "output": AssetType.TRANSPORTABLE, - "deps": AssetType.JSONABLE, - "call_before": AssetType.JSONABLE, - "call_after": AssetType.JSONABLE, + "hooks": AssetType.JSONABLE, "qelectron_db": AssetType.BYTES, "stdout": AssetType.TEXT, "stderr": AssetType.TEXT, @@ -160,27 +158,10 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron ASSET_FILENAME_MAP["error"], ) - deps = node_attrs["metadata"]["deps"] - deps_asset = save_asset( - deps, ASSET_TYPES["deps"], node_storage_path, ASSET_FILENAME_MAP["deps"] + hooks = node_attrs["metadata"]["hooks"] + hooks_asset = save_asset( + hooks, ASSET_TYPES["hooks"], node_storage_path, ASSET_FILENAME_MAP["hooks"] ) - - call_before = node_attrs["metadata"]["call_before"] - call_before_asset = save_asset( - call_before, - ASSET_TYPES["call_before"], - node_storage_path, - ASSET_FILENAME_MAP["call_before"], - ) - - call_after = node_attrs["metadata"]["call_after"] - call_after_asset = save_asset( - call_after, - ASSET_TYPES["call_after"], - node_storage_path, - ASSET_FILENAME_MAP["call_after"], - ) - return ElectronAssets( function=function_asset, function_string=function_string_asset, @@ -190,9 +171,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron stderr=stderr_asset, qelectron_db=qelectron_db_asset, error=error_asset, - deps=deps_asset, - call_before=call_before_asset, - call_after=call_after_asset, + hooks=hooks_asset, ) @@ -206,9 +185,7 @@ def _deserialize_node_assets(ea: ElectronAssets) -> dict: qelectron_db = load_asset(ea.qelectron_db, ASSET_TYPES["qelectron_db"]) error = load_asset(ea.error, ASSET_TYPES["error"]) - deps = load_asset(ea.deps, ASSET_TYPES["deps"]) - call_before = load_asset(ea.call_before, ASSET_TYPES["call_before"]) - call_after = load_asset(ea.call_after, ASSET_TYPES["call_after"]) + hooks = load_asset(ea.hooks, ASSET_TYPES["hooks"]) return { "function": function, @@ -220,9 +197,7 @@ def _deserialize_node_assets(ea: ElectronAssets) -> dict: "qelectron_db": qelectron_db, "error": error, "metadata": { - "deps": deps, - "call_before": call_before, - "call_after": call_after, + "hooks": hooks, }, } diff --git a/covalent/_serialize/lattice.py b/covalent/_serialize/lattice.py index b188b8edf..3ab39f2bc 100644 --- a/covalent/_serialize/lattice.py +++ b/covalent/_serialize/lattice.py @@ -44,9 +44,7 @@ "named_kwargs": AssetType.TRANSPORTABLE, "cova_imports": AssetType.JSONABLE, "lattice_imports": AssetType.TEXT, - "deps": AssetType.JSONABLE, - "call_before": AssetType.JSONABLE, - "call_after": AssetType.JSONABLE, + "hooks": AssetType.JSONABLE, } @@ -141,23 +139,11 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets: ) # NOTE: these are actually JSONable - deps_asset = save_asset( - lat.metadata["deps"], - ASSET_TYPES["deps"], + hooks_asset = save_asset( + lat.metadata["hooks"], + ASSET_TYPES["hooks"], storage_path, - ASSET_FILENAME_MAP["deps"], - ) - call_before_asset = save_asset( - lat.metadata["call_before"], - ASSET_TYPES["call_before"], - storage_path, - ASSET_FILENAME_MAP["call_before"], - ) - call_after_asset = save_asset( - lat.metadata["call_after"], - ASSET_TYPES["call_after"], - storage_path, - ASSET_FILENAME_MAP["call_after"], + ASSET_FILENAME_MAP["hooks"], ) return LatticeAssets( @@ -169,9 +155,7 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets: named_kwargs=named_kwargs_asset, cova_imports=cova_imports_asset, lattice_imports=lattice_imports_asset, - deps=deps_asset, - call_before=call_before_asset, - call_after=call_after_asset, + hooks=hooks_asset, ) @@ -186,9 +170,7 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict: named_kwargs = load_asset(assets.named_kwargs, ASSET_TYPES["named_kwargs"]) cova_imports = load_asset(assets.cova_imports, ASSET_TYPES["cova_imports"]) lattice_imports = load_asset(assets.lattice_imports, ASSET_TYPES["lattice_imports"]) - deps = load_asset(assets.deps, ASSET_TYPES["deps"]) - call_before = load_asset(assets.call_before, ASSET_TYPES["call_before"]) - call_after = load_asset(assets.call_after, ASSET_TYPES["call_after"]) + hooks = load_asset(assets.hooks, ASSET_TYPES["hooks"]) return { "workflow_function": workflow_function, "workflow_function_string": workflow_function_string, @@ -199,9 +181,7 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict: "cova_imports": cova_imports, "lattice_imports": lattice_imports, "metadata": { - "deps": deps, - "call_before": call_before, - "call_after": call_after, + "hooks": hooks, }, } diff --git a/covalent/_shared_files/defaults.py b/covalent/_shared_files/defaults.py index 3fdcaf010..a1b9983a4 100644 --- a/covalent/_shared_files/defaults.py +++ b/covalent/_shared_files/defaults.py @@ -17,9 +17,8 @@ """Create custom sentinels and defaults for Covalent""" import os -from builtins import list from dataclasses import dataclass, field -from typing import Dict, List +from typing import Dict import dask.system @@ -199,8 +198,6 @@ class DefaultConfig: class DefaultMetadataValues: executor: str = field(default_factory=get_default_executor) executor_data: Dict = field(default_factory=dict) - deps: Dict = field(default_factory=dict) - call_before: List = field(default_factory=list) - call_after: List = field(default_factory=list) + hooks: Dict = field(default_factory=dict) workflow_executor: str = field(default_factory=get_default_executor) workflow_executor_data: Dict = field(default_factory=dict) diff --git a/covalent/_shared_files/schemas/electron.py b/covalent/_shared_files/schemas/electron.py index d75500d82..4a6541383 100644 --- a/covalent/_shared_files/schemas/electron.py +++ b/covalent/_shared_files/schemas/electron.py @@ -46,9 +46,7 @@ "stderr", "qelectron_db", # user dependent assets - "deps", - "call_before", - "call_after", + "hooks", } ELECTRON_FUNCTION_FILENAME = "function.tobj" @@ -61,7 +59,7 @@ ) ELECTRON_ERROR_FILENAME = "error.log" ELECTRON_RESULTS_FILENAME = "results.tobj" -ELECTRON_DEPS_FILENAME = "deps.json" +ELECTRON_HOOKS_FILENAME = "hooks.json" ELECTRON_CALL_BEFORE_FILENAME = "call_before.json" ELECTRON_CALL_AFTER_FILENAME = "call_after.json" ELECTRON_STORAGE_TYPE = "file" @@ -76,9 +74,7 @@ "stderr": ELECTRON_STDERR_FILENAME, "qelectron_db": ELECTRON_QELECTRON_DB_FILENAME, "error": ELECTRON_ERROR_FILENAME, - "deps": ELECTRON_DEPS_FILENAME, - "call_before": ELECTRON_CALL_BEFORE_FILENAME, - "call_after": ELECTRON_CALL_AFTER_FILENAME, + "hooks": ELECTRON_HOOKS_FILENAME, } @@ -93,9 +89,7 @@ class ElectronAssets(BaseModel): qelectron_db: AssetSchema # user dependent assets - deps: AssetSchema - call_before: AssetSchema - call_after: AssetSchema + hooks: AssetSchema class ElectronMetadata(BaseModel): diff --git a/covalent/_shared_files/schemas/lattice.py b/covalent/_shared_files/schemas/lattice.py index 80a8c62aa..0fdf35f16 100644 --- a/covalent/_shared_files/schemas/lattice.py +++ b/covalent/_shared_files/schemas/lattice.py @@ -44,9 +44,7 @@ "cova_imports", "lattice_imports", # user dependent assets - "deps", - "call_before", - "call_after", + "hooks", } LATTICE_FUNCTION_FILENAME = "function.tobj" @@ -59,7 +57,7 @@ LATTICE_NAMED_ARGS_FILENAME = "named_args.tobj" LATTICE_NAMED_KWARGS_FILENAME = "named_kwargs.tobj" LATTICE_RESULTS_FILENAME = "results.tobj" -LATTICE_DEPS_FILENAME = "deps.json" +LATTICE_HOOKS_FILENAME = "hooks.json" LATTICE_CALL_BEFORE_FILENAME = "call_before.json" LATTICE_CALL_AFTER_FILENAME = "call_after.json" LATTICE_COVA_IMPORTS_FILENAME = "cova_imports.json" @@ -76,9 +74,7 @@ "named_kwargs": LATTICE_NAMED_KWARGS_FILENAME, "cova_imports": LATTICE_COVA_IMPORTS_FILENAME, "lattice_imports": LATTICE_LATTICE_IMPORTS_FILENAME, - "deps": LATTICE_DEPS_FILENAME, - "call_before": LATTICE_CALL_BEFORE_FILENAME, - "call_after": LATTICE_CALL_AFTER_FILENAME, + "hooks": LATTICE_HOOKS_FILENAME, } @@ -93,9 +89,7 @@ class LatticeAssets(BaseModel): lattice_imports: AssetSchema # lattice.metadata - deps: AssetSchema - call_before: AssetSchema - call_after: AssetSchema + hooks: AssetSchema class LatticeMetadata(BaseModel): diff --git a/covalent/_workflow/electron.py b/covalent/_workflow/electron.py index ebe313675..008a6a234 100644 --- a/covalent/_workflow/electron.py +++ b/covalent/_workflow/electron.py @@ -309,10 +309,11 @@ def get_item(e, key): iterable_metadata = self.metadata.copy() filtered_call_before = [] - for elem in iterable_metadata["call_before"]: - if elem["attributes"]["retval_keyword"] != "files": - filtered_call_before.append(elem) - iterable_metadata["call_before"] = filtered_call_before + if "call_before" in iterable_metadata["hooks"]: + for elem in iterable_metadata["hooks"]["call_before"]: + if elem["attributes"]["retval_keyword"] != "files": + filtered_call_before.append(elem) + iterable_metadata["hooks"]["call_before"] = filtered_call_before # Pack with main electron unless it is a sublattice. name = active_lattice.transport_graph.get_node_value(self.node_id, "name") @@ -452,6 +453,7 @@ def __call__(self, *args, **kwargs) -> Union[Any, "Electron"]: # Add a node to the transport graph of the active lattice. Electrons bound to nodes will never be packed with the # 'master' Electron. # Add non-sublattice node to the transport graph of the active lattice. + self.node_id = active_lattice.transport_graph.add_node( name=self.function.__name__, function=self.function, @@ -474,15 +476,20 @@ def __call__(self, *args, **kwargs) -> Union[Any, "Electron"]: # For keyword arguments # Filter out kwargs to be injected by call_before call_deps during execution. - call_before = self.metadata["call_before"] - retval_keywords = {item["attributes"]["retval_keyword"]: None for item in call_before} + + retval_keywords = {} + if "call_before" in self.metadata["hooks"]: + call_before = self.metadata["hooks"]["call_before"] + retval_keywords = { + item["attributes"]["retval_keyword"]: None for item in call_before + } + for key, value in named_kwargs.items(): if key in retval_keywords: app_log.debug( f"kwarg {key} for function {self.function.__name__} to be injected at runtime" ) continue - self.connect_node_with_others( self.node_id, key, value, "kwarg", None, active_lattice.transport_graph ) @@ -771,13 +778,21 @@ def electron( if call_after: call_after_final.extend(call_after) + if deps is None and call_before_final is None and call_after_final is None: + hooks = None + else: + hooks = {} + if deps is not None: + hooks["deps"] = deps + if call_before is not None: + hooks["call_before"] = call_before + if call_after is not None: + hooks["call_after"] = call_after + constraints = { "executor": executor, - "deps": deps, - "call_before": call_before_final, - "call_after": call_after_final, + "hooks": hooks, } - constraints = encode_metadata(constraints) def decorator_electron(func=None): diff --git a/covalent/_workflow/lattice.py b/covalent/_workflow/lattice.py index 2c4159d99..8f81fb706 100644 --- a/covalent/_workflow/lattice.py +++ b/covalent/_workflow/lattice.py @@ -18,11 +18,9 @@ import importlib.metadata import json -import os import warnings import webbrowser from builtins import list -from contextlib import redirect_stdout from copy import deepcopy from dataclasses import asdict from functools import wraps @@ -211,7 +209,7 @@ def build_graph(self, *args, **kwargs) -> None: self.lattice_imports, self.cova_imports = get_imports(workflow_function) # Set any lattice metadata not explicitly set by the user - constraint_names = {"executor", "workflow_executor", "deps", "call_before", "call_after"} + constraint_names = {"executor", "workflow_executor", "hooks"} new_metadata = { name: DEFAULT_METADATA_VALUES[name] for name in constraint_names @@ -225,15 +223,14 @@ def build_graph(self, *args, **kwargs) -> None: # Check whether task packing is enabled self._task_packing = get_config("sdk.task_packing") == "true" - with redirect_stdout(open(os.devnull, "w")): - with active_lattice_manager.claim(self): - try: - retval = workflow_function(*new_args, **new_kwargs) - except Exception: - warnings.warn( - "Please make sure you are not manipulating an object inside the lattice." - ) - raise + with active_lattice_manager.claim(self): + try: + retval = workflow_function(*new_args, **new_kwargs) + except Exception: + warnings.warn( + "Please make sure you are not manipulating an object inside the lattice." + ) + raise pp = Postprocessor(lattice=self) @@ -364,7 +361,7 @@ def lattice( ) executor = backend - deps = {} + deps = {} if deps_bash or deps_pip else None if isinstance(deps_bash, DepsBash): deps["bash"] = deps_bash @@ -387,12 +384,21 @@ def lattice( if isinstance(triggers, BaseTrigger): triggers = [triggers] + if deps is None and call_before is None and call_after is None: + hooks = None + else: + hooks = {} + if deps is not None: + hooks["deps"] = deps + if call_before is not None: + hooks["call_before"] = call_before + if call_after is not None: + hooks["call_after"] = call_after + constraints = { "executor": executor, "workflow_executor": workflow_executor, - "deps": deps, - "call_before": call_before, - "call_after": call_after, + "hooks": hooks, "triggers": triggers, } diff --git a/covalent/_workflow/lepton.py b/covalent/_workflow/lepton.py index f9084f2c8..9d54d213a 100644 --- a/covalent/_workflow/lepton.py +++ b/covalent/_workflow/lepton.py @@ -41,6 +41,11 @@ # TODO: Review exceptions/errors +DEFAULT_HOOKS = DEFAULT_METADATA_VALUES["hooks"] +DEFAULT_DEPS = DEFAULT_HOOKS.get("deps", {}) +DEFAULT_CALL_BEFORE = DEFAULT_HOOKS.get("call_before", []) +DEFAULT_CALL_AFTER = DEFAULT_HOOKS.get("call_after", []) + class Lepton(Electron): """ @@ -86,10 +91,10 @@ def __init__( List[Union[str, "BaseExecutor"]], Union[str, "BaseExecutor"] ] = DEFAULT_METADATA_VALUES["executor"], files: List[FileTransfer] = [], - deps_bash: Union[DepsBash, List, str] = DEFAULT_METADATA_VALUES["deps"].get("bash", []), - deps_pip: Union[DepsPip, list] = DEFAULT_METADATA_VALUES["deps"].get("pip", None), - call_before: Union[List[DepsCall], DepsCall] = DEFAULT_METADATA_VALUES["call_before"], - call_after: Union[List[DepsCall], DepsCall] = DEFAULT_METADATA_VALUES["call_after"], + deps_bash: Union[DepsBash, List, str] = DEFAULT_DEPS.get("bash", []), + deps_pip: Union[DepsPip, list] = DEFAULT_DEPS.get("pip", []), + call_before: Union[List[DepsCall], DepsCall] = DEFAULT_CALL_BEFORE, + call_after: Union[List[DepsCall], DepsCall] = DEFAULT_CALL_AFTER, ) -> None: self.language = language self.library_name = library_name @@ -174,13 +179,16 @@ def __init__( "DepsCall retval_keyword(s) are not currently supported for Leptons, please remove the retval_keyword arg from DepsCall for the workflow to be constructed successfully." ) - # Should be synced with electron - constraints = { - "executor": executor, + hooks = { "deps": deps, "call_before": call_before, "call_after": call_after, } + # Should be synced with electron + constraints = { + "executor": executor, + "hooks": hooks, + } constraints = encode_metadata(constraints) @@ -424,10 +432,10 @@ def bash( Union[List[Union[str, "BaseExecutor"]], Union[str, "BaseExecutor"]] ] = DEFAULT_METADATA_VALUES["executor"], files: List[FileTransfer] = [], - deps_bash: Union[DepsBash, List, str] = DEFAULT_METADATA_VALUES["deps"].get("bash", []), - deps_pip: Union[DepsPip, list] = DEFAULT_METADATA_VALUES["deps"].get("pip", None), - call_before: Union[List[DepsCall], DepsCall] = DEFAULT_METADATA_VALUES["call_before"], - call_after: Union[List[DepsCall], DepsCall] = DEFAULT_METADATA_VALUES["call_after"], + deps_bash: Union[DepsBash, List, str] = DEFAULT_DEPS.get("bash", []), + deps_pip: Union[DepsPip, list] = DEFAULT_DEPS.get("pip", []), + call_before: Union[List[DepsCall], DepsCall] = DEFAULT_CALL_BEFORE, + call_after: Union[List[DepsCall], DepsCall] = DEFAULT_CALL_AFTER, ) -> Callable: """Bash decorator which wraps a Python function as a Bash Lepton.""" diff --git a/covalent/_workflow/transport.py b/covalent/_workflow/transport.py index c1c7dd31d..2ab1db739 100644 --- a/covalent/_workflow/transport.py +++ b/covalent/_workflow/transport.py @@ -56,21 +56,25 @@ def encode_metadata(metadata: dict) -> dict: encoded_metadata["workflow_executor_data"] = encoded_wf_executor # Bash Deps, Pip Deps, Env Deps, etc - if "deps" in metadata and metadata["deps"] is not None: - for dep_type, dep_object in metadata["deps"].items(): - if dep_object and not isinstance(dep_object, dict): - encoded_metadata["deps"][dep_type] = dep_object.to_dict() - - # call_before/after - if "call_before" in metadata and metadata["call_before"] is not None: - for i, dep in enumerate(metadata["call_before"]): - if not isinstance(dep, dict): - encoded_metadata["call_before"][i] = dep.to_dict() - - if "call_after" in metadata and metadata["call_after"] is not None: - for i, dep in enumerate(metadata["call_after"]): - if not isinstance(dep, dict): - encoded_metadata["call_after"][i] = dep.to_dict() + + if "hooks" in metadata and metadata["hooks"] is not None: + hooks = metadata["hooks"] + + if "deps" in hooks and hooks["deps"] is not None: + for dep_type, dep_object in hooks["deps"].items(): + if dep_object and not isinstance(dep_object, dict): + encoded_metadata["hooks"]["deps"][dep_type] = dep_object.to_dict() + + # call_before/after + if "call_before" in hooks and hooks["call_before"] is not None: + for i, dep in enumerate(hooks["call_before"]): + if not isinstance(dep, dict): + encoded_metadata["hooks"]["call_before"][i] = dep.to_dict() + + if "call_after" in hooks and hooks["call_after"] is not None: + for i, dep in enumerate(hooks["call_after"]): + if not isinstance(dep, dict): + encoded_metadata["hooks"]["call_after"][i] = dep.to_dict() # triggers if "triggers" in metadata: diff --git a/covalent/executor/schemas.py b/covalent/executor/schemas.py index 2e9b16204..40b4c732e 100644 --- a/covalent/executor/schemas.py +++ b/covalent/executor/schemas.py @@ -56,9 +56,7 @@ class TaskSpec(BaseModel): function_id: The `node_id` of the function. args_ids: The `node_id`s of the function's args kwargs_ids: The `node_id`s of the function's kwargs {key: node_id} - deps_id: An opaque string representing the task's deps. - call_before_id: An opaque string representing the task's call_before. - call_after_id: An opaque string representing the task's call_before. + hooks_id: An opaque string representing the task's hooks. The attribute values can be used in conjunction with a `ResourceMap` to locate the actual resources in the compute @@ -68,9 +66,6 @@ class TaskSpec(BaseModel): function_id: int args_ids: List[int] kwargs_ids: Dict[str, int] - deps_id: str - call_before_id: str - call_after_id: str class ResourceMap(BaseModel): @@ -101,7 +96,7 @@ class ResourceMap(BaseModel): inputs: Dict[int, str] # Includes deps, call_before, call_after - deps: Dict[str, str] + hooks: Dict[int, str] class TaskGroup(BaseModel): diff --git a/covalent/executor/utils/wrappers.py b/covalent/executor/utils/wrappers.py index 90dec9e64..8c20a009f 100644 --- a/covalent/executor/utils/wrappers.py +++ b/covalent/executor/utils/wrappers.py @@ -220,21 +220,14 @@ def run_task_from_uris( resp.raise_for_status() ser_kwargs[k] = deserialize_node_asset(resp.content, "output") - # Download deps - deps_url = f"{server_url}/api/v2/dispatches/{dispatch_id}/electrons/{task_id}/assets/deps" - resp = requests.get(deps_url, stream=True) + # Download deps, call_before, and call_after + hooks_url = f"{server_url}/api/v2/dispatches/{dispatch_id}/electrons/{task_id}/assets/hooks" + resp = requests.get(hooks_url, stream=True) resp.raise_for_status() - deps_json = deserialize_node_asset(resp.content, "deps") - - cb_url = f"{server_url}/api/v2/dispatches/{dispatch_id}/electrons/{task_id}/assets/call_before" - resp = requests.get(cb_url, stream=True) - resp.raise_for_status() - call_before_json = deserialize_node_asset(resp.content, "call_before") - - ca_url = f"{server_url}/api/v2/dispatches/{dispatch_id}/electrons/{task_id}/assets/call_after" - resp = requests.get(ca_url, stream=True) - resp.raise_for_status() - call_after_json = deserialize_node_asset(resp.content, "call_after") + hooks_json = deserialize_node_asset(resp.content, "hooks") + deps_json = hooks_json.get("deps", {}) + call_before_json = hooks_json.get("call_before", []) + call_after_json = hooks_json.get("call_after", []) # Assemble and run the task call_before, call_after = _gather_deps( @@ -417,27 +410,15 @@ def run_task_from_uris_alt( with open(uri, "rb") as f: ser_kwargs[key] = deserialize_node_asset(f.read(), "output") - # Load deps - deps_id = task["deps_id"] - deps_uri = resources["deps"][deps_id] - if deps_uri.startswith(prefix): - deps_uri = deps_uri[prefix_len:] - with open(deps_uri, "rb") as f: - deps_json = deserialize_node_asset(f.read(), "deps") - - call_before_id = task["call_before_id"] - call_before_uri = resources["deps"][call_before_id] - if call_before_uri.startswith(prefix): - call_before_uri = call_before_uri[prefix_len:] - with open(call_before_uri, "rb") as f: - call_before_json = deserialize_node_asset(f.read(), "call_before") - - call_after_id = task["call_after_id"] - call_after_uri = resources["deps"][call_after_id] - if call_after_uri.startswith(prefix): - call_after_uri = call_after_uri[prefix_len:] - with open(call_after_uri, "rb") as f: - call_after_json = deserialize_node_asset(f.read(), "call_after") + # Load deps, call_before, and call_after + hooks_uri = resources["hooks"][task_id] + if hooks_uri.startswith(prefix): + hooks_uri = hooks_uri[prefix_len:] + with open(hooks_uri, "rb") as f: + hooks_json = deserialize_node_asset(f.read(), "hooks") + deps_json = hooks_json.get("deps", {}) + call_before_json = hooks_json.get("call_before", []) + call_after_json = hooks_json.get("call_after", []) # Assemble and invoke the task call_before, call_after = _gather_deps( @@ -500,16 +481,14 @@ def run_task_from_uris_alt( exception_occurred = True tb = "".join(traceback.TracebackException.from_exception(ex).format()) print(tb, file=sys.stderr) - result_uri = "" stdout.flush() stderr.flush() stdout_size = os.path.getsize(stdout_uri) stderr_size = os.path.getsize(stderr_uri) - qelectron_db_size = len(qelectron_db_bytes) result_summary = { "node_id": task_id, "output": { - "uri": result_uri, + "uri": "", "size": 0, }, "stdout": { @@ -521,8 +500,8 @@ def run_task_from_uris_alt( "size": stderr_size, }, "qelectron_db": { - "uri": qelectron_db_uri, - "size": qelectron_db_size, + "uri": "", + "size": 0, }, "exception_occurred": exception_occurred, } diff --git a/covalent_dispatcher/_core/runner.py b/covalent_dispatcher/_core/runner.py index b38cc1058..7477b61c0 100644 --- a/covalent_dispatcher/_core/runner.py +++ b/covalent_dispatcher/_core/runner.py @@ -218,16 +218,14 @@ async def _run_task( async def _gather_deps(dispatch_id: str, node_id: int) -> Tuple[List, List]: """Assemble deps for a node into the final call_before and call_after""" - deps_attrs = await datasvc.electron.get( - dispatch_id, node_id, ["deps", "call_before", "call_after"] - ) + resp = await datasvc.electron.get(dispatch_id, node_id, ["hooks"]) - deps = deps_attrs["deps"] + hooks = resp["hooks"] # Assemble call_before and call_after from all the deps - - call_before_objs_json = deps_attrs["call_before"] - call_after_objs_json = deps_attrs["call_after"] + deps = hooks.get("deps", {}) + call_before_objs_json = hooks.get("call_before", []) + call_after_objs_json = hooks.get("call_after", []) call_before = [] call_after = [] diff --git a/covalent_dispatcher/_core/runner_ng.py b/covalent_dispatcher/_core/runner_ng.py index b3988de88..5c98a7721 100644 --- a/covalent_dispatcher/_core/runner_ng.py +++ b/covalent_dispatcher/_core/runner_ng.py @@ -85,35 +85,20 @@ async def _submit_abstract_task_group( if not type(executor).SUPPORTS_MANAGED_EXECUTION: raise NotImplementedError("Executor does not support managed execution") - resources = {"functions": {}, "inputs": {}, "deps": {}} + resources = {"functions": {}, "inputs": {}, "hooks": {}} # Get upload URIs for task_spec in task_seq: task_id = task_spec["function_id"] function_uri = executor.get_upload_uri(task_group_metadata, f"function-{task_id}") - deps_uri = executor.get_upload_uri(task_group_metadata, f"deps-{task_id}") - call_before_uri = executor.get_upload_uri( - task_group_metadata, f"call_before-{task_id}" - ) - call_after_uri = executor.get_upload_uri(task_group_metadata, f"call_after-{task_id}") + hooks_uri = executor.get_upload_uri(task_group_metadata, f"hooks-{task_id}") await am.upload_asset_for_nodes(dispatch_id, "function", {task_id: function_uri}) - await am.upload_asset_for_nodes(dispatch_id, "deps", {task_id: deps_uri}) - await am.upload_asset_for_nodes(dispatch_id, "call_before", {task_id: call_before_uri}) - await am.upload_asset_for_nodes(dispatch_id, "call_after", {task_id: call_after_uri}) - - deps_id = f"deps-{task_id}" - call_before_id = f"call_before-{task_id}" - call_after_id = f"call_after-{task_id}" - task_spec["deps_id"] = deps_id - task_spec["call_before_id"] = call_before_id - task_spec["call_after_id"] = call_after_id + await am.upload_asset_for_nodes(dispatch_id, "hooks", {task_id: hooks_uri}) resources["functions"][task_id] = function_uri - resources["deps"][deps_id] = deps_uri - resources["deps"][call_before_id] = call_before_uri - resources["deps"][call_after_id] = call_after_uri + resources["hooks"][task_id] = hooks_uri task_specs.append(TaskSpec(**task_spec)) diff --git a/covalent_dispatcher/_dal/db_interfaces/lattice_utils.py b/covalent_dispatcher/_dal/db_interfaces/lattice_utils.py index 7d941d7b6..676d0b68c 100644 --- a/covalent_dispatcher/_dal/db_interfaces/lattice_utils.py +++ b/covalent_dispatcher/_dal/db_interfaces/lattice_utils.py @@ -74,9 +74,7 @@ "lattice_imports": "lattice_imports_filename", "executor_data": "executor_data_filename", "workflow_executor_data": "workflow_executor_data_filename", - "deps": "deps_filename", - "call_before": "call_before_filename", - "call_after": "call_after_filename", + "hooks": "hooks_filename", } diff --git a/covalent_dispatcher/_dal/importers/electron.py b/covalent_dispatcher/_dal/importers/electron.py index 60b1a3e0d..d4b5047c5 100644 --- a/covalent_dispatcher/_dal/importers/electron.py +++ b/covalent_dispatcher/_dal/importers/electron.py @@ -26,12 +26,10 @@ from covalent._shared_files import logger from covalent._shared_files.schemas.electron import ( ASSET_FILENAME_MAP, - ELECTRON_CALL_AFTER_FILENAME, - ELECTRON_CALL_BEFORE_FILENAME, - ELECTRON_DEPS_FILENAME, ELECTRON_ERROR_FILENAME, ELECTRON_FUNCTION_FILENAME, ELECTRON_FUNCTION_STRING_FILENAME, + ELECTRON_HOOKS_FILENAME, ELECTRON_RESULTS_FILENAME, ELECTRON_STDERR_FILENAME, ELECTRON_STDOUT_FILENAME, @@ -111,9 +109,7 @@ def _get_electron_meta( "stdout_filename": ELECTRON_STDOUT_FILENAME, "stderr_filename": ELECTRON_STDERR_FILENAME, "error_filename": ELECTRON_ERROR_FILENAME, - "deps_filename": ELECTRON_DEPS_FILENAME, - "call_before_filename": ELECTRON_CALL_BEFORE_FILENAME, - "call_after_filename": ELECTRON_CALL_AFTER_FILENAME, + "hooks_filename": ELECTRON_HOOKS_FILENAME, } kwargs.update(legacy_kwargs) return kwargs diff --git a/covalent_dispatcher/_dal/importers/lattice.py b/covalent_dispatcher/_dal/importers/lattice.py index 7c5870b30..a14938f98 100644 --- a/covalent_dispatcher/_dal/importers/lattice.py +++ b/covalent_dispatcher/_dal/importers/lattice.py @@ -24,14 +24,12 @@ from covalent._shared_files.config import get_config from covalent._shared_files.schemas.lattice import ( - LATTICE_CALL_AFTER_FILENAME, - LATTICE_CALL_BEFORE_FILENAME, LATTICE_COVA_IMPORTS_FILENAME, - LATTICE_DEPS_FILENAME, LATTICE_DOCSTRING_FILENAME, LATTICE_ERROR_FILENAME, LATTICE_FUNCTION_FILENAME, LATTICE_FUNCTION_STRING_FILENAME, + LATTICE_HOOKS_FILENAME, LATTICE_INPUTS_FILENAME, LATTICE_LATTICE_IMPORTS_FILENAME, LATTICE_NAMED_ARGS_FILENAME, @@ -76,9 +74,7 @@ def _get_lattice_meta(lat: LatticeSchema, storage_path) -> dict: "named_args_filename": LATTICE_NAMED_ARGS_FILENAME, "named_kwargs_filename": LATTICE_NAMED_KWARGS_FILENAME, "results_filename": LATTICE_RESULTS_FILENAME, - "deps_filename": LATTICE_DEPS_FILENAME, - "call_before_filename": LATTICE_CALL_BEFORE_FILENAME, - "call_after_filename": LATTICE_CALL_AFTER_FILENAME, + "hooks_filename": LATTICE_HOOKS_FILENAME, "cova_imports_filename": LATTICE_COVA_IMPORTS_FILENAME, "lattice_imports_filename": LATTICE_LATTICE_IMPORTS_FILENAME, } diff --git a/covalent_dispatcher/_db/models.py b/covalent_dispatcher/_db/models.py index b5d1fe408..7e0521c35 100644 --- a/covalent_dispatcher/_db/models.py +++ b/covalent_dispatcher/_db/models.py @@ -101,14 +101,8 @@ class Lattice(Base): # name of the file containing the serialized output results_filename = Column(Text) - # Name of the file containing the default electron dependencies - deps_filename = Column(Text) - - # Name of the file containing the default list of callables before electrons are executed - call_before_filename = Column(Text) - - # Name of the file containing the default list of callables after electrons are executed - call_after_filename = Column(Text) + # Name of the file containing the default electron hooks + hooks_filename = Column(Text) # Name of the file containing the set of cova imports cova_imports_filename = Column(Text) @@ -189,13 +183,7 @@ class Electron(Base): stdout_filename = Column(Text) # Name of the file containing the electron execution dependencies - deps_filename = Column(Text) - - # Name of the file containing the functions that are called before electron execution - call_before_filename = Column(Text) - - # Name of the file containing the functions that are called before electron execution - call_after_filename = Column(Text) + hooks_filename = Column(Text) # Whether qelectron data exists or not qelectron_data_exists = Column(Boolean, nullable=False, default=False) diff --git a/covalent_dispatcher/_db/upsert.py b/covalent_dispatcher/_db/upsert.py index cc2e76309..3bd7f0ca7 100644 --- a/covalent_dispatcher/_db/upsert.py +++ b/covalent_dispatcher/_db/upsert.py @@ -50,9 +50,7 @@ ELECTRON_QELECTRON_DB_FILENAME = ELECTRON_FILENAMES["qelectron_db"] ELECTRON_ERROR_FILENAME = ELECTRON_FILENAMES["error"] ELECTRON_RESULTS_FILENAME = ELECTRON_FILENAMES["output"] -ELECTRON_DEPS_FILENAME = ELECTRON_FILENAMES["deps"] -ELECTRON_CALL_BEFORE_FILENAME = ELECTRON_FILENAMES["call_before"] -ELECTRON_CALL_AFTER_FILENAME = ELECTRON_FILENAMES["call_after"] +ELECTRON_HOOKS_FILENAME = ELECTRON_FILENAMES["hooks"] ELECTRON_STORAGE_TYPE = "file" LATTICE_FUNCTION_FILENAME = LATTICE_FILENAMES["workflow_function"] LATTICE_FUNCTION_STRING_FILENAME = LATTICE_FILENAMES["workflow_function_string"] @@ -62,9 +60,7 @@ LATTICE_NAMED_ARGS_FILENAME = LATTICE_FILENAMES["named_args"] LATTICE_NAMED_KWARGS_FILENAME = LATTICE_FILENAMES["named_kwargs"] LATTICE_RESULTS_FILENAME = LATTICE_FILENAMES["result"] -LATTICE_DEPS_FILENAME = LATTICE_FILENAMES["deps"] -LATTICE_CALL_BEFORE_FILENAME = LATTICE_FILENAMES["call_before"] -LATTICE_CALL_AFTER_FILENAME = LATTICE_FILENAMES["call_after"] +LATTICE_HOOKS_FILENAME = LATTICE_FILENAMES["hooks"] LATTICE_COVA_IMPORTS_FILENAME = LATTICE_FILENAMES["cova_imports"] LATTICE_LATTICE_IMPORTS_FILENAME = LATTICE_FILENAMES["lattice_imports"] LATTICE_STORAGE_TYPE = "file" @@ -115,9 +111,7 @@ def _lattice_data(session: Session, result: Result, electron_id: int = None) -> ("named_args", LATTICE_NAMED_ARGS_FILENAME, result.lattice.named_args), ("named_kwargs", LATTICE_NAMED_KWARGS_FILENAME, result.lattice.named_kwargs), ("result", LATTICE_RESULTS_FILENAME, result._result), - ("deps", LATTICE_DEPS_FILENAME, result.lattice.metadata["deps"]), - ("call_before", LATTICE_CALL_BEFORE_FILENAME, result.lattice.metadata["call_before"]), - ("call_after", LATTICE_CALL_AFTER_FILENAME, result.lattice.metadata["call_after"]), + ("hooks", LATTICE_HOOKS_FILENAME, result.lattice.metadata["hooks"]), ("cova_imports", LATTICE_COVA_IMPORTS_FILENAME, result.lattice.cova_imports), ("lattice_imports", LATTICE_LATTICE_IMPORTS_FILENAME, result.lattice.lattice_imports), ]: @@ -170,9 +164,7 @@ def _lattice_data(session: Session, result: Result, electron_id: int = None) -> "named_args_filename": LATTICE_NAMED_ARGS_FILENAME, "named_kwargs_filename": LATTICE_NAMED_KWARGS_FILENAME, "results_filename": LATTICE_RESULTS_FILENAME, - "deps_filename": LATTICE_DEPS_FILENAME, - "call_before_filename": LATTICE_CALL_BEFORE_FILENAME, - "call_after_filename": LATTICE_CALL_AFTER_FILENAME, + "hooks_filename": LATTICE_HOOKS_FILENAME, "cova_imports_filename": LATTICE_COVA_IMPORTS_FILENAME, "lattice_imports_filename": LATTICE_LATTICE_IMPORTS_FILENAME, "results_dir": results_dir, @@ -294,16 +286,10 @@ def _electron_data( ("function", ELECTRON_FUNCTION_FILENAME, tg.get_node_value(node_id, "function")), ("function_string", ELECTRON_FUNCTION_STRING_FILENAME, function_string), ("value", ELECTRON_VALUE_FILENAME, node_value), - ("deps", ELECTRON_DEPS_FILENAME, tg.get_node_value(node_id, "metadata")["deps"]), ( - "call_before", - ELECTRON_CALL_BEFORE_FILENAME, - tg.get_node_value(node_id, "metadata")["call_before"], - ), - ( - "call_after", - ELECTRON_CALL_AFTER_FILENAME, - tg.get_node_value(node_id, "metadata")["call_after"], + "hooks", + ELECTRON_HOOKS_FILENAME, + tg.get_node_value(node_id, "metadata")["hooks"], ), ("stdout", ELECTRON_STDOUT_FILENAME, node_stdout), ("stderr", ELECTRON_STDERR_FILENAME, node_stderr), @@ -359,9 +345,7 @@ def _electron_data( "stdout_filename": ELECTRON_STDOUT_FILENAME, "stderr_filename": ELECTRON_STDERR_FILENAME, "error_filename": ELECTRON_ERROR_FILENAME, - "deps_filename": ELECTRON_DEPS_FILENAME, - "call_before_filename": ELECTRON_CALL_BEFORE_FILENAME, - "call_after_filename": ELECTRON_CALL_AFTER_FILENAME, + "hooks_filename": ELECTRON_HOOKS_FILENAME, "qelectron_data_exists": node_qelectron_data_exists, "job_id": job_row.id, "created_at": timestamp, diff --git a/covalent_dispatcher/_db/write_result_to_db.py b/covalent_dispatcher/_db/write_result_to_db.py index a334e395b..9d928c1ec 100644 --- a/covalent_dispatcher/_db/write_result_to_db.py +++ b/covalent_dispatcher/_db/write_result_to_db.py @@ -98,9 +98,7 @@ def transaction_insert_lattices_data( named_args_filename: str, named_kwargs_filename: str, results_filename: str, - deps_filename: str, - call_before_filename: str, - call_after_filename: str, + hooks_filename: str, cova_imports_filename: str, lattice_imports_filename: str, results_dir: str, @@ -138,9 +136,7 @@ def transaction_insert_lattices_data( named_args_filename=named_args_filename, named_kwargs_filename=named_kwargs_filename, results_filename=results_filename, - deps_filename=deps_filename, - call_before_filename=call_before_filename, - call_after_filename=call_after_filename, + hooks_filename=hooks_filename, cova_imports_filename=cova_imports_filename, lattice_imports_filename=lattice_imports_filename, results_dir=results_dir, @@ -248,9 +244,7 @@ def transaction_insert_electrons_data( stdout_filename: str, stderr_filename: str, error_filename: str, - deps_filename: str, - call_before_filename: str, - call_after_filename: str, + hooks_filename: str, job_id: int, qelectron_data_exists: bool, created_at: dt, @@ -291,9 +285,7 @@ def transaction_insert_electrons_data( stdout_filename=stdout_filename, stderr_filename=stderr_filename, error_filename=error_filename, - deps_filename=deps_filename, - call_before_filename=call_before_filename, - call_after_filename=call_after_filename, + hooks_filename=hooks_filename, qelectron_data_exists=qelectron_data_exists, is_active=True, job_id=job_id, diff --git a/covalent_dispatcher/_service/models.py b/covalent_dispatcher/_service/models.py index 43ac3410a..2d2f7db10 100644 --- a/covalent_dispatcher/_service/models.py +++ b/covalent_dispatcher/_service/models.py @@ -24,45 +24,6 @@ from covalent._shared_files.schemas.result import ResultSchema -# # Copied from _dal -# RESULT_ASSET_KEYS = { -# "inputs", -# "result", -# "error", -# } - -# # Copied from _dal -# LATTICE_ASSET_KEYS = { -# "workflow_function", -# "workflow_function_string", -# "__doc__", -# "named_args", -# "named_kwargs", -# "cova_imports", -# "lattice_imports", -# # metadata -# "executor_data", -# "workflow_executor_data", -# "deps", -# "call_before", -# "call_after", -# } - -# # Copied from _dal -# ELECTRON_ASSET_KEYS = { -# "function", -# "function_string", -# "output", -# "value", -# "error", -# "stdout", -# "stderr", -# # electron metadata -# "deps", -# "call_before", -# "call_after", -# } - range_regex = "bytes=([0-9]+)-([0-9]*)" range_pattern = re.compile(range_regex) @@ -82,9 +43,7 @@ class LatticeAssetKey(str, Enum): inputs = "inputs" named_args = "named_args" named_kwargs = "named_kwargs" - deps = "deps" - call_before = "call_before" - call_after = "call_after" + hooks = "hooks" cova_imports = "cova_imports" lattice_imports = "lattice_imports" @@ -94,13 +53,11 @@ class ElectronAssetKey(str, Enum): function_string = "function_string" output = "output" value = "value" - deps = "deps" + hooks = "hooks" error = "error" stdout = "stdout" stderr = "stderr" qelectron_db = "qelectron_db" - call_before = "call_before" - call_after = "call_after" class ExportResponseSchema(BaseModel): diff --git a/covalent_migrations/versions/3727163f275c_consolidate_electron_hooks.py b/covalent_migrations/versions/3727163f275c_consolidate_electron_hooks.py new file mode 100644 index 000000000..d1954d3f7 --- /dev/null +++ b/covalent_migrations/versions/3727163f275c_consolidate_electron_hooks.py @@ -0,0 +1,73 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the Apache License 2.0 (the "License"). A copy of the +# License may be obtained with this software package or at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Use of this file is prohibited except in compliance with the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Consolidate electron hooks + +Revision ID: 3727163f275c +Revises: 1142d81b29b8 +Create Date: 2023-12-07 10:47:38.837720 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +# pragma: allowlist nextline secret +revision = "3727163f275c" +# pragma: allowlist nextline secret +down_revision = "1142d81b29b8" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("electron_dependency", schema=None) as batch_op: + batch_op.create_foreign_key("child_electron_link", "electrons", ["electron_id"], ["id"]) + + with op.batch_alter_table("electrons", schema=None) as batch_op: + batch_op.add_column(sa.Column("hooks_filename", sa.Text(), nullable=True)) + batch_op.drop_column("call_after_filename") + batch_op.drop_column("call_before_filename") + batch_op.drop_column("deps_filename") + + with op.batch_alter_table("lattices", schema=None) as batch_op: + batch_op.add_column(sa.Column("hooks_filename", sa.Text(), nullable=True)) + batch_op.drop_column("call_after_filename") + batch_op.drop_column("call_before_filename") + batch_op.drop_column("deps_filename") + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("lattices", schema=None) as batch_op: + batch_op.add_column(sa.Column("deps_filename", sa.TEXT(), nullable=True)) + batch_op.add_column(sa.Column("call_before_filename", sa.TEXT(), nullable=True)) + batch_op.add_column(sa.Column("call_after_filename", sa.TEXT(), nullable=True)) + batch_op.drop_column("hooks_filename") + + with op.batch_alter_table("electrons", schema=None) as batch_op: + batch_op.add_column(sa.Column("deps_filename", sa.TEXT(), nullable=True)) + batch_op.add_column(sa.Column("call_before_filename", sa.TEXT(), nullable=True)) + batch_op.add_column(sa.Column("call_after_filename", sa.TEXT(), nullable=True)) + batch_op.drop_column("hooks_filename") + + with op.batch_alter_table("electron_dependency", schema=None) as batch_op: + batch_op.drop_constraint("child_electron_link", type_="foreignkey") + + # ### end Alembic commands ### diff --git a/covalent_ui/api/v1/data_layer/electron_dal.py b/covalent_ui/api/v1/data_layer/electron_dal.py index 4a4707146..5f85c965a 100644 --- a/covalent_ui/api/v1/data_layer/electron_dal.py +++ b/covalent_ui/api/v1/data_layer/electron_dal.py @@ -258,9 +258,7 @@ def get_electrons_id(self, dispatch_id, electron_id) -> Electron: Electron.results_filename, Electron.value_filename, Electron.stdout_filename, - Electron.deps_filename, - Electron.call_before_filename, - Electron.call_after_filename, + Electron.hooks_filename, Electron.stderr_filename, Electron.error_filename, Electron.name, diff --git a/covalent_ui/api/v1/database/schema/electron.py b/covalent_ui/api/v1/database/schema/electron.py index 635240918..01a36bca8 100644 --- a/covalent_ui/api/v1/database/schema/electron.py +++ b/covalent_ui/api/v1/database/schema/electron.py @@ -44,9 +44,7 @@ class Electron(Base): key: For generated and subscript nodes stdout_filename: Name of the file containing standard output generated by the task stderr_filename: Name of the file containing standard error generated by the task - deps_filename: Name of the file containing depends instances of DepsBash and DepsPip - call_before_filename: Name of the file containing list of DepsCall objects - call_after_filename : Name of the file containing list of DepsCall objects + hooks_filename: Name of the file containing depends instances of DepsBash, DepsPip, and DepsCall error_filename: Name of the file containing execution information generated at runtime is_active: Status of the record, 1: active and 0: inactive job_id: ID for circuit_info @@ -106,13 +104,7 @@ class Electron(Base): stdout_filename = Column(Text) # Name of the file containing the electron execution dependencies - deps_filename = Column(Text) - - # Name of the file containing the functions that are called before electron execution - call_before_filename = Column(Text) - - # Name of the file containing the functions that are called after electron execution - call_after_filename = Column(Text) + hooks_filename = Column(Text) # Name of the file containing standard error generated by the task stderr_filename = Column(Text) diff --git a/covalent_ui/api/v1/database/schema/lattices.py b/covalent_ui/api/v1/database/schema/lattices.py index 97212a9f1..330fa4915 100644 --- a/covalent_ui/api/v1/database/schema/lattices.py +++ b/covalent_ui/api/v1/database/schema/lattices.py @@ -107,13 +107,7 @@ class Lattice(Base): results_filename = Column(Text) # Name of the file containing the default electron dependencies - deps_filename = Column(Text) - - # Name of the file containing the default list of callables before electrons are executed - call_before_filename = Column(Text) - - # Name of the file containing the default list of callables after electrons are executed - call_after_filename = Column(Text) + hooks_filename = Column(Text) # Name of the file containing the set of cova imports cova_imports_filename = Column(Text) diff --git a/covalent_ui/api/v1/routes/end_points/electron_routes.py b/covalent_ui/api/v1/routes/end_points/electron_routes.py index 616f1c806..43b81f3ee 100644 --- a/covalent_ui/api/v1/routes/end_points/electron_routes.py +++ b/covalent_ui/api/v1/routes/end_points/electron_routes.py @@ -175,7 +175,7 @@ def get_electron_file(dispatch_id: uuid.UUID, electron_id: int, name: ElectronFi dispatch_id: Dispatch id of lattice/sublattice electron_id: Transport graph node id of a electron name: refers file type, like inputs, function_string, function, executor, result, value, key, - stdout, deps, call_before, call_after, error, info + stdout, hooks, error, info Returns: Returns electron details based on the given name """ @@ -221,14 +221,8 @@ def get_electron_file(dispatch_id: uuid.UUID, electron_id: int, name: ElectronFi elif name == "stdout": response = handler.read_from_text(result["stdout_filename"]) return ElectronFileResponse(data=response) - elif name == "deps": - response = handler.read_from_serialized(result["deps_filename"]) - return ElectronFileResponse(data=response) - elif name == "call_before": - response = handler.read_from_serialized(result["call_before_filename"]) - return ElectronFileResponse(data=response) - elif name == "call_after": - response = handler.read_from_serialized(result["call_after_filename"]) + elif name == "hooks": + response = handler.read_from_serialized(result["hooks_filename"]) return ElectronFileResponse(data=response) elif name == "error": # Error and stderr won't be both populated if `error` diff --git a/tests/covalent_dispatcher_tests/_core/data_modules/asset_manager_db_integration_test.py b/tests/covalent_dispatcher_tests/_core/data_modules/asset_manager_db_integration_test.py index efe75dddd..2cfbeeb01 100644 --- a/tests/covalent_dispatcher_tests/_core/data_modules/asset_manager_db_integration_test.py +++ b/tests/covalent_dispatcher_tests/_core/data_modules/asset_manager_db_integration_test.py @@ -134,10 +134,12 @@ async def test_download_assets_for_node(test_db, mocker): assets = { "output": { "remote_uri": "", + "size": 0, }, - "stdout": {"remote_uri": src_uri_stdout, "size": None, "digest": "0af23"}, + "stdout": {"remote_uri": src_uri_stdout, "size": 5, "digest": "0af23"}, "stderr": { "remote_uri": src_uri_stderr, + "size": 0, }, } assets = {k: AssetUpdate(**v) for k, v in assets.items()} @@ -145,13 +147,16 @@ async def test_download_assets_for_node(test_db, mocker): expected_update = { "output": { "remote_uri": "", + "size": 0, }, "stdout": { "remote_uri": src_uri_stdout, "digest": "0af23", + "size": 5, }, "stderr": { "remote_uri": src_uri_stderr, + "size": 0, }, } await am.download_assets_for_node( diff --git a/tests/covalent_dispatcher_tests/_core/runner_ng_test.py b/tests/covalent_dispatcher_tests/_core/runner_ng_test.py index c1d095cdb..4e6f8a88d 100644 --- a/tests/covalent_dispatcher_tests/_core/runner_ng_test.py +++ b/tests/covalent_dispatcher_tests/_core/runner_ng_test.py @@ -158,14 +158,10 @@ async def test_submit_abstract_task_group(mocker, task_cancelled): } mock_function_uri_0 = me.get_upload_uri(task_group_metadata, "function-0") - mock_deps_uri_0 = me.get_upload_uri(task_group_metadata, "deps-0") - mock_cb_uri_0 = me.get_upload_uri(task_group_metadata, "call_before-0") - mock_ca_uri_0 = me.get_upload_uri(task_group_metadata, "call_after-0") + mock_hooks_uri_0 = me.get_upload_uri(task_group_metadata, "hooks-0") mock_function_uri_3 = me.get_upload_uri(task_group_metadata, "function-3") - mock_deps_uri_3 = me.get_upload_uri(task_group_metadata, "deps-3") - mock_cb_uri_3 = me.get_upload_uri(task_group_metadata, "call_before-3") - mock_ca_uri_3 = me.get_upload_uri(task_group_metadata, "call_after-3") + mock_hooks_uri_3 = me.get_upload_uri(task_group_metadata, "hooks-3") mock_node_upload_uri_1 = me.get_upload_uri(task_group_metadata, "node_1") mock_node_upload_uri_2 = me.get_upload_uri(task_group_metadata, "node_2") @@ -173,14 +169,8 @@ async def test_submit_abstract_task_group(mocker, task_cancelled): mock_function_id_0 = 0 mock_args_ids = abstract_inputs["args"] mock_kwargs_ids = abstract_inputs["kwargs"] - mock_deps_id_0 = "deps-0" - mock_cb_id_0 = "call_before-0" - mock_ca_id_0 = "call_after-0" mock_function_id_3 = 3 - mock_deps_id_3 = "deps-3" - mock_cb_id_3 = "call_before-3" - mock_ca_id_3 = "call_after-3" resources = { "functions": { @@ -191,32 +181,19 @@ async def test_submit_abstract_task_group(mocker, task_cancelled): 1: mock_node_upload_uri_1, 2: mock_node_upload_uri_2, }, - "deps": { - mock_deps_id_0: mock_deps_uri_0, - mock_cb_id_0: mock_cb_uri_0, - mock_ca_id_0: mock_ca_uri_0, - mock_deps_id_3: mock_deps_uri_3, - mock_cb_id_3: mock_cb_uri_3, - mock_ca_id_3: mock_ca_uri_3, - }, + "hooks": {0: mock_hooks_uri_0, 3: mock_hooks_uri_3}, } mock_task_spec_0 = { "function_id": mock_function_id_0, "args_ids": mock_args_ids, "kwargs_ids": mock_kwargs_ids, - "deps_id": mock_deps_id_0, - "call_before_id": mock_cb_id_0, - "call_after_id": mock_ca_id_0, } mock_task_spec_3 = { "function_id": mock_function_id_3, "args_ids": mock_args_ids, "kwargs_ids": mock_kwargs_ids, - "deps_id": mock_deps_id_3, - "call_before_id": mock_cb_id_3, - "call_after_id": mock_ca_id_3, } mock_task_0 = { diff --git a/tests/covalent_dispatcher_tests/_db/update_test.py b/tests/covalent_dispatcher_tests/_db/update_test.py index 1d9230f67..567c83bc9 100644 --- a/tests/covalent_dispatcher_tests/_db/update_test.py +++ b/tests/covalent_dispatcher_tests/_db/update_test.py @@ -128,9 +128,7 @@ def test_result_persist_workflow_1(test_db, result_1, mocker): assert lattice_row.electron_id is None assert lattice_row.executor == "local" assert lattice_row.workflow_executor == "local" - assert lattice_row.deps_filename == upsert.LATTICE_DEPS_FILENAME - assert lattice_row.call_before_filename == upsert.LATTICE_CALL_BEFORE_FILENAME - assert lattice_row.call_after_filename == upsert.LATTICE_CALL_AFTER_FILENAME + assert lattice_row.hooks_filename == upsert.LATTICE_HOOKS_FILENAME assert lattice_row.root_dispatch_id == "dispatch_1" assert lattice_row.results_dir == result_1.results_dir @@ -141,18 +139,15 @@ def test_result_persist_workflow_1(test_db, result_1, mocker): storage_path=lattice_storage_path, filename=lattice_row.function_filename ).get_deserialized() assert workflow_function(1, 2) == 4 - assert ( + with pytest.raises(FileNotFoundError): local_store.load_file( storage_path=lattice_storage_path, filename=lattice_row.error_filename ) - == "" - ) - assert ( + + with pytest.raises(FileNotFoundError): local_store.load_file( storage_path=lattice_storage_path, filename=lattice_row.results_filename - ).get_deserialized() - is None - ) + ) executor_data = json.loads(lattice_row.executor_data) @@ -182,22 +177,10 @@ def test_result_persist_workflow_1(test_db, result_1, mocker): if electron.transport_graph_node_id == 1: assert ( local_store.load_file( - storage_path=electron.storage_path, filename=electron.deps_filename + storage_path=electron.storage_path, filename=electron.hooks_filename ) == {} ) - assert ( - local_store.load_file( - storage_path=electron.storage_path, filename=electron.call_before_filename - ) - == [] - ) - assert ( - local_store.load_file( - storage_path=electron.storage_path, filename=electron.call_after_filename - ) - == [] - ) if electron.transport_graph_node_id == 3: executor_data = json.loads(electron.executor_data) diff --git a/tests/covalent_dispatcher_tests/_db/write_result_to_db_test.py b/tests/covalent_dispatcher_tests/_db/write_result_to_db_test.py index e5558fc05..26b114913 100644 --- a/tests/covalent_dispatcher_tests/_db/write_result_to_db_test.py +++ b/tests/covalent_dispatcher_tests/_db/write_result_to_db_test.py @@ -67,9 +67,7 @@ STDERR_FILENAME = "stderr.log" ERROR_FILENAME = "error.log" TRANSPORT_GRAPH_FILENAME = "transport_graph.pkl" -DEPS_FILENAME = "deps.pkl" -CALL_BEFORE_FILENAME = "call_before.pkl" -CALL_AFTER_FILENAME = "call_after.pkl" +HOOKS_FILENAME = "hooks.pkl" COVA_IMPORTS_FILENAME = "cova_imports.json" LATTICE_IMPORTS_FILENAME = "lattice_imports.txt" RESULTS_DIR = "/tmp/results" @@ -131,9 +129,7 @@ def get_lattice_kwargs( named_args_filename=NAMED_ARGS_FILENAME, named_kwargs_filename=NAMED_KWARGS_FILENAME, results_filename=RESULTS_FILENAME, - deps_filename=DEPS_FILENAME, - call_before_filename=CALL_BEFORE_FILENAME, - call_after_filename=CALL_AFTER_FILENAME, + hooks_filename=HOOKS_FILENAME, cova_imports_filename=COVA_IMPORTS_FILENAME, lattice_imports_filename=LATTICE_IMPORTS_FILENAME, results_dir=RESULTS_DIR, @@ -166,9 +162,7 @@ def get_lattice_kwargs( "named_args_filename": named_args_filename, "named_kwargs_filename": named_kwargs_filename, "results_filename": results_filename, - "deps_filename": deps_filename, - "call_before_filename": call_before_filename, - "call_after_filename": call_after_filename, + "hooks_filename": hooks_filename, "cova_imports_filename": cova_imports_filename, "lattice_imports_filename": lattice_imports_filename, "results_dir": results_dir, @@ -198,9 +192,7 @@ def get_electron_kwargs( stdout_filename=STDOUT_FILENAME, stderr_filename=STDERR_FILENAME, error_filename=ERROR_FILENAME, - deps_filename=DEPS_FILENAME, - call_before_filename=CALL_BEFORE_FILENAME, - call_after_filename=CALL_AFTER_FILENAME, + hooks_filename=HOOKS_FILENAME, job_id=1, qelectron_data_exists=False, created_at=None, @@ -228,9 +220,7 @@ def get_electron_kwargs( "stdout_filename": stdout_filename, "stderr_filename": stderr_filename, "error_filename": error_filename, - "deps_filename": deps_filename, - "call_before_filename": call_before_filename, - "call_after_filename": call_after_filename, + "hooks_filename": hooks_filename, "job_id": job_id, "qelectron_data_exists": qelectron_data_exists, "created_at": created_at, @@ -299,9 +289,7 @@ def test_insert_lattices_data(test_db, mocker): assert lattice.named_args_filename == NAMED_ARGS_FILENAME assert lattice.named_kwargs_filename == NAMED_KWARGS_FILENAME assert lattice.results_filename == RESULTS_FILENAME - assert lattice.deps_filename == DEPS_FILENAME - assert lattice.call_before_filename == CALL_BEFORE_FILENAME - assert lattice.call_after_filename == CALL_AFTER_FILENAME + assert lattice.hooks_filename == HOOKS_FILENAME assert lattice.cova_imports_filename == COVA_IMPORTS_FILENAME assert lattice.lattice_imports_filename == LATTICE_IMPORTS_FILENAME assert lattice.results_dir == RESULTS_DIR diff --git a/tests/functional_tests/workflow_stack_test.py b/tests/functional_tests/workflow_stack_test.py index 5d76993b3..47e1578a7 100644 --- a/tests/functional_tests/workflow_stack_test.py +++ b/tests/functional_tests/workflow_stack_test.py @@ -25,7 +25,6 @@ import covalent._dispatcher_plugins.local as local import covalent._results_manager.results_manager as rm from covalent._results_manager.result import Result -from covalent.executor import LocalExecutor def construct_temp_cache_dir(): @@ -140,7 +139,7 @@ def task_1(): def task_2(): pass - @ct.lattice(executor=LocalExecutor(), deps_bash=l_bash_dep) + @ct.lattice(deps_bash=l_bash_dep) def workflow(): task_1() task_2() @@ -149,8 +148,8 @@ def workflow(): res = ct.get_result(dispatch_id, wait=True) tg = res.lattice.transport_graph - dep_0 = ct.DepsBash().from_dict(tg.get_node_value(0, "metadata")["deps"]["bash"]) - dep_1 = ct.DepsBash().from_dict(tg.get_node_value(1, "metadata")["deps"]["bash"]) + dep_0 = ct.DepsBash().from_dict(tg.get_node_value(0, "metadata")["hooks"]["deps"]["bash"]) + dep_1 = ct.DepsBash().from_dict(tg.get_node_value(1, "metadata")["hooks"]["deps"]["bash"]) assert dep_0.commands == ["ls"] assert dep_1.commands == ["ls -l"]