Skip to content

Commit

Permalink
voto space save
Browse files Browse the repository at this point in the history
  • Loading branch information
callumrollo committed Nov 29, 2024
1 parent 812e746 commit d1dbe13
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pyglider/ncprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def extract_timeseries_profiles(inname, outdir, deploymentyaml):
for p in profiles:
ind = np.where(ds.profile_index == p)[0]
dss = ds.isel(time=ind)
outname = outdir + '/' + utils.get_file_id(dss) + '.nc'
outname = outdir + '/' + 'mission_grid.nc'
_log.info('Checking %s', outname)
if not os.path.exists(outname):
# this is the id for the whole file, not just this profile..
Expand Down
70 changes: 51 additions & 19 deletions pyglider/seaexplorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
os.mkdir(outdir)
except FileExistsError:
pass
with open(deploymentyaml) as fin:
deployment = yaml.safe_load(fin)
ncvar = deployment['netcdf_variables']

for ftype in ['gli', 'pld1']:
goodfiles = []
Expand Down Expand Up @@ -128,7 +131,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
# Try to read the file with polars. If the file is corrupted (rare), file read will fail and file
# is appended to badfiles
try:
out = pl.read_csv(f, separator=';')
out = pl.read_csv(f, separator=';', ignore_errors=True)
except Exception as e:
_log.warning(f'Exception reading {f}: {e}')
_log.warning(f'Could not read {f}')
Expand All @@ -152,20 +155,55 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
out = out.with_columns(
pl.col('AD2CP_TIME').str.strptime(pl.Datetime, format="%m%d%y %H:%M:%S", strict=False))

if "" in out.columns:
out = out.drop("")
out = _remove_fill_values(out, fill_value=9999)
out = _remove_fill_values(out, fill_value=-9999)
out = drop_pre_1971_samples(out)

# subsetting for heavily oversampled raw data:
if rawsub == 'raw' and dropna_subset is not None:
# This check is the polars equivalent of pandas dropna. See docstring note on dropna
out = out.with_columns(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64))
.sum(axis=1).alias("null_count")).filter(
.horizontal_sum().alias("null_count")).filter(
pl.col("null_count") <= dropna_thresh) \
.drop("null_count")

if ftype == 'gli':
out = out.with_columns([(pl.col("NavState") * 0 + int(filenum)).alias("fnum")])
out_special = out.select(
pl.col('time'),
pl.col("^(NavState)$").cast(pl.UInt8),
pl.col("^(fnum)$").cast(pl.UInt16),
pl.col("^(DeadReckoning|)$").cast(pl.Int8),
pl.col("^(DesiredH|)$").cast(pl.Int16),
pl.col("^(SecurityLevel)$").cast(pl.UInt32),
)
remaining_columns = set(out.columns).difference(set(out_special.columns))
out_remainder = out.select(remaining_columns).cast(pl.Float32)
out = pl.concat((out_special, out_remainder), how='horizontal')

out.write_parquet(fnout)
goodfiles.append(f)
else:
if out.select("time").shape[0] > min_samples_in_file:
# Drop rows that have no data from the key variables
if 'keep_variables' in ncvar.keys():
source_keepers = [ncvar[name]['source'] for name in ncvar['keep_variables']]
dropna_thresh = len(source_keepers)
out = out.with_columns(out.select(pl.col(source_keepers).is_null().cast(pl.Int64))
.sum_horizontal().alias("null_count")).filter(
pl.col("null_count") < dropna_thresh) \
.drop("null_count")
out_special = out.select(
pl.col("^*AD2CP_TIME$").cast(pl.Datetime),
pl.col("^*time$").cast(pl.Datetime),
pl.col("^(NAV_RESOURCE)$").cast(pl.UInt8),
pl.col("^*COUNT$").cast(pl.Int32),
)
remaining_columns = set(out.columns).difference(set(out_special.columns))
out_remainder = out.select(remaining_columns).cast(pl.Float32)
out = pl.concat((out_special, out_remainder), how='horizontal')

