Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use polars for seaexplorer data file load #120

Merged
merged 15 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/getting-started-seaexplorer.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ There are four top-levels to the `deployment.yaml`

The example script is relatively straight forward if there is no intermediate processing. See {ref}`ExProc`, below.

Data comes from an input directory, and is translated to raw glider-dependent netCDF files and put in a new directory. These files are useful of their own right, but are not CF compliant. These files are then merged into a single monolithic netCDF file, and this is translated to a CF-compliant timeseries file. Finally individual profiles are saved and a 2-D 1-m grid in time-depth is saved.
Data comes from an input directory, and is translated to raw glider-dependent parquet files files and put in a new directory. These files are useful of their own right. Apache Parquet is a columnar oriented format for storing tabular data. Parquet files take up less space than netCDF or csv and are much faster to read and write. These files can be opened with [polars.read_parquet](https://pola-rs.github.io/polars-book/user-guide/howcani/io/parquet.html) or [pandas.read_parquet](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html). These files are then merged into a single monolithic parquet file, and this is translated to a CF-compliant timeseries netCDF file. Finally individual profiles are saved and a 2-D 1-m grid in time-depth is saved.

It is likely that between these steps the user will want to add any screening steps, or adjustments to the calibrations. PyGlider does not provide those steps.

Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ dependencies:
- scipy
- bitstring
- pooch
- polars
- pip:
- dbdreader
176 changes: 97 additions & 79 deletions pyglider/seaexplorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import xarray as xr
import yaml
import pyglider.utils as utils
import pandas as pd

import datetime
import polars as pl

_log = logging.getLogger(__name__)

Expand All @@ -25,7 +25,7 @@ def _outputname(f, outdir):
for ff in fns:
fnout += ff.lower() + '.'
filenum = int(fns[4])
return outdir + fnout + 'nc', filenum
return outdir + fnout + 'parquet', filenum


def _needsupdating(ftype, fin, fout):
Expand All @@ -41,7 +41,7 @@ def _sort(ds):
def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
min_samples_in_file=5, dropna_subset=None, dropna_thresh=1):
"""
Convert seaexplorer text files to raw netcdf files.
Convert seaexplorer text files to raw parquet files.

Parameters
----------
Expand Down Expand Up @@ -125,49 +125,59 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
_log.info(f'{f} to {fnout}')
if not incremental or _needsupdating(ftype, f, fnout):
_log.info(f'Doing: {f} to {fnout}')
out = pd.read_csv(f, header=0, delimiter=';',
parse_dates=True, index_col=0,
dayfirst=True)
try:
out = pl.read_csv(f, sep=';')
except:
_log.warning(f'Could not read {f}')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add the badfiles here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

badfiles added

continue
if "Timestamp" in out.columns:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this get a comment? Why do we need this check?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is for corrupted files. I encountered this once in ~ 50 missions I've processed so far. I've added a comment

out = out.with_column(
pl.col("Timestamp").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S"))
out = out.rename({"Timestamp": "time"})
else:
out = out.with_column(
pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%3f"))
out = out.rename({"PLD_REALTIMECLOCK": "time"})
for col_name in out.columns:
if "time" not in col_name.lower():
out = out.with_column(pl.col(col_name).cast(pl.Float64))
# If AD2CP data present, convert timestamps to datetime
if 'AD2CP_TIME' in out.columns:
# Set datestamps with date 00000 to None
out.loc[out.AD2CP_TIME.str[:6] == '000000',
'AD2CP_TIME'] = None
out['AD2CP_TIME'] = pd.to_datetime(out.AD2CP_TIME)
out = out.with_column(
pl.col('AD2CP_TIME').str.strptime(pl.Datetime, fmt="%m%d%y %H:%M:%S", strict=False))

# subsetting for heavily oversampled raw data:
if rawsub=='raw' and dropna_subset is not None:
out = out.dropna(subset=dropna_subset, thresh=dropna_thresh)

with out.to_xarray() as outx:
key = list(outx.coords.keys())[0]
outx = outx.rename({key: 'time'})
outx['fnum'] = ('time',
int(filenum)*np.ones(len(outx['time'])))
if ftype == 'gli':
outx.to_netcdf(fnout[:-3] + '.nc', 'w')
if rawsub == 'raw' and dropna_subset is not None:
out = out.with_column(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a comment, and if it doesn't cause a slowdown or extra writes maybe would benefit from unpacking into separate calls. Looks like you are dropping repeats, but its too many steps in one line for me to follow without spending too much time ;-)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment on this. It's a bit convoluted looking, but it's just the polars equivalent of pandas.dropna. I didn't want to change the functionality of the dropna option in this PR. We can factor it out as a separate PR though?

.sum(axis=1).alias("null_count")).filter(
pl.col("null_count") <= dropna_thresh) \
.drop("null_count")

if ftype == 'gli':
out = out.with_columns([(pl.col("NavState") * 0 + int(filenum)).alias("fnum")])
out.write_parquet(fnout)
goodfiles.append(f)
else:
if out.select("time").shape[0] > min_samples_in_file:
out.write_parquet(fnout)
goodfiles.append(f)
else:
if outx.indexes["time"].size > min_samples_in_file:
outx.to_netcdf(f'{fnout[:-3]}.nc', 'w',
unlimited_dims=['time'])
goodfiles.append(f)
else:
_log.warning('Number of sensor data points'
'too small. Skipping nc write')
badfiles.append(f)
_log.warning('Number of sensor data points'
'too small. Skipping parquet write')
badfiles.append(f)
if len(badfiles) > 0:
_log.warning('Some files could not be parsed:')
for fn in badfiles:
_log.warning('%s', fn)
if not goodfiles:
_log.warning(f'No valid unprocessed seaexplorer files found in'f'{indir}')
return False
_log.info('All raw files converted to nc')
_log.info('All raw files converted to parquet')
return True


def drop_rogue_1970(ds):
def drop_rogue_1970(df):
"""
If dates greater than 1971, 1, 1 are observed, drop any dates before
1971-01-01, from the datset and return it. This function removes 1970
Expand All @@ -180,11 +190,15 @@ def drop_rogue_1970(ds):
Returns:
ds: xarray.DataSet
"""
dt_1971 = np.datetime64("1971-01-01")
dt_1971 = datetime.datetime(1971, 1, 1)
# If all dates before or after 1971-01-01, return the dataset
if (ds.time > dt_1971).all() or (ds.time < dt_1971).all():
return ds
return ds.where(ds.time > dt_1971, drop=True)
pre_1971 = df.filter(pl.col("time") < dt_1971)
if len(pre_1971) == len(df):
return pre_1971
post_1971 = df.filter(pl.col("time") > dt_1971)
if len(post_1971) == len(df):
return post_1971
return df.filter(pl.col("time") > dt_1971)


def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably rename this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed this to the more descriptive drop_pre_1971_samples

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant rename merege_rawnc.... Though that screws with older scripts, but I'd rename this merge_parquet or whatever, and then alias merge_rawnc so old scripts work

Expand Down Expand Up @@ -216,49 +230,44 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
deployment = yaml.safe_load(fin)
metadata = deployment['metadata']
id = metadata['glider_name']
outgli = outdir + '/' + id + '-rawgli.nc'
outpld = outdir + '/' + id + '-' + kind + 'pld.nc'
outgli = outdir + '/' + id + '-rawgli.parquet'
outpld = outdir + '/' + id + '-' + kind + 'pld.parquet'

_log.info('Opening *.gli.sub.*.nc multi-file dataset from %s', indir)
files = sorted(glob.glob(indir+'/*.gli.sub.*.nc'))
_log.info('Opening *.gli.sub.*.parquet multi-file dataset from %s', indir)
files = sorted(glob.glob(indir + '/*.gli.sub.*.parquet'))
if not files:
_log.warning(f'No *gli*.nc files found in {indir}')
_log.warning(f'No *gli*.parquet files found in {indir}')
return False
with xr.open_dataset(files[0]) as gli:
for fil in files[1:]:
try:
with xr.open_dataset(fil) as gli2:
gli = xr.concat([gli, gli2], dim='time')
except:
pass
gli = drop_rogue_1970(gli)
gli.to_netcdf(outgli)
gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet')
gli = drop_rogue_1970(gli)
gli.write_parquet(outgli)
_log.info(f'Done writing {outgli}')

_log.info('Opening *.pld.sub.*.nc multi-file dataset')
files = sorted(glob.glob(indir+'/*.pld1.'+kind+'.*.nc'))
_log.info('Opening *.pld.sub.*.parquet multi-file dataset')
files = sorted(glob.glob(indir + '/*.pld1.' + kind + '.*.parquet'))
if not files:
_log.warning(f'No *{kind}*.nc files found in {indir}')
_log.warning(f'No *{kind}*.parquet files found in {indir}')
return False
with xr.open_dataset(files[0]) as pld:
for fil in files[1:]:
try:
with xr.open_dataset(fil) as pld2:
pld = xr.concat([pld, pld2], dim='time')
except:
pass
pld = drop_rogue_1970(pld)
pld.to_netcdf(outpld)
pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet')
pld = drop_rogue_1970(pld)
pld.write_parquet(outpld)

_log.info(f'Done writing {outpld}')
_log.info('Done merge_rawnc')
return True


def _interp_gli_to_pld(gli, ds, val, indctd):
gli_ind = ~np.isnan(val)
valout = np.interp(ds['time'],
gli['time'][gli_ind],
val[gli_ind])
# switch for if we are comparing two polars dataframes or a polars dataframe and a xarray dataset
if type(ds) is pl.internals.dataframe.frame.DataFrame:
valout = np.interp(ds["time"],
gli.filter(gli_ind)["time"],
val[gli_ind])
else:
valout = np.interp(ds['time'].astype(int),
np.array(gli.filter(gli_ind)["time"].to_numpy().astype('datetime64[ns]')).astype(int),
val[gli_ind])
return valout


Expand Down Expand Up @@ -286,9 +295,9 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
device_data = deployment['glider_devices']
id = metadata['glider_name']
_log.info(f'Opening combined nav file {indir}/{id}-rawgli.nc')
gli = xr.open_dataset(f'{indir}/{id}-rawgli.nc')
_log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.nc')
sensor = xr.open_dataset(f'{indir}/{id}-{kind}pld.nc')
gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet')
_log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet')
sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet')

# build a new data set based on info in `deploymentyaml.`
# We will use ctd as the interpolant
Expand All @@ -304,7 +313,8 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
# It oversamples the nav data, but mildly undersamples the optics and
# oxygen....
if 'timebase' in ncvar:
indctd = np.where(~np.isnan(sensor[ncvar['timebase']['source']]))[0]
vals = sensor.select([ncvar['timebase']['source']]).to_numpy()[:, 0]
indctd = np.where(~np.isnan(vals))[0]
elif 'GPCTD_TEMPERATURE' in list(sensor.variables):
_log.warning('No timebase specified. Using GPCTD_TEMPERATURE as time'
'base')
Expand All @@ -317,35 +327,43 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
_log.warning('No gpctd or legato data found. Using NAV_DEPTH as time'
'base')
indctd = np.where(~np.isnan(sensor.NAV_DEPTH))[0]
ds['time'] = (('time'), sensor['time'].values[indctd], attr)
ds['time'] = (('time'), sensor.select('time').to_numpy()[indctd, 0], attr)
thenames = list(ncvar.keys())
for i in ['time', 'timebase', 'keep_variables']:
if i in thenames:
thenames.remove(i)
for name in thenames:
_log.info('interpolating ' + name)
if not('method' in ncvar[name].keys()):
if not ('method' in ncvar[name].keys()):
# variables that are in the data set or can be interpolated from it
if 'conversion' in ncvar[name].keys():
convert = getattr(utils, ncvar[name]['conversion'])
else:
convert = utils._passthrough
sensorname = ncvar[name]['source']
if sensorname in list(sensor.variables):
if sensorname in list(sensor.columns):
_log.debug('sensorname %s', sensorname)
val = convert(sensor[sensorname])
val = convert(sensor.select(sensorname).to_numpy()[:, 0])
if 'coarsen' in ncvar[name]:
# smooth oxygen data as originally perscribed
coarsen_time = ncvar[name]['coarsen']
sensor_sub = sensor.coarsen(time=coarsen_time,
boundary='trim').mean()
val2 = sensor_sub[sensorname]
val = _interp_gli_to_pld(sensor_sub, sensor, val2, indctd)
coarse_ints = np.arange(0, len(sensor)/coarsen_time, 1/coarsen_time).astype(int)
sensor_sub = sensor.with_columns(pl.lit(coarse_ints).alias("coarse_ints"))
sensor_sub_grouped = sensor_sub.with_column(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block needs a comment as well... It says "smooth" oxygen above, but I'm not following this danse with coarse_ints etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes more sense! I've renamed to merge_parquet and added merge_rawnc = merge_parquet so hopefully older scripts will still work. I've also added more comments to the variables coarsening

pl.col('time').to_physical()
).groupby(
by=pl.col('coarse_ints'),
maintain_order=True
).mean().with_column(
pl.col('time').cast(pl.Datetime('ms'))
)[:-1, :]
val2 = sensor_sub_grouped.select(sensorname).to_numpy()[:, 0]
val = _interp_gli_to_pld(sensor_sub_grouped, sensor, val2, indctd)
val = val[indctd]

ncvar['method'] = 'linear fill'
else:
val = gli[sensorname]
val = gli.select(sensorname).to_numpy()[:, 0]
val = convert(val)
# Values from the glider netcdf must be interpolated to match
# the sensor netcdf
Expand All @@ -355,7 +373,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
ncvar[name].pop('coordinates', None)
attrs = ncvar[name]
attrs = utils.fill_required_attrs(attrs)
ds[name] = (('time'), val.data, attrs)
ds[name] = (('time'), val, attrs)

# fix lon and lat to be linearly interpolated between fixes
good = np.where(np.abs(np.diff(ds.longitude)) +
Expand Down Expand Up @@ -397,7 +415,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
ds = ds.isel(time=good)
len_after_drop = len(ds.time)
proportion_kept = len_after_drop / len_before_drop
loss_str = f"{100 * (1-proportion_kept)} % samples removed by timestamp deduplication."
loss_str = f"{100 * (1 - proportion_kept)} % samples removed by timestamp deduplication."
if proportion_kept < 0.5:
raise ValueError(f"{loss_str} Check input data for duplicate timestamps")
elif proportion_kept < 0.999:
Expand Down Expand Up @@ -441,7 +459,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
ds.ad2cp_time.attrs.pop('units')
ds.to_netcdf(outname, 'w',
encoding={'time': {'units':
'seconds since 1970-01-01T00:00:00Z'}})
'seconds since 1970-01-01T00:00:00Z'}})
return outname


Expand Down
1 change: 1 addition & 0 deletions tests/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies:
- pytest
- pooch
- matplotlib
- polars
- pip:
- dbdreader

2 changes: 1 addition & 1 deletion tests/test_pyglider.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_example_seaexplorer(var):
# Test that each variable and its coordinates match
assert output[var].attrs == test_data[var].attrs
if var not in ['time']:
np.testing.assert_allclose(output[var].values, test_data[var].values)
np.testing.assert_allclose(output[var].values, test_data[var].values, rtol=1e-5)
else:
dt0 = output[var].values - np.datetime64('2000-01-01')
dt1 = test_data[var].values - np.datetime64('2000-01-01')
Expand Down
15 changes: 7 additions & 8 deletions tests/test_seaexplorer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import xarray as xr
import polars as pl
import pytest
from pathlib import Path
import sys
import os
os.system('rm tests/data/realtime_rawnc/*')
library_dir = Path(__file__).parent.parent.absolute()
Expand All @@ -13,7 +12,7 @@
def test__outputname():
fnout, filenum = seaexplorer._outputname('tests/data/realtime_raw/sea035.12.pld1.sub.36',
'tests/data/realtime_rawnc/')
assert fnout == 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc'
assert fnout == 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet'
assert filenum == 36


Expand Down Expand Up @@ -44,7 +43,7 @@ def test_raw_to_rawnc():
def test__needsupdating():
ftype = 'pld1'
fin = 'tests/data/realtime_raw/sea035.12.pld1.sub.36'
fout = 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc'
fout = 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet'
result_badpath = seaexplorer._needsupdating(ftype, fin, 'baz')
result_goodpath = seaexplorer._needsupdating(ftype, fin, fout)
assert result_badpath is True
Expand All @@ -68,11 +67,11 @@ def test_merge_rawnc():

def test__interp_gli_to_pld():
# function should interpolate values from the glider dataset to sampling frequency of payload dataset
glider = xr.open_dataset('tests/data/realtime_rawnc/sea035.0012.gli.sub.0036.nc')
ds = xr.open_dataset('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc')
val = glider.Pitch
glider = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.gli.sub.0036.parquet')
ds = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet')
val = glider.select("Pitch").to_numpy()[:, 0]
pitch_interp = seaexplorer._interp_gli_to_pld(glider, ds, val, None)
assert len(pitch_interp) == len(ds.time)
assert len(pitch_interp) == ds.shape[0]


def test_raw_to_timeseries():
Expand Down