Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory improvements (1/3): Introduce new data access layer and schemas #1728

Merged
merged 33 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
39a1598
Mem (1/3): introduce new DAL and core Pydantic models
cjao Jun 23, 2023
9389b3b
Mem (1/3): Fix schemas
cjao Jul 11, 2023
23761a7
Mem (1/3): DAL PR: temporarily redirect core dispatcher tests
cjao Jun 23, 2023
69def08
Mem (1/3): DAL PR: fix tests
cjao Jun 28, 2023
f942531
Mem (1/3): Fix requirements workflow
cjao Jul 12, 2023
4693404
Mem (1/3): Uncomment boilerplate in disabled unit tests
cjao Jul 14, 2023
8bb0adb
Mem (1/3): Add unit test for format_server_url
cjao Jul 14, 2023
b18db33
Mem (1/3): defer copy_file_locally to next PR
cjao Jul 14, 2023
f792432
Mem (1/3): update changelog
cjao Jun 30, 2023
cb8ae1d
Mem (1/3): Core DAL improvements
cjao Sep 22, 2023
7f74c63
syncing with develop
kessler-frost Oct 3, 2023
249578d
updated license of files
kessler-frost Oct 3, 2023
5f3586e
updated license of files
kessler-frost Oct 3, 2023
5ee94d6
fixed alembic migrations order
kessler-frost Oct 3, 2023
e0efd49
fixing tests
kessler-frost Oct 3, 2023
e1e9447
fixing tests
kessler-frost Oct 3, 2023
0745b19
fixing tests
kessler-frost Oct 3, 2023
801842b
Merge branch 'develop' into memory-improvements-pr-1
kessler-frost Oct 3, 2023
0109e36
fixing ui backend tests
kessler-frost Oct 4, 2023
35076e8
Merge branch 'memory-improvements-pr-1' of github.com:AgnostiqHQ/cova…
kessler-frost Oct 4, 2023
b1ceb26
addressed most comments, still some left
kessler-frost Oct 4, 2023
6912aec
implementing final set of suggestions
kessler-frost Oct 5, 2023
f02c68f
Merge branch 'develop' into memory-improvements-pr-1
kessler-frost Oct 5, 2023
acf39f1
updated changelog
kessler-frost Oct 5, 2023
c3be087
fixed changelog
kessler-frost Oct 5, 2023
1c7aa36
implemented suggestions
kessler-frost Oct 12, 2023
4b3f46d
Merge branch 'develop' into memory-improvements-pr-1
kessler-frost Oct 12, 2023
a64a1a6
Merge branch 'develop' into memory-improvements-pr-1
kessler-frost Oct 12, 2023
f49860f
Added qelectron_data_exists to the ElectronMetadata schema
kessler-frost Oct 13, 2023
fa73fee
Added qelectron_data_exists to the electron metadata
kessler-frost Oct 13, 2023
f085136
fixed typo
kessler-frost Oct 13, 2023
3262dfb
fixing test
kessler-frost Oct 13, 2023
b487f58
fixing test
kessler-frost Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,6 @@ node_modules/
!.yarn/releases
!.yarn/sdks
!.yarn/versions

