Skip to content

Commit

Permalink
ENH: fix polars incompatibilities
Browse files Browse the repository at this point in the history
  • Loading branch information
jklymak committed Mar 25, 2024
1 parent 71c6e13 commit 95744cd
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
33 changes: 17 additions & 16 deletions pyglider/seaexplorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,34 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
# Try to read the file with polars. If the file is corrupted (rare), file read will fail and file
# is appended to badfiles
try:
out = pl.read_csv(f, sep=';')
except:
out = pl.read_csv(f, separator=';')
except Exception as e:
_log.warning(f'Exception reading {f}: {e}')
_log.warning(f'Could not read {f}')
badfiles.append(f)
continue
# Parse the datetime from nav files (called Timestamp) and pld1 files (called PLD_REALTIMECLOCK)
if "Timestamp" in out.columns:
out = out.with_column(
pl.col("Timestamp").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S"))
out = out.with_columns(
pl.col("Timestamp").str.strptime(pl.Datetime, format="%d/%m/%Y %H:%M:%S"))
out = out.rename({"Timestamp": "time"})
else:
out = out.with_column(
pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%3f"))
out = out.with_columns(
pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, format="%d/%m/%Y %H:%M:%S.%3f"))
out = out.rename({"PLD_REALTIMECLOCK": "time"})
for col_name in out.columns:
if "time" not in col_name.lower():
out = out.with_column(pl.col(col_name).cast(pl.Float64))
out = out.with_columns(pl.col(col_name).cast(pl.Float64))
# If AD2CP data present, convert timestamps to datetime
if 'AD2CP_TIME' in out.columns:
# Set datestamps with date 00000 to None
out = out.with_column(
pl.col('AD2CP_TIME').str.strptime(pl.Datetime, fmt="%m%d%y %H:%M:%S", strict=False))
out = out.with_columns(
pl.col('AD2CP_TIME').str.strptime(pl.Datetime, format="%m%d%y %H:%M:%S", strict=False))

# subsetting for heavily oversampled raw data:
if rawsub == 'raw' and dropna_subset is not None:
# This check is the polars equivalent of pandas dropna. See docstring note on dropna
out = out.with_column(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64))
out = out.with_columns(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64))
.sum(axis=1).alias("null_count")).filter(
pl.col("null_count") <= dropna_thresh) \
.drop("null_count")
Expand Down Expand Up @@ -265,7 +266,7 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
def _interp_gli_to_pld(gli, ds, val, indctd):
gli_ind = ~np.isnan(val)
# switch for if we are comparing two polars dataframes or a polars dataframe and a xarray dataset
if type(ds) is pl.internals.dataframe.frame.DataFrame:
if type(ds) is pl.DataFrame:
valout = np.interp(ds["time"],
gli.filter(gli_ind)["time"],
val[gli_ind])
Expand Down Expand Up @@ -296,7 +297,7 @@ def _remove_fill_values(df, fill_value=9999):
pl.when(pl.col(pl.Float64) == fill_value)
.then(None)
.otherwise(pl.col(pl.Float64))
.keep_name()
.name.keep()
)
return df

Expand Down Expand Up @@ -364,12 +365,12 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
coarse_ints = np.arange(0, len(sensor) / coarsen_time, 1 / coarsen_time).astype(int)
sensor_sub = sensor.with_columns(pl.lit(coarse_ints).alias("coarse_ints"))
# Subsample the variable data keeping only the samples from the coarsened timeseries
sensor_sub_grouped = sensor_sub.with_column(
sensor_sub_grouped = sensor_sub.with_columns(
pl.col('time').to_physical()
).groupby(
).group_by(
by=pl.col('coarse_ints'),
maintain_order=True
).mean().with_column(
).mean().with_columns(
pl.col('time').cast(pl.Datetime('ms'))
)[:-1, :]
val2 = sensor_sub_grouped.select(sensorname).to_numpy()[:, 0]
Expand All @@ -388,7 +389,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
else:
val = np.interp(time_timebase.astype(float), time_var.astype(float), var_non_nan)

# interpolate only over those gaps that are smaller than 'maxgap'
# interpolate only over those gaps that are smaller than 'maxgap'
tg_ind = utils.find_gaps(time_var.astype(float),time_timebase.astype(float),maxgap)
val[tg_ind] = np.nan
else:
Expand Down
12 changes: 7 additions & 5 deletions tests/test_seaexplorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,25 @@ def test_raw_to_rawnc():
# Default settings on a clean folder
result_default = seaexplorer.raw_to_rawnc('tests/data/realtime_raw/',
'tests/data/realtime_rawnc/', None)
assert result_default is True
# Test the reprocess flag works
result_reprocess = seaexplorer.raw_to_rawnc('tests/data/realtime_raw/',
'tests/data/realtime_rawnc/',
None, incremental=False)
assert result_reprocess is True

# Check that reprocessing not preformed by default
result_no_new_files = seaexplorer.raw_to_rawnc('tests/data/realtime_raw/',
'tests/data/realtime_rawnc/',
None)
assert result_no_new_files is False

# Reject all payload files with fewer than 10000 lines
result_strict = seaexplorer.raw_to_rawnc('tests/data/realtime_raw/',
'tests/data/realtime_rawnc/',
None,
incremental=False,
min_samples_in_file=10000)
assert result_default is True
assert result_reprocess is True
assert result_no_new_files is False
assert result_strict is False


Expand All @@ -65,8 +67,8 @@ def test_merge_rawnc():
kind='sub')
assert result_default is False
assert result_sub is True


def test__remove_fill_values():
# This should convert values equallling 9999 in the original df to nan
df_in = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet')
Expand Down

0 comments on commit 95744cd

Please sign in to comment.