From ae02f4d0b349f88959223a6294d0d5b41f703e12 Mon Sep 17 00:00:00 2001 From: Ed Safford <62339196+EdwardSafford-NOAA@users.noreply.github.com> Date: Wed, 18 Oct 2023 10:22:15 -0400 Subject: [PATCH] Add support for DA monitor binary station data files (#160) --- src/eva/data/mon_data_space.py | 341 +++++++++++++++--- .../batch/base/plot_tools/dynamic_config.py | 5 +- .../batch/emcpy/diagnostics/map_scatter.py | 14 +- 3 files changed, 310 insertions(+), 50 deletions(-) diff --git a/src/eva/data/mon_data_space.py b/src/eva/data/mon_data_space.py index b0e2c49f..b273a150 100644 --- a/src/eva/data/mon_data_space.py +++ b/src/eva/data/mon_data_space.py @@ -13,7 +13,7 @@ import os import numpy as np -from xarray import Dataset, concat +from xarray import Dataset, concat, merge, align from scipy.io import FortranFile from datetime import datetime @@ -41,16 +41,31 @@ def execute(self, dataset_config, data_collections, timing): timing (Timing): Timing object for tracking execution time. """ + stn_data = False + # Set the collection name # ----------------------- collection_name = get(dataset_config, self.logger, 'name') + # Filenames to be read into this collection + # ----------------------------------------- + filenames = get(dataset_config, self.logger, 'filenames') + # Get control file and parse # -------------------------- control_file = get(dataset_config, self.logger, 'control_file') - coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict = ( - self.get_ctl_dict(control_file[0])) - ndims_used, dims_arr = self.get_ndims_used(dims) + + dims_arr = [] + if self.is_stn_data(control_file[0]): + coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict = ( + self.get_stn_ctl_dict(control_file[0])) + ndims_used = 2 + dims_arr = ['xdef', 'ydef', 'zdef'] + stn_data = True + else: + coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict = ( + self.get_ctl_dict(control_file[0])) + ndims_used, dims_arr = self.get_ndims_used(dims) # Get the groups to be read # ------------------------- @@ -77,41 +92,60 @@ def execute(self, dataset_config, data_collections, timing): channo = chans_dict["chan_nums"] if chans_dict is not None else None x_range, y_range, z_range = self.get_dim_ranges(coords, dims, channo) - # Filenames to be read into this collection - # ----------------------------------------- - filenames = get(dataset_config, self.logger, 'filenames') - ds_list = [] - # Get missing value threshold # --------------------------- threshold = float(get(dataset_config, self.logger, 'missing_value_threshold', 1.0e30)) + ds_list = [] for filename in filenames: - # read data file - darr, cycle_tm = self.read_ieee(filename, coords, dims, ndims_used, - dims_arr, nvars, vars) + lat = [] + lon = [] + if stn_data: + + # Read station data file. Note that the variable dimensions + # will NOT be the same for different station data files. + darr, cycle_tm, dims, lat, lon = self.read_stn_ieee(filename, coords, dims, + ndims_used, dims_arr, nvars, + vars) + y_range = np.arange(1, dims['ydef']+1) + + else: + # read data file + darr, cycle_tm = self.read_ieee(filename, coords, dims, ndims_used, + dims_arr, nvars, vars) # add cycle as a variable to data array cyc_darr = self.var_to_np_array(dims, ndims_used, dims_arr, cycle_tm) # create dataset from file contents timestep_ds = None - timestep_ds = self.load_dset(vars, nvars, coords, darr, dims, ndims_used, - dims_arr, x_range, y_range, z_range, cyc_darr, channo) + dims_arr, x_range, y_range, z_range, channo, cyc_darr) if attribs['sat']: timestep_ds.attrs['satellite'] = attribs['sat'] if attribs['sensor']: timestep_ds.attrs['sensor'] = attribs['sensor'] + # Add lat and lon variables. This is done separately because they are + # only single dimension arrays unlike the obs which are 2d (level, nobs). + if stn_data and len(lat): + timestep_ds['lat'] = (['Nobs'], (lat)) + if stn_data and len(lon): + timestep_ds['lon'] = (['Nobs'], (lon)) + # add cycle_tm dim for concat timestep_ds['Time'] = cycle_tm.strftime("%Y%m%d%H") # Add this dataset to the list of ds_list ds_list.append(timestep_ds) + # Align all datasets. This syncs the dimensions and variables + # of all datasets in ds_list using NaN for all missing data. + if stn_data: + ds_list = align(*ds_list, join='outer', exclude=[]) + # Concatenate datasets from ds_list into a single dataset ds = concat(ds_list, dim='Time') @@ -247,7 +281,6 @@ def subset_coordinate(self, ds, coordinate, requested_subset, chans_dict): # ---------------------------------------------------------------------------------------------- - # Parse control file and return elements in dictionaries def get_ctl_dict(self, control_file): """ @@ -271,7 +304,7 @@ def get_ctl_dict(self, control_file): coord_list = [] dims = {'xdef': 0, 'ydef': 0, 'zdef': 0} dim_list = [] - attribs = {'sensor': None, 'sat': None} + attribs = {'sensor': None, 'sat': None, 'dtype': None} vars = [] nvars = 0 chans_dict = None @@ -341,6 +374,9 @@ def get_ctl_dict(self, control_file): attribs['sensor'] = strs[1] attribs['sat'] = strs[2] + if line.find('dtype station') != -1 or line.find('DTYPE station') != -1: + attribs['dtype'] = 'station' + # Note we need to extract the actual channel numbers. We have the # number of channels via the xdef line, but they are not necessarily # ordered consecutively. @@ -358,6 +394,7 @@ def get_ctl_dict(self, control_file): chan_nassim.append(int(strs[4])) if line.find('level=') != -1: + strs = line.split() tlev = strs[2].replace(',', '') if tlev.isdigit(): @@ -377,7 +414,7 @@ def get_ctl_dict(self, control_file): # Ignore any coordinates in the control file that have a value of 1. used = 0 mydef = ["xdef", "ydef", "zdef"] - for x in range(3): + for x in range(len(dim_list)): if dim_list[x] > 2: coords[mydef[used]] = coord_list[x] dims[mydef[used]] = dim_list[x] @@ -411,9 +448,116 @@ def get_ctl_dict(self, control_file): levs_dict = {'levels': levs, 'levels_assim': level_assim, 'levels_nassim': level_nassim} + fp.close() + return coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict + + # ---------------------------------------------------------------------------------------------- + + def get_stn_ctl_dict(self, control_file): + + """ + Parse the station data control file and extract information into dictionaries. + + Args: + control_file (str): Path to the control file. + + Returns: + dict: Dictionary containing various coordinates and information. + dict: Dictionary containing dimension sizes. + dict: Dictionary containing sensor and satellite attributes. + int: Number of variables. + list: List of variable names. + list: List of scan positions. + dict: Dictionary containing channel information. + dict: Dictionary containing level information. + """ + coords = {'xdef': 'Level', 'ydef': 'Nobs', 'zdef': None} + dims = {'xdef': 0, 'ydef': 0, 'zdef': 0} + dim_list = [] + attribs = {'sensor': None, 'sat': None, 'dtype': 'station'} + vars = [] + nvars = 0 + chans_dict = None + scanpo = None + levs_dict = None + levs = [] + lev_vals = [] + level_assim = [] + level_nassim = [] + + coord_dict = { + 'channel': 'Channel', + 'scan': 'Scan', + 'pressure': 'Level', + 'region': 'Region', + 'iter': 'Iteration' + } + + with open(control_file, 'r') as fp: + lines = fp.readlines() + for line in lines: + + # Find level information + if line.find('level=') != -1: + strs = line.split() + + lev_vals.append({'lev_val': strs[4], 'iuse': strs[7], 'err_val': strs[10]}) + lev_str = strs[2].split(',') + levs.append(int(lev_str[0])) + + if strs[7] == '1': + level_assim.append(int(lev_str[0])) + else: + level_nassim.append(int(lev_str[0])) + + if line.find('vars') != -1: + strs = line.split() + for st in strs: + if st.isdigit(): + nvars = int(st) + + if line.find('title') != -1: + if line.find('conventional') == -1 and line.find('gsistat') == -1: + strs = line.split() + attribs['sensor'] = strs[1] + attribs['sat'] = strs[2] + + # The list of variables is at the end of the file between the lines + # "vars" and "end vars". Note that for ozn station control files the + # var is repeated for every level (e.g. obs1, obs2, obs3, etc). These + # need to be combined into single entries in the vars list and nvars + # then set to the final size of the vars list. + start = len(lines) - (nvars + 1) + for x in range(start, start + nvars): + strs = lines[x].split() + if strs[-1] not in vars: + vars.append(strs[-1]) + nvars = len(vars) + + # set levels + dim_list.append(len(lev_vals)) + dim_list.append(0) + + # Ignore any coordinates in the control file that have a value of 1. + used = 0 + mydef = ["xdef", "ydef", "zdef"] + + if 'Level' in coords.values(): + for x in range(len(level_assim), len(levs)): + level_assim.append(0) + for x in range(len(level_nassim), len(levs)): + level_nassim.append(0) + levs_dict = {'levels': levs, + 'levels_assim': level_assim, + 'levels_nassim': level_nassim} + dims['xdef'] = len(levs) + + fp.close() return coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict + # ---------------------------------------------------------------------------------------------- + def read_ieee(self, file_name, coords, dims, ndims_used, dims_arr, nvars, vars, file_path=None): """ @@ -501,6 +645,7 @@ def read_ieee(self, file_name, coords, dims, ndims_used, dims_arr, nvars, vars, arr = np.transpose(f.read_reals(dtype=np.dtype('>f4')).reshape(dimensions)) else: arr = zarray + rtn_array = np.append(rtn_array, [arr], axis=0) if load_data: @@ -509,6 +654,82 @@ def read_ieee(self, file_name, coords, dims, ndims_used, dims_arr, nvars, vars, # ---------------------------------------------------------------------------------------------- + def read_stn_ieee(self, file_name, coords, dims, ndims_used, dims_arr, + nvars, vars, file_path=None): + + """ + Read station data from an IEEE file and arrange it into a numpy array. + + Args: + file_name (str): Name of the IEEE station file to read. + coords (dict): Dictionary of coordinates. + dims (dict): Dictionary of dimension sizes. + ndims_used (int): Number of dimensions used. + dims_arr (list): List of dimension names used. + nvars (int): Number of variables. + vars (list): List of variable names. + file_path (str, optional): Path to the directory containing the file. Defaults to None. + + Returns: + numpy.ndarray: Numpy array containing the read data. + datetime.datetime: Cycle time extracted from the filename. + list: Updated list of dimensions + list: Lat location of obs + list: Lon location of obs + """ + + rtn_array = None + + # find cycle time in filename and create cycle_tm as datetime object + cycle_tm = None + cycstrs = file_name.replace('/', '.').split('.') + + for cycstr in cycstrs: + if ((cycstr.isnumeric()) and (len(cycstr) == 10)): + cycle_tm = datetime(int(cycstr[0:4]), int(cycstr[4:6]), + int(cycstr[6:8]), int(cycstr[8:])) + + filename = os.path.join(file_path, file_name) if file_path else file_name + + load_data = True + if os.path.isfile(filename): + f = FortranFile(filename, 'r', '>u4') + else: + self.logger.info(f"WARNING: file {filename} is missing") + load_data = False + + mylist = [] + lat = [] + lon = [] + numobs = 0 + + if load_data: + nlev = 1 + while (nlev): + + # Header components are stn id, lat, lon, time, nlev, flag. + # Data follows header if nlev > 0, else it's EOF. + record = f.read_record('>i8', '>f4', '>f4', '>f4', '>i4', '>i4') + nlev = record[4] + if nlev: + lat.append(record[1]) + lon.append(record[2]) + data = f.read_record('>f4').reshape([nvars, dims[dims_arr[0]]]) + numobs += 1 + mylist.append(data) + + # dimensions are nvar, nlev, numobs + rtn_array = np.dstack(mylist) + dims['ydef'] = numobs + + f.close() + rtn_lat = np.asarray(lat).reshape(-1) + rtn_lon = np.asarray(lon).reshape(-1) + + return rtn_array, cycle_tm, dims, rtn_lat, rtn_lon + + # ---------------------------------------------------------------------------------------------- + def var_to_np_array(self, dims, ndims_used, dims_arr, var): """ @@ -617,7 +838,7 @@ def get_ndims_used(self, dims): # ---------------------------------------------------------------------------------------------- def load_dset(self, vars, nvars, coords, darr, dims, ndims_used, - dims_arr, x_range, y_range, z_range, cyc_darr, channo): + dims_arr, x_range, y_range, z_range, channo, cyc_darr=None): """ Create a dataset from various components. @@ -666,35 +887,46 @@ def load_dset(self, vars, nvars, coords, darr, dims, ndims_used, new_ds = Dataset.from_dict(d) rtn_ds = new_ds if rtn_ds is None else rtn_ds.merge(new_ds) + # Define new coordinates + new_coords = { + coords[dims_arr[0]]: x_range, + coords[dims_arr[1]]: y_range + } + + # Add the new coordinates to the dataset + rtn_ds = rtn_ds.assign_coords(new_coords) + # tack on 'cycle' as a variable # ----------------------------- - if ndims_used == 1: - new_cyc = Dataset( - { - 'cycle': ((coords[dims_arr[0]]), cyc_darr), - }, - coords={coords[dims_arr[0]]: x_range}, - ) - if ndims_used == 2: - new_cyc = Dataset( - { - 'cycle': ((coords[dims_arr[0]], coords[dims_arr[1]]), cyc_darr), - }, - coords={coords[dims_arr[0]]: x_range, - coords[dims_arr[1]]: y_range}, - ) - if ndims_used == 3: - new_cyc = Dataset( - { - 'cycle': ((coords[dims_arr[0]], coords[dims_arr[1]], - coords[dims_arr[2]]), cyc_darr), - }, - coords={coords[dims_arr[0]]: x_range, - coords[dims_arr[1]]: y_range, - coords[dims_arr[2]]: z_range}, - ) - - rtn_ds = rtn_ds.merge(new_cyc) + if cyc_darr is not None: + if ndims_used == 1: + new_cyc = Dataset( + { + 'cycle': ((coords[dims_arr[0]]), cyc_darr), + }, + coords={coords[dims_arr[0]]: x_range}, + ) + if ndims_used == 2: + new_cyc = Dataset( + { + 'cycle': ((coords[dims_arr[0]], coords[dims_arr[1]]), cyc_darr), + }, + coords={coords[dims_arr[0]]: x_range, + coords[dims_arr[1]]: y_range}, + ) + if ndims_used == 3: + new_cyc = Dataset( + { + 'cycle': ((coords[dims_arr[0]], coords[dims_arr[1]], + coords[dims_arr[2]]), cyc_darr), + }, + coords={coords[dims_arr[0]]: x_range, + coords[dims_arr[1]]: y_range, + coords[dims_arr[2]]: z_range}, + ) + + rtn_ds = rtn_ds.merge(new_cyc) + return rtn_ds # ---------------------------------------------------------------------------------------------- @@ -744,3 +976,22 @@ def loadConditionalItems(self, dataset, chans_dict, levs_dict, scanpo): return dataset # ---------------------------------------------------------------------------------------------- + + def is_stn_data(self, control_file): + + """ + Check for station data indicator in control file. + + Args: + control_file(string): control file name. + + Returns: + is_stn(boolean): True if this is a control file. + """ + + with open(control_file, 'r') as fp: + content = fp.read() + is_stn = True if 'DTYPE station' in content or 'dtype station' in content else False + fp.close() + + return is_stn diff --git a/src/eva/plotting/batch/base/plot_tools/dynamic_config.py b/src/eva/plotting/batch/base/plot_tools/dynamic_config.py index 5421f118..ffe24508 100644 --- a/src/eva/plotting/batch/base/plot_tools/dynamic_config.py +++ b/src/eva/plotting/batch/base/plot_tools/dynamic_config.py @@ -62,14 +62,15 @@ def vminvmaxcmap(logger, option_dict, plots_dict, data_collections): # is kept percentage_capture = option_dict.get('percentage capture', 100) - # Optionally the data might have a channel. + # Optionally the data might have a channel or level. channel = option_dict.get('channel', None) + level = option_dict.get('level', None) # Get the data variable to use for determining cbar limits varname = option_dict.get('data variable') varname_cgv = varname.split('::') datavar = data_collections.get_variable_data(varname_cgv[0], varname_cgv[1], - varname_cgv[2], channel) + varname_cgv[2], channel, level) # Reorder the data array datavar = np.sort(datavar) diff --git a/src/eva/plotting/batch/emcpy/diagnostics/map_scatter.py b/src/eva/plotting/batch/emcpy/diagnostics/map_scatter.py index 9544aba1..b263d754 100644 --- a/src/eva/plotting/batch/emcpy/diagnostics/map_scatter.py +++ b/src/eva/plotting/batch/emcpy/diagnostics/map_scatter.py @@ -43,19 +43,26 @@ def __init__(self, config, logger, dataobj): """ # prepare data based on config - # Optionally get the channel to plot + # Optionally get the channel or level to plot channel = None if 'channel' in config['data']: channel = config['data'].get('channel') + level = None + if 'level' in config: + level = config.get('level') + lonvar_cgv = config['longitude']['variable'].split('::') lonvar = dataobj.get_variable_data(lonvar_cgv[0], lonvar_cgv[1], lonvar_cgv[2], None) lonvar = slice_var_from_str(config['longitude'], lonvar, logger) latvar_cgv = config['latitude']['variable'].split('::') latvar = dataobj.get_variable_data(latvar_cgv[0], latvar_cgv[1], latvar_cgv[2], None) latvar = slice_var_from_str(config['latitude'], latvar, logger) + datavar_cgv = config['data']['variable'].split('::') - datavar = dataobj.get_variable_data(datavar_cgv[0], datavar_cgv[1], datavar_cgv[2], channel) + datavar = dataobj.get_variable_data(datavar_cgv[0], datavar_cgv[1], + datavar_cgv[2], channel, level) datavar = slice_var_from_str(config['data'], datavar, logger) + # scatter data should be flattened lonvar = lonvar.flatten() latvar = latvar.flatten() @@ -67,6 +74,7 @@ def __init__(self, config, logger, dataobj): # create declarative plotting MapScatter object self.plotobj = emcpy.plots.map_plots.MapScatter(latvar, lonvar, datavar) + # get defaults from schema layer_schema = config.get("schema", os.path.join(return_eva_path(), @@ -74,7 +82,7 @@ def __init__(self, config, logger, dataobj): 'batch', 'emcpy', 'defaults', 'map_scatter.yaml')) config = get_schema(layer_schema, config, logger) - delvars = ['longitude', 'latitude', 'data', 'type', 'schema'] + delvars = ['longitude', 'latitude', 'data', 'type', 'schema', 'level'] for d in delvars: config.pop(d, None) self.plotobj = update_object(self.plotobj, config, logger)