Skip to content

Commit

Permalink
Reduce the number of assets to upload during dispatch
Browse files Browse the repository at this point in the history
Initialize unset assets to `None` and don' upload them.

When downloading assets during `get_result()`, ignore assets with zero
size.

Also require asset sizes when retrieving asset updates in
`Executor.receive()`.
  • Loading branch information
cjao committed Dec 10, 2023
1 parent c78a1d8 commit f461ce6
Show file tree
Hide file tree
Showing 29 changed files with 232 additions and 94 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [UNRELEASED]

### Changed

- SDK no longer uploads empty assets when submitting a dispatch.
- Results Manager avoids downloading assets with size 0.
- Local and Dask executor plugins now return accurate sizes of task
artifacts.
- Size (number of bytes) is now a required attribute whenever updating
asset metadata. Although the exact numerical value is not yet
important, whether the size is reported to be zero or positive does
have consequences.

### Operations

- Allow `cloudpickle` >= 3.0.0
Expand Down
8 changes: 6 additions & 2 deletions covalent/_dispatcher_plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,15 +593,19 @@ def upload_assets(manifest: ResultSchema):
def _upload(assets: List[AssetSchema]):
local_scheme_prefix = "file://"
total = len(assets)
number_uploaded = 0
for i, asset in enumerate(assets):
if not asset.remote_uri:
if not asset.remote_uri or not asset.uri:
app_log.debug(f"Skipping asset {i+1} out of {total}")
continue
if asset.remote_uri.startswith(local_scheme_prefix):
copy_file_locally(asset.uri, asset.remote_uri)
number_uploaded += 1

Check warning on line 603 in covalent/_dispatcher_plugins/local.py

View check run for this annotation

Codecov / codecov/patch

covalent/_dispatcher_plugins/local.py#L603

Added line #L603 was not covered by tests
else:
_upload_asset(asset.uri, asset.remote_uri)
app_log.debug(f"uploaded {i+1} out of {total} assets.")
number_uploaded += 1
app_log.debug(f"Uploaded asset {i+1} out of {total}.")
app_log.debug(f"uploaded {number_uploaded} assets.")


def _upload_asset(local_uri, remote_uri):
Expand Down
4 changes: 2 additions & 2 deletions covalent/_results_manager/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ def __init__(self, lattice: Lattice, dispatch_id: str = "") -> None:
self._task_failed = False
self._task_cancelled = False

self._result = TransportableObject(None)
self._result = None

self._num_nodes = -1

self._error = ""
self._error = None

def __str__(self):
"""String representation of the result object"""
Expand Down
27 changes: 18 additions & 9 deletions covalent/_results_manager/results_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,26 +248,35 @@ def download_asset(remote_uri: str, local_path: str, chunk_size: int = 1024 * 10

def _download_result_asset(manifest: dict, results_dir: str, key: str):
remote_uri = manifest["assets"][key]["remote_uri"]
local_path = get_result_asset_path(results_dir, key)
download_asset(remote_uri, local_path)
manifest["assets"][key]["uri"] = f"file://{local_path}"
size = manifest["assets"][key]["size"]

if size > 0:
local_path = get_result_asset_path(results_dir, key)
download_asset(remote_uri, local_path)
manifest["assets"][key]["uri"] = f"file://{local_path}"


def _download_lattice_asset(manifest: dict, results_dir: str, key: str):
lattice_assets = manifest["lattice"]["assets"]
remote_uri = lattice_assets[key]["remote_uri"]
local_path = get_lattice_asset_path(results_dir, key)
download_asset(remote_uri, local_path)
lattice_assets[key]["uri"] = f"file://{local_path}"
size = lattice_assets[key]["size"]

if size > 0:
local_path = get_lattice_asset_path(results_dir, key)
download_asset(remote_uri, local_path)
lattice_assets[key]["uri"] = f"file://{local_path}"


def _download_node_asset(manifest: dict, results_dir: str, node_id: int, key: str):
node = manifest["lattice"]["transport_graph"]["nodes"][node_id]
node_assets = node["assets"]
remote_uri = node_assets[key]["remote_uri"]
local_path = get_node_asset_path(results_dir, node_id, key)
download_asset(remote_uri, local_path)
node_assets[key]["uri"] = f"file://{local_path}"
size = node_assets[key]["size"]

if size > 0:
local_path = get_node_asset_path(results_dir, node_id, key)
download_asset(remote_uri, local_path)
node_assets[key]["uri"] = f"file://{local_path}"


def _load_result_asset(manifest: dict, key: str):
Expand Down
3 changes: 3 additions & 0 deletions covalent/_serialize/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str

scheme = "file"

if data is None:
return AssetSchema(size=0)

serialized = serialize_asset(data, data_type)
digest = _sha1_asset(serialized)
path = Path(storage_path) / filename
Expand Down
17 changes: 8 additions & 9 deletions covalent/_serialize/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
ElectronSchema,
)
from .._shared_files.util_classes import RESULT_STATUS, Status
from .._workflow.transportable_object import TransportableObject
from .common import AssetType, load_asset, save_asset

