From 2671bf0b757b455f3c73342c2f63763b15b4e91e Mon Sep 17 00:00:00 2001 From: Francesco Vertemati <63065831+vertefra@users.noreply.github.com> Date: Fri, 27 Sep 2024 13:23:56 -0400 Subject: [PATCH] Feature/multipart data upload (#57) * tested * preload trained model * fix returning data * tested * fix typing issue * [2-feature/trained model selection] - Add trained model selection (#56) * update small0 and version * tested * inference for pytorch is any * remove indirection * update type for qcogsync * fix typing issue * [0-feature/select model by name] - Add model selection call (#53) * tested * preload trained model * inference for pytorch is any * remove indirection --------- Co-authored-by: Sebastien Roy * adding value error * merging branch * noqa import order * remove unused import * update * format * test update * update batch size --------- Co-authored-by: Sebastien Roy --- examples/train_test.py | 152 ++++++++++++++++++++-- pyproject.toml | 5 +- qcog_python_client/__init__.py | 9 +- qcog_python_client/qcog/_baseclient.py | 56 ++++++-- qcog_python_client/qcog/_data_uploader.py | 52 +++++++- qcog_python_client/qcog/_httpclient.py | 16 ++- qcog_python_client/qcog/_interfaces.py | 21 ++- qcog_python_client/qcog/qcogasync.py | 13 +- qcog_python_client/qcog/qcogsync.py | 8 +- 9 files changed, 291 insertions(+), 41 deletions(-) diff --git a/examples/train_test.py b/examples/train_test.py index 38ee455..9c827f9 100644 --- a/examples/train_test.py +++ b/examples/train_test.py @@ -1,11 +1,19 @@ """Example of training a model.""" import os +import sys +import time +import numpy as np import pandas +from pandas import DataFrame from qcog_python_client import AsyncQcogClient, QcogClient from qcog_python_client.schema import GradOptimizationParameters, GradStateParameters +from qcog_python_client.schema.generated_schema.models import ( + AnalyticOptimizationParameters, + LOBPCGFastStateParameters, +) API_TOKEN = os.environ["API_TOKEN"] @@ -18,6 +26,43 @@ learning_rate=0.01, ) +HOST = os.getenv("QCOG_HOST", "dev.qognitive.io") +PORT = os.getenv("QCOG_PORT", 443) +DATASET_ID = os.getenv("DATASET_ID", None) +MODEL_ID = os.getenv("MODEL_ID", None) +DATASET_NAME = os.getenv("DATASET_NAME", "test-1234") + +print("------------ Presets ------------") +print("HOST: ", HOST) +print("PORT: ", PORT) +print("DATASET_ID: ", DATASET_ID) +print("MODEL_ID: ", MODEL_ID) +print("DATASET_NAME: ", DATASET_NAME) +print("--------------------------------") + + +def _get_test_df(size_mb: int) -> tuple[DataFrame, int]: + # Estimate the number of rows needed to reach the desired size + # This is an approximation and may need adjustment + row_size_bytes = 160 + num_rows = (size_mb * 1024 * 1024) // row_size_bytes + + # Create the DataFrame + df = DataFrame( + { + "X": np.random.choice(["0", "1"], size=num_rows), + "Y": np.random.choice(["0", "1"], size=num_rows), + "Z": np.random.choice(["0", "1"], size=num_rows), + } + ) + + # Check the actual size and adjust if necessary + actual_size_mb = df.memory_usage(deep=True).sum() / (1024 * 1024) + print(f"Actual DataFrame size: {actual_size_mb:.2f} MB") + print("Dataframe length: ", len(df)) + + return df, len(df) + def main(): """Run training.""" @@ -71,17 +116,102 @@ async def async_main(): return hsm.trained_model["guid"] +async def big_data_test() -> None: + """Upload data as a stream.""" + client = await AsyncQcogClient.create( + token=API_TOKEN, + hostname=HOST, + port=PORT, + ) + big_df, batch_size = _get_test_df(10000) + + if DATASET_ID is None: + size = big_df.memory_usage(deep=True).sum() / 1024**2 + print("Testing Size of big_df MB: ", size) + + print("Testing upload_data") + + start = time.time() + await client.upload_data(big_df, DATASET_NAME) + end = time.time() + print(f"`upload_data` Time taken to upload {size} MB of data: ", end - start) + else: + print("Using existing dataset") + await client.preloaded_data(DATASET_ID) + + print("Test Model Training") + client.pauli( + operators=["X", "Y", "Z"], + ) + + await client.train( + batch_size=batch_size // 3, + num_passes=1, + weight_optimization=GradOptimizationParameters( + iterations=10, + learning_rate=1e-3, + ), + get_states_extra=states_extra, + ) + + await client.status() + print(client.trained_model) + + +async def check_status() -> None: + """Check status.""" + client = await AsyncQcogClient.create( + token=API_TOKEN, + hostname=HOST, + port=PORT, + ) + if MODEL_ID is None: + raise ValueError("MODEL_GUID is not set") + + await client.preloaded_model(MODEL_ID) + status = await client.progress() + print(status) + loss = await client.get_loss() + print(loss) + + +async def case_ensemble() -> None: + """Test case ensemble.""" + client = await AsyncQcogClient.create( + token=API_TOKEN, + hostname=HOST, + port=PORT, + ) + + dataset_id = "ab1aae7c-28d7-37eb-a251-1479f61818ab" + + client.ensemble(operators=["X", "Y", "Z"], dim=4, num_axes=4, seed=1) + + await client.preloaded_data(dataset_id) + + await client.train( + batch_size=1000, + num_passes=1, + weight_optimization=AnalyticOptimizationParameters(), + get_states_extra=LOBPCGFastStateParameters(iterations=1, tol=0.05), + ) + + print(client.trained_model) + + if __name__ == "__main__": - print("################################") - print("# SYNC #") - print("################################") - guid = main() - print("################################") - print("# ASYNC #") - print("################################") import asyncio - asyncio.run(async_main()) - print("done") - - print(f"\nexport TRAINED_MODEL={guid}") + cmd = sys.argv[1] + if cmd == "train": + asyncio.run(main()) + elif cmd == "train_async": + asyncio.run(async_main()) + elif cmd == "big_data": + asyncio.run(big_data_test()) + elif cmd == "status": + asyncio.run(check_status()) + elif cmd == "case_ensemble": + asyncio.run(case_ensemble()) + else: + raise ValueError(f"Invalid command: {cmd}") diff --git a/pyproject.toml b/pyproject.toml index 1452b06..0da6d83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "typing_extensions", "pydantic-settings", "anyio", + "wandb>=0.17.7" ] dynamic = ["version"] @@ -38,14 +39,12 @@ dev = [ "datamodel-code-generator", "pytest-asyncio" ] + examples = [ "scikit-learn", "torch", "pillow", ] -wandb = [ - "wandb>=0.17.7", -] [tool.setuptools_scm] version_file = "qcog_python_client/__version__.py" diff --git a/qcog_python_client/__init__.py b/qcog_python_client/__init__.py index 4b5518f..6126d44 100644 --- a/qcog_python_client/__init__.py +++ b/qcog_python_client/__init__.py @@ -1,7 +1,12 @@ """Qcog Python Client package.""" -from .qcog import AsyncQcogClient, QcogClient -from .schema import ( +from pydantic import BaseModel + +# Import before everything else in order to patch the BaseModel class +BaseModel.model_config = {"protected_namespaces": ()} + +from .qcog import AsyncQcogClient, QcogClient # noqa: E402 +from .schema import ( # noqa: E402 Model, ModelEnsembleParameters, ModelGeneralParameters, diff --git a/qcog_python_client/qcog/_baseclient.py b/qcog_python_client/qcog/_baseclient.py index ee6e2ca..e551f46 100644 --- a/qcog_python_client/qcog/_baseclient.py +++ b/qcog_python_client/qcog/_baseclient.py @@ -10,7 +10,10 @@ from qcog_python_client.log import qcoglogger from qcog_python_client.qcog._base64utils import base642dataframe from qcog_python_client.qcog._data_uploader import encode_base64 -from qcog_python_client.qcog._interfaces import ABCDataClient, ABCRequestClient +from qcog_python_client.qcog._interfaces import ( + IDataClient, + IRequestClient, +) from qcog_python_client.qcog._jsonable_parameters import ( jsonable_inference_parameters, jsonable_train_parameters, @@ -50,6 +53,7 @@ | ModelPytorchParameters ) + logger = qcoglogger.getChild(__name__) @@ -57,8 +61,8 @@ class BaseQcogClient: """Base Qcog Client.""" def __init__(self) -> None: # noqa: D107 - self.http_client: ABCRequestClient - self.data_client: ABCDataClient + self.http_client: IRequestClient + self.data_client: IDataClient self._version: str = DEFAULT_QCOG_VERSION self._model: TrainingModel | None = None self._project: dict | None = None @@ -182,7 +186,7 @@ def trained_model(self) -> dict: @trained_model.setter def trained_model(self, value: dict) -> None: """Set and validate the trained model.""" - if self.model.model_name == Model.pytorch.value: + if self._model and self._model.model_name == Model.pytorch.value: self._trained_model = ( AppSchemasPytorchModelPytorchTrainedModelPayloadResponse.model_validate( value @@ -299,6 +303,13 @@ async def _data(self, data: pd.DataFrame) -> BaseQcogClient: self.dataset = await self.data_client.upload_data(data) return self + async def _upload_data(self, data: pd.DataFrame, dataset_id: str) -> BaseQcogClient: + """Upload Data.""" + # Delegating the upload function to the data client + # So any change in the logic or service will not affect the client + self.dataset = await self.data_client.stream_data(data, dataset_id=dataset_id) + return self + async def _preloaded_data(self, guid: str) -> BaseQcogClient: """Async method to retrieve a dataset that was previously uploaded from guid.""" self.dataset = await self.http_client.get(f"dataset/{guid}") @@ -340,7 +351,7 @@ async def _preloaded_model( if pytorch_model_name: await self._preloaded_pt_model(pytorch_model_name) - if self.model.model_name == Model.pytorch.value: + if self._model and self._model.model_name == Model.pytorch.value: await self._preload_trained_pt_model( guid=guid, force_reload=force_reload, @@ -425,6 +436,35 @@ async def _preload_trained_qcog_model( ) -> BaseQcogClient: """Retrieve a trained model by guid.""" self.trained_model = await self.http_client.get(f"model/{guid}") + # Load training parameters + # _model is actually something that doesn't exists + # in the database, and it's only a part of the training + # parameters. We need to load the training parameters + # to set the _model, because on the actual model there + # aren't information about what kind of model was used + # and this is necessary cause most of the properties + # of the client, need to know what kind of model is + # in order to call the correct API. + await self._preloaded_training_parameters( + self.trained_model["training_parameters_guid"] + ) + + model_name = Model(self.training_parameters["model"]) + + model_params_validator: ( + type[ModelPauliParameters] | type[ModelEnsembleParameters] | None + ) = None + if model_name == Model.pauli: + model_params_validator = ModelPauliParameters + elif model_name == Model.ensemble: + model_params_validator = ModelEnsembleParameters + else: + raise ValueError(f"Model {model_name} not found") + + model_parameters = self.training_parameters["parameters"]["model"] + + self._model = model_params_validator.model_validate(model_parameters) + return self async def _preloaded_pt_model(self, model_name: str) -> BaseQcogClient: @@ -560,7 +600,7 @@ async def _progress(self) -> dict: `status` : TrainingStatus """ - if self.model.model_name == Model.pytorch.value: + if self._model and self.model.model_name == Model.pytorch.value: logger.warning("Progress is not available for PyTorch models.") return {} @@ -576,7 +616,7 @@ async def _progress(self) -> dict: async def _load_trained_model(self) -> None: """Load the status of the current trained model.""" - if self.model.model_name == Model.pytorch.value: + if self._model and self._model.model_name == Model.pytorch.value: raise ValueError("Load trained model is not available for PyTorch models.") self.trained_model = await self.http_client.get( @@ -584,7 +624,7 @@ async def _load_trained_model(self) -> None: ) async def _status(self) -> TrainingStatus: - if self.model.model_name == Model.pytorch.value: + if self._model and self._model.model_name == Model.pytorch.value: return await self._get_pt_trained_model_status() return await self._get_trained_model_status() diff --git a/qcog_python_client/qcog/_data_uploader.py b/qcog_python_client/qcog/_data_uploader.py index dbb03d7..cf5b41c 100644 --- a/qcog_python_client/qcog/_data_uploader.py +++ b/qcog_python_client/qcog/_data_uploader.py @@ -4,20 +4,23 @@ to support multi part uploads or other types of uploads. """ +import gzip + +import aiohttp from pandas.core.api import DataFrame as DataFrame from qcog_python_client.qcog._base64utils import encode_base64 -from qcog_python_client.qcog._interfaces import ABCDataClient, ABCRequestClient +from qcog_python_client.qcog._interfaces import IDataClient, IRequestClient from qcog_python_client.schema import DatasetPayload -class DataClient(ABCDataClient): +class DataClient(IDataClient): """Data Client Uploader. Current implementation that relies on a classic http post request. """ - def __init__(self, http_client: ABCRequestClient) -> None: + def __init__(self, http_client: IRequestClient) -> None: self.http_client = http_client async def upload_data(self, data: DataFrame) -> dict: @@ -29,3 +32,46 @@ async def upload_data(self, data: DataFrame) -> dict: ).model_dump() return await self.http_client.post("dataset", data_payload) + + async def stream_data( + self, + data: DataFrame, + *, + dataset_id: str, + encoding: str = "gzipBase64", + ) -> dict: + """Stream data to the server. + + This method will stream the data to the server in chunks. + + Parameters + ---------- + data : DataFrame + The data to stream to the server. + dataset_id : str + The ID of the dataset to stream the data to. + This should be unique for each Dataset. + encoding : str + The encoding of the data. + + """ + headers = self.http_client.headers + base_url = self.http_client.base_url + url = f"{base_url}/dataset/upload?dataset_id={dataset_id}&format=dataframe&source=client&encoding={encoding}" # noqa: E501 + + # Zip gzip the data + zip_data = gzip.compress(encode_base64(data).encode()) + + form = aiohttp.FormData() + form.add_field( + "file", zip_data, filename="data.csv", content_type="application/gzip" + ) + + try: + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=headers, data=form) as response: + response.raise_for_status() + data_: dict = await response.json() + return data_ + except Exception as e: + raise e diff --git a/qcog_python_client/qcog/_httpclient.py b/qcog_python_client/qcog/_httpclient.py index b091cb3..3ef4da5 100644 --- a/qcog_python_client/qcog/_httpclient.py +++ b/qcog_python_client/qcog/_httpclient.py @@ -12,7 +12,7 @@ import urllib3.util from qcog_python_client.log import qcoglogger as logger -from qcog_python_client.qcog._interfaces import ABCRequestClient +from qcog_python_client.qcog._interfaces import IRequestClient class HttpMethod(Enum): @@ -63,19 +63,29 @@ def __init__( self.port: int = port self.api_version: str = api_version - self.headers = {"Authorization": f"Bearer {self.token}"} + self._headers = {"Authorization": f"Bearer {self.token}"} protocol = "http" if hostname in {"localhost", "127.0.0.1"} else "https" base_url: str = f"{protocol}://{self.hostname}:{self.port}" self.url: str = f"{base_url}/api/{self.api_version}" self.retries: int = retries -class RequestClient(_HTTPClient, ABCRequestClient): +class RequestClient(_HTTPClient, IRequestClient): """Async API client. This class is the async implementation of the API client """ + @property + def headers(self) -> dict: + """Get the headers.""" + return self._headers + + @property + def base_url(self) -> str: + """Get the base url of the request.""" + return self.url + async def _request_retry( self, uri: str, diff --git a/qcog_python_client/qcog/_interfaces.py b/qcog_python_client/qcog/_interfaces.py index aebda7f..ad57826 100644 --- a/qcog_python_client/qcog/_interfaces.py +++ b/qcog_python_client/qcog/_interfaces.py @@ -1,11 +1,11 @@ from abc import ABC, abstractmethod -from typing import overload +from typing import Any, overload import aiohttp import pandas as pd -class ABCRequestClient(ABC): +class IRequestClient(ABC): """Interface for a request client.""" @abstractmethod @@ -37,11 +37,26 @@ async def post( """Execute a post request.""" ... + @property + def headers(self) -> dict: + """Get the headers.""" + raise NotImplementedError -class ABCDataClient(ABC): + @property + def base_url(self) -> str: + """Get the base url of the request.""" + raise NotImplementedError + + +class IDataClient(ABC): """Interface for a data client.""" @abstractmethod async def upload_data(self, data: pd.DataFrame) -> dict: """Upload a dataframe.""" ... + + @abstractmethod + async def stream_data(self, data: Any, *, dataset_id: str) -> dict: + """Download a dataframe.""" + ... diff --git a/qcog_python_client/qcog/qcogasync.py b/qcog_python_client/qcog/qcogasync.py index bfd3765..3f616a9 100644 --- a/qcog_python_client/qcog/qcogasync.py +++ b/qcog_python_client/qcog/qcogasync.py @@ -12,8 +12,8 @@ from qcog_python_client.qcog._data_uploader import DataClient from qcog_python_client.qcog._httpclient import RequestClient from qcog_python_client.qcog._interfaces import ( - ABCDataClient, - ABCRequestClient, + IDataClient, + IRequestClient, ) from qcog_python_client.qcog._version import DEFAULT_QCOG_VERSION from qcog_python_client.schema.common import ( @@ -39,8 +39,8 @@ async def create( api_version: str = "v1", safe_mode: bool = False, version: str = DEFAULT_QCOG_VERSION, - httpclient: ABCRequestClient | None = None, - dataclient: ABCDataClient | None = None, + httpclient: IRequestClient | None = None, + dataclient: IDataClient | None = None, ) -> AsyncQcogClient: """Instantiate a new Qcog client. @@ -102,6 +102,11 @@ async def data(self, data: pd.DataFrame) -> AsyncQcogClient: await self._data(data) return self + async def upload_data(self, data: pd.DataFrame, dataset_id: str) -> AsyncQcogClient: + """Upload data as a stream.""" + await self._upload_data(data, dataset_id) + return self + async def preloaded_data(self, guid: str) -> AsyncQcogClient: """Retrieve a dataset that was previously uploaded from guid. diff --git a/qcog_python_client/qcog/qcogsync.py b/qcog_python_client/qcog/qcogsync.py index 743539f..53f473e 100644 --- a/qcog_python_client/qcog/qcogsync.py +++ b/qcog_python_client/qcog/qcogsync.py @@ -11,8 +11,8 @@ from qcog_python_client.qcog._data_uploader import DataClient from qcog_python_client.qcog._httpclient import RequestClient from qcog_python_client.qcog._interfaces import ( - ABCDataClient, - ABCRequestClient, + IDataClient, + IRequestClient, ) from qcog_python_client.qcog._version import DEFAULT_QCOG_VERSION from qcog_python_client.schema.common import ( @@ -39,8 +39,8 @@ def create( api_version: str = "v1", safe_mode: bool = False, version: str = DEFAULT_QCOG_VERSION, - httpclient: ABCRequestClient | None = None, - dataclient: ABCDataClient | None = None, + httpclient: IRequestClient | None = None, + dataclient: IDataClient | None = None, ) -> QcogClient: """Instantiate a new Qcog client.