Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory improvements (1/3): Introduce new data access layer and schemas #1728

Merged
merged 33 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
39a1598
Mem (1/3): introduce new DAL and core Pydantic models
cjao Jun 23, 2023
9389b3b
Mem (1/3): Fix schemas
cjao Jul 11, 2023
23761a7
Mem (1/3): DAL PR: temporarily redirect core dispatcher tests
cjao Jun 23, 2023
69def08
Mem (1/3): DAL PR: fix tests
cjao Jun 28, 2023
f942531
Mem (1/3): Fix requirements workflow
cjao Jul 12, 2023
4693404
Mem (1/3): Uncomment boilerplate in disabled unit tests
cjao Jul 14, 2023
8bb0adb
Mem (1/3): Add unit test for format_server_url
cjao Jul 14, 2023
b18db33
Mem (1/3): defer copy_file_locally to next PR
cjao Jul 14, 2023
f792432
Mem (1/3): update changelog
cjao Jun 30, 2023
cb8ae1d
Mem (1/3): Core DAL improvements
cjao Sep 22, 2023
7f74c63
syncing with develop
kessler-frost Oct 3, 2023
249578d
updated license of files
kessler-frost Oct 3, 2023
5f3586e
updated license of files
kessler-frost Oct 3, 2023
5ee94d6
fixed alembic migrations order
kessler-frost Oct 3, 2023
e0efd49
fixing tests
kessler-frost Oct 3, 2023
e1e9447
fixing tests
kessler-frost Oct 3, 2023
0745b19
fixing tests
kessler-frost Oct 3, 2023
801842b
Merge branch 'develop' into memory-improvements-pr-1
kessler-frost Oct 3, 2023
0109e36
fixing ui backend tests
kessler-frost Oct 4, 2023
35076e8
Merge branch 'memory-improvements-pr-1' of github.com:AgnostiqHQ/cova…
kessler-frost Oct 4, 2023
b1ceb26
addressed most comments, still some left
kessler-frost Oct 4, 2023
6912aec
implementing final set of suggestions
kessler-frost Oct 5, 2023
f02c68f
Merge branch 'develop' into memory-improvements-pr-1
kessler-frost Oct 5, 2023
acf39f1
updated changelog
kessler-frost Oct 5, 2023
c3be087
fixed changelog
kessler-frost Oct 5, 2023
1c7aa36
implemented suggestions
kessler-frost Oct 12, 2023
4b3f46d
Merge branch 'develop' into memory-improvements-pr-1
kessler-frost Oct 12, 2023
a64a1a6
Merge branch 'develop' into memory-improvements-pr-1
kessler-frost Oct 12, 2023
f49860f
Added qelectron_data_exists to the ElectronMetadata schema
kessler-frost Oct 13, 2023
fa73fee
Added qelectron_data_exists to the electron metadata
kessler-frost Oct 13, 2023
f085136
fixed typo
kessler-frost Oct 13, 2023
3262dfb
fixing test
kessler-frost Oct 13, 2023
b487f58
fixing test
kessler-frost Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions covalent/_file_transfer/strategies/shutil_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ def __init__(

# return callable to copy files in the local file system
def cp(self, from_file: File, to_file: File = File()) -> None:
"""
Get a callable that copies a file from one location to another locally

Args:
from_file: File to copy from
to_file: File to copy to. Defaults to File().

Returns:
A callable that copies a file from one location to another locally
"""

def callable():
shutil.copyfile(from_file.filepath, to_file.filepath)

Expand Down
8 changes: 5 additions & 3 deletions covalent/_results_manager/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@
"""

NEW_OBJ = RESULT_STATUS.NEW_OBJECT
PENDING_REUSE = RESULT_STATUS.PENDING_REUSE
PENDING_REUSE = (
RESULT_STATUS.PENDING_REUSE
) # Facilitates reuse of previous electrons in the new dispatcher design
COMPLETED = RESULT_STATUS.COMPLETED
POSTPROCESSING = RESULT_STATUS.POSTPROCESSING
PENDING_POSTPROCESSING = RESULT_STATUS.PENDING_POSTPROCESSING
Expand Down Expand Up @@ -106,14 +108,14 @@
pattern = re.compile(regex)
m = pattern.match(input_string)
if m:
arg_str_repr = m.group(1).rstrip(",")
kwarg_str_repr = m.group(2)
arg_str_repr = m[1].rstrip(",")
kwarg_str_repr = m[2]
else:
arg_str_repr = str(None)
kwarg_str_repr = str(None)

Check warning on line 115 in covalent/_results_manager/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_results_manager/result.py#L114-L115

Added lines #L114 - L115 were not covered by tests
else:
arg_str_repr = str(None)
kwarg_str_repr = str(None)

Check warning on line 118 in covalent/_results_manager/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_results_manager/result.py#L117-L118

Added lines #L117 - L118 were not covered by tests

show_result_str = f"""
Lattice Result
Expand Down Expand Up @@ -212,7 +214,7 @@
if self._result is not None:
return self._result.get_deserialized()
else:
return None

Check warning on line 217 in covalent/_results_manager/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_results_manager/result.py#L217

Added line #L217 was not covered by tests

@property
def inputs(self) -> dict:
Expand Down
86 changes: 79 additions & 7 deletions covalent/_serialize/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
""" Serialization/Deserialization methods for Assets """

import hashlib
import json
Expand All @@ -26,17 +27,41 @@
from .._shared_files.schemas.asset import AssetSchema
from .._workflow.transportable_object import TransportableObject

__all__ = [
"AssetType",
"save_asset",
"load_asset",
]


CHECKSUM_ALGORITHM = "sha"


class AssetType(Enum):
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
OBJECT = 0
TRANSPORTABLE = 1
"""
Enum for the type of Asset data

"""

OBJECT = 0 # Fallback to cloudpickling
TRANSPORTABLE = 1 # Custom TO serialization
JSONABLE = 2
TEXT = 3
TEXT = 3 # Mainly for stdout, stderr, docstrings, etc.


def serialize_asset(data: Any, data_type: AssetType) -> bytes:
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
"""
Serialize the asset data

Args:
data: Data to serialize
data_type: Type of the Asset data to serialize

Returns:
Serialized data as bytes

"""

if data_type == AssetType.OBJECT:
return cloudpickle.dumps(data)
elif data_type == AssetType.TRANSPORTABLE:
Expand All @@ -46,10 +71,22 @@
elif data_type == AssetType.TEXT:
return data.encode("utf-8")
else:
raise TypeError(f"Unsupported data type {type(data)}")

Check warning on line 74 in covalent/_serialize/common.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/common.py#L74

Added line #L74 was not covered by tests


def deserialize_asset(data: bytes, data_type: AssetType) -> Any:
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
"""
Deserialize the asset data

Args:
data: Data to deserialize
data_type: Type of the Asset data to deserialize

Returns:
Deserialized data

"""

if data_type == AssetType.OBJECT:
return cloudpickle.loads(data)
elif data_type == AssetType.TRANSPORTABLE:
Expand All @@ -59,14 +96,39 @@
elif data_type == AssetType.TEXT:
return data.decode("utf-8")
else:
raise TypeError("Unsupported data type")

Check warning on line 99 in covalent/_serialize/common.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/common.py#L99

Added line #L99 was not covered by tests


def _sha1_asset(data: bytes) -> str:
"""
Compute the sha1 checksum of the asset data

Args:
data: Data to compute checksum for

Returns:
sha1 checksum of the data

"""

return hashlib.sha1(data).hexdigest()


def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str) -> AssetSchema:
"""
Save the asset data to the storage path

Args:
data: Data to save
data_type: Type of the Asset data to save
storage_path: Path to save the data to
filename: Name of the file to save the data to

Returns:
AssetSchema object containing metadata about the saved data

"""

