Skip to content

Commit

Permalink
feat: wip add tslong to tswide conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
sphamba authored and martibosch committed Jul 18, 2024
1 parent c541f40 commit 330b67a
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 38 deletions.
18 changes: 18 additions & 0 deletions tstore/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,21 @@ def _change_series_backend_from_pyarrow(
return series

raise ValueError(f"Unsupported backend: {new_backend}")


def cast_column_to_large_string(df: DataFrame, col: str) -> DataFrame:
"""Cast a column to a large string type."""
if isinstance(df, (DaskDataFrame, PandasDataFrame)):
df[col] = df[col].astype("large_string[pyarrow]")

elif isinstance(df, PolarsDataFrame):
df = df.cast({col: pl.String})

elif isinstance(df, PyArrowDataFrame):
schema = df.schema
field_index = schema.get_field_index(col)
schema = schema.remove(field_index)
schema = schema.insert(field_index, pa.field(col, pa.large_string()))
df = df.cast(target_schema=schema)

return df
28 changes: 28 additions & 0 deletions tstore/tests/test_tslong.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from tstore.tslong.pandas import TSLongPandas
from tstore.tslong.polars import TSLongPolars
from tstore.tslong.pyarrow import TSLongPyArrow
from tstore.tswide.dask import TSWideDask
from tstore.tswide.pandas import TSWidePandas
from tstore.tswide.polars import TSWidePolars
from tstore.tswide.pyarrow import TSWidePyArrow

# Imported fixtures from conftest.py:
# - dask_long_dataframe
Expand All @@ -46,6 +50,13 @@
"pyarrow": TSDFPyArrow,
}

tswide_classes = {
"dask": TSWideDask,
"pandas": TSWidePandas,
"polars": TSWidePolars,
"pyarrow": TSWidePyArrow,
}


# Functions ####################################################################

