Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add support for batch processing on DataType/dtype.
  • Loading branch information
EdwardSafford-NOAA committed Dec 21, 2023
1 parent 4cfc04d commit c566ccb
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 51 deletions.
35 changes: 30 additions & 5 deletions src/eva/data/data_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def add_variable_to_collection(self, collection_name, group_name, variable_name,
# ----------------------------------------------------------------------------------------------

def get_variable_data_array(self, collection_name, group_name, variable_name,
channels=None, levels=None):
channels=None, levels=None, dtypes=None):

"""
Retrieve a specific variable (as a DataArray) from a collection.
Expand All @@ -181,18 +181,21 @@ def get_variable_data_array(self, collection_name, group_name, variable_name,
variable_name (str): Name of the variable.
channels (int or list[int]): Indices of channels to select (optional).
levels (int or list[int]): Indices of levels to select (optional).
dtypes (str or list[str]): Indices of data types to select (optional).
Returns:
DataArray: The selected variable as an xarray DataArray.
Raises:
ValueError: If channels are provided but the 'Channel' dimension is missing.
ValueError: If channels, levels, or dtypes are provided but the
corresponding 'Channel', 'Level', or 'DatatType' dimension
is missing.
"""

group_variable_name = group_name + '::' + variable_name
data_array = self._collections[collection_name][group_variable_name]

if channels is None and levels is None:
if channels is None and levels is None and dtypes is None:
return data_array

if channels is not None:
Expand All @@ -201,6 +204,9 @@ def get_variable_data_array(self, collection_name, group_name, variable_name,
if 'Channel' not in list(self._collections[collection_name].dims):
self.logger.abort(f'In get_variable_data_array channels is provided but ' +
f'Channel is not a dimension in Dataset')

available_values = data_array['Channel'].values

# Make sure it is a list
channels_sel = []
channels_sel.append(channels)
Expand Down Expand Up @@ -229,10 +235,28 @@ def get_variable_data_array(self, collection_name, group_name, variable_name,
self.logger.abort('In get_variable_data_array levels is neither none ' +
'nor a list of integers')

elif dtypes is not None:
if isinstance(dtypes, str) or not any(not isinstance(dt, str) for dt in dtypes):
# DataType must be a dimension if it will be used for selection
if 'DataType' not in list(self._collections[collection_name].dims):
self.logger.abort(f'In get_variable_data_array levels is provided but ' +
f'DataType is not a dimension in Dataset')
# Make sure it is a list
dtypes_sel = []
dtypes_sel.append(dtypes)
available_values = data_array['DataType'].values

# Create a new DataArray with the requested dtypes
data_array_dtypes = data_array.sel(DataType=dtypes_sel)
return data_array_dtypes
else:
self.logger.abort('In get_variable_data_array dtype is neither none ' +
'nor a list of strings')

# ----------------------------------------------------------------------------------------------

def get_variable_data(self, collection_name, group_name, variable_name,
channels=None, levels=None):
channels=None, levels=None, dtypes=None):

"""
Retrieve the data of a specific variable from a collection.
Expand All @@ -243,13 +267,14 @@ def get_variable_data(self, collection_name, group_name, variable_name,
variable_name (str): Name of the variable.
channels (int or list[int]): Indices of channels to select (optional).
levels (int or list[int]): Indices of levels to select (optional).
dtypes (str or list[str]): Indices of data types to select (optional).
Returns:
ndarray: The selected variable data as a NumPy array.
"""

variable_array = self.get_variable_data_array(collection_name, group_name, variable_name,
channels, levels)
channels, levels, dtypes)