out.write_parquet(fnout)
goodfiles.append(f)
else:
Expand All @@ -176,6 +214,8 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
_log.warning('Some files could not be parsed:')
for fn in badfiles:
_log.warning('%s', fn)
from votoutils.utilities.utilities import mailer
mailer("pyglider-error", f"failed to parse files in {indir}: {badfiles}")
if not goodfiles:
_log.warning(f'No valid unprocessed seaexplorer files found in'f'{indir}')
return False
Expand All @@ -196,11 +236,7 @@ def drop_pre_1971_samples(df):
Returns:
df: polars.DataFrame
"""
dt_1971 = datetime.datetime(1971, 1, 1)
# If all dates before or after 1971-01-01, return the dataset
pre_1971 = df.filter(pl.col("time") < dt_1971)
if len(pre_1971) == len(df):
return pre_1971
dt_1971 = datetime.datetime(2020, 1, 1)
post_1971 = df.filter(pl.col("time") > dt_1971)
if len(post_1971) == len(df):
return post_1971
Expand Down Expand Up @@ -245,17 +281,17 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
_log.warning(f'No *gli*.parquet files found in {indir}')
return False
gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet')
gli = drop_pre_1971_samples(gli)
gli.write_parquet(outgli)
_log.info(f'Done writing {outgli}')

_log.info('Opening *.pld.sub.*.parquet multi-file dataset')
files = sorted(glob.glob(indir + '/*.pld1.' + kind + '.*.parquet'))
if not files:
_log.warning(f'No *{kind}*.parquet files found in {indir}')
return False
pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet')
pld = drop_pre_1971_samples(pld)
plds = glob.glob(indir + '/*.pld1.' + kind + '.*.parquet')
plds.sort()
first = pl.read_parquet(plds[0])
pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet', columns=first.columns, allow_missing_columns=True)
pld.write_parquet(outpld)

_log.info(f'Done writing {outpld}')
Expand Down Expand Up @@ -288,7 +324,7 @@ def _interp_pld_to_pld(pld, ds, val, indctd):
return val


def _remove_fill_values(df, fill_value=9999):
def _remove_fill_values(df, fill_value=-9999):
"""
For input dataframe df, this function converts all Float values equaling fill_values to null. Columns of other
datatypes are not affected.
Expand Down Expand Up @@ -318,7 +354,6 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet')
_log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet')
sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet')
sensor = _remove_fill_values(sensor)

# build a new data set based on info in `deploymentyaml.`
# We will use ctd as the interpolant
Expand Down Expand Up @@ -390,7 +425,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
val = np.interp(time_timebase.astype(float), time_var.astype(float), var_non_nan)

# interpolate only over those gaps that are smaller than 'maxgap'
tg_ind = utils.find_gaps(time_var.astype(float),time_timebase.astype(float),maxgap)
tg_ind = utils.find_gaps(time_var.astype(float), time_timebase.astype(float), maxgap)
val[tg_ind] = np.nan
else:
val = val[indctd]
Expand Down Expand Up @@ -430,9 +465,6 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
# some derived variables:
ds = utils.get_glider_depth(ds)
ds = utils.get_distance_over_ground(ds)
# ds = utils.get_profiles(ds)
ds = utils.get_profiles_new(ds, filt_time=profile_filt_time,
profile_min_time=profile_min_time)
ds = utils.get_derived_eos_raw(ds)

# somehow this comes out unsorted:
Expand Down Expand Up @@ -481,7 +513,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
os.mkdir(outdir)
except:
pass
id0 = ds.attrs['deployment_name']
id0 = "mission_timeseries"
outname = outdir + id0 + fnamesuffix + '.nc'
_log.info('writing %s', outname)
if 'units' in ds.time.attrs.keys():
Expand Down

0 comments on commit d1dbe13

Please sign in to comment.