diff --git a/docs/getting-started-seaexplorer.md b/docs/getting-started-seaexplorer.md index 33f239c..87ca7ae 100644 --- a/docs/getting-started-seaexplorer.md +++ b/docs/getting-started-seaexplorer.md @@ -30,7 +30,7 @@ There are four top-levels to the `deployment.yaml` The example script is relatively straight forward if there is no intermediate processing. See {ref}`ExProc`, below. -Data comes from an input directory, and is translated to raw glider-dependent netCDF files and put in a new directory. These files are useful of their own right, but are not CF compliant. These files are then merged into a single monolithic netCDF file, and this is translated to a CF-compliant timeseries file. Finally individual profiles are saved and a 2-D 1-m grid in time-depth is saved. +Data comes from an input directory, and is translated to raw glider-dependent parquet files files and put in a new directory. These files are useful of their own right. Apache Parquet is a columnar oriented format for storing tabular data. Parquet files take up less space than netCDF or csv and are much faster to read and write. These files can be opened with [polars.read_parquet](https://pola-rs.github.io/polars-book/user-guide/howcani/io/parquet.html) or [pandas.read_parquet](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html). These files are then merged into a single monolithic parquet file, and this is translated to a CF-compliant timeseries netCDF file. Finally individual profiles are saved and a 2-D 1-m grid in time-depth is saved. It is likely that between these steps the user will want to add any screening steps, or adjustments to the calibrations. PyGlider does not provide those steps. diff --git a/docs/index.md b/docs/index.md index bad8fb1..e18d95f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,7 +10,7 @@ by optional steps to create profiles or depth-time grids. ```python seaexplorer.raw_to_rawnc(rawdir, rawncdir, deploymentyaml) -seaexplorer.merge_rawnc(rawncdir, rawncdir, deploymentyaml, kind='sub') +seaexplorer.merge_parquet(rawncdir, rawncdir, deploymentyaml, kind='sub') outname = seaexplorer.raw_to_timeseries(rawncdir, tsdir, deploymentyaml, kind='sub') # optional... ncprocess.extract_timeseries_profiles(outname, profiledir, deploymentyaml) diff --git a/environment.yml b/environment.yml index f1d086e..7e13558 100644 --- a/environment.yml +++ b/environment.yml @@ -13,5 +13,6 @@ dependencies: - scipy - bitstring - pooch + - polars - pip: - dbdreader diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 59afcc0..2c98c41 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -9,8 +9,8 @@ import xarray as xr import yaml import pyglider.utils as utils -import pandas as pd - +import datetime +import polars as pl _log = logging.getLogger(__name__) @@ -25,7 +25,7 @@ def _outputname(f, outdir): for ff in fns: fnout += ff.lower() + '.' filenum = int(fns[4]) - return outdir + fnout + 'nc', filenum + return outdir + fnout + 'parquet', filenum def _needsupdating(ftype, fin, fout): @@ -41,7 +41,7 @@ def _sort(ds): def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, min_samples_in_file=5, dropna_subset=None, dropna_thresh=1): """ - Convert seaexplorer text files to raw netcdf files. + Convert seaexplorer text files to raw parquet files. Parameters ---------- @@ -125,37 +125,52 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, _log.info(f'{f} to {fnout}') if not incremental or _needsupdating(ftype, f, fnout): _log.info(f'Doing: {f} to {fnout}') - out = pd.read_csv(f, header=0, delimiter=';', - parse_dates=True, index_col=0, - dayfirst=True) + # Try to read the file with polars. If the file is corrupted (rare), file read will fail and file + # is appended to badfiles + try: + out = pl.read_csv(f, sep=';') + except: + _log.warning(f'Could not read {f}') + badfiles.append(f) + continue + # Parse the datetime from nav files (called Timestamp) and pld1 files (called PLD_REALTIMECLOCK) + if "Timestamp" in out.columns: + out = out.with_column( + pl.col("Timestamp").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S")) + out = out.rename({"Timestamp": "time"}) + else: + out = out.with_column( + pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%3f")) + out = out.rename({"PLD_REALTIMECLOCK": "time"}) + for col_name in out.columns: + if "time" not in col_name.lower(): + out = out.with_column(pl.col(col_name).cast(pl.Float64)) # If AD2CP data present, convert timestamps to datetime if 'AD2CP_TIME' in out.columns: # Set datestamps with date 00000 to None - out.loc[out.AD2CP_TIME.str[:6] == '000000', - 'AD2CP_TIME'] = None - out['AD2CP_TIME'] = pd.to_datetime(out.AD2CP_TIME) + out = out.with_column( + pl.col('AD2CP_TIME').str.strptime(pl.Datetime, fmt="%m%d%y %H:%M:%S", strict=False)) # subsetting for heavily oversampled raw data: - if rawsub=='raw' and dropna_subset is not None: - out = out.dropna(subset=dropna_subset, thresh=dropna_thresh) - - with out.to_xarray() as outx: - key = list(outx.coords.keys())[0] - outx = outx.rename({key: 'time'}) - outx['fnum'] = ('time', - int(filenum)*np.ones(len(outx['time']))) - if ftype == 'gli': - outx.to_netcdf(fnout[:-3] + '.nc', 'w') + if rawsub == 'raw' and dropna_subset is not None: + # This check is the polars equivalent of pandas dropna. See docstring note on dropna + out = out.with_column(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64)) + .sum(axis=1).alias("null_count")).filter( + pl.col("null_count") <= dropna_thresh) \ + .drop("null_count") + + if ftype == 'gli': + out = out.with_columns([(pl.col("NavState") * 0 + int(filenum)).alias("fnum")]) + out.write_parquet(fnout) + goodfiles.append(f) + else: + if out.select("time").shape[0] > min_samples_in_file: + out.write_parquet(fnout) goodfiles.append(f) else: - if outx.indexes["time"].size > min_samples_in_file: - outx.to_netcdf(f'{fnout[:-3]}.nc', 'w', - unlimited_dims=['time']) - goodfiles.append(f) - else: - _log.warning('Number of sensor data points' - 'too small. Skipping nc write') - badfiles.append(f) + _log.warning('Number of sensor data points' + 'too small. Skipping parquet write') + badfiles.append(f) if len(badfiles) > 0: _log.warning('Some files could not be parsed:') for fn in badfiles: @@ -163,11 +178,11 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, if not goodfiles: _log.warning(f'No valid unprocessed seaexplorer files found in'f'{indir}') return False - _log.info('All raw files converted to nc') + _log.info('All raw files converted to parquet') return True -def drop_rogue_1970(ds): +def drop_pre_1971_samples(df): """ If dates greater than 1971, 1, 1 are observed, drop any dates before 1971-01-01, from the datset and return it. This function removes 1970 @@ -175,19 +190,23 @@ def drop_rogue_1970(ds): are < 1971-01-01, no action is taken Parameters: - ds: xarray.DataSet - dataset to check for pre-1971 dates + df: polars.DataFrame + dataframe to check for pre-1971 dates Returns: - ds: xarray.DataSet + df: polars.DataFrame """ - dt_1971 = np.datetime64("1971-01-01") + dt_1971 = datetime.datetime(1971, 1, 1) # If all dates before or after 1971-01-01, return the dataset - if (ds.time > dt_1971).all() or (ds.time < dt_1971).all(): - return ds - return ds.where(ds.time > dt_1971, drop=True) + pre_1971 = df.filter(pl.col("time") < dt_1971) + if len(pre_1971) == len(df): + return pre_1971 + post_1971 = df.filter(pl.col("time") > dt_1971) + if len(post_1971) == len(df): + return post_1971 + return df.filter(pl.col("time") > dt_1971) -def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): +def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'): """ Merge all the raw netcdf files in indir. These are meant to be the raw flight and science files from the slocum. @@ -216,39 +235,28 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): deployment = yaml.safe_load(fin) metadata = deployment['metadata'] id = metadata['glider_name'] - outgli = outdir + '/' + id + '-rawgli.nc' - outpld = outdir + '/' + id + '-' + kind + 'pld.nc' + outgli = outdir + '/' + id + '-rawgli.parquet' + outpld = outdir + '/' + id + '-' + kind + 'pld.parquet' - _log.info('Opening *.gli.sub.*.nc multi-file dataset from %s', indir) - files = sorted(glob.glob(indir+'/*.gli.sub.*.nc')) + _log.info('Opening *.gli.sub.*.parquet multi-file dataset from %s', indir) + files = sorted(glob.glob(indir + '/*.gli.sub.*.parquet')) if not files: - _log.warning(f'No *gli*.nc files found in {indir}') + _log.warning(f'No *gli*.parquet files found in {indir}') return False - with xr.open_dataset(files[0]) as gli: - for fil in files[1:]: - try: - with xr.open_dataset(fil) as gli2: - gli = xr.concat([gli, gli2], dim='time') - except: - pass - gli = drop_rogue_1970(gli) - gli.to_netcdf(outgli) + gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet') + gli = drop_pre_1971_samples(gli) + gli.write_parquet(outgli) _log.info(f'Done writing {outgli}') - _log.info('Opening *.pld.sub.*.nc multi-file dataset') - files = sorted(glob.glob(indir+'/*.pld1.'+kind+'.*.nc')) + _log.info('Opening *.pld.sub.*.parquet multi-file dataset') + files = sorted(glob.glob(indir + '/*.pld1.' + kind + '.*.parquet')) if not files: - _log.warning(f'No *{kind}*.nc files found in {indir}') + _log.warning(f'No *{kind}*.parquet files found in {indir}') return False - with xr.open_dataset(files[0]) as pld: - for fil in files[1:]: - try: - with xr.open_dataset(fil) as pld2: - pld = xr.concat([pld, pld2], dim='time') - except: - pass - pld = drop_rogue_1970(pld) - pld.to_netcdf(outpld) + pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet') + pld = drop_pre_1971_samples(pld) + pld.write_parquet(outpld) + _log.info(f'Done writing {outpld}') _log.info('Done merge_rawnc') return True @@ -256,9 +264,15 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): def _interp_gli_to_pld(gli, ds, val, indctd): gli_ind = ~np.isnan(val) - valout = np.interp(ds['time'], - gli['time'][gli_ind], - val[gli_ind]) + # switch for if we are comparing two polars dataframes or a polars dataframe and a xarray dataset + if type(ds) is pl.internals.dataframe.frame.DataFrame: + valout = np.interp(ds["time"], + gli.filter(gli_ind)["time"], + val[gli_ind]) + else: + valout = np.interp(ds['time'].astype(int), + np.array(gli.filter(gli_ind)["time"].to_numpy().astype('datetime64[ns]')).astype(int), + val[gli_ind]) return valout @@ -286,9 +300,9 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', device_data = deployment['glider_devices'] id = metadata['glider_name'] _log.info(f'Opening combined nav file {indir}/{id}-rawgli.nc') - gli = xr.open_dataset(f'{indir}/{id}-rawgli.nc') - _log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.nc') - sensor = xr.open_dataset(f'{indir}/{id}-{kind}pld.nc') + gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet') + _log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet') + sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet') # build a new data set based on info in `deploymentyaml.` # We will use ctd as the interpolant @@ -304,7 +318,8 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', # It oversamples the nav data, but mildly undersamples the optics and # oxygen.... if 'timebase' in ncvar: - indctd = np.where(~np.isnan(sensor[ncvar['timebase']['source']]))[0] + vals = sensor.select([ncvar['timebase']['source']]).to_numpy()[:, 0] + indctd = np.where(~np.isnan(vals))[0] elif 'GPCTD_TEMPERATURE' in list(sensor.variables): _log.warning('No timebase specified. Using GPCTD_TEMPERATURE as time' 'base') @@ -317,35 +332,45 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', _log.warning('No gpctd or legato data found. Using NAV_DEPTH as time' 'base') indctd = np.where(~np.isnan(sensor.NAV_DEPTH))[0] - ds['time'] = (('time'), sensor['time'].values[indctd], attr) + ds['time'] = (('time'), sensor.select('time').to_numpy()[indctd, 0], attr) thenames = list(ncvar.keys()) for i in ['time', 'timebase', 'keep_variables']: if i in thenames: thenames.remove(i) for name in thenames: _log.info('interpolating ' + name) - if not('method' in ncvar[name].keys()): + if not ('method' in ncvar[name].keys()): # variables that are in the data set or can be interpolated from it if 'conversion' in ncvar[name].keys(): convert = getattr(utils, ncvar[name]['conversion']) else: convert = utils._passthrough sensorname = ncvar[name]['source'] - if sensorname in list(sensor.variables): + if sensorname in list(sensor.columns): _log.debug('sensorname %s', sensorname) - val = convert(sensor[sensorname]) + val = convert(sensor.select(sensorname).to_numpy()[:, 0]) if 'coarsen' in ncvar[name]: - # smooth oxygen data as originally perscribed + # coarsen oxygen data as originally perscribed coarsen_time = ncvar[name]['coarsen'] - sensor_sub = sensor.coarsen(time=coarsen_time, - boundary='trim').mean() - val2 = sensor_sub[sensorname] - val = _interp_gli_to_pld(sensor_sub, sensor, val2, indctd) + # create a boolean mask of coarsened timesteps. Use this mask to create an array of samples to keep + coarse_ints = np.arange(0, len(sensor) / coarsen_time, 1 / coarsen_time).astype(int) + sensor_sub = sensor.with_columns(pl.lit(coarse_ints).alias("coarse_ints")) + # Subsample the variable data keeping only the samples from the coarsened timeseries + sensor_sub_grouped = sensor_sub.with_column( + pl.col('time').to_physical() + ).groupby( + by=pl.col('coarse_ints'), + maintain_order=True + ).mean().with_column( + pl.col('time').cast(pl.Datetime('ms')) + )[:-1, :] + val2 = sensor_sub_grouped.select(sensorname).to_numpy()[:, 0] + val = _interp_gli_to_pld(sensor_sub_grouped, sensor, val2, indctd) val = val[indctd] ncvar['method'] = 'linear fill' else: - val = gli[sensorname] + val = gli.select(sensorname).to_numpy()[:, 0] val = convert(val) # Values from the glider netcdf must be interpolated to match # the sensor netcdf @@ -355,7 +380,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', ncvar[name].pop('coordinates', None) attrs = ncvar[name] attrs = utils.fill_required_attrs(attrs) - ds[name] = (('time'), val.data, attrs) + ds[name] = (('time'), val, attrs) # fix lon and lat to be linearly interpolated between fixes good = np.where(np.abs(np.diff(ds.longitude)) + @@ -397,7 +422,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', ds = ds.isel(time=good) len_after_drop = len(ds.time) proportion_kept = len_after_drop / len_before_drop - loss_str = f"{100 * (1-proportion_kept)} % samples removed by timestamp deduplication." + loss_str = f"{100 * (1 - proportion_kept)} % samples removed by timestamp deduplication." if proportion_kept < 0.5: raise ValueError(f"{loss_str} Check input data for duplicate timestamps") elif proportion_kept < 0.999: @@ -441,7 +466,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', ds.ad2cp_time.attrs.pop('units') ds.to_netcdf(outname, 'w', encoding={'time': {'units': - 'seconds since 1970-01-01T00:00:00Z'}}) + 'seconds since 1970-01-01T00:00:00Z'}}) return outname @@ -496,4 +521,6 @@ def _parse_sensor_config(filename): return active_device_dicts -__all__ = ['raw_to_rawnc', 'merge_rawnc', 'raw_to_timeseries'] +merge_rawnc = merge_parquet + +__all__ = ['raw_to_rawnc', 'merge_parquet', 'raw_to_timeseries'] diff --git a/tests/environment.yml b/tests/environment.yml index f227236..70c658a 100644 --- a/tests/environment.yml +++ b/tests/environment.yml @@ -15,6 +15,7 @@ dependencies: - pytest - pooch - matplotlib + - polars - pip: - dbdreader diff --git a/tests/example-data/example-seaexplorer-legato-flntu-arod-ad2cp/process_deploymentRealTime.py b/tests/example-data/example-seaexplorer-legato-flntu-arod-ad2cp/process_deploymentRealTime.py index 81e1ba0..c76e44c 100644 --- a/tests/example-data/example-seaexplorer-legato-flntu-arod-ad2cp/process_deploymentRealTime.py +++ b/tests/example-data/example-seaexplorer-legato-flntu-arod-ad2cp/process_deploymentRealTime.py @@ -22,7 +22,7 @@ # turn seaexplorer zipped csvs into nc files. seaexplorer.raw_to_rawnc(rawdir, rawncdir, deploymentyaml) # merge individual netcdf files into single netcdf files *.gli*.nc and *.pld1*.nc - seaexplorer.merge_rawnc(rawncdir, rawncdir, deploymentyaml, kind='sub') + seaexplorer.merge_parquet(rawncdir, rawncdir, deploymentyaml, kind='sub') # Make level-0 timeseries netcdf file from the raw files... outname = seaexplorer.raw_to_timeseries(rawncdir, l0tsdir, deploymentyaml, kind='sub') ncprocess.extract_timeseries_profiles(outname, profiledir, deploymentyaml) diff --git a/tests/example-data/example-seaexplorer/process_deploymentRealTime.py b/tests/example-data/example-seaexplorer/process_deploymentRealTime.py index 3207dcc..8eef439 100644 --- a/tests/example-data/example-seaexplorer/process_deploymentRealTime.py +++ b/tests/example-data/example-seaexplorer/process_deploymentRealTime.py @@ -26,7 +26,7 @@ # turn *.EBD and *.DBD into *.ebd.nc and *.dbd.nc netcdf files. seaexplorer.raw_to_rawnc(rawdir, rawncdir, deploymentyaml) # merge individual neetcdf files into single netcdf files *.ebd.nc and *.dbd.nc - seaexplorer.merge_rawnc(rawncdir, rawncdir, deploymentyaml, kind='sub') + seaexplorer.merge_parquet(rawncdir, rawncdir, deploymentyaml, kind='sub') # Make level-1 timeseries netcdf file from th raw files... outname = seaexplorer.raw_to_timeseries(rawncdir, l0tsdir, deploymentyaml, kind='sub') diff --git a/tests/test_pyglider.py b/tests/test_pyglider.py index 6203281..80ac25f 100644 --- a/tests/test_pyglider.py +++ b/tests/test_pyglider.py @@ -16,7 +16,7 @@ deploymentyaml = str(example_dir / 'example-seaexplorer/deploymentRealtime.yml') l0tsdir = str(example_dir / 'example-seaexplorer/L0-timeseries-test/') + '/' seaexplorer.raw_to_rawnc(rawdir, rawncdir, deploymentyaml) -seaexplorer.merge_rawnc(rawncdir, rawncdir, deploymentyaml, kind='sub') +seaexplorer.merge_parquet(rawncdir, rawncdir, deploymentyaml, kind='sub') outname = seaexplorer.raw_to_L0timeseries(rawncdir, l0tsdir, deploymentyaml, kind='sub') output = xr.open_dataset(outname) @@ -39,7 +39,7 @@ def test_example_seaexplorer(var): # Test that each variable and its coordinates match assert output[var].attrs == test_data[var].attrs if var not in ['time']: - np.testing.assert_allclose(output[var].values, test_data[var].values) + np.testing.assert_allclose(output[var].values, test_data[var].values, rtol=1e-5) else: dt0 = output[var].values - np.datetime64('2000-01-01') dt1 = test_data[var].values - np.datetime64('2000-01-01') @@ -98,7 +98,7 @@ def test_example_slocum(var): assert output_slocum[var].attrs == test_data_slocum[var].attrs if var not in ['time']: np.testing.assert_allclose(output_slocum[var].values, - test_data_slocum[var].values) + test_data_slocum[var].values, rtol=1e-6) else: dt0 = output_slocum[var].values - np.datetime64('2000-01-01') dt1 = test_data_slocum[var].values - np.datetime64('2000-01-01') @@ -163,7 +163,7 @@ def test_example_slocum_littleendian(var): assert output_slocum_le[var].attrs == test_data_slocum_le[var].attrs if var not in ['time']: np.testing.assert_allclose(output_slocum_le[var].values, - test_data_slocum_le[var].values) + test_data_slocum_le[var].values, rtol=1e-6) else: dt0 = output_slocum_le[var].values - np.datetime64('2000-01-01') dt1 = test_data_slocum_le[var].values - np.datetime64('2000-01-01') diff --git a/tests/test_seaexplorer.py b/tests/test_seaexplorer.py index d24f4ce..d0ed977 100644 --- a/tests/test_seaexplorer.py +++ b/tests/test_seaexplorer.py @@ -1,7 +1,6 @@ -import xarray as xr +import polars as pl import pytest from pathlib import Path -import sys import os os.system('rm tests/data/realtime_rawnc/*') library_dir = Path(__file__).parent.parent.absolute() @@ -13,7 +12,7 @@ def test__outputname(): fnout, filenum = seaexplorer._outputname('tests/data/realtime_raw/sea035.12.pld1.sub.36', 'tests/data/realtime_rawnc/') - assert fnout == 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc' + assert fnout == 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet' assert filenum == 36 @@ -44,7 +43,7 @@ def test_raw_to_rawnc(): def test__needsupdating(): ftype = 'pld1' fin = 'tests/data/realtime_raw/sea035.12.pld1.sub.36' - fout = 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc' + fout = 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet' result_badpath = seaexplorer._needsupdating(ftype, fin, 'baz') result_goodpath = seaexplorer._needsupdating(ftype, fin, fout) assert result_badpath is True @@ -52,12 +51,12 @@ def test__needsupdating(): def test_merge_rawnc(): - result_default = seaexplorer.merge_rawnc( + result_default = seaexplorer.merge_parquet( 'tests/data/realtime_rawnc/', 'tests/data/realtime_rawnc/', example_dir / 'example-seaexplorer/deploymentRealtime.yml') - result_sub = seaexplorer.merge_rawnc( + result_sub = seaexplorer.merge_parquet( 'tests/data/realtime_rawnc/', 'tests/data/realtime_rawnc/', example_dir / 'example-seaexplorer/deploymentRealtime.yml', @@ -68,11 +67,11 @@ def test_merge_rawnc(): def test__interp_gli_to_pld(): # function should interpolate values from the glider dataset to sampling frequency of payload dataset - glider = xr.open_dataset('tests/data/realtime_rawnc/sea035.0012.gli.sub.0036.nc') - ds = xr.open_dataset('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc') - val = glider.Pitch + glider = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.gli.sub.0036.parquet') + ds = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet') + val = glider.select("Pitch").to_numpy()[:, 0] pitch_interp = seaexplorer._interp_gli_to_pld(glider, ds, val, None) - assert len(pitch_interp) == len(ds.time) + assert len(pitch_interp) == ds.shape[0] def test_raw_to_timeseries():