__all__ = [
Expand Down Expand Up @@ -105,55 +104,55 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron
ASSET_FILENAME_MAP["function"],
)

function_string = node_attrs.get("function_string", "")
function_string = node_attrs.get("function_string", None)
function_string_asset = save_asset(
function_string,
ASSET_TYPES["function_string"],
node_storage_path,
ASSET_FILENAME_MAP["function_string"],
)

node_value = node_attrs.get("value", TransportableObject(None))
node_value = node_attrs.get("value", None)
value_asset = save_asset(
node_value,
ASSET_TYPES["value"],
node_storage_path,
ASSET_FILENAME_MAP["value"],
)

node_output = node_attrs.get("output", TransportableObject(None))
node_output = node_attrs.get("output", None)
output_asset = save_asset(
node_output,
ASSET_TYPES["output"],
node_storage_path,
ASSET_FILENAME_MAP["output"],
)

node_stdout = node_attrs.get("stdout", "")
node_stdout = node_attrs.get("stdout", None)
stdout_asset = save_asset(
node_stdout,
ASSET_TYPES["stdout"],
node_storage_path,
ASSET_FILENAME_MAP["stdout"],
)

node_stderr = node_attrs.get("stderr", "")
node_stderr = node_attrs.get("stderr", None)
stderr_asset = save_asset(
node_stderr,
ASSET_TYPES["stderr"],
node_storage_path,
ASSET_FILENAME_MAP["stderr"],
)

qelectron_db = node_attrs.get("qelectron_db", bytes())
qelectron_db = node_attrs.get("qelectron_db", None)
qelectron_db_asset = save_asset(
qelectron_db,
ASSET_TYPES["qelectron_db"],
node_storage_path,
ASSET_FILENAME_MAP["qelectron_db"],
)

node_error = node_attrs.get("error", "")
node_error = node_attrs.get("error", None)
error_asset = save_asset(
node_error,
ASSET_TYPES["error"],
Expand Down Expand Up @@ -230,7 +229,7 @@ def _deserialize_node_assets(ea: ElectronAssets) -> dict:

def _get_node_custom_assets(node_attrs: dict) -> Dict[str, AssetSchema]:
if "custom_asset_keys" in node_attrs["metadata"]:
return {key: AssetSchema() for key in node_attrs["metadata"]["custom_asset_keys"]}
return {key: AssetSchema(size=0) for key in node_attrs["metadata"]["custom_asset_keys"]}


def serialize_node(node_id: int, node_attrs: dict, node_storage_path) -> ElectronSchema:
Expand Down
4 changes: 2 additions & 2 deletions covalent/_serialize/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets:
ASSET_FILENAME_MAP["workflow_function_string"],
)