# Extract the actual data array
variable_data = variable_array.data
Expand Down
116 changes: 81 additions & 35 deletions src/eva/data/mon_data_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ class MonDataSpace(EvaDatasetBase):
A class for handling MonDataSpace dataset configuration and processing.
"""

# index values for specific control files
level_iuse_ozn = 7
channel_iuse_rad = 7
channel_num_rad = 4
type_con = 1
dtype_con = 5
subtype_con = 7
regions_rad_ang = 5

def execute(self, dataset_config, data_collections, timing):

"""
Expand Down Expand Up @@ -63,7 +72,7 @@ def execute(self, dataset_config, data_collections, timing):
dims_arr = ['xdef', 'ydef', 'zdef']
stn_data = True
else:
coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict = (
coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict, dtype_dict = (
self.get_ctl_dict(control_file[0]))
ndims_used, dims_arr = self.get_ndims_used(dims)

Expand All @@ -90,7 +99,8 @@ def execute(self, dataset_config, data_collections, timing):
# Set coordinate ranges
# ---------------------
channo = chans_dict["chan_nums"] if chans_dict is not None else None
x_range, y_range, z_range = self.get_dim_ranges(coords, dims, channo)
datatypes = dtype_dict["dtype"] if dtype_dict is not None else None
x_range, y_range, z_range = self.get_dim_ranges(coords, dims, channo, datatypes)

# Get missing value threshold
# ---------------------------
Expand Down Expand Up @@ -121,7 +131,7 @@ def execute(self, dataset_config, data_collections, timing):
# create dataset from file contents
timestep_ds = None
timestep_ds = self.load_dset(vars, nvars, coords, darr, dims, ndims_used,
dims_arr, x_range, y_range, z_range, channo, cyc_darr)
dims_arr, x_range, y_range, z_range, cyc_darr)

if attribs['sat']:
timestep_ds.attrs['satellite'] = attribs['sat']
Expand Down Expand Up @@ -165,7 +175,8 @@ def execute(self, dataset_config, data_collections, timing):
# Conditionally add channel, level, scan, and iteration related variables
# -----------------------------------------------------------------------
iterations = x_range if 'Iteration' in coords.values() else None
ds = self.loadConditionalItems(ds, chans_dict, levs_dict, scanpo, iterations)
ds = self.loadConditionalItems(ds, chans_dict, levs_dict,
dtype_dict, scanpo, iterations)

# Rename variables with group
rename_dict = {}
Expand Down Expand Up @@ -318,13 +329,18 @@ def get_ctl_dict(self, control_file):
levs = []
level_assim = []
level_nassim = []
dtype_dict = None
dtype = []
dtype_assim = []

coord_dict = {
'channel': 'Channel',
'scan': 'Scan',
'pressure': 'Level',
'vertical': 'Level',
'region': 'Region',
'iter': 'Iteration'
'iter': 'Iteration',
'data type': 'DataType'
}

with open(control_file, 'r') as fp:
Expand All @@ -334,6 +350,7 @@ def get_ctl_dict(self, control_file):
# Locate the coordinates using coord_dict. There will be 1-3
# coordinates specified as XDEF, YDEF, and ZDEF.
for item in list(coord_dict.keys()):

if 'DEF' in line and item in line:
coord_list.append(coord_dict[item])

Expand Down Expand Up @@ -378,6 +395,12 @@ def get_ctl_dict(self, control_file):
if line.find('dtype station') != -1 or line.find('DTYPE station') != -1:
attribs['dtype'] = 'station'

if line.find('subtype') != -1:
strs = line.split()
dtype.append(strs[self.type_con] + strs[self.dtype_con] + '_' +
strs[self.subtype_con])
dtype_assim.append(strs[9])

# Note we need to extract the actual channel numbers. We have the
# number of channels via the xdef line, but they are not necessarily
# ordered consecutively.
Expand All @@ -388,29 +411,35 @@ def get_ctl_dict(self, control_file):
if line.find('channel=') != -1:
strs = line.split()
if strs[4].isdigit():
channo.append(int(strs[4]))
if strs[7] == '1':
chan_assim.append(int(strs[4]))
if strs[7] == '-1':
chan_nassim.append(int(strs[4]))
channo.append(int(strs[self.channel_num_rad]))
if strs[self.channel_iuse_rad] == '1':
chan_assim.append(int(strs[self.channel_num_rad]))
if strs[self.channel_iuse_rad] == '-1':
chan_nassim.append(int(strs[self.channel_num_rad]))

if line.find('level=') != -1:

strs = line.split()
tlev = strs[2].replace(',', '')
if tlev.isdigit():
levs.append(int(tlev))
if strs[7] == '1':
level_assim.append(int(tlev))
if strs[7] == '-1':
level_nassim.append(int(tlev))

