From d1dbe13a534ec514e033c62478f6ae97f6b252b2 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Fri, 29 Nov 2024 09:56:17 +0100 Subject: [PATCH] voto space save --- pyglider/ncprocess.py | 2 +- pyglider/seaexplorer.py | 70 ++++++++++++++++++++++++++++++----------- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/pyglider/ncprocess.py b/pyglider/ncprocess.py index 4587941..b330dc0 100644 --- a/pyglider/ncprocess.py +++ b/pyglider/ncprocess.py @@ -47,7 +47,7 @@ def extract_timeseries_profiles(inname, outdir, deploymentyaml): for p in profiles: ind = np.where(ds.profile_index == p)[0] dss = ds.isel(time=ind) - outname = outdir + '/' + utils.get_file_id(dss) + '.nc' + outname = outdir + '/' + 'mission_grid.nc' _log.info('Checking %s', outname) if not os.path.exists(outname): # this is the id for the whole file, not just this profile.. diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 3ea583d..e86212b 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -97,6 +97,9 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, os.mkdir(outdir) except FileExistsError: pass + with open(deploymentyaml) as fin: + deployment = yaml.safe_load(fin) + ncvar = deployment['netcdf_variables'] for ftype in ['gli', 'pld1']: goodfiles = [] @@ -128,7 +131,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, # Try to read the file with polars. If the file is corrupted (rare), file read will fail and file # is appended to badfiles try: - out = pl.read_csv(f, separator=';') + out = pl.read_csv(f, separator=';', ignore_errors=True) except Exception as e: _log.warning(f'Exception reading {f}: {e}') _log.warning(f'Could not read {f}') @@ -152,20 +155,55 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, out = out.with_columns( pl.col('AD2CP_TIME').str.strptime(pl.Datetime, format="%m%d%y %H:%M:%S", strict=False)) + if "" in out.columns: + out = out.drop("") + out = _remove_fill_values(out, fill_value=9999) + out = _remove_fill_values(out, fill_value=-9999) + out = drop_pre_1971_samples(out) + # subsetting for heavily oversampled raw data: if rawsub == 'raw' and dropna_subset is not None: # This check is the polars equivalent of pandas dropna. See docstring note on dropna out = out.with_columns(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64)) - .sum(axis=1).alias("null_count")).filter( + .horizontal_sum().alias("null_count")).filter( pl.col("null_count") <= dropna_thresh) \ .drop("null_count") - if ftype == 'gli': out = out.with_columns([(pl.col("NavState") * 0 + int(filenum)).alias("fnum")]) + out_special = out.select( + pl.col('time'), + pl.col("^(NavState)$").cast(pl.UInt8), + pl.col("^(fnum)$").cast(pl.UInt16), + pl.col("^(DeadReckoning|)$").cast(pl.Int8), + pl.col("^(DesiredH|)$").cast(pl.Int16), + pl.col("^(SecurityLevel)$").cast(pl.UInt32), + ) + remaining_columns = set(out.columns).difference(set(out_special.columns)) + out_remainder = out.select(remaining_columns).cast(pl.Float32) + out = pl.concat((out_special, out_remainder), how='horizontal') + out.write_parquet(fnout) goodfiles.append(f) else: if out.select("time").shape[0] > min_samples_in_file: + # Drop rows that have no data from the key variables + if 'keep_variables' in ncvar.keys(): + source_keepers = [ncvar[name]['source'] for name in ncvar['keep_variables']] + dropna_thresh = len(source_keepers) + out = out.with_columns(out.select(pl.col(source_keepers).is_null().cast(pl.Int64)) + .sum_horizontal().alias("null_count")).filter( + pl.col("null_count") < dropna_thresh) \ + .drop("null_count") + out_special = out.select( + pl.col("^*AD2CP_TIME$").cast(pl.Datetime), + pl.col("^*time$").cast(pl.Datetime), + pl.col("^(NAV_RESOURCE)$").cast(pl.UInt8), + pl.col("^*COUNT$").cast(pl.Int32), + ) + remaining_columns = set(out.columns).difference(set(out_special.columns)) + out_remainder = out.select(remaining_columns).cast(pl.Float32) + out = pl.concat((out_special, out_remainder), how='horizontal') + out.write_parquet(fnout) goodfiles.append(f) else: @@ -176,6 +214,8 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, _log.warning('Some files could not be parsed:') for fn in badfiles: _log.warning('%s', fn) + from votoutils.utilities.utilities import mailer + mailer("pyglider-error", f"failed to parse files in {indir}: {badfiles}") if not goodfiles: _log.warning(f'No valid unprocessed seaexplorer files found in'f'{indir}') return False @@ -196,11 +236,7 @@ def drop_pre_1971_samples(df): Returns: df: polars.DataFrame """ - dt_1971 = datetime.datetime(1971, 1, 1) - # If all dates before or after 1971-01-01, return the dataset - pre_1971 = df.filter(pl.col("time") < dt_1971) - if len(pre_1971) == len(df): - return pre_1971 + dt_1971 = datetime.datetime(2020, 1, 1) post_1971 = df.filter(pl.col("time") > dt_1971) if len(post_1971) == len(df): return post_1971 @@ -245,17 +281,17 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'): _log.warning(f'No *gli*.parquet files found in {indir}') return False gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet') - gli = drop_pre_1971_samples(gli) gli.write_parquet(outgli) _log.info(f'Done writing {outgli}') - _log.info('Opening *.pld.sub.*.parquet multi-file dataset') files = sorted(glob.glob(indir + '/*.pld1.' + kind + '.*.parquet')) if not files: _log.warning(f'No *{kind}*.parquet files found in {indir}') return False - pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet') - pld = drop_pre_1971_samples(pld) + plds = glob.glob(indir + '/*.pld1.' + kind + '.*.parquet') + plds.sort() + first = pl.read_parquet(plds[0]) + pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet', columns=first.columns, allow_missing_columns=True) pld.write_parquet(outpld) _log.info(f'Done writing {outpld}') @@ -288,7 +324,7 @@ def _interp_pld_to_pld(pld, ds, val, indctd): return val -def _remove_fill_values(df, fill_value=9999): +def _remove_fill_values(df, fill_value=-9999): """ For input dataframe df, this function converts all Float values equaling fill_values to null. Columns of other datatypes are not affected. @@ -318,7 +354,6 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet') _log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet') sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet') - sensor = _remove_fill_values(sensor) # build a new data set based on info in `deploymentyaml.` # We will use ctd as the interpolant @@ -390,7 +425,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', val = np.interp(time_timebase.astype(float), time_var.astype(float), var_non_nan) # interpolate only over those gaps that are smaller than 'maxgap' - tg_ind = utils.find_gaps(time_var.astype(float),time_timebase.astype(float),maxgap) + tg_ind = utils.find_gaps(time_var.astype(float), time_timebase.astype(float), maxgap) val[tg_ind] = np.nan else: val = val[indctd] @@ -430,9 +465,6 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', # some derived variables: ds = utils.get_glider_depth(ds) ds = utils.get_distance_over_ground(ds) - # ds = utils.get_profiles(ds) - ds = utils.get_profiles_new(ds, filt_time=profile_filt_time, - profile_min_time=profile_min_time) ds = utils.get_derived_eos_raw(ds) # somehow this comes out unsorted: @@ -481,7 +513,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', os.mkdir(outdir) except: pass - id0 = ds.attrs['deployment_name'] + id0 = "mission_timeseries" outname = outdir + id0 + fnamesuffix + '.nc' _log.info('writing %s', outname) if 'units' in ds.time.attrs.keys():