From 7ae9ea4912fae8d8759c357b99ad4fe977928e8a Mon Sep 17 00:00:00 2001 From: "Kate.Friedman" Date: Tue, 13 Aug 2024 17:53:05 +0000 Subject: [PATCH] Consolidate staging settings - Move cycle, rRUN, model_start_date_current_cycle, and member start/stop settings into new configure function and out of JJOB. - Address some reviewer feedback and make corrections. Refs #2475 --- jobs/JGLOBAL_STAGE_IC | 21 +------ parm/stage/stage.yaml.j2 | 60 +++++++++--------- scripts/exglobal_stage_ic.py | 8 ++- .../pygfs/task/{stage.py => stage_ic.py} | 63 ++++++++++++++----- 4 files changed, 84 insertions(+), 68 deletions(-) rename ush/python/pygfs/task/{stage.py => stage_ic.py} (55%) diff --git a/jobs/JGLOBAL_STAGE_IC b/jobs/JGLOBAL_STAGE_IC index 0113b6c224..5039c94eec 100755 --- a/jobs/JGLOBAL_STAGE_IC +++ b/jobs/JGLOBAL_STAGE_IC @@ -3,31 +3,12 @@ source "${HOMEgfs}/ush/preamble.sh" source "${HOMEgfs}/ush/jjob_header.sh" -e "stage_ic" -c "base stage_ic" -# Define significant cycles -# shellcheck disable=SC2153 -half_window=$(( assim_freq / 2 )) -current_cycle="${PDY}${cyc}" -previous_cycle=$(date --utc -d "${PDY} ${cyc} - ${assim_freq} hours" +%Y%m%d%H) -current_cycle_begin=$(date --utc -d "${PDY} ${cyc} - ${half_window} hours" +%Y%m%d%H) -current_cycle_end=$(date --utc -d "${PDY} ${cyc} + ${half_window} hours" +%Y%m%d%H) - -# Define model start date for current_cycle as the time the forecast will start -if [[ "${DOIAU:-NO}" == "YES" ]] && [[ "${MODE}" == "cycled" ]] ; then - model_start_date_current_cycle="${current_cycle_begin}" -else - if [[ "${REPLAY_ICS:-NO}" == "YES" ]]; then - model_start_date_current_cycle=${current_cycle_end} - else - model_start_date_current_cycle=${current_cycle} - fi -fi -export current_cycle previous_cycle model_start_date_current_cycle - # Initialize return code err=0 # Execute staging "${SCRgfs}/exglobal_stage_ic.py" +err=$? ############################################################### # Check for errors and exit if any of the above failed diff --git a/parm/stage/stage.yaml.j2 b/parm/stage/stage.yaml.j2 index 84e79da980..49415a3e17 100644 --- a/parm/stage/stage.yaml.j2 +++ b/parm/stage/stage.yaml.j2 @@ -23,8 +23,8 @@ {% set current_cycle_HH = current_cycle | strftime("%H") %} {% set previous_cycle_YMD = previous_cycle | to_YMD %} {% set previous_cycle_HH = previous_cycle | strftime("%H") %} -{% set m_prefix = model_start_date_current_cycle | to_YMD + "." + model_start_date_current_cycle | strftime("%H") + "0000" %} -{% set p_prefix = previous_cycle | to_YMD + "." + previous_cycle | strftime("%H") + "0000" %} +{% set m_prefix = model_start_date_current_cycle | strftime("%Y%m%d.%H0000") %} +{% set p_prefix = previous_cycle | strftime("%Y%m%d.%H0000") %} #################################################################### # Initial condition to stage @@ -37,26 +37,26 @@ {% else %} {% set mem_char = '' %} {% endif %} -{% set current_dict = ({ '${ROTDIR}':ROTDIR, - '${RUN}':RUN, - '${YMD}':current_cycle_YMD, - '${HH}':current_cycle_HH, - '${MEMDIR}': mem_char }) %} -{% set previous_dict = ({ '${ROTDIR}':ROTDIR, - '${RUN}':rRUN, - '${YMD}':previous_cycle_YMD, - '${HH}':previous_cycle_HH, - '${MEMDIR}': mem_char }) %} -{% set previous_run_dict = ({ '${ROTDIR}':ROTDIR, - '${RUN}':RUN, - '${YMD}':previous_cycle_YMD, - '${HH}':previous_cycle_HH, - '${MEMDIR}': mem_char }) %} +{% set current_cycle_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':RUN, + '${YMD}':current_cycle_YMD, + '${HH}':current_cycle_HH, + '${MEMDIR}': mem_char }) %} +{% set previous_cycle_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':rRUN, + '${YMD}':previous_cycle_YMD, + '${HH}':previous_cycle_HH, + '${MEMDIR}': mem_char }) %} +{% set previous_cycle_and_run_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':RUN, + '${YMD}':previous_cycle_YMD, + '${HH}':previous_cycle_HH, + '${MEMDIR}': mem_char }) %} # Initial condition definitions {% if MODE == "cycled" and RUN == "gdas" %} -{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_dict) %} +{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %} analysis: mkdir: - "{{ COMOUT_ATMOS_ANALYSIS }}" @@ -69,7 +69,7 @@ analysis: {% endif %} {% if EXP_WARM_START == True %} -{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_dict) %} +{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %} atmosphere_warm: mkdir: - "{{ COMOUT_ATMOS_RESTART_PREV }}" @@ -88,7 +88,7 @@ atmosphere_warm: {% endif %} # path_exists {% endfor %} # ntile {% else %} # cold start -{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_dict) %} +{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_cycle_dict) %} atmosphere_cold: mkdir: - "{{ COMOUT_ATMOS_INPUT }}" @@ -102,7 +102,7 @@ atmosphere_cold: {% endif %} {% if REPLAY_ICS == True %} -{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_dict) %} +{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %} atmosphere_perturbation: mkdir: - "{{ COMOUT_ATMOS_ANALYSIS }}" @@ -114,7 +114,7 @@ atmosphere_perturbation: atmosphere_nest: {% set ntile = 7 %} {% if EXP_WARM_START == True %} - {% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_dict) %} + {% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %} mkdir: - "{{ COMOUT_ATMOS_RESTART_PREV }}" copy: @@ -122,7 +122,7 @@ atmosphere_nest: - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ m_prefix }}.{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}/{{ m_prefix }}.{{ ftype }}.nest0{{ ntile-5 }}.tile{{ ntile }}.nc"] {% endfor %} {% else %} # cold start - {% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_dict) %} + {% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_cycle_dict) %} mkdir: - "{{ COMOUT_ATMOS_INPUT }}" copy: @@ -134,14 +134,14 @@ atmosphere_nest: {% if DO_ICE %} {% if DO_JEDIOCNVAR == True %} -{% set COMOUT_ICE_ANALYSIS = COM_ICE_ANALYSIS_TMPL | replace_tmpl(current_dict) %} +{% set COMOUT_ICE_ANALYSIS = COM_ICE_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %} ice: mkdir: - "{{ COMOUT_ICE_ANALYSIS }}" copy: - ["{{ ICSDIR }}/{{ COMOUT_ICE_ANALYSIS | relpath(ROTDIR) }}/{{ m_prefix }}.cice_model_anl.res.nc", "{{ COMOUT_ICE_ANALYSIS }}"] {% else %} -{% set COMOUT_ICE_RESTART_PREV = COM_ICE_RESTART_TMPL | replace_tmpl(previous_dict) %} +{% set COMOUT_ICE_RESTART_PREV = COM_ICE_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %} ice: mkdir: - "{{ COMOUT_ICE_RESTART_PREV }}" @@ -151,7 +151,7 @@ ice: {% endif %} {% if DO_OCN %} -{% set COMOUT_OCEAN_RESTART_PREV = COM_OCEAN_RESTART_TMPL | replace_tmpl(previous_dict) %} +{% set COMOUT_OCEAN_RESTART_PREV = COM_OCEAN_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %} ocean: mkdir: - "{{ COMOUT_OCEAN_RESTART_PREV }}" @@ -164,7 +164,7 @@ ocean: {% endif %} {% if DO_JEDIOCNVAR == True %} -{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_dict) %} +{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %} rerun: mkdir: - "{{ COMOUT_OCEAN_ANALYSIS }}" @@ -173,7 +173,7 @@ rerun: {% endif %} {% if REPLAY_ICS == True %} -{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_dict) %} +{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %} replay: mkdir: - "{{ COMOUT_OCEAN_ANALYSIS }}" @@ -182,7 +182,7 @@ replay: {% endif %} {% if EXP_WARM_START == True %} -{% set COMOUT_MED_RESTART_PREV = COM_MED_RESTART_TMPL | replace_tmpl(previous_dict) %} +{% set COMOUT_MED_RESTART_PREV = COM_MED_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %} {% if path_exists(ICSDIR ~ "/" ~ COMOUT_MED_RESTART_PREV | relpath(ROTDIR) ~ "/" ~ m_prefix ~ ".ufs.cpld.cpl.r.nc") %} mediator: mkdir: @@ -195,7 +195,7 @@ mediator: {% endif %} # DO_OCN=True {% if DO_WAVE %} -{% set COMOUT_WAVE_RESTART_PREV = COM_WAVE_RESTART_TMPL | replace_tmpl(previous_run_dict) %} +{% set COMOUT_WAVE_RESTART_PREV = COM_WAVE_RESTART_TMPL | replace_tmpl(previous_cycle_and_run_dict) %} wave: mkdir: - "{{ COMOUT_WAVE_RESTART_PREV }}" diff --git a/scripts/exglobal_stage_ic.py b/scripts/exglobal_stage_ic.py index 7757caf879..4a7b034efa 100755 --- a/scripts/exglobal_stage_ic.py +++ b/scripts/exglobal_stage_ic.py @@ -2,7 +2,7 @@ import os -from pygfs.task.stage import Stage +from pygfs.task.stage_ic import Stage from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit # Initialize root logger @@ -19,8 +19,7 @@ def main(): # Pull out all the configuration keys needed to run stage job keys = ['RUN', 'MODE', 'EXP_WARM_START', 'NMEM_ENS', - 'previous_cycle', 'current_cycle', - 'model_start_date_current_cycle', + 'current_cycle', 'previous_cycle', 'ROTDIR', 'ICSDIR', 'STAGE_IC_YAML_TMPL', 'OCNRES', 'waveGRD', 'ntiles', 'DO_JEDIOCNVAR', 'REPLAY_ICS', 'DO_WAVE', 'DO_OCN', 'DO_ICE', 'DO_NEST'] @@ -34,6 +33,9 @@ def main(): if key.startswith("COM"): stage_dict[key] = stage.task_config[key] + # Configure staging + stage_dict = stage.configure(stage_dict) + # Stage ICs stage.execute_stage(stage_dict) diff --git a/ush/python/pygfs/task/stage.py b/ush/python/pygfs/task/stage_ic.py similarity index 55% rename from ush/python/pygfs/task/stage.py rename to ush/python/pygfs/task/stage_ic.py index 377ce1faae..32bc7efdf1 100644 --- a/ush/python/pygfs/task/stage.py +++ b/ush/python/pygfs/task/stage_ic.py @@ -5,8 +5,8 @@ from typing import Any, Dict, List from wxflow import (AttrDict, FileHandler, Task, cast_strdict_as_dtypedict, - logit, parse_j2yaml, strftime, - to_YMD, to_YMDH, Template, TemplateConstants) + logit, parse_j2yaml, strftime, to_YMD, + add_to_datetime, to_timedelta, Template, TemplateConstants) logger = getLogger(__name__.split('.')[-1]) @@ -32,13 +32,11 @@ def __init__(self, config: Dict[str, Any]) -> None: """ super().__init__(config) - rotdir = self.task_config.ROTDIR + os.sep - self.task_config = AttrDict(**self.task_config) @logit(logger) - def execute_stage(self, stage_dict: Dict[str, Any]) -> None: - """Perform local staging of initial condition files. + def configure(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any]): + """Determine stage settings based on configuration and runtime options. Parameters ---------- @@ -47,22 +45,38 @@ def execute_stage(self, stage_dict: Dict[str, Any]) -> None: Returns ------- - None + stage_dict : Dict[str, Any] + Configuration dictionary updated """ - if not os.path.isdir(stage_dict.ROTDIR): - raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({stage_dict.ROTDIR}) does not exist!") - # Add the os.path.exists function to the dict for yaml parsing + #------------------------------------------------------------- stage_dict['path_exists'] = os.path.exists + # Determine model start date + #--------------------------- + current_cycle_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) + current_cycle_end = add_to_datetime(self.task_config.current_cycle, to_timedelta(f"{self.task_config['assim_freq']}H") / 2) + + if self.task_config.DOIAU == True and self.task_config.MODE == "cycled": + model_start_date_current_cycle = current_cycle_begin + else: + if self.task_config.REPLAY_ICS == True: + model_start_date_current_cycle = current_cycle_end + else: + model_start_date_current_cycle = self.task_config.current_cycle + + stage_dict['model_start_date_current_cycle'] = model_start_date_current_cycle + # Determine restart RUN + #---------------------- rRUN = self.task_config.RUN if self.task_config.RUN == "gfs": rRUN = "gdas" stage_dict['rRUN'] = rRUN # Determine ensemble member settings + #----------------------------------- MEM_START = -1 # Deterministic default, no members if self.task_config.NMEM_ENS > 0: if self.task_config.RUN == "gefs": @@ -71,14 +85,33 @@ def execute_stage(self, stage_dict: Dict[str, Any]) -> None: MEM_START = 1 if MEM_START >= 0: # Ensemble RUN - first_mem = MEM_START - last_mem = self.task_config.NMEM_ENS + stage_dict['first_mem'] = MEM_START + stage_dict['last_mem'] = self.task_config.NMEM_ENS else: # Deteministic RUN - first_mem = MEM_START - last_mem = MEM_START + stage_dict['first_mem'] = MEM_START + stage_dict['last_mem'] = MEM_START + + return stage_dict + + @logit(logger) + def execute_stage(self, stage_dict: Dict[str, Any]) -> None: + """Perform local staging of initial condition files. + + Parameters + ---------- + stage_dict : Dict[str, Any] + Configuration dictionary + + Returns + ------- + None + """ + + if not os.path.isdir(stage_dict.ROTDIR): + raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({stage_dict.ROTDIR}) does not exist!") # Loop over members - for mem in range(first_mem, last_mem + 1): + for mem in range(stage_dict.first_mem, stage_dict.last_mem + 1): stage_dict['mem'] = mem stage_set = parse_j2yaml(self.task_config.STAGE_IC_YAML_TMPL, stage_dict, allow_missing=False) # Copy files to ROTDIR