# Ozn data control files include the assim flag on the Level definition
# lines. Con data control files use level but assim is included on the
# dtype (data type) line, not Level
#
if len(strs) >= self.level_iuse_ozn:
if strs[self.level_iuse_ozn] == '1':
level_assim.append(int(tlev))
if strs[self.level_iuse_ozn] == '-1':
level_nassim.append(int(tlev))

# The list of variables is at the end of the file between the lines
# "vars" and "end vars".
start = len(lines) - (nvars + 1)
for x in range(start, start + nvars):
strs = lines[x].split()
vars.append(strs[-1])
vars.append(strs[0])

# Ignore any coordinates in the control file that have a value of 1.
used = 0
Expand Down Expand Up @@ -442,15 +471,22 @@ def get_ctl_dict(self, control_file):
'chans_nassim': chan_nassim}

if 'Level' in coords.values():
for x in range(len(level_assim), len(levs)):
level_assim.append(0)
for x in range(len(level_nassim), len(levs)):
level_nassim.append(0)
levs_dict = {'levels': levs,
'levels_assim': level_assim,
'levels_nassim': level_nassim}
levs_dict = {'levels': levs}

if len(level_assim) > 0 or len(level_nassim) > 0:
for x in range(len(level_assim), len(levs)):
level_assim.append(0)
for x in range(len(level_nassim), len(levs)):
level_nassim.append(0)
levs_dict['levels_assim'] = level_assim
levs_dict['levels_nassim'] = level_nassim

if 'DataType' in coords.values():
dtype_dict = {'dtype': dtype,
'assim': dtype_assim}

fp.close()
return coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict
return coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict, dtype_dict

# ----------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -609,7 +645,7 @@ def read_ieee(self, file_name, coords, dims, ndims_used, dims_arr, nvars, vars,
zarray = np.zeros((dims[dims_arr[0]], dims[dims_arr[1]]), float)
dimensions = [dims[dims_arr[1]], dims[dims_arr[0]]]

