Skip to content

Commit

Permalink
Merge branch 'develop' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
kessler-frost authored Nov 23, 2023
2 parents c7886db + 0e67dfa commit 34b7f73
Show file tree
Hide file tree
Showing 56 changed files with 897 additions and 514 deletions.
23 changes: 22 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [UNRELEASED]

### Changed

- Changed the azurebatch.rst banner from default covalent jpg to azure batch's svg file

### Fixed

- Lattice-default metadata attributes are now applied correctly
Expand All @@ -15,8 +19,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed unassigned variable names
- Contributing guidelines steps for installing for the first time
- Updated gitignore to ignore yarn files and folders for latest version of yarn
- Fixed the bug that caused ValueError error when using KEYWORD_ONLY parameter in electron func
- Fixed the bug that caused ValueError error when using KEYWORD_ONLY parameter in electron func
- Changed code at line 218 in covalent/_shared_files/utils.py
- Fixed usage of deprecated pydantic validation methods
- Fixed qelectron_db retrieval in result object
- Fixed editability of Qelectron on settings page - UI changes
- Certain pydantic v2 related updates
- Fixed lattice's metadata propagation to electron's metadata in case no metadata was provided to the electron

### Operations

Expand All @@ -36,12 +45,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Changed license to Apache
- Improved error handling in generate_docs.py
- [Significant Changes] Migrated core server-side code to new data access layer.
- Changed the way UI was accessing the qelectron database to access it directly from the mdb file in object store
- Update version of browserverify-sign
- Limiting cloudpickle version to less than 3.0 for now

### Added

- Documentation and test cases for database triggers.
- Added the `__pow__` method to the `Electron` class
- New Runner and executor API to bypass server-side memory when running tasks.
- Added qelectron_db as an asset to be transferred from executor's machine to covalent server
- New methods to qelectron_utils, replacing the old ones

### Docs

Expand All @@ -51,6 +65,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Tests

- Temporarily skipping the sqlite and database trigger functional tests
- Updated tests to accommodate the new qelectron fixes
- Added new tests for the Database class and qelectron_utils

### Removed

- Removed no longer needed methods in qelectron_utils
- Removed `dispatch-id` from generate_node_result method

## [0.229.0-rc.0] - 2023-09-22

Expand Down
148 changes: 72 additions & 76 deletions covalent/_dispatcher_plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_redispatch_request_body_v2(
dispatcher_addr: str = None,
) -> ResultSchema:
rm = get_result_manager(dispatch_id, dispatcher_addr=dispatcher_addr, wait=True)
manifest = ResultSchema.parse_obj(rm._manifest)
manifest = ResultSchema.model_validate(rm._manifest)

# If no changes to inputs or electron, just retry the dispatch
if not new_args and not new_kwargs and not replace_electrons:
Expand Down Expand Up @@ -170,6 +170,69 @@ def wrapper(*args, **kwargs) -> str:

return wrapper

@staticmethod
def register(
orig_lattice: Lattice,
dispatcher_addr: str = None,
) -> Callable:
"""
Wrapping the dispatching functionality to allow input passing
and server address specification.
Afterwards, send the lattice to the dispatcher server and return
the assigned dispatch id.
Args:
orig_lattice: The lattice/workflow to send to the dispatcher server.
dispatcher_addr: The address of the dispatcher server. If None then then defaults to the address set in Covalent's config.
Returns:
Wrapper function which takes the inputs of the workflow as arguments
"""

if dispatcher_addr is None:
dispatcher_addr = format_server_url()

@wraps(orig_lattice)
def wrapper(*args, **kwargs) -> str:
"""
Send the lattice to the dispatcher server and return
the assigned dispatch id.
Args:
*args: The inputs of the workflow.
**kwargs: The keyword arguments of the workflow.
Returns:
The dispatch id of the workflow.
"""

if not isinstance(orig_lattice, Lattice):
message = f"Dispatcher expected a Lattice, received {type(orig_lattice)} instead."
app_log.error(message)
raise TypeError(message)

