Skip to content

Commit

Permalink
Feature/multipart data upload (#57)
Browse files Browse the repository at this point in the history
* tested

* preload trained model

* fix returning data

* tested

* fix typing issue

* [2-feature/trained model selection] - Add trained model selection (#56)

* update small0 and version

* tested

* inference for pytorch is any

* remove indirection

* update type for qcogsync

* fix typing issue

* [0-feature/select model by name] - Add model selection call (#53)

* tested

* preload trained model

* inference for pytorch is any

* remove indirection

---------

Co-authored-by: Sebastien Roy <[email protected]>

* adding value error

* merging branch

* noqa import order

* remove unused import

* update

* format

* test update

* update batch size

---------

Co-authored-by: Sebastien Roy <[email protected]>
  • Loading branch information
vertefra and sebastroy authored Sep 27, 2024
1 parent 0027158 commit 2671bf0
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 41 deletions.
152 changes: 141 additions & 11 deletions examples/train_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
"""Example of training a model."""

import os
import sys
import time

import numpy as np
import pandas
from pandas import DataFrame

from qcog_python_client import AsyncQcogClient, QcogClient
from qcog_python_client.schema import GradOptimizationParameters, GradStateParameters
from qcog_python_client.schema.generated_schema.models import (
AnalyticOptimizationParameters,
LOBPCGFastStateParameters,
)

API_TOKEN = os.environ["API_TOKEN"]

Expand All @@ -18,6 +26,43 @@
learning_rate=0.01,
)

HOST = os.getenv("QCOG_HOST", "dev.qognitive.io")
PORT = os.getenv("QCOG_PORT", 443)
DATASET_ID = os.getenv("DATASET_ID", None)
MODEL_ID = os.getenv("MODEL_ID", None)
DATASET_NAME = os.getenv("DATASET_NAME", "test-1234")

print("------------ Presets ------------")
print("HOST: ", HOST)
print("PORT: ", PORT)
print("DATASET_ID: ", DATASET_ID)
print("MODEL_ID: ", MODEL_ID)
print("DATASET_NAME: ", DATASET_NAME)
print("--------------------------------")


def _get_test_df(size_mb: int) -> tuple[DataFrame, int]:
# Estimate the number of rows needed to reach the desired size
# This is an approximation and may need adjustment
row_size_bytes = 160
num_rows = (size_mb * 1024 * 1024) // row_size_bytes

# Create the DataFrame
df = DataFrame(
{
"X": np.random.choice(["0", "1"], size=num_rows),
"Y": np.random.choice(["0", "1"], size=num_rows),
"Z": np.random.choice(["0", "1"], size=num_rows),
}
)

# Check the actual size and adjust if necessary
actual_size_mb = df.memory_usage(deep=True).sum() / (1024 * 1024)
print(f"Actual DataFrame size: {actual_size_mb:.2f} MB")
print("Dataframe length: ", len(df))

return df, len(df)


def main():
"""Run training."""
Expand Down Expand Up @@ -71,17 +116,102 @@ async def async_main():
return hsm.trained_model["guid"]


async def big_data_test() -> None:
"""Upload data as a stream."""
client = await AsyncQcogClient.create(
token=API_TOKEN,
hostname=HOST,
port=PORT,
)
big_df, batch_size = _get_test_df(10000)

if DATASET_ID is None:
size = big_df.memory_usage(deep=True).sum() / 1024**2
print("Testing Size of big_df MB: ", size)

print("Testing upload_data")

start = time.time()
await client.upload_data(big_df, DATASET_NAME)
end = time.time()
print(f"`upload_data` Time taken to upload {size} MB of data: ", end - start)
else:
print("Using existing dataset")
await client.preloaded_data(DATASET_ID)

print("Test Model Training")
client.pauli(
operators=["X", "Y", "Z"],
)

await client.train(
batch_size=batch_size // 3,
num_passes=1,
weight_optimization=GradOptimizationParameters(
iterations=10,
learning_rate=1e-3,
),
get_states_extra=states_extra,
)

await client.status()
print(client.trained_model)


async def check_status() -> None:
"""Check status."""
client = await AsyncQcogClient.create(
token=API_TOKEN,
hostname=HOST,
port=PORT,
)
if MODEL_ID is None:
raise ValueError("MODEL_GUID is not set")

await client.preloaded_model(MODEL_ID)
status = await client.progress()
print(status)
loss = await client.get_loss()
print(loss)


async def case_ensemble() -> None:
"""Test case ensemble."""
client = await AsyncQcogClient.create(
token=API_TOKEN,
hostname=HOST,
port=PORT,
)

dataset_id = "ab1aae7c-28d7-37eb-a251-1479f61818ab"

client.ensemble(operators=["X", "Y", "Z"], dim=4, num_axes=4, seed=1)

await client.preloaded_data(dataset_id)

await client.train(
batch_size=1000,
num_passes=1,
weight_optimization=AnalyticOptimizationParameters(),
get_states_extra=LOBPCGFastStateParameters(iterations=1, tol=0.05),
)

print(client.trained_model)


if __name__ == "__main__":
print("################################")
print("# SYNC #")
print("################################")
guid = main()
print("################################")
print("# ASYNC #")
print("################################")
import asyncio

asyncio.run(async_main())
print("done")

print(f"\nexport TRAINED_MODEL={guid}")
cmd = sys.argv[1]
if cmd == "train":
asyncio.run(main())
elif cmd == "train_async":
asyncio.run(async_main())
elif cmd == "big_data":
asyncio.run(big_data_test())
elif cmd == "status":
asyncio.run(check_status())
elif cmd == "case_ensemble":
asyncio.run(case_ensemble())
else:
raise ValueError(f"Invalid command: {cmd}")
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"typing_extensions",
"pydantic-settings",
"anyio",
"wandb>=0.17.7"
]

dynamic = ["version"]
Expand All @@ -38,14 +39,12 @@ dev = [
"datamodel-code-generator",
"pytest-asyncio"
]

examples = [
"scikit-learn",
"torch",
"pillow",
]
wandb = [
"wandb>=0.17.7",
]

[tool.setuptools_scm]
version_file = "qcog_python_client/__version__.py"
Expand Down
9 changes: 7 additions & 2 deletions qcog_python_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""Qcog Python Client package."""

from .qcog import AsyncQcogClient, QcogClient
from .schema import (
from pydantic import BaseModel

# Import before everything else in order to patch the BaseModel class
BaseModel.model_config = {"protected_namespaces": ()}

from .qcog import AsyncQcogClient, QcogClient # noqa: E402
from .schema import ( # noqa: E402
Model,
ModelEnsembleParameters,
ModelGeneralParameters,
Expand Down
56 changes: 48 additions & 8 deletions qcog_python_client/qcog/_baseclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from qcog_python_client.log import qcoglogger
from qcog_python_client.qcog._base64utils import base642dataframe
from qcog_python_client.qcog._data_uploader import encode_base64
from qcog_python_client.qcog._interfaces import ABCDataClient, ABCRequestClient
from qcog_python_client.qcog._interfaces import (
IDataClient,
IRequestClient,
)
from qcog_python_client.qcog._jsonable_parameters import (
jsonable_inference_parameters,
jsonable_train_parameters,
Expand Down Expand Up @@ -50,15 +53,16 @@
| ModelPytorchParameters
)


logger = qcoglogger.getChild(__name__)


class BaseQcogClient:
"""Base Qcog Client."""

def __init__(self) -> None: # noqa: D107
self.http_client: ABCRequestClient
self.data_client: ABCDataClient
self.http_client: IRequestClient
self.data_client: IDataClient
self._version: str = DEFAULT_QCOG_VERSION
self._model: TrainingModel | None = None
self._project: dict | None = None
Expand Down Expand Up @@ -182,7 +186,7 @@ def trained_model(self) -> dict:
@trained_model.setter
def trained_model(self, value: dict) -> None:
"""Set and validate the trained model."""
if self.model.model_name == Model.pytorch.value:
if self._model and self._model.model_name == Model.pytorch.value:
self._trained_model = (
AppSchemasPytorchModelPytorchTrainedModelPayloadResponse.model_validate(
value
Expand Down Expand Up @@ -299,6 +303,13 @@ async def _data(self, data: pd.DataFrame) -> BaseQcogClient:
self.dataset = await self.data_client.upload_data(data)
return self

async def _upload_data(self, data: pd.DataFrame, dataset_id: str) -> BaseQcogClient:
"""Upload Data."""
# Delegating the upload function to the data client
# So any change in the logic or service will not affect the client
self.dataset = await self.data_client.stream_data(data, dataset_id=dataset_id)
return self

async def _preloaded_data(self, guid: str) -> BaseQcogClient:
"""Async method to retrieve a dataset that was previously uploaded from guid."""
self.dataset = await self.http_client.get(f"dataset/{guid}")
Expand Down Expand Up @@ -340,7 +351,7 @@ async def _preloaded_model(
if pytorch_model_name:
await self._preloaded_pt_model(pytorch_model_name)

if self.model.model_name == Model.pytorch.value:
if self._model and self._model.model_name == Model.pytorch.value:
await self._preload_trained_pt_model(
guid=guid,
force_reload=force_reload,
Expand Down Expand Up @@ -425,6 +436,35 @@ async def _preload_trained_qcog_model(
) -> BaseQcogClient:
"""Retrieve a trained model by guid."""
self.trained_model = await self.http_client.get(f"model/{guid}")
# Load training parameters
# _model is actually something that doesn't exists
# in the database, and it's only a part of the training
# parameters. We need to load the training parameters
# to set the _model, because on the actual model there
# aren't information about what kind of model was used
# and this is necessary cause most of the properties
# of the client, need to know what kind of model is
# in order to call the correct API.
await self._preloaded_training_parameters(
self.trained_model["training_parameters_guid"]
)

model_name = Model(self.training_parameters["model"])

model_params_validator: (
type[ModelPauliParameters] | type[ModelEnsembleParameters] | None
) = None
if model_name == Model.pauli:
model_params_validator = ModelPauliParameters
elif model_name == Model.ensemble:
model_params_validator = ModelEnsembleParameters
else:
raise ValueError(f"Model {model_name} not found")

model_parameters = self.training_parameters["parameters"]["model"]

self._model = model_params_validator.model_validate(model_parameters)

return self

async def _preloaded_pt_model(self, model_name: str) -> BaseQcogClient:
Expand Down Expand Up @@ -560,7 +600,7 @@ async def _progress(self) -> dict:
`status` : TrainingStatus
"""
if self.model.model_name == Model.pytorch.value:
if self._model and self.model.model_name == Model.pytorch.value:
logger.warning("Progress is not available for PyTorch models.")
return {}

Expand All @@ -576,15 +616,15 @@ async def _progress(self) -> dict:

async def _load_trained_model(self) -> None:
"""Load the status of the current trained model."""
if self.model.model_name == Model.pytorch.value:
if self._model and self._model.model_name == Model.pytorch.value:
raise ValueError("Load trained model is not available for PyTorch models.")

self.trained_model = await self.http_client.get(
f"model/{self.trained_model['guid']}"
)

async def _status(self) -> TrainingStatus:
if self.model.model_name == Model.pytorch.value:
if self._model and self._model.model_name == Model.pytorch.value:
return await self._get_pt_trained_model_status()
return await self._get_trained_model_status()

Expand Down
Loading

0 comments on commit 2671bf0

Please sign in to comment.