if ndims_used == 3: # RadMon angle
if ndims_used == 3: # RadMon angle, ConMon time/vert
rtn_array = np.empty((0, dims[dims_arr[0]], dims[dims_arr[1]],
dims[dims_arr[2]]), float)
zarray = np.zeros((dims[dims_arr[0]], dims[dims_arr[1]],
Expand All @@ -627,14 +663,15 @@ def read_ieee(self, file_name, coords, dims, ndims_used, dims_arr, nvars, vars,
tarr = zarray
else:
mylist = []
for z in range(5):
for z in range(dims['zdef']):
arr = f.read_reals(dtype=np.dtype('>f4')).reshape(dims['ydef'],
dims['xdef'])
mylist.append(np.transpose(arr))
tarr = np.dstack(mylist)

else:
tarr = zarray

rtn_array = np.append(rtn_array, [tarr], axis=0)

else: # ndims_used == 1|2
Expand Down Expand Up @@ -768,7 +805,7 @@ def var_to_np_array(self, dims, ndims_used, dims_arr, var):

# ----------------------------------------------------------------------------------------------

def get_dim_ranges(self, coords, dims, channo):
def get_dim_ranges(self, coords, dims, channo, datatypes):

"""
Get the valid ranges for each dimension based on the specified coordinates and channel
Expand All @@ -778,6 +815,7 @@ def get_dim_ranges(self, coords, dims, channo):
coords (dict): Dictionary of coordinates.
dims (dict): Dictionary of dimension sizes.
channo (list): List of channel numbers.
datatypes (list): List of data types.
Returns:
numpy.ndarray or None: Valid x coordinate range or None.
Expand All @@ -795,7 +833,12 @@ def get_dim_ranges(self, coords, dims, channo):
# - The z coordinate is never used for channel.

if dims['xdef'] > 1:
x_range = channo if coords['xdef'] == 'Channel' else np.arange(1, dims['xdef']+1)
if coords['xdef'] == 'Channel':
x_range = channo
elif coords['xdef'] == 'DataType':
x_range = datatypes
else:
x_range = np.arange(1, dims['xdef']+1)

if dims['ydef'] > 1:
y_range = channo if coords['ydef'] == 'Channel' else np.arange(1, dims['ydef']+1)
Expand Down Expand Up @@ -842,7 +885,7 @@ def get_ndims_used(self, dims):
# ----------------------------------------------------------------------------------------------

def load_dset(self, vars, nvars, coords, darr, dims, ndims_used,
dims_arr, x_range, y_range, z_range, channo, cyc_darr=None):
dims_arr, x_range, y_range, z_range, cyc_darr=None):

"""
Create a dataset from various components.
Expand All @@ -859,7 +902,6 @@ def load_dset(self, vars, nvars, coords, darr, dims, ndims_used,
y_range (numpy.ndarray or None): Valid y coordinate range.
z_range (numpy.ndarray or None): Valid z coordinate range.
cyc_darr (numpy.ndarray): Numpy array of cycle data.
channo (list): List of channel numbers.
Returns:
xarray.Dataset: Created dataset.
Expand Down Expand Up @@ -904,9 +946,6 @@ def load_dset(self, vars, nvars, coords, darr, dims, ndims_used,
coords[dims_arr[2]]: z_range
}

if 'Channel' in coords.values():
d.update({"Channel": {"dims": ("Channel"), "data": channo}})

new_ds = Dataset.from_dict(d)
rtn_ds = new_ds if rtn_ds is None else rtn_ds.merge(new_ds)

Expand Down Expand Up @@ -948,7 +987,8 @@ def load_dset(self, vars, nvars, coords, darr, dims, ndims_used,

# ----------------------------------------------------------------------------------------------

def loadConditionalItems(self, dataset, chans_dict, levs_dict, scanpo, iterations=None):
def loadConditionalItems(self, dataset, chans_dict, levs_dict, dtype_dict,
scanpo, iterations=None):

"""
Add channel, level, and scan related variables to the dataset.
Expand Down Expand Up @@ -976,10 +1016,16 @@ def loadConditionalItems(self, dataset, chans_dict, levs_dict, scanpo, iteration
dataset['level'] = (['Level'], levs_dict["levels"])
dataset['level_yaxis_z'] = (['Level'], [0.0]*len(levs_dict["levels"]))

if len(levs_dict["levels_assim"]) > 0:
if 'levels_assim' in levs_dict:
dataset['level_assim'] = (['Level'], levs_dict["levels_assim"])
if len(levs_dict["levels_nassim"]) > 0:
if 'levels_nassim' in levs_dict:
dataset['level_nassim'] = (['Level'], levs_dict["levels_nassim"])
if 'levels_value' in levs_dict:
dataset['level_value'] = (['Level'], levs_dict["levels_value"])

if dtype_dict is not None:
dataset['dtype'] = (['DataType'], dtype_dict['dtype'])
dataset['dtype_assim'] = (['DataType'], dtype_dict['assim'])

if scanpo is not None:
nscan = dataset.dims.get('Scan')
Expand Down
13 changes: 9 additions & 4 deletions src/eva/plotting/batch/base/diagnostics/line_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,21 @@ def __init__(self, config, logger, dataobj):
logger.abort('In Scatter comparison the first variable \'var1\' does not appear to ' +
'be in the required format of collection::group::variable.')

# Optionally get the channel|level to plot
# Optionally get the channel|level|dtype to plot
channel = None
if 'channel' in config:
channel = config.get('channel')
level = None
if 'level' in config:
level = config.get('level')

xdata = dataobj.get_variable_data(var0_cgv[0], var0_cgv[1], var0_cgv[2], channel, level)
ydata = dataobj.get_variable_data(var1_cgv[0], var1_cgv[1], var1_cgv[2], channel, level)
dtype = None
if 'dtype' in config:
dtype = config.get('dtype')

xdata = dataobj.get_variable_data(var0_cgv[0], var0_cgv[1],
var0_cgv[2], channel, level, dtype)
ydata = dataobj.get_variable_data(var1_cgv[0], var1_cgv[1],
var1_cgv[2], channel, level, dtype)

# see if we need to slice data
xdata = slice_var_from_str(config['x'], xdata, logger)
Expand Down
Loading

0 comments on commit c566ccb

Please sign in to comment.