Skip to content

Commit

Permalink
[1-feature/add model inferences] - Add model inferences (#54)
Browse files Browse the repository at this point in the history
* tested

* preload trained model

* fix returning data

* tested

* fix typing issue

* [2-feature/trained model selection] - Add trained model selection (#56)

* update small0 and version

* tested

* inference for pytorch is any

* remove indirection

* update type for qcogsync

* fix typing issue

* [0-feature/select model by name] - Add model selection call (#53)

* tested

* preload trained model

* inference for pytorch is any

* remove indirection

---------

Co-authored-by: Sebastien Roy <[email protected]>

* adding value error

---------

Co-authored-by: Sebastien Roy <[email protected]>
  • Loading branch information
vertefra and sebastroy authored Sep 24, 2024
1 parent 2c45a8e commit 0027158
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 24 deletions.
153 changes: 147 additions & 6 deletions qcog_python_client/qcog/_baseclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from typing import Any, TypeAlias
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse

import aiohttp
import pandas as pd
Expand Down Expand Up @@ -67,9 +68,35 @@ def __init__(self) -> None: # noqa: D107
self._inference_result: dict | None = None
self._loss: Matrix | None = None
self._pytorch_model: dict | None = None
self._pytorch_trained_models: list[dict] | None = None
self.last_status: TrainingStatus | None = None
self.metrics: dict | None = None

@property
def pytorch_trained_models(self) -> list[dict]:
"""Return the list of Pytorch trained models."""
if self._pytorch_trained_models is None:
self._pytorch_trained_models = []
return self._pytorch_trained_models

@pytorch_trained_models.setter
def pytorch_trained_models(self, fetched: list[dict]) -> None:
if self._pytorch_trained_models is None:
self._pytorch_trained_models = []
# Add each of the fetched trained models to the list
# making sure that the guid is unique
guids = {
trained_model["guid"] for trained_model in self._pytorch_trained_models
}

for trained_model in fetched:
if trained_model["guid"] not in guids:
# Validate the model
validated = AppSchemasPytorchModelPytorchTrainedModelPayloadResponse.model_validate( # noqa: E501
trained_model
)
self._pytorch_trained_models.append(validated.model_dump())

@property
def pytorch_model(self) -> dict:
"""Return the Pytorch model."""
Expand Down Expand Up @@ -299,15 +326,105 @@ async def _preloaded_training_parameters(self, guid: str) -> BaseQcogClient:
)
return self

async def _preloaded_model(self, guid: str) -> BaseQcogClient:
async def _preloaded_model(
self,
guid: str | None = None,
*,
pytorch_model_name: str | None = None,
force_reload: bool = False,
) -> BaseQcogClient:
"""Retrieve preexisting model payload."""
# If a `pytorch_model_name` is provided,
# We can assume that we don't have a `model`
# set yet, so we will fetch the model first.
if pytorch_model_name:
await self._preloaded_pt_model(pytorch_model_name)

if self.model.model_name == Model.pytorch.value:
pytorch_model_guid = self.pytorch_model["guid"]
await self._preload_trained_pt_model(
guid=guid,
force_reload=force_reload,
)
else:
if guid is None:
raise ValueError(
"Model guid is required for Pauli and Ensemble models."
)
await self._preload_trained_qcog_model(guid)

return self

async def _preload_trained_pt_models(
self,
pytorch_model_guid: str,
*,
page: int = 0,
limit: int = 100,
training_status: TrainingStatus | None = None,
) -> BaseQcogClient:
"""Retrieve preexisting trained models for a PyTorch model."""
params: dict = {
"limit": limit,
"page": page,
}

if training_status:
params["training_status"] = training_status.value

# Compose URL with parameters
# TODO: refactor methods to accept parameters dictionary
# and move this logic to the http client
url = f"pytorch_model/{pytorch_model_guid}/trained_model"
urlparts = urlparse(url)
query_params = parse_qs(urlparts.query)
query_params.update(params)
new_query_string = urlencode(query_params, doseq=True)
new_url_parts = urlparts._replace(query=new_query_string)
url = urlunparse(new_url_parts)

self.pytorch_trained_models = await self.http_client.get_many(
url,
)
return self

async def _preload_trained_pt_model(
self,
*,
guid: str | None = None,
force_reload: bool = False,
) -> BaseQcogClient:
# If a guid is provided, we will fetch the trained model

if guid and force_reload:
raise ValueError("Cannot provide both guid and force_reload.")

pytorch_model_guid = self.pytorch_model["guid"]

if guid:
self.trained_model = await self.http_client.get(
f"pytorch_model/{pytorch_model_guid}/trained_model/{guid}"
)
else:
self.trained_model = await self.http_client.get(f"model/{guid}")

return self
# Otherwise, check if we need to load the latest trained models
if force_reload:
await self._preload_trained_pt_models(
pytorch_model_guid,
training_status=TrainingStatus.completed,
)

if not self.pytorch_trained_models:
raise ValueError("No trained models found.")

self.trained_model = self.pytorch_trained_models[0]
return self

async def _preload_trained_qcog_model(
self,
guid: str,
) -> BaseQcogClient:
"""Retrieve a trained model by guid."""
self.trained_model = await self.http_client.get(f"model/{guid}")
return self

async def _preloaded_pt_model(self, model_name: str) -> BaseQcogClient:
Expand Down Expand Up @@ -362,9 +479,17 @@ async def _train(
async def _inference(
self,
data: pd.DataFrame,
parameters: InferenceParameters,
) -> pd.DataFrame:
parameters: InferenceParameters | None = None,
) -> pd.DataFrame | Any:
"""From a trained model query an inference."""
if self.model.model_name == Model.pytorch.value:
return await self._pt_inference(data)

if parameters is None:
raise ValueError(
"Inference parameters are required for Pauli and Ensemble models."
)

inference_result = await self.http_client.post(
f"model/{self.trained_model['guid']}/inference",
{
Expand All @@ -377,6 +502,22 @@ async def _inference(
inference_result["response"]["data"],
)

async def _pt_inference(
self,
data: pd.DataFrame,
) -> Any:
model_guid = self.pytorch_model["guid"]
trained_model_guid = self.trained_model["guid"]

inference_result = await self.http_client.post(
f"pytorch_model/{model_guid}/trained_model/{trained_model_guid}/inference",
{
"data": encode_base64(data),
},
)

return inference_result.get("data")

async def _train_pytorch(
self,
training_parameters: PytorchTrainingParameters,
Expand Down
44 changes: 40 additions & 4 deletions qcog_python_client/qcog/_httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async def _request_retry(
uri: str,
method: HttpMethod,
data: dict | aiohttp.FormData | None = None,
) -> dict:
) -> dict | list[dict]:
"""Execute the async get "aiohttp" by adding class-level settings.
Parameters
Expand Down Expand Up @@ -144,7 +144,7 @@ async def _request_retry(
else:
raise ValueError(f"Invalid Content Type found: {type(data)}")

retval: dict = await resp.json()
retval: dict | list[dict] = await resp.json()

return retval

Expand Down Expand Up @@ -174,11 +174,42 @@ async def get(self, endpoint: str) -> dict:
dict: unpacked json dict
"""
return await self._request_retry(
response_data = await self._request_retry(
f"{self.url}/{endpoint}",
HttpMethod.get,
)

if not isinstance(response_data, dict):
raise RuntimeError("Expected a single object, got a list")

return response_data

async def get_many(self, endpoint: str) -> list[dict]:
"""Execute a get request.
Convenience wrapper around aiohttp.get (called via _get method)
Parameters
----------
endpoint: str
a valid prefix to the orchestration API (including guid
if applicable) and will add to the dns prefix
Returns
-------
list[dict]: unpacked json dict
"""
response_data = await self._request_retry(
f"{self.url}/{endpoint}",
HttpMethod.get,
)

if not isinstance(response_data, list):
raise RuntimeError("Expected a list of objects, got a single object")

return response_data

async def post(
self,
endpoint: str,
Expand All @@ -204,6 +235,11 @@ async def post(
dict: unpacked json dict
"""
return await self._request_retry(
response_data = await self._request_retry(
f"{self.url}/{endpoint}", HttpMethod.post, data
)

if not isinstance(response_data, dict):
raise RuntimeError("Expected a single object, got a list")

return response_data
7 changes: 6 additions & 1 deletion qcog_python_client/qcog/_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ class ABCRequestClient(ABC):

@abstractmethod
async def get(self, url: str) -> dict:
"""Execute a get request."""
"""Execute a get request. Returns a single object."""
...

@abstractmethod
async def get_many(self, url: str) -> list[dict]:
"""Execute a get request. Returns a list of objects."""
...

@overload
Expand Down
58 changes: 46 additions & 12 deletions qcog_python_client/qcog/qcogasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from typing import Any
from typing import Any, overload

import pandas as pd

Expand Down Expand Up @@ -135,23 +135,57 @@ async def preloaded_training_parameters(self, guid: str) -> AsyncQcogClient:
await self._preloaded_training_parameters(guid)
return self

async def preloaded_model(self, guid: str) -> AsyncQcogClient:
"""Retrieve a preexisting trained model.
@overload
async def preloaded_model(self, guid: str) -> AsyncQcogClient: ...

@overload
async def preloaded_model(
self,
guid: str,
*,
pytorch_model_name: str | None = None,
) -> AsyncQcogClient: ...

@overload
async def preloaded_model(
self, *, pytorch_model_name: str | None = None, force_reload: bool = False
) -> AsyncQcogClient: ...

If you are working on a Pytorch model, you need to preload the
pytorch model first using `preloaded_pt_model`.
async def preloaded_model(
self,
guid: str | None = None,
*,
pytorch_model_name: str | None = None,
force_reload: bool = False,
) -> AsyncQcogClient:
"""Retrieve a preexisting trained model.
Parameters
----------
guid : str
model guid
guid : str | None
trained model identifier. If you are working on a pytorch model,
you also need to run `preload_pt_model` to load the model architecture,
or you can provide `pytorch_model_name` parameter with the name
of the model. If no `guid` is provided and you are working with
a pytorch model, the client will try to load the latest trained model.
pytorch_model_name : str | None
the name of the PyTorch model. This is the identifier that you
used when you uploaded the model using `pytorch` method.
It should be provided if no model architecture is loaded.
force_reload : bool | None
If true will fetch the latest models at every request.
Returns
-------
QcogClient
"""
await self._preloaded_model(guid)
# if a guid is provided, just fetch the model
await self._preloaded_model(
guid,
pytorch_model_name=pytorch_model_name,
force_reload=force_reload,
)
return self

async def preloaded_pt_model(self, model_name: str) -> AsyncQcogClient:
Expand Down Expand Up @@ -202,8 +236,8 @@ async def train(
return self

async def inference(
self, data: pd.DataFrame, parameters: InferenceParameters
) -> pd.DataFrame:
self, data: pd.DataFrame, parameters: InferenceParameters | None
) -> pd.DataFrame | Any:
"""From a trained model query an inference.
Parameters
Expand All @@ -215,8 +249,8 @@ async def inference(
Returns
-------
pd.DataFrame
the predictions
pd.DataFrame | Any
the inference result
"""
return await self._inference(data, parameters)
Expand Down
2 changes: 1 addition & 1 deletion qcog_python_client/qcog/qcogsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def train(

def inference(
self, data: pd.DataFrame, parameters: InferenceParameters
) -> pd.DataFrame:
) -> pd.DataFrame | Any:
"""From a trained model query an inference.
Parameters
Expand Down

0 comments on commit 0027158

Please sign in to comment.