diff --git a/setup.cfg b/setup.cfg index bc4dcb75e..f787c5adc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,7 +32,7 @@ project_urls = packages = find: install_requires = colorlog - dspeed>=1.1 + dspeed>=1.2 h5py>=3.2 iminuit legend-daq2lh5>=1.0 diff --git a/src/pygama/flow/data_loader.py b/src/pygama/flow/data_loader.py index 7490271b0..b077c45a1 100644 --- a/src/pygama/flow/data_loader.py +++ b/src/pygama/flow/data_loader.py @@ -13,6 +13,7 @@ import numpy as np import pandas as pd +import polars as pl from dspeed.vis import WaveformBrowser from lgdo import Array, LH5Iterator, LH5Store, Struct, Table, lgdo_utils from lgdo.types.vectorofvectors import build_cl, explode_arrays, explode_cl @@ -572,37 +573,39 @@ def build_entry_list( ) else: tcm_tb = tcm_tables[0] - tcm_path = os.path.join( self.data_dir, self.filedb.tier_dirs[tcm_tier].lstrip("/"), self.filedb.df.iloc[file][f"{tcm_tier}_file"].lstrip("/"), ) - if not os.path.exists(tcm_path): raise FileNotFoundError(f"Can't find TCM file for {tcm_level}") tcm_table_name = self.filedb.get_table_name(tcm_tier, tcm_tb) - try: - tcm_lgdo, _ = sto.read_object(tcm_table_name, tcm_path) - except KeyError: - log.warning(f"Cannot find table {tcm_table_name} in file {tcm_path}") - continue + + tcm_lgdo, _ = sto.read_object(tcm_table_name, tcm_path) + # Have to do some hacky stuff until I get a get_dataframe() method tcm_lgdo[self.tcms[tcm_level]["tcm_cols"]["child_idx"]] = Array( nda=explode_cl(tcm_lgdo["cumulative_length"].nda) ) + tcm_lgdo["index"] = Array( + nda=np.arange( + len(tcm_lgdo[self.tcms[tcm_level]["tcm_cols"]["child_idx"]]) + ) + ) tcm_lgdo.pop("cumulative_length") tcm_tb = Table(col_dict=tcm_lgdo) - f_entries = tcm_tb.get_dataframe() + f_entries = pl.DataFrame(tcm_tb.get_dataframe()) renaming = { self.tcms[tcm_level]["tcm_cols"]["child_idx"]: f"{child}_idx", self.tcms[tcm_level]["tcm_cols"]["parent_tb"]: f"{parent}_table", self.tcms[tcm_level]["tcm_cols"]["parent_idx"]: f"{parent}_idx", } - f_entries.rename(columns=renaming, inplace=True) + f_entries = f_entries.rename(renaming) if self.merge_files: - f_entries["file"] = file + # f_entries.with_columns("file") = file + f_entries = f_entries.with_columns(pl.lit(file).alias("file")) # At this point, should have a list of all available hits/evts joined by tcm if mode == "any": @@ -615,7 +618,6 @@ def build_entry_list( if self.cuts is not None: if level in self.cuts.keys(): cut = self.cuts[level] - col_tiers = col_tiers_dict[level] # Tables in first tier of event should be the same for all tiers in one level @@ -630,21 +632,23 @@ def build_entry_list( # Cut any rows of TCM not relating to requested tables if level == parent: - f_entries.query(f"{level}_table in {tables}", inplace=True) + f_entries = f_entries.filter(pl.col(f"{level}_table").is_in(tables)) for tb in tables: tb_table = None if level == parent: - tcm_idx = f_entries.query(f"{level}_table == {tb}").index + f_entries_filtered = f_entries.filter( + pl.col(f"{level}_table") == tb + ) else: - tcm_idx = f_entries.index - idx_mask = f_entries.loc[tcm_idx, f"{level}_idx"] + f_entries_filtered = f_entries for tier in self.tiers[level]: tier_path = os.path.join( self.data_dir, self.filedb.tier_dirs[tier].lstrip("/"), self.filedb.df.loc[file, f"{tier}_file"].lstrip("/"), ) + # print(type(f_entries_filtered.select(f"{level}_idx")).to_series())#.to_list() if tier in col_tiers[file]["tables"].keys(): if tb in col_tiers[file]["tables"][tier]: table_name = self.filedb.get_table_name(tier, tb) @@ -653,7 +657,9 @@ def build_entry_list( table_name, tier_path, field_mask=cut_cols[level], - idx=idx_mask.tolist(), + idx=f_entries_filtered.select(f"{level}_idx") + .to_series() + .to_list(), ) except KeyError: log.warning( @@ -667,21 +673,31 @@ def build_entry_list( if tb_table is None: continue tb_df = tb_table.get_dataframe() - tb_df.query(cut, inplace=True) - idx_match = f_entries.query(f"{level}_idx in {list(tb_df.index)}") + tb_df.query(cut, inplace=True, engine="numexpr") + + idx_match = f_entries.filter( + pl.col(f"{level}_idx").is_in(list(tb_df.index)) + ) if level == parent: - idx_match = idx_match.query(f"{level}_table == {tb}") + idx_match = idx_match.filter(pl.col(f"{level}_table") == tb) if mode == "only": - keep_idx = idx_match.index drop_idx = set.symmetric_difference( - set(tcm_idx), list(keep_idx) + set(f_entries_filtered.select("index").to_series()), + list(idx_match.select("index").to_series()), + ) + # print(drop_idx) + f_entries = f_entries.filter( + pl.col("index").is_in(drop_idx).not_() ) - f_entries.drop(drop_idx, inplace=True) elif mode == "any": - evts = list(idx_match[f"{child}_idx"].unique()) - keep_idx = f_entries.query(f"{child}_idx in {evts}").index + evts = list( + idx_match.select(f"{child}_idx").to_series().unique() + ) + keep_idx = f_entries.query(f"{child}_idx in {evts}").select( + "index" + ) drop = set.symmetric_difference( - set(f_entries.index), list(keep_idx) + set(f_entries_filtered.select("index")), list(keep_idx) ) if drop_idx is None: drop_idx = drop @@ -695,14 +711,12 @@ def build_entry_list( if col in for_output: f_entries.loc[keep_idx, col] = tb_df[col].tolist() - if mode == "any": - if drop_idx is not None: - f_entries.drop(index=drop_idx, inplace=True) - - f_entries.reset_index(inplace=True, drop=True) + f_entries = f_entries.with_columns( + pl.Series(np.arange(len(f_entries))).alias("index") + ) if in_memory: - entries[file] = f_entries + entries[file] = f_entries.drop("index").to_pandas() if output_file: # Convert f_entries DataFrame to Struct f_dict = f_entries.to_dict("list")