# Ignore mock database
**/*.sqlite
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- [Significant Changes] Improving memory management part 1/3
- Removed strict version pins on `lmdbm`, `mpire`, `orjson`, and `pennylane`
- Changed license to Apache

Expand Down Expand Up @@ -146,6 +147,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- File transfer strategy for GCP storage
- Add CLI status for zombie, stopped process.
- Fix for double locking file in configurations.
- Introduced new data access layer
- Introduced Shutil file transfer strategy for local file transfers

### Docs

Expand Down
3 changes: 2 additions & 1 deletion covalent/_file_transfer/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class FileSchemes(str, enum.Enum):


class FileTransferStrategyTypes(str, enum.Enum):
Shutil = "Shutil"
Rsync = "rsync"
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
HTTP = "http"
S3 = "s3"
Expand All @@ -43,7 +44,7 @@ class FileTransferStrategyTypes(str, enum.Enum):


SchemeToStrategyMap = {
"file": FileTransferStrategyTypes.Rsync,
"file": FileTransferStrategyTypes.Shutil,
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
"http": FileTransferStrategyTypes.HTTP,
"https": FileTransferStrategyTypes.HTTP,
"s3": FileTransferStrategyTypes.S3,
Expand Down
8 changes: 4 additions & 4 deletions covalent/_file_transfer/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .enums import FileTransferStrategyTypes, FtCallDepReturnValue, Order
from .file import File
from .strategies.http_strategy import HTTP
from .strategies.rsync_strategy import Rsync
from .strategies.shutil_strategy import Shutil
from .strategies.transfer_strategy_base import FileTransferStrategy


Expand Down Expand Up @@ -59,10 +59,10 @@ def __init__(
if strategy:
self.strategy = strategy
elif (
from_file.mapped_strategy_type == FileTransferStrategyTypes.Rsync
and to_file.mapped_strategy_type == FileTransferStrategyTypes.Rsync
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
from_file.mapped_strategy_type == FileTransferStrategyTypes.Shutil
and to_file.mapped_strategy_type == FileTransferStrategyTypes.Shutil
):
self.strategy = Rsync()
self.strategy = Shutil()
elif from_file.mapped_strategy_type == FileTransferStrategyTypes.HTTP:
self.strategy = HTTP()
else:
Expand Down
59 changes: 59 additions & 0 deletions covalent/_file_transfer/strategies/shutil_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2021 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import shutil

from .. import File
from .transfer_strategy_base import FileTransferStrategy


class Shutil(FileTransferStrategy):
"""
Implements Base FileTransferStrategy class to copy files locally

The copying is done in-process using shutil.copyfile.
"""

def __init__(
self,
):
pass

# return callable to copy files in the local file system
def cp(self, from_file: File, to_file: File = File()) -> None:
"""
Get a callable that copies a file from one location to another locally

Args:
from_file: File to copy from
to_file: File to copy to. Defaults to File().

Returns:
A callable that copies a file from one location to another locally
"""

def callable():
shutil.copyfile(from_file.filepath, to_file.filepath)

return callable

# Local file operations only
def upload(self, from_file: File, to_file: File = File()) -> File:
raise NotImplementedError

# Local file operations only
def download(self, from_file: File, to_file: File) -> File:
raise NotImplementedError
41 changes: 28 additions & 13 deletions covalent/_results_manager/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

"""Result object."""
import os
import re
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Set, Union

Expand Down Expand Up @@ -62,6 +63,9 @@ class Result:
"""

NEW_OBJ = RESULT_STATUS.NEW_OBJECT
PENDING_REUSE = (
RESULT_STATUS.PENDING_REUSE
) # Facilitates reuse of previous electrons in the new dispatcher design
COMPLETED = RESULT_STATUS.COMPLETED
POSTPROCESSING = RESULT_STATUS.POSTPROCESSING
PENDING_POSTPROCESSING = RESULT_STATUS.PENDING_POSTPROCESSING
Expand Down Expand Up @@ -92,19 +96,26 @@ def __init__(self, lattice: Lattice, dispatch_id: str = "") -> None:

self._num_nodes = -1

self._inputs = {"args": [], "kwargs": {}}
if lattice.args:
self._inputs["args"] = lattice.args
if lattice.kwargs:
self._inputs["kwargs"] = lattice.kwargs

self._error = None
self._error = ""

def __str__(self):
"""String representation of the result object"""

arg_str_repr = [e.object_string for e in self.inputs["args"]]
kwarg_str_repr = {key: value.object_string for key, value in self.inputs["kwargs"].items()}
if isinstance(self.inputs, TransportableObject):
input_string = self.inputs.object_string

regex = r"^\{'args': \((.*)\), 'kwargs': \{(.*)\}\}$"
pattern = re.compile(regex)
m = pattern.match(input_string)
if m:
arg_str_repr = m[1].rstrip(",")
kwarg_str_repr = m[2]
else:
arg_str_repr = str(None)
kwarg_str_repr = str(None)
else:
arg_str_repr = str(None)
kwarg_str_repr = str(None)

show_result_str = f"""
Lattice Result
Expand Down Expand Up @@ -200,15 +211,18 @@ def result(self) -> Union[int, float, list, dict]:
Final result of current dispatch.
"""

return self._result.get_deserialized()
if self._result is not None:
return self._result.get_deserialized()
else:
return None

@property
def inputs(self) -> dict:
"""
Inputs sent to the "Lattice" function for dispatching.
"""

return self._inputs
return self.lattice.inputs

@property
def error(self) -> str:
Expand Down Expand Up @@ -327,8 +341,9 @@ def post_process(self) -> Any:
with active_lattice_manager.claim(lattice):
lattice.post_processing = True
lattice.electron_outputs = ordered_node_outputs
args = [arg.get_deserialized() for arg in lattice.args]
kwargs = {k: v.get_deserialized() for k, v in lattice.kwargs.items()}
inputs = self.lattice.inputs.get_deserialized()
args = inputs["args"]
kwargs = inputs["kwargs"]
workflow_function = lattice.workflow_function.get_deserialized()
result = workflow_function(*args, **kwargs)
lattice.post_processing = False
Expand Down
15 changes: 15 additions & 0 deletions covalent/_serialize/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2021 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
167 changes: 167 additions & 0 deletions covalent/_serialize/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright 2021 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
""" Serialization/Deserialization methods for Assets """

import hashlib
import json
from enum import Enum
from pathlib import Path
from typing import Any

import cloudpickle

from .._shared_files.schemas.asset import AssetSchema
from .._workflow.transportable_object import TransportableObject

__all__ = [
"AssetType",
"save_asset",
"load_asset",
]


CHECKSUM_ALGORITHM = "sha"


class AssetType(Enum):
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
"""
Enum for the type of Asset data

"""

OBJECT = 0 # Fallback to cloudpickling
TRANSPORTABLE = 1 # Custom TO serialization
JSONABLE = 2
TEXT = 3 # Mainly for stdout, stderr, docstrings, etc.


def serialize_asset(data: Any, data_type: AssetType) -> bytes:
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
"""
Serialize the asset data

Args:
data: Data to serialize
data_type: Type of the Asset data to serialize

Returns:
Serialized data as bytes

"""

if data_type == AssetType.OBJECT:
return cloudpickle.dumps(data)
elif data_type == AssetType.TRANSPORTABLE:
return data.serialize()
elif data_type == AssetType.JSONABLE:
return json.dumps(data).encode("utf-8")
elif data_type == AssetType.TEXT:
return data.encode("utf-8")
else:
raise TypeError(f"Unsupported data type {type(data)}")


def deserialize_asset(data: bytes, data_type: AssetType) -> Any:
kessler-frost marked this conversation as resolved.
Show resolved Hide resolved
"""
Deserialize the asset data

Args:
data: Data to deserialize
data_type: Type of the Asset data to deserialize

Returns:
Deserialized data

"""

if data_type == AssetType.OBJECT:
return cloudpickle.loads(data)
elif data_type == AssetType.TRANSPORTABLE:
return TransportableObject.deserialize(data)
elif data_type == AssetType.JSONABLE:
return json.loads(data.decode("utf-8"))
elif data_type == AssetType.TEXT:
return data.decode("utf-8")
else:
raise TypeError("Unsupported data type")


def _sha1_asset(data: bytes) -> str:
"""
Compute the sha1 checksum of the asset data

Args:
data: Data to compute checksum for

Returns:
sha1 checksum of the data

"""

return hashlib.sha1(data).hexdigest()


def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str) -> AssetSchema:
"""
Save the asset data to the storage path

Args:
data: Data to save
data_type: Type of the Asset data to save
storage_path: Path to save the data to
filename: Name of the file to save the data to

Returns:
AssetSchema object containing metadata about the saved data

"""

scheme = "file"

serialized = serialize_asset(data, data_type)
digest = _sha1_asset(serialized)
path = Path(storage_path) / filename
path = path.resolve()
with open(path, "wb") as f:
f.write(serialized)
uri = f"{scheme}://{path}"
return AssetSchema(digest_alg=CHECKSUM_ALGORITHM, digest=digest, size=len(serialized), uri=uri)


def load_asset(asset_meta: AssetSchema, data_type: AssetType) -> Any:
"""
Load the asset data from the storage path

Args:
asset_meta: Metadata about the asset to load
data_type: Type of the Asset data to load

Returns:
Asset data

"""

scheme_prefix = "file://"
uri = asset_meta.uri

if not uri:
return None

path = uri[len(scheme_prefix) :] if uri.startswith(scheme_prefix) else uri

with open(path, "rb") as f:
data = f.read()
return deserialize_asset(data, data_type)
Loading
Loading