Skip to content

Commit

Permalink
Consolidate deps, call_before, call_after into hooks asset
Browse files Browse the repository at this point in the history
  • Loading branch information
cjao committed Dec 8, 2023
1 parent ef13c22 commit 127e4bd
Show file tree
Hide file tree
Showing 31 changed files with 264 additions and 421 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
asset metadata. Although the exact numerical value is not yet
important, whether the size is reported to be zero or positive does
have consequences.
- Pack deps, call_before, and call_after assets into one file.

### Fixed

- Reduced number of assets to upload when submitting a dispatch.

### Operations

Expand Down
8 changes: 2 additions & 6 deletions covalent/_results_manager/results_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,15 @@
SDK_NODE_META_KEYS = {
"executor",
"executor_data",
"deps",
"call_before",
"call_after",
"hooks",
}

SDK_LAT_META_KEYS = {
"executor",
"executor_data",
"workflow_executor",
"workflow_executor_data",
"deps",
"call_before",
"call_after",
"hooks",
}

DEFERRED_KEYS = {
Expand Down
39 changes: 7 additions & 32 deletions covalent/_serialize/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
"function_string": AssetType.TEXT,
"value": AssetType.TRANSPORTABLE,
"output": AssetType.TRANSPORTABLE,
"deps": AssetType.JSONABLE,
"call_before": AssetType.JSONABLE,
"call_after": AssetType.JSONABLE,
"hooks": AssetType.JSONABLE,
"qelectron_db": AssetType.BYTES,
"stdout": AssetType.TEXT,
"stderr": AssetType.TEXT,
Expand Down Expand Up @@ -160,27 +158,10 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron
ASSET_FILENAME_MAP["error"],
)

deps = node_attrs["metadata"]["deps"]
deps_asset = save_asset(
deps, ASSET_TYPES["deps"], node_storage_path, ASSET_FILENAME_MAP["deps"]
hooks = node_attrs["metadata"]["hooks"]
hooks_asset = save_asset(
hooks, ASSET_TYPES["hooks"], node_storage_path, ASSET_FILENAME_MAP["hooks"]
)

call_before = node_attrs["metadata"]["call_before"]
call_before_asset = save_asset(
call_before,
ASSET_TYPES["call_before"],
node_storage_path,
ASSET_FILENAME_MAP["call_before"],
)

call_after = node_attrs["metadata"]["call_after"]
call_after_asset = save_asset(
call_after,
ASSET_TYPES["call_after"],
node_storage_path,
ASSET_FILENAME_MAP["call_after"],
)

return ElectronAssets(
function=function_asset,
function_string=function_string_asset,
Expand All @@ -190,9 +171,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron
stderr=stderr_asset,
qelectron_db=qelectron_db_asset,
error=error_asset,
deps=deps_asset,
call_before=call_before_asset,
call_after=call_after_asset,
hooks=hooks_asset,
)


Expand All @@ -206,9 +185,7 @@ def _deserialize_node_assets(ea: ElectronAssets) -> dict:
qelectron_db = load_asset(ea.qelectron_db, ASSET_TYPES["qelectron_db"])
error = load_asset(ea.error, ASSET_TYPES["error"])

deps = load_asset(ea.deps, ASSET_TYPES["deps"])
call_before = load_asset(ea.call_before, ASSET_TYPES["call_before"])
call_after = load_asset(ea.call_after, ASSET_TYPES["call_after"])
hooks = load_asset(ea.hooks, ASSET_TYPES["hooks"])

return {
"function": function,
Expand All @@ -220,9 +197,7 @@ def _deserialize_node_assets(ea: ElectronAssets) -> dict:
"qelectron_db": qelectron_db,
"error": error,
"metadata": {
"deps": deps,
"call_before": call_before,
"call_after": call_after,
"hooks": hooks,
},
}

Expand Down
36 changes: 8 additions & 28 deletions covalent/_serialize/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
"named_kwargs": AssetType.TRANSPORTABLE,
"cova_imports": AssetType.JSONABLE,
"lattice_imports": AssetType.TEXT,
"deps": AssetType.JSONABLE,
"call_before": AssetType.JSONABLE,
"call_after": AssetType.JSONABLE,
"hooks": AssetType.JSONABLE,
}


Expand Down Expand Up @@ -141,23 +139,11 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets:
)