docstring = "" if lat.__doc__ is None else lat.__doc__
docstring = lat.__doc__
docstring_asset = save_asset(
docstring,
ASSET_TYPES["doc"],
Expand Down Expand Up @@ -208,7 +208,7 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict:

def _get_lattice_custom_assets(lat: Lattice) -> Dict[str, AssetSchema]:
if "custom_asset_keys" in lat.metadata:
return {key: AssetSchema() for key in lat.metadata["custom_asset_keys"]}
return {key: AssetSchema(size=0) for key in lat.metadata["custom_asset_keys"]}


def serialize_lattice(lat, storage_path: str) -> LatticeSchema:
Expand Down
6 changes: 4 additions & 2 deletions covalent/_shared_files/schemas/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ class AssetSchema(BaseModel):
remote_uri: Optional[str] = None

# Size of the asset in bytes
size: Optional[int] = 0
size: int


class AssetUpdate(BaseModel):
remote_uri: Optional[str] = None
size: Optional[int] = None
digest_alg: Optional[str] = None
digest: Optional[str] = None

# Size of the asset in bytes
size: int
20 changes: 16 additions & 4 deletions covalent/executor/executor_plugins/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,27 @@ async def receive(self, task_group_metadata: Dict, data: Any) -> List[TaskUpdate

if terminal_status == RESULT_STATUS.CANCELLED:
output_uri = ""
output_size = 0
stdout_uri = ""
stdout_size = 0
stderr_uri = ""
stderr_size = 0
qelectron_db_uri = ""
qelectron_db_size = 0

else:
result_path = os.path.join(self.cache_dir, f"result-{dispatch_id}:{task_id}.json")
with open(result_path, "r") as f:
result_summary = json.load(f)
node_id = result_summary["node_id"]
output_uri = result_summary["output_uri"]
stdout_uri = result_summary["stdout_uri"]
stderr_uri = result_summary["stderr_uri"]
qelectron_db_uri = result_summary["qelectron_db_uri"]
output_uri = result_summary["output"]["uri"]
output_size = result_summary["output"]["size"]
stdout_uri = result_summary["stdout"]["uri"]
stdout_size = result_summary["stdout"]["size"]
stderr_uri = result_summary["stderr"]["uri"]
stderr_size = result_summary["stderr"]["size"]
qelectron_db_uri = result_summary["qelectron_db"]["uri"]
qelectron_db_size = result_summary["qelectron_db"]["size"]
exception_raised = result_summary["exception_occurred"]

terminal_status = (
Expand All @@ -333,15 +341,19 @@ async def receive(self, task_group_metadata: Dict, data: Any) -> List[TaskUpdate
"assets": {
"output": {
"remote_uri": output_uri,
"size": output_size,
},
"stdout": {
"remote_uri": stdout_uri,
"size": stdout_size,
},
"stderr": {
"remote_uri": stderr_uri,
"size": stderr_size,
},
"qelectron_db": {
"remote_uri": qelectron_db_uri,
"size": qelectron_db_size,
},
},
}
Expand Down
17 changes: 3 additions & 14 deletions covalent/executor/executor_plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,24 +216,13 @@ def _receive(self, task_group_metadata: Dict, data: Any) -> List[TaskUpdate]:
received = ReceiveModel.model_validate(data)
terminal_status = Status(received.status.value)

# Don't update any asset metadata since all assets will be
# pushed from the executor
task_result = {
"dispatch_id": dispatch_id,
"node_id": task_id,
"status": terminal_status,
"assets": {
"output": {
"remote_uri": "",
},
"stdout": {
"remote_uri": "",
},
"stderr": {
"remote_uri": "",
},
"qelectron_db": {
"remote_uri": "",
},
},
"assets": {},
}

task_results.append(TaskUpdate(**task_result))
Expand Down
Loading

0 comments on commit f461ce6

Please sign in to comment.