lattice = deepcopy(orig_lattice)

lattice.build_graph(*args, **kwargs)

with tempfile.TemporaryDirectory() as tmp_dir:
manifest = LocalDispatcher.prepare_manifest(lattice, tmp_dir)
LocalDispatcher.register_manifest(manifest, dispatcher_addr)

dispatch_id = manifest.metadata.dispatch_id

path = dispatch_cache_dir / f"{dispatch_id}"

with open(path, "w") as f:
f.write(manifest.model_dump_json())

LocalDispatcher.upload_assets(manifest)

return dispatch_id

return wrapper

@staticmethod
def submit(
orig_lattice: Lattice,
Expand Down Expand Up @@ -373,8 +436,6 @@ def stop_triggers(
+ str(get_config("dispatcher.port"))
)

stop_triggers_url = f"{triggers_server_addr}/api/triggers/stop_observe"

if isinstance(dispatch_ids, str):
dispatch_ids = [dispatch_ids]

Expand All @@ -386,69 +447,6 @@ def stop_triggers(
for d_id in dispatch_ids:
app_log.debug(d_id)

@staticmethod
def register(
orig_lattice: Lattice,
dispatcher_addr: str = None,
) -> Callable:
"""
Wrapping the dispatching functionality to allow input passing
and server address specification.
Afterwards, send the lattice to the dispatcher server and return
the assigned dispatch id.
Args:
orig_lattice: The lattice/workflow to send to the dispatcher server.
dispatcher_addr: The address of the dispatcher server. If None then then defaults to the address set in Covalent's config.
Returns:
Wrapper function which takes the inputs of the workflow as arguments
"""

if dispatcher_addr is None:
dispatcher_addr = format_server_url()

@wraps(orig_lattice)
def wrapper(*args, **kwargs) -> str:
"""
Send the lattice to the dispatcher server and return
the assigned dispatch id.
Args:
*args: The inputs of the workflow.
**kwargs: The keyword arguments of the workflow.
Returns:
The dispatch id of the workflow.
"""

if not isinstance(orig_lattice, Lattice):
message = f"Dispatcher expected a Lattice, received {type(orig_lattice)} instead."
app_log.error(message)
raise TypeError(message)

lattice = deepcopy(orig_lattice)

lattice.build_graph(*args, **kwargs)

with tempfile.TemporaryDirectory() as tmp_dir:
manifest = LocalDispatcher.prepare_manifest(lattice, tmp_dir)
LocalDispatcher.register_manifest(manifest, dispatcher_addr)

dispatch_id = manifest.metadata.dispatch_id

path = dispatch_cache_dir / f"{dispatch_id}"

with open(path, "w") as f:
f.write(manifest.json())

LocalDispatcher.upload_assets(manifest)

return dispatch_id

return wrapper

@staticmethod
def register_redispatch(
dispatch_id: str,
Expand Down Expand Up @@ -506,7 +504,7 @@ def func(*new_args, **new_kwargs):
path = dispatch_cache_dir / f"{redispatch_id}"

with open(path, "w") as f:
f.write(manifest.json())
f.write(manifest.model_dump_json())

LocalDispatcher.upload_assets(manifest)

Expand Down Expand Up @@ -541,20 +539,16 @@ def register_manifest(
if dispatcher_addr is None:
dispatcher_addr = format_server_url()

if push_assets:
stripped = strip_local_uris(manifest)
else:
stripped = manifest

stripped = strip_local_uris(manifest) if push_assets else manifest
endpoint = "/api/v2/dispatches"

if parent_dispatch_id:
endpoint = f"{endpoint}/{parent_dispatch_id}/subdispatches"

r = APIClient(dispatcher_addr).post(endpoint, data=stripped.json())
r = APIClient(dispatcher_addr).post(endpoint, data=stripped.model_dump_json())
r.raise_for_status()

parsed_resp = ResultSchema.parse_obj(r.json())
parsed_resp = ResultSchema.model_validate(r.json())

return merge_response_manifest(manifest, parsed_resp)

Expand All @@ -581,10 +575,12 @@ def register_derived_manifest(
endpoint = f"/api/v2/dispatches/{dispatch_id}/redispatches"

params = {"reuse_previous_results": reuse_previous_results}
r = APIClient(dispatcher_addr).post(endpoint, data=stripped.json(), params=params)
r = APIClient(dispatcher_addr).post(
endpoint, data=stripped.model_dump_json(), params=params
)
r.raise_for_status()

parsed_resp = ResultSchema.parse_obj(r.json())
parsed_resp = ResultSchema.model_validate(r.json())

return merge_response_manifest(manifest, parsed_resp)

Expand Down
31 changes: 13 additions & 18 deletions covalent/_results_manager/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
from .._shared_files.config import get_config
from .._shared_files.context_managers import active_lattice_manager
from .._shared_files.defaults import postprocess_prefix, prefix_separator, sublattice_prefix
from .._shared_files.qelectron_utils import QE_DB_DIRNAME
from .._shared_files.util_classes import RESULT_STATUS, Status
from .._workflow.lattice import Lattice
from .._workflow.transport import TransportableObject
from ..quantum.qserver import database as qe_db

if TYPE_CHECKING:
from .._shared_files.util_classes import Status
Expand Down Expand Up @@ -211,10 +209,7 @@ def result(self) -> Union[int, float, list, dict]:
Final result of current dispatch.
"""

if self._result is not None:
return self._result.get_deserialized()
else:
return None
return self._result.get_deserialized() if self._result is not None else None

@property
def inputs(self) -> dict:
Expand Down Expand Up @@ -275,7 +270,7 @@ def get_node_result(self, node_id: int) -> dict:
"end_time": self.lattice.transport_graph.get_node_value(node_id, "end_time"),
"status": self._get_node_status(node_id),
"output": self._get_node_output(node_id),
"qelectron": self._get_node_qelectron_data(node_id),
"qelectron": self._get_node_qelectron_db_bytes(node_id),
"error": self.lattice.transport_graph.get_node_value(node_id, "error"),
"sublattice_result": self.lattice.transport_graph.get_node_value(
node_id, "sublattice_result"
Expand Down Expand Up @@ -387,26 +382,26 @@ def _get_node_output(self, node_id: int) -> Any:
"""
return self._lattice.transport_graph.get_node_value(node_id, "output")

def _get_node_qelectron_data(self, node_id: int) -> dict:
def _get_node_qelectron_db_bytes(self, node_id: int) -> dict:
"""
Return all QElectron data associated with a node.
Return the entire QRlectron DB in bytes, associated with a node.
In order to use this db, the user needs to save this in an data.mdb file and
then use the `Database` class, found in `covalent.quantum.qserver.database` to access the data,
i.e.: database = Database("dirpath/containing/data.mdb")
database.get_db_dict(dispatch_id="dispatch_id", node_id="node_id", direct_path=True)
Args:
node_id: The node id.
Returns:
The QElectron data of said node. Will return None if no data exists.
The QElectron db of said node. Will return an empty byte string if it doesn't exist.
"""

try:
# Checks existence of QElectron data.
self._lattice.transport_graph.get_node_value(node_id, "qelectron_data_exists")
return self._lattice.transport_graph.get_node_value(node_id, "qelectron_db")
except KeyError:
return None

results_dir = get_config("dispatcher")["results_dir"]
db_dir = os.path.join(results_dir, self.dispatch_id, QE_DB_DIRNAME)

return qe_db.Database(db_dir).get_db(dispatch_id=self.dispatch_id, node_id=node_id)
return bytes()

def _get_node_error(self, node_id: int) -> Union[None, str]:
"""
Expand Down
Loading

0 comments on commit 34b7f73

Please sign in to comment.