# NOTE: these are actually JSONable
deps_asset = save_asset(
lat.metadata["deps"],
ASSET_TYPES["deps"],
hooks_asset = save_asset(
lat.metadata["hooks"],
ASSET_TYPES["hooks"],
storage_path,
ASSET_FILENAME_MAP["deps"],
)
call_before_asset = save_asset(
lat.metadata["call_before"],
ASSET_TYPES["call_before"],
storage_path,
ASSET_FILENAME_MAP["call_before"],
)
call_after_asset = save_asset(
lat.metadata["call_after"],
ASSET_TYPES["call_after"],
storage_path,
ASSET_FILENAME_MAP["call_after"],
ASSET_FILENAME_MAP["hooks"],
)

return LatticeAssets(
Expand All @@ -169,9 +155,7 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets:
named_kwargs=named_kwargs_asset,
cova_imports=cova_imports_asset,
lattice_imports=lattice_imports_asset,
deps=deps_asset,
call_before=call_before_asset,
call_after=call_after_asset,
hooks=hooks_asset,
)


Expand All @@ -186,9 +170,7 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict:
named_kwargs = load_asset(assets.named_kwargs, ASSET_TYPES["named_kwargs"])
cova_imports = load_asset(assets.cova_imports, ASSET_TYPES["cova_imports"])
lattice_imports = load_asset(assets.lattice_imports, ASSET_TYPES["lattice_imports"])
deps = load_asset(assets.deps, ASSET_TYPES["deps"])
call_before = load_asset(assets.call_before, ASSET_TYPES["call_before"])
call_after = load_asset(assets.call_after, ASSET_TYPES["call_after"])
hooks = load_asset(assets.hooks, ASSET_TYPES["hooks"])
return {
"workflow_function": workflow_function,
"workflow_function_string": workflow_function_string,
Expand All @@ -199,9 +181,7 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict:
"cova_imports": cova_imports,
"lattice_imports": lattice_imports,
"metadata": {
"deps": deps,
"call_before": call_before,
"call_after": call_after,
"hooks": hooks,
},
}

Expand Down
7 changes: 2 additions & 5 deletions covalent/_shared_files/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
"""Create custom sentinels and defaults for Covalent"""

import os
from builtins import list
from dataclasses import dataclass, field
from typing import Dict, List
from typing import Dict

import dask.system

Expand Down Expand Up @@ -199,8 +198,6 @@ class DefaultConfig:
class DefaultMetadataValues:
executor: str = field(default_factory=get_default_executor)
executor_data: Dict = field(default_factory=dict)
deps: Dict = field(default_factory=dict)
call_before: List = field(default_factory=list)
call_after: List = field(default_factory=list)
hooks: Dict = field(default_factory=dict)
workflow_executor: str = field(default_factory=get_default_executor)
workflow_executor_data: Dict = field(default_factory=dict)
14 changes: 4 additions & 10 deletions covalent/_shared_files/schemas/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@
"stderr",
"qelectron_db",
# user dependent assets
"deps",
"call_before",
"call_after",
"hooks",
}

ELECTRON_FUNCTION_FILENAME = "function.tobj"
Expand All @@ -61,7 +59,7 @@
)
ELECTRON_ERROR_FILENAME = "error.log"
ELECTRON_RESULTS_FILENAME = "results.tobj"
ELECTRON_DEPS_FILENAME = "deps.json"
ELECTRON_HOOKS_FILENAME = "hooks.json"
ELECTRON_CALL_BEFORE_FILENAME = "call_before.json"
ELECTRON_CALL_AFTER_FILENAME = "call_after.json"
ELECTRON_STORAGE_TYPE = "file"
Expand All @@ -76,9 +74,7 @@
"stderr": ELECTRON_STDERR_FILENAME,
"qelectron_db": ELECTRON_QELECTRON_DB_FILENAME,
"error": ELECTRON_ERROR_FILENAME,
"deps": ELECTRON_DEPS_FILENAME,
"call_before": ELECTRON_CALL_BEFORE_FILENAME,
"call_after": ELECTRON_CALL_AFTER_FILENAME,
"hooks": ELECTRON_HOOKS_FILENAME,
}


Expand All @@ -93,9 +89,7 @@ class ElectronAssets(BaseModel):
qelectron_db: AssetSchema

# user dependent assets
deps: AssetSchema
call_before: AssetSchema
call_after: AssetSchema
hooks: AssetSchema


class ElectronMetadata(BaseModel):
Expand Down
14 changes: 4 additions & 10 deletions covalent/_shared_files/schemas/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
"cova_imports",
"lattice_imports",
# user dependent assets
"deps",
"call_before",
"call_after",
"hooks",
}