scheme = "file"

serialized = serialize_asset(data, data_type)
Expand All @@ -80,16 +142,26 @@


def load_asset(asset_meta: AssetSchema, data_type: AssetType) -> Any:
"""
Load the asset data from the storage path

Args:
asset_meta: Metadata about the asset to load
data_type: Type of the Asset data to load

Returns:
Asset data

"""

scheme_prefix = "file://"
uri = asset_meta.uri

if not uri:
return None

Check warning on line 161 in covalent/_serialize/common.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/common.py#L161

Added line #L161 was not covered by tests

if uri.startswith(scheme_prefix):
path = uri[len(scheme_prefix) :]
else:
path = uri
path = uri[len(scheme_prefix) :] if uri.startswith(scheme_prefix) else uri

with open(path, "rb") as f:
data = f.read()
return deserialize_asset(data, data_type)
10 changes: 8 additions & 2 deletions covalent/_serialize/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
from .._workflow.transportable_object import TransportableObject
from .common import AssetType, load_asset, save_asset

__all__ = [
"serialize_node",
"deserialize_node",
]


ASSET_TYPES = {
"function": AssetType.TRANSPORTABLE,
"function_string": AssetType.TEXT,
Expand All @@ -52,13 +58,13 @@
# Optional
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
status = node_attrs.get("status", RESULT_STATUS.NEW_OBJECT)

start_time = node_attrs.get("start_time", None)
start_time = node_attrs.get("start_time")
if start_time:
start_time = start_time.isoformat()

Check warning on line 63 in covalent/_serialize/electron.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/electron.py#L63

Added line #L63 was not covered by tests

end_time = node_attrs.get("end_time", None)
end_time = node_attrs.get("end_time")
if end_time:
end_time = end_time.isoformat()

Check warning on line 67 in covalent/_serialize/electron.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/electron.py#L67

Added line #L67 was not covered by tests

return ElectronMetadata(
task_group_id=task_group_id,
Expand Down
6 changes: 6 additions & 0 deletions covalent/_serialize/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
from .common import AssetType, load_asset, save_asset
from .transport_graph import deserialize_transport_graph, serialize_transport_graph

__all__ = [
"serialize_lattice",
"deserialize_lattice",
]


ASSET_TYPES = {
"workflow_function": AssetType.TRANSPORTABLE,
"workflow_function_string": AssetType.TEXT,
Expand Down Expand Up @@ -87,8 +93,8 @@

try:
workflow_func_string = lat.workflow_function_string
except AttributeError:
workflow_func_string = ""

Check warning on line 97 in covalent/_serialize/lattice.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/lattice.py#L96-L97

Added lines #L96 - L97 were not covered by tests
workflow_func_str_asset = save_asset(
workflow_func_string,
ASSET_TYPES["workflow_function_string"],
Expand Down Expand Up @@ -218,7 +224,7 @@

def deserialize_lattice(model: LatticeSchema) -> Lattice:
def dummy_function(x):
return x

Check warning on line 227 in covalent/_serialize/lattice.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/lattice.py#L227

Added line #L227 was not covered by tests

lat = Lattice(dummy_function)

Expand Down
26 changes: 16 additions & 10 deletions covalent/_serialize/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
from .common import AssetType, load_asset, save_asset
from .lattice import deserialize_lattice, serialize_lattice

__all__ = [
"serialize_result",
"deserialize_result",
"strip_local_uris",
"merge_response_manifest",
"extract_assets",
]


ASSET_TYPES = {
"error": AssetType.TEXT,
"result": AssetType.TRANSPORTABLE,
Expand Down Expand Up @@ -101,28 +110,28 @@

def strip_local_uris(res: ResultSchema) -> ResultSchema:
# Create a copy with the local uris removed for submission
manifest = res.copy(deep=True).dict()

Check warning on line 113 in covalent/_serialize/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/result.py#L113

Added line #L113 was not covered by tests

# Strip workflow asset uris:
dispatch_assets = manifest["assets"]
for _, asset in dispatch_assets.items():
asset["uri"] = ""

Check warning on line 118 in covalent/_serialize/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/result.py#L116-L118

Added lines #L116 - L118 were not covered by tests

lattice = manifest["lattice"]
lattice_assets = lattice["assets"]
for _, asset in lattice_assets.items():
asset["uri"] = ""

Check warning on line 123 in covalent/_serialize/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/result.py#L120-L123

Added lines #L120 - L123 were not covered by tests

# Node assets
tg = lattice["transport_graph"]

Check warning on line 126 in covalent/_serialize/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/result.py#L126

Added line #L126 was not covered by tests

nodes = tg["nodes"]
for node in nodes:
node_assets = node["assets"]
for _, asset in node_assets.items():
asset["uri"] = ""

Check warning on line 132 in covalent/_serialize/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/result.py#L128-L132

Added lines #L128 - L132 were not covered by tests

return ResultSchema.parse_obj(manifest)

Check warning on line 134 in covalent/_serialize/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/result.py#L134

Added line #L134 was not covered by tests


# Functions to postprocess response from dispatcher
Expand All @@ -136,6 +145,7 @@
response: The manifest returned from `/register`.
Returns:
A combined manifest with asset `remote_uri`s populated.

"""

manifest.metadata.dispatch_id = response.metadata.dispatch_id
Expand Down Expand Up @@ -170,32 +180,28 @@


def extract_assets(manifest: ResultSchema) -> List[AssetSchema]:
"""Extract all of the asset metadata from a manifest dictionary.
"""
Extract all of the asset metadata from a manifest dictionary.

Args:
manifest: A result manifest

Returns:
A list of assets
"""

assets = []
"""

# workflow-level assets
dispatch_assets = manifest.assets
for key, asset in dispatch_assets:
assets.append(asset)

assets = [asset for key, asset in dispatch_assets]
lattice = manifest.lattice
lattice_assets = lattice.assets
for key, asset in lattice_assets:
assets.append(asset)
assets.extend(asset for key, asset in lattice_assets)

# Node assets
tg = lattice.transport_graph
nodes = tg.nodes
for node in nodes:
node_assets = node.assets
for key, asset in node_assets:
assets.append(asset)
assets.extend(asset for key, asset in node_assets)
return assets
Loading
Loading