From cb237d1e9e6f0c71071ee350cf685475d8a3ad74 Mon Sep 17 00:00:00 2001 From: Sam Greenbury Date: Tue, 7 Nov 2023 17:20:04 +0000 Subject: [PATCH] Add initial dataframe builder class --- python/uatk_spc/builder.py | 84 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 python/uatk_spc/builder.py diff --git a/python/uatk_spc/builder.py b/python/uatk_spc/builder.py new file mode 100644 index 0000000..d050ee9 --- /dev/null +++ b/python/uatk_spc/builder.py @@ -0,0 +1,84 @@ +from typing import Dict, List, Self + +import pandas as pd +import polars as pl +from uatk_spc.reader import DataFrame, SPCReader, backend_error + + +class Builder(SPCReader): + """ + A class for building a flat dataset starting from peeopl per row and combining + additional population fields. + + Attributes: + data (DataFrame | None): DataFrame that is being built. + """ + + data: DataFrame | None + + def __init__( + self, + path: str, + region: str, + input_type: str = "parquet", + backend: str = "polars", + ): + super().__init__(path, region, input_type, backend) + self.data = self.people + + def add_households(self) -> Self: + if self.backend == "polars": + self.data = self.data.unnest("identifiers").join( + self.households, left_on="household", right_on="id", how="left" + ) + return self + elif self.backend == "pandas": + # TODO: handle duplicate column names ("id") + self.data = ( + self.data.drop(columns=["identifiers"]) + .join(pd.json_normalize(self.people["identifiers"])) + .merge(self.households, left_on="household", right_on="id", how="left") + ) + return self + else: + raise backend_error(self.backend) + + def add_time_use_diaries( + self, features: Dict[str, List[str]], diary_type: str = "weekday_diaries" + ) -> Self: + people = ( + self.data.unnest(features.keys()) + .select( + ["id", "household"] + + [el for (_, features) in features.items() for el in features] + + [diary_type] + ) + .explode(diary_type) + ) + time_use_diaries_with_idx = pl.concat( + [ + self.time_use_diaries, + pl.int_range(0, self.time_use_diaries.shape[0], eager=True) + .rename("index") + .cast(pl.UInt64) + .to_frame(), + ], + how="horizontal", + ) + self.data = people.join( + time_use_diaries_with_idx, left_on=diary_type, right_on="index" + ) + return self + + def unnest(self, features: List[str]) -> Self: + # TODO: unnest object/struct columns + pass + + def select(self, features: List[str]) -> Self: + """Select column subset of features from people.""" + # TODO: select columns + pass + + def build(self) -> DataFrame: + """Returns the final built DataFrame.""" + return self.data