LATTICE_FUNCTION_FILENAME = "function.tobj"
Expand All @@ -59,7 +57,7 @@
LATTICE_NAMED_ARGS_FILENAME = "named_args.tobj"
LATTICE_NAMED_KWARGS_FILENAME = "named_kwargs.tobj"
LATTICE_RESULTS_FILENAME = "results.tobj"
LATTICE_DEPS_FILENAME = "deps.json"
LATTICE_HOOKS_FILENAME = "hooks.json"
LATTICE_CALL_BEFORE_FILENAME = "call_before.json"
LATTICE_CALL_AFTER_FILENAME = "call_after.json"
LATTICE_COVA_IMPORTS_FILENAME = "cova_imports.json"
Expand All @@ -76,9 +74,7 @@
"named_kwargs": LATTICE_NAMED_KWARGS_FILENAME,
"cova_imports": LATTICE_COVA_IMPORTS_FILENAME,
"lattice_imports": LATTICE_LATTICE_IMPORTS_FILENAME,
"deps": LATTICE_DEPS_FILENAME,
"call_before": LATTICE_CALL_BEFORE_FILENAME,
"call_after": LATTICE_CALL_AFTER_FILENAME,
"hooks": LATTICE_HOOKS_FILENAME,
}


Expand All @@ -93,9 +89,7 @@ class LatticeAssets(BaseModel):
lattice_imports: AssetSchema

# lattice.metadata
deps: AssetSchema
call_before: AssetSchema
call_after: AssetSchema
hooks: AssetSchema


class LatticeMetadata(BaseModel):
Expand Down
37 changes: 26 additions & 11 deletions covalent/_workflow/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,11 @@ def get_item(e, key):
iterable_metadata = self.metadata.copy()

filtered_call_before = []
for elem in iterable_metadata["call_before"]:
if elem["attributes"]["retval_keyword"] != "files":
filtered_call_before.append(elem)
iterable_metadata["call_before"] = filtered_call_before
if "call_before" in iterable_metadata["hooks"]:
for elem in iterable_metadata["hooks"]["call_before"]:
if elem["attributes"]["retval_keyword"] != "files":
filtered_call_before.append(elem)
iterable_metadata["hooks"]["call_before"] = filtered_call_before

# Pack with main electron unless it is a sublattice.
name = active_lattice.transport_graph.get_node_value(self.node_id, "name")
Expand Down Expand Up @@ -452,6 +453,7 @@ def __call__(self, *args, **kwargs) -> Union[Any, "Electron"]:

# Add a node to the transport graph of the active lattice. Electrons bound to nodes will never be packed with the
# 'master' Electron. # Add non-sublattice node to the transport graph of the active lattice.

self.node_id = active_lattice.transport_graph.add_node(
name=self.function.__name__,
function=self.function,
Expand All @@ -474,15 +476,20 @@ def __call__(self, *args, **kwargs) -> Union[Any, "Electron"]:

# For keyword arguments
# Filter out kwargs to be injected by call_before call_deps during execution.
call_before = self.metadata["call_before"]
retval_keywords = {item["attributes"]["retval_keyword"]: None for item in call_before}

retval_keywords = {}
if "call_before" in self.metadata["hooks"]:
call_before = self.metadata["hooks"]["call_before"]
retval_keywords = {
item["attributes"]["retval_keyword"]: None for item in call_before
}

for key, value in named_kwargs.items():
if key in retval_keywords:
app_log.debug(
f"kwarg {key} for function {self.function.__name__} to be injected at runtime"
)
continue

self.connect_node_with_others(
self.node_id, key, value, "kwarg", None, active_lattice.transport_graph
)
Expand Down Expand Up @@ -771,13 +778,21 @@ def electron(
if call_after:
call_after_final.extend(call_after)

if deps is None and call_before_final is None and call_after_final is None:
hooks = None
else:
hooks = {}
if deps is not None:
hooks["deps"] = deps
if call_before_final is not None:
hooks["call_before"] = call_before_final
if call_after_final is not None:
hooks["call_after"] = call_after_final

constraints = {
"executor": executor,
"deps": deps,
"call_before": call_before_final,
"call_after": call_after_final,
"hooks": hooks,
}

constraints = encode_metadata(constraints)

def decorator_electron(func=None):
Expand Down
Loading

0 comments on commit 127e4bd

Please sign in to comment.