Expand Down Expand Up @@ -231,3 +242,20 @@ def test_to_tsdf(
np.testing.assert_array_equal(tsdf["tstore_id"], ["1", "2", "3", "4"])
np.testing.assert_array_equal(tsdf["static_var1"], ["A", "B", "C", "D"])
np.testing.assert_array_equal(tsdf["static_var2"], [1.0, 2.0, 3.0, 4.0])


@pytest.mark.parametrize("backend", ["dask", "pandas", "polars", "pyarrow"])
def test_to_tswide(
backend: str,
request,
) -> None:
"""Test the to_tsdf function."""
tslong_fixture_name = f"{backend}_tslong"
tslong = request.getfixturevalue(tslong_fixture_name)
tswide = tslong.to_tswide()

assert isinstance(tswide, tswide_classes[backend])
assert tswide._tstore_id_var == "tstore_id"
assert tswide._tstore_time_var == "time"
assert tswide._tstore_ts_vars == {"ts_var1": ["var1", "var2"], "ts_var2": ["var3", "var4"]}
assert tswide._tstore_static_vars == ["static_var1", "static_var2"]
2 changes: 1 addition & 1 deletion tstore/tslong/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from tstore.tslong.tslong import TSLong


def open_tslong(base_dir: Union[str, Path], *args, backend: Backend = "pandas", **kwargs):
def open_tslong(base_dir: Union[str, Path], *args, backend: Backend = "dask", **kwargs):
"""Read a TStore file structure as a TSLong object."""
ts_long_classes = {
"dask": TSLongDask,
Expand Down
20 changes: 19 additions & 1 deletion tstore/tslong/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,22 @@ def _get_static_values(self) -> dict[str, list]:

def to_tswide(self) -> "TSWideDask":
"""Convert the wrapper into a TSWideDask object."""
raise NotImplementedError
from tstore.tswide.dask import TSWideDask

df = self._obj
df = df.reset_index()
df[self._tstore_id_var] = df[self._tstore_id_var].astype("category").compute()
df = df.pivot_table(
index=self._tstore_time_var,
columns=self._tstore_id_var,
values=df.columns.difference([self._tstore_id_var]),
aggfunc="first",
)

return TSWideDask(
df,
id_var=self._tstore_id_var,
time_var=self._tstore_time_var,
ts_vars=self._tstore_ts_vars,
static_vars=self._tstore_static_vars,
)
17 changes: 16 additions & 1 deletion tstore/tslong/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,19 @@ def from_tstore(

def to_tswide(self) -> "TSWidePandas":
"""Convert the wrapper into a TSWide object."""
raise NotImplementedError
from tstore.tswide.pandas import TSWidePandas

df = self._obj
df = df.pivot_table(
index=self._tstore_time_var,
columns=self._tstore_id_var,
aggfunc="first",
)

return TSWidePandas(
df,
id_var=self._tstore_id_var,
time_var=self._tstore_time_var,
ts_vars=self._tstore_ts_vars,
static_vars=self._tstore_static_vars,
)
6 changes: 1 addition & 5 deletions tstore/tslong/polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

if TYPE_CHECKING:
# To avoid circular imports
from tstore.tswide.polars import TSWidePolars
pass


class TSLongPolars(TSLong):
Expand Down Expand Up @@ -155,7 +155,3 @@ def from_tstore(

# Conversion to polars
return tslong_pyarrow.change_backend(new_backend="polars")

def to_tswide(self) -> "TSWidePolars":
"""Convert the wrapper into a TSWide object."""
raise NotImplementedError
6 changes: 1 addition & 5 deletions tstore/tslong/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

if TYPE_CHECKING:
# To avoid circular imports
from tstore.tswide.pyarrow import TSWidePyArrow
pass


class TSLongPyArrow(TSLong):
Expand Down Expand Up @@ -95,10 +95,6 @@ def from_tstore(
static_vars=static_vars,
)

def to_tswide(self) -> "TSWidePyArrow":
"""Convert the wrapper into a TSWide object."""
raise NotImplementedError


def _read_ts(
fpath,
Expand Down
23 changes: 6 additions & 17 deletions tstore/tslong/tslong.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
"""Module defining the TSLong abstract wrapper."""

from abc import abstractmethod
from typing import TYPE_CHECKING, Optional

import polars as pl
import pyarrow as pa

from tstore.backend import (
Backend,
DaskDataFrame,
DataFrame,
PandasDataFrame,
PolarsDataFrame,
PyArrowDataFrame,
cast_column_to_large_string,
change_backend,
re_set_dataframe_index,
)
Expand Down Expand Up @@ -45,18 +42,7 @@ def __init__(
Defaults to None, which will group all columns not in `static_vars` together.
static_vars (list[str]): List of column names that are static across time. Defaults to None.
"""
if isinstance(df, (DaskDataFrame, PandasDataFrame)):
df[id_var] = df[id_var].astype("large_string[pyarrow]")

elif isinstance(df, PolarsDataFrame):
df = df.cast({id_var: pl.String})

elif isinstance(df, PyArrowDataFrame):
schema = df.schema
field_index = schema.get_field_index(id_var)
schema = schema.remove(field_index)
schema = schema.insert(field_index, pa.field(id_var, pa.large_string()))
df = df.cast(target_schema=schema)
df = cast_column_to_large_string(df, id_var)

# Ensure correct index column
df = re_set_dataframe_index(df, index_var=time_var)
Expand Down Expand Up @@ -130,6 +116,9 @@ def to_tsdf(self) -> "TSDF":
tsdf = dask_tsdf.change_backend(new_backend=self.current_backend)
return tsdf

@abstractmethod
def to_tswide(self) -> "TSWide":
"""Convert the wrapper into a TSWide object."""
dask_tslong = self.change_backend(new_backend="dask")
dask_tswide = dask_tslong.to_tswide()
tswide = dask_tswide.change_backend(new_backend=self.current_backend)
return tswide
2 changes: 1 addition & 1 deletion tstore/tswide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from tstore.tswide.tswide import TSWide


def open_tswide(base_dir: Union[str, Path], *args, backend: Backend = "pandas", **kwargs):
def open_tswide(base_dir: Union[str, Path], *args, backend: Backend = "dask", **kwargs):
"""Read a TStore file structure as a TSWide object."""
ts_wide_classes = {
"dask": TSWideDask,
Expand Down
66 changes: 59 additions & 7 deletions tstore/tswide/tswide.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
"""Module defining the TSWide abstract wrapper."""

from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

from tstore.backend import DaskDataFrame, DataFrame, PandasDataFrame, PolarsDataFrame, PyArrowDataFrame
from tstore.backend import (
DaskDataFrame,
DataFrame,
PandasDataFrame,
PolarsDataFrame,
PyArrowDataFrame,
)
from tstore.tswrapper.tswrapper import TSWrapper

if TYPE_CHECKING:
Expand All @@ -15,6 +21,52 @@
class TSWide(TSWrapper):
"""Abstract wrapper for a wide-form timeseries DataFrame."""

def __init__(
self,
df: DataFrame,
id_var: str,
time_var: str = "time",
ts_vars: Optional[dict[str, list[str]]] = None,
static_vars: Optional[list[str]] = None,
) -> None:
"""Wrap a wide-form timeseries DataFrame as a TSWide object.
Args:
df (DataFrame): DataFrame to wrap.
id_var (str): Name of the column containing the identifier variable.
time_var (str): Name of the column containing the time variable. Defaults to "time".
ts_vars (dict[str, list[str]]): Dictionary of named groups of column names.
Defaults to None, which will group all columns not in `static_vars` together.
static_vars (list[str]): List of column names that are static across time. Defaults to None.
"""
# TODO: Cast id_var to large string
# df = cast_column_to_large_string(df, id_var)

# TODO: Ensure correct index column
# df = re_set_dataframe_index(df, index_var=time_var)

super().__init__(df)

if static_vars is None:
static_vars = []

if ts_vars is None:
ts_vars = {
"ts_variable": [
col for col in df.columns if col != id_var and col != time_var and col not in static_vars
],
}

# Set attributes using __dict__ to not trigger __setattr__
self.__dict__.update(
{
"_tstore_id_var": id_var,
"_tstore_time_var": time_var,
"_tstore_ts_vars": ts_vars,
"_tstore_static_vars": static_vars,
},
)

def __new__(cls, *args, **kwargs) -> "TSWide":
"""When calling TSWide() directly, return the appropriate subclass."""
if cls is TSWide:
Expand All @@ -24,7 +76,7 @@ def __new__(cls, *args, **kwargs) -> "TSWide":
return super().__new__(cls)

@staticmethod
def wrap(df: DataFrame) -> "TSWide":
def wrap(df: DataFrame, *args, **kwargs) -> "TSWide":
"""Wrap a DataFrame in the appropriate TSWide subclass."""
# Lazy import to avoid circular imports
from tstore.tswide.dask import TSWideDask
Expand All @@ -33,16 +85,16 @@ def wrap(df: DataFrame) -> "TSWide":
from tstore.tswide.pyarrow import TSWidePyArrow

if isinstance(df, DaskDataFrame):
return TSWideDask(df)
return TSWideDask(df, *args, **kwargs)

if isinstance(df, PandasDataFrame):
return TSWidePandas(df)
return TSWidePandas(df, *args, **kwargs)

if isinstance(df, PolarsDataFrame):
return TSWidePolars(df)
return TSWidePolars(df, *args, **kwargs)

if isinstance(df, PyArrowDataFrame):
return TSWidePyArrow(df)
return TSWidePyArrow(df, *args, **kwargs)

type_path = f"{type(df).__module__}.{type(df).__qualname__}"
raise TypeError(f"Cannot wrap type {type_path} as a TSWide object.")
Expand Down

0 comments on commit 330b67a

Please sign in to comment.