diff --git a/bin/decumulate b/bin/decumulate new file mode 100644 index 0000000..80a7bd7 --- /dev/null +++ b/bin/decumulate @@ -0,0 +1,14 @@ +#!python + +import os +import sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +src_dir = os.path.normpath(os.path.join(current_dir, '../src/')) +if os.path.exists(src_dir): + sys.path.append(src_dir) + +from lisfloodutilities.gridding.decumulate_daily_grids import main_script + +if __name__ == '__main__': + main_script() diff --git a/setup.py b/setup.py index d669cb3..af0a1df 100644 --- a/setup.py +++ b/setup.py @@ -130,6 +130,7 @@ def run(self): 'compare: compare two set of netCDF files; ' 'thresholds: compute discharge return period thresholds; ' 'gridding: interpolate meteo variables observations; ' + 'decumulate: decumulate daily grids into 6 hourly grids in kiwis format; ' 'ncextract: extract values from netCDF files; ' 'catchstats: calculates catchment statistics; ', long_description=long_description, @@ -143,12 +144,12 @@ def run(self): # 'GDAL=={}'.format(gdal_version), 'netCDF4>=1.5.3', 'toolz', 'xarray>=0.15.1', 'dask', 'pandas>=0.25.1', 'nine', 'pyg2p'], - author="Valerio Lorini, Stefania Grimaldi, Carlo Russo, Domenico Nappo, Lorenzo Alfieri", - author_email="valerio.lorini@ec.europa.eu,stefania.grimaldi@ec.europa.eu,carlo.russo@ext.ec.europa.eu,domenico.nappo@gmail.com,lorenzo.alfieri@ec.europa.eu", + author="Valerio Lorini, Stefania Grimaldi, Carlo Russo, Goncalo Gomes, Domenico Nappo, Lorenzo Alfieri", + author_email="valerio.lorini@ec.europa.eu,stefania.grimaldi@ec.europa.eu,carlo.russo@ext.ec.europa.eu,goncalo.ramos-gomes@ext.ec.europa.eu,domenico.nappo@gmail.com,lorenzo.alfieri@ec.europa.eu", keywords=['netCDF4', 'PCRaster', 'mapstack', 'lisflood', 'efas', 'glofas', 'ecmwf', 'copernicus'], license='EUPL 1.2', url='https://github.com/ec-jrc/lisflood-utilities', - scripts=['bin/pcr2nc', 'bin/cutmaps', 'bin/compare', 'bin/nc2pcr', 'bin/thresholds', 'bin/gridding', 'bin/cddmap', 'bin/ncextract','bin/catchstats',], + scripts=['bin/pcr2nc', 'bin/cutmaps', 'bin/compare', 'bin/nc2pcr', 'bin/thresholds', 'bin/gridding', 'bin/decumulate', 'bin/cddmap', 'bin/ncextract','bin/catchstats',], zip_safe=True, classifiers=[ # complete classifier list: http://pypi.python.org/pypi?%3Aaction=list_classifiers diff --git a/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr.txt b/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr.txt index 7a658b5..7f5daf4 100755 --- a/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr.txt +++ b/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr.txt @@ -11,10 +11,20 @@ VALUE_OFFSET = 0.0 DATA_TYPE_PACKED = i2 STANDARD_NAME = precipitation_amount LONG_NAME = Daily Accumulated Precipitation + +# 1280 - IMGW +# 1295 - MARS +# 1302 - CarpatClim # 1303 - ERAinterim +# 1304 - EURO4M-APGD +# 1310 - HNMS # 1329 - ERA5-land KIWIS_FILTER_PLUGIN_CLASSES = {'ObservationsKiwisFilter': {'1303': 100.0, '1329': 100.0}} +# 1310/HNMS until 31/12/2000, 1295/MARS until 01/01/2008, 1304/EURO4M-APGD until 31/12/2008 and 1302/CarpatClim until 31/12/2010 +KIWIS_FILTER_DECUMULATION_CONFIG = [{'START': '1989-12-31 00:00:00', 'END': '2000-12-31 23:59:59', 'PROVIDERS_RADIUS': {'1295': 1.5, '1302': 1.5, '1304': 1.5, '1310': 1.5}},{'START': '2001-01-01 00:00:00', 'END': '2007-12-31 23:59:59', 'PROVIDERS_RADIUS': {'1295': 1.5, '1302': 1.5, '1304': 1.5}},{'START': '2008-01-01 00:00:00', 'END': '2008-12-31 23:59:59', 'PROVIDERS_RADIUS': {'1302': 1.5, '1304': 1.5}},{'START': '2009-01-01 00:00:00', 'END': '2010-12-31 23:59:59', 'PROVIDERS_RADIUS': {'1302': 1.5}}] + + [VAR_TIME] UNIT = days since 1990-01-02 06:00:00.0 diff --git a/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr6.txt b/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr6.txt index 05e15ac..0f7f211 100755 --- a/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr6.txt +++ b/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr6.txt @@ -11,10 +11,13 @@ VALUE_OFFSET = 0.0 DATA_TYPE_PACKED = i2 STANDARD_NAME = precipitation_amount LONG_NAME = 6 Hourly Accumulated Precipitation -# 1304 - EURO4M-APGD -# 1302 - CarpatClim + +# 1280 - IMGW # 1295 - MARS +# 1302 - CarpatClim # 1303 - ERAinterim +# 1304 - EURO4M-APGD +# 1310 - HNMS # 1329 - ERA5-land KIWIS_FILTER_PLUGIN_CLASSES = {'DowgradedObservationsKiwisFilter': {'1304': 1.0, '1302': 1.0, '1295': 1.0}, 'ObservationsKiwisFilter': {'1303': 100.0, '1329': 100.0}} diff --git a/src/lisfloodutilities/gridding/configuration/1arcmin/default.txt b/src/lisfloodutilities/gridding/configuration/1arcmin/default.txt index d162ebd..786bed5 100755 --- a/src/lisfloodutilities/gridding/configuration/1arcmin/default.txt +++ b/src/lisfloodutilities/gridding/configuration/1arcmin/default.txt @@ -32,10 +32,21 @@ VALUE_OFFSET = 0.0 DATA_TYPE_PACKED = i2 STANDARD_NAME = DUMMY_STANDARD_NAME LONG_NAME = DUMMY LONG NAME + KIWIS_FILTER_COLUMNS = {'COL_LAT': 'station_local_y', 'COL_LON': 'station_local_x', 'COL_IS_IN_DOMAIN': 'EFAS_ADDATTR_ISINNEWDOMAIN'} # KIWIS_FILTER_COLUMNS = {} KIWIS_FILTER_PLUGIN_CLASSES = {'KiwisFilter': {}} +# 1280 - IMGW +# 1295 - MARS +# 1302 - CarpatClim +# 1303 - ERAinterim +# 1304 - EURO4M-APGD +# 1310 - HNMS +# 1329 - ERA5-land +# KIWIS_FILTER_DECUMULATION_CONFIG = [{'START': '1989-12-31 00:00:00', 'END': '2000-12-31 23:59:59', 'PROVIDERS_RADIUS': {'1295': 1.5, '1302': 1.5}}] +KIWIS_FILTER_DECUMULATION_CONFIG = [] + [DIMENSION] TOP = 72.25000000000000000000 diff --git a/src/lisfloodutilities/gridding/configuration/5x5km/default.txt b/src/lisfloodutilities/gridding/configuration/5x5km/default.txt index cace2c0..57fcb63 100755 --- a/src/lisfloodutilities/gridding/configuration/5x5km/default.txt +++ b/src/lisfloodutilities/gridding/configuration/5x5km/default.txt @@ -33,8 +33,18 @@ VALUE_OFFSET = 0.0 DATA_TYPE_PACKED = i2 STANDARD_NAME = DUMMY_STANDARD_NAME LONG_NAME = DUMMY LONG NAME + KIWIS_FILTER_COLUMNS = {'COL_LAT': 'station_local_y', 'COL_LON': 'station_local_x', 'COL_IS_IN_DOMAIN': 'EFAS_ADDATTR_ISINNEWDOMAIN'} KIWIS_FILTER_PLUGIN_CLASSES = {'KiwisFilter': {}} +# 1280 - IMGW +# 1295 - MARS +# 1302 - CarpatClim +# 1303 - ERAinterim +# 1304 - EURO4M-APGD +# 1310 - HNMS +# 1329 - ERA5-land +# KIWIS_FILTER_DECUMULATION_CONFIG = [{'START': '1989-12-31 00:00:00', 'END': '2000-12-31 23:59:59', 'PROVIDERS_RADIUS': {'1295': 1.5, '1302': 1.5}}] +KIWIS_FILTER_DECUMULATION_CONFIG = [] [DIMENSION] diff --git a/src/lisfloodutilities/gridding/decumulate_daily_grids.py b/src/lisfloodutilities/gridding/decumulate_daily_grids.py new file mode 100644 index 0000000..00c2339 --- /dev/null +++ b/src/lisfloodutilities/gridding/decumulate_daily_grids.py @@ -0,0 +1,297 @@ +__author__="Goncalo Gomes" +__date__="$May 14, 2024 12:01:00$" +__version__="0.1" +__updated__="$Mar 14, 2024 16:01:00$" + +""" +Copyright 2019-2020 European Union +Licensed under the EUPL, Version 1.2 or as soon they will be approved by the European Commission subsequent versions of the EUPL (the "Licence"); +You may not use this work except in compliance with the Licence. +You may obtain a copy of the Licence at: +https://joinup.ec.europa.eu/sites/default/files/inline-files/EUPL%20v1_2%20EN(1).txt +Unless required by applicable law or agreed to in writing, software distributed under the Licence is distributed on an "AS IS" basis, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the Licence for the specific language governing permissions and limitations under the Licence. + +""" + +import sys +import os +import csv +from typing import List, Tuple +from pathlib import Path +from argparse import ArgumentParser, ArgumentTypeError +from datetime import datetime, timedelta +import numpy as np +import numpy.ma as ma +import pandas as pd +from collections import OrderedDict +from scipy.spatial import cKDTree +from lisfloodutilities.gridding.lib.utils import Config, FileUtils +from lisfloodutilities.gridding.lib.filters import KiwisFilter, DowgradedDailyTo6HourlyObservationsKiwisFilter + +# Disable the SettingWithCopyWarning +pd.options.mode.chained_assignment = None + +quiet_mode = False + + +def print_msg(msg: str = ''): + global quiet_mode + if not quiet_mode: + print(msg) + + +def get_filter_class(conf: Config) -> KiwisFilter: + # Allow us to get the output columns out of the dataframe + plugins_columns_def = conf.get_config_field('PROPERTIES', 'KIWIS_FILTER_COLUMNS') + plugins_columns_dic = eval(plugins_columns_def) + plugins_columns = OrderedDict(plugins_columns_dic) + filter_class = KiwisFilter(filter_columns=plugins_columns) + return filter_class + + +def get_timestamp_from_filename(conf: Config, filename: Path) -> datetime: + file_timestamp = datetime.strptime(filename.name, conf.input_timestamp_pattern) + if conf.force_time is not None: + new_time = datetime.strptime(conf.force_time, "%H%M").time() + file_timestamp = datetime.combine(file_timestamp.date(), new_time) + return file_timestamp + + +def get_dataframes(conf: Config, kiwis_files: List[Path]) -> Tuple[List[pd.DataFrame], List[datetime], List[str]]: + filter_class = get_filter_class(conf) + print_msg(f'Filter class: {filter_class.get_class_description()}') + filepath_kiwis = [] + kiwis_timestamps = [] + kiwis_str_timestamps = [] + for filename_kiwis in kiwis_files: + kiwis_timestamp = get_timestamp_from_filename(conf, filename_kiwis) + kiwis_timestamp_str = kiwis_timestamp.strftime(FileUtils.DATE_PATTERN_CONDENSED_SHORT) + filepath_kiwis.append(filename_kiwis) + kiwis_str_timestamps.append(kiwis_timestamp_str) + kiwis_timestamps.append(kiwis_timestamp) + df_kiwis_array = filter_class.filter(filepath_kiwis, kiwis_str_timestamps, []) + return df_kiwis_array, kiwis_timestamps, kiwis_str_timestamps + +def get_providers_radius(conf: Config, kiwis_timestamp_24h: datetime) -> dict: + """ + Get the configuration of providers and radius correspondent to the current daily date + """ + TIMESTAMP_PATTERN = '%Y-%m-%d %H:%M:%S' + filter_args = {} + plugin_decumulation_def = conf.get_config_field('PROPERTIES', 'KIWIS_FILTER_DECUMULATION_CONFIG') + plugin_decumulation_config = eval(plugin_decumulation_def) + + for cur_config in plugin_decumulation_config: + try: + start_timestamp = datetime.strptime(cur_config['START'], TIMESTAMP_PATTERN) + except Exception as e: + start_timestamp = None + try: + end_timestamp = datetime.strptime(cur_config['END'], TIMESTAMP_PATTERN) + except Exception as e: + end_timestamp = None + providers_radius = cur_config['PROVIDERS_RADIUS'] + if ((start_timestamp is None or kiwis_timestamp_24h >= start_timestamp) and + (end_timestamp is None or kiwis_timestamp_24h <= end_timestamp)): + filter_args = providers_radius + return filter_args + +def get_decumulated_dataframes(conf: Config, kiwis_filepaths: List[Path], kiwis_str_timestamps: List[str], + kiwis_dataframes: List[pd.DataFrame], kiwis_timestamp_24h: datetime) -> List[pd.DataFrame]: + """ + Applies the decumulation filter to the dataframes. The first dataframe in the array should be the Daily + one and then the remaining 4 dataframes will be the 6 hourly ones. + """ + + filter_args = get_providers_radius(conf, kiwis_timestamp_24h) + + # If there was no data interval defined for the current daily date, then no need to decumulate + if not filter_args: + return kiwis_dataframes + + plugins_columns_def = conf.get_config_field('PROPERTIES', 'KIWIS_FILTER_COLUMNS') + plugins_columns_dic = eval(plugins_columns_def) + filter_columns = OrderedDict(plugins_columns_dic) + filter_class = DowgradedDailyTo6HourlyObservationsKiwisFilter(filter_columns, filter_args) + df_kiwis_array = filter_class.filter(kiwis_filepaths, kiwis_str_timestamps, kiwis_dataframes) + return df_kiwis_array + + +def run(config_filename_24h: str, config_filename_6h: str, file_utils_24h: FileUtils, file_utils_6h: FileUtils, + kiwis_24h_06am_path: Path, kiwis_6h_12pm_path: Path, kiwis_6h_18pm_path: Path, kiwis_6h_12am_path: Path, kiwis_6h_06am_path: Path, + output_path: Path = None): + """ + Interpolate text files containing (x, y, value) using inverse distance interpolation. + Produces as output, either a netCDF file containing all the grids or one TIFF file per grid. + 1. Read the config files + 2. Get the ordered list of files + 3. Interpolate the grids performing height correction on some variables + 4. Write the resulting grids + """ + global quiet_mode + + interpolation_mode = 'adw' + memory_save_mode = 0 + start_date = None + end_date = None + conf_24h = Config(config_filename_24h, start_date, end_date, quiet_mode, interpolation_mode, memory_save_mode) + conf_6h = Config(config_filename_6h, start_date, end_date, quiet_mode, interpolation_mode, memory_save_mode) + + print_msg('Start reading files') + df_kiwis_array_24h, kiwis_timestamps_24h, kiwis_str_timestamps_24h = get_dataframes(conf_24h, [kiwis_24h_06am_path]) + df_kiwis_array_6h, kiwis_timestamps_6h, kiwis_str_timestamps_6h = get_dataframes(conf_6h, [kiwis_6h_12pm_path, kiwis_6h_18pm_path, + kiwis_6h_12am_path, kiwis_6h_06am_path]) + # Check timestamps are correct + if not (kiwis_timestamps_24h[0] == kiwis_timestamps_6h[3] and + (kiwis_timestamps_24h[0] - timedelta(hours=6)) == kiwis_timestamps_6h[2] and + (kiwis_timestamps_24h[0] - timedelta(hours=12)) == kiwis_timestamps_6h[1] and + (kiwis_timestamps_24h[0] - timedelta(hours=18)) == kiwis_timestamps_6h[0]): + raise ArgumentTypeError("The input kiwis do not respect the expected timestamps.") + + kiwis_dataframes = df_kiwis_array_24h + kiwis_dataframes.extend(df_kiwis_array_6h) + kiwis_timestamps = kiwis_str_timestamps_24h + kiwis_timestamps.extend(kiwis_str_timestamps_6h) + kiwis_filepaths = [kiwis_24h_06am_path, kiwis_6h_12pm_path, kiwis_6h_18pm_path, kiwis_6h_12am_path, kiwis_6h_06am_path] + + df_kiwis_array = get_decumulated_dataframes(conf_24h, kiwis_filepaths, kiwis_timestamps, kiwis_dataframes, kiwis_timestamps_24h[0]) + + i = 0 + for kiwis_filepath in kiwis_filepaths[1:]: + i += 1 + if output_path is not None: + filepath = Path.joinpath(output_path, kiwis_filepath.name) + else: + filepath = kiwis_filepath + df_kiwis_array[i].to_csv(filepath, index=False, header=True, sep="\t") + print_msg(f'Wrote file: {filepath}') + print_msg('Finished writing files') + +def get_existing_file_path(parser: ArgumentParser, input_file_path: str) -> Path: + file_path = Path(input_file_path) + if not file_path.is_file(): + parser.error(f'Input file {file_path} does not exist or cannot be opened.') + return file_path + +def main(argv): + '''Command line options.''' + global quiet_mode + + program_name = os.path.basename(sys.argv[0]) + program_path = os.path.dirname(os.path.realpath(sys.argv[0])) + program_version = "v%s" % __version__ + program_build_date = "%s" % __updated__ + + program_version_string = 'version %s (%s)\n' % (program_version, program_build_date) + program_longdesc = ''' + This script performs the decumulation of daily kiwis files into the respective 6hourly precipitation kiwis files in order to increase the station density or to fill in 6hourly gaps. + JIRA Issue: EMDCC-1484 + ''' + program_license = """ + Copyright 2019-2020 European Union + Licensed under the EUPL, Version 1.2 or as soon they will be approved by the European Commission subsequent versions of the EUPL (the "Licence"); + You may not use this work except in compliance with the Licence. + You may obtain a copy of the Licence at: + https://joinup.ec.europa.eu/sites/default/files/inline-files/EUPL%20v1_2%20EN(1).txt + Unless required by applicable law or agreed to in writing, software distributed under the Licence is distributed on an "AS IS" basis, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the Licence for the specific language governing permissions and limitations under the Licence. + """ + + try: + # setup option parser + parser = ArgumentParser(epilog=program_license, description=program_version_string+program_longdesc) + + # set defaults + parser.set_defaults(quiet=False, + out_folder=None) + + parser.add_argument("-d", "--pr24h", dest="kiwis_24h_06am_path", required=True, type=FileUtils.file_type, + help="Set the input kiwis file containing daily precipitation for a given day.", + metavar="/path/to/pr200102150600_all.kiwis") + parser.add_argument("-1", "--pr6h12pm", dest="kiwis_6h_12pm_path", required=True, type=FileUtils.file_type, + help="Set the input kiwis file containing the first 6hourly timestep of precipitation 12:00 of the previous day.", + metavar="/path/to/pr6200102141200_all.kiwis") + parser.add_argument("-2", "--pr6h18pm", dest="kiwis_6h_18pm_path", required=True, type=FileUtils.file_type, + help="Set the input kiwis file containing the second 6hourly timestep of precipitation 18:00 of the previous day.", + metavar="/path/to/pr6200102141800_all.kiwis") + parser.add_argument("-3", "--pr6h12am", dest="kiwis_6h_12am_path", required=True, type=FileUtils.file_type, + help="Set the input kiwis file containing the first 6hourly timestep of precipitation 00:00 of the daily precipitation day.", + metavar="/path/to/pr6200102150000_all.kiwis") + parser.add_argument("-4", "--pr6h06am", dest="kiwis_6h_06am_path", required=True, type=FileUtils.file_type, + help="Set the input kiwis file containing the first 6hourly timestep of precipitation 06:00 of the daily precipitation day.", + metavar="/path/to/pr6200102150600_all.kiwis") + parser.add_argument("-o", "--out", dest="output_folder", required=False, type=FileUtils.folder_type, + help="Set the output folder where the resulting 6h kiwis will be stored. If this folder is not set, it will change the input kiwis.", + metavar="/path/to/output/folder") + parser.add_argument("-c", "--conf", dest="config_type", required=True, + help="Set the grid configuration type to use.", + metavar="{5x5km, 1arcmin,...}") + parser.add_argument("-p", "--pathconf", dest="config_base_path", required=False, type=FileUtils.folder_type, + help="Overrides the base path where the configurations are stored.", + metavar="/path/to/config") + parser.add_argument("-v", "--var24h", dest="variable_code_24h", required=True, + help="Set the daily variable to be processed.", + metavar="{pr,ta,...}") + parser.add_argument("-6", "--var6h", dest="variable_code_6h", required=True, + help="Set the 6hourly variable to be processed.", + metavar="{pr6,ta6,...}") + parser.add_argument("-q", "--quiet", dest="quiet", action="store_true", help="Set script output into quiet mode [default: %(default)s]") + + # process options + args = parser.parse_args(argv) + quiet_mode = args.quiet + + configuration_base_folder = os.path.join(program_path, '../src/lisfloodutilities/gridding/configuration') + + if args.config_base_path is not None and len(args.config_base_path) > 0: + configuration_base_folder = args.config_base_path + + file_utils_24h = FileUtils(args.variable_code_24h, quiet_mode) + config_type_path_24h = file_utils_24h.get_config_type_path(configuration_base_folder, args.config_type) + config_filename_24h = file_utils_24h.get_config_file(config_type_path_24h) + + file_utils_6h = FileUtils(args.variable_code_6h, quiet_mode) + config_type_path_6h = file_utils_6h.get_config_type_path(configuration_base_folder, args.config_type) + config_filename_6h = file_utils_6h.get_config_file(config_type_path_6h) + + if args.output_folder is not None and len(args.output_folder) > 0: + output_path = Path(args.output_folder) + print_msg(f"Output Folder: {output_path}") + else: + output_path = None + print_msg(f"Output Folder: 6hourly kiwis files will be overwritten") + + kiwis_24h_06am_path = get_existing_file_path(parser, args.kiwis_24h_06am_path) + kiwis_6h_12pm_path = get_existing_file_path(parser, args.kiwis_6h_12pm_path) + kiwis_6h_18pm_path = get_existing_file_path(parser, args.kiwis_6h_18pm_path) + kiwis_6h_12am_path = get_existing_file_path(parser, args.kiwis_6h_12am_path) + kiwis_6h_06am_path = get_existing_file_path(parser, args.kiwis_6h_06am_path) + + print_msg(f"Daily PR kiwis file: {args.kiwis_24h_06am_path}") + print_msg(f"6hourly PR kiwis file 12:00: {args.kiwis_6h_12pm_path}") + print_msg(f"6hourly PR kiwis file 18:00: {args.kiwis_6h_18pm_path}") + print_msg(f"6hourly PR kiwis file 00:00: {args.kiwis_6h_12am_path}") + print_msg(f"6hourly PR kiwis file 06:00: {args.kiwis_6h_06am_path}") + print_msg(f"Config File 24h: {config_filename_24h}") + print_msg(f"Config File 6h: {config_filename_6h}") + + run(config_filename_24h, config_filename_6h, file_utils_24h, file_utils_6h, + kiwis_24h_06am_path, kiwis_6h_12pm_path, kiwis_6h_18pm_path, + kiwis_6h_12am_path, kiwis_6h_06am_path, output_path=output_path) + except Exception as e: + indent = len(program_name) * " " + sys.stderr.write(program_name + ": " + repr(e) + "\n") + sys.stderr.write(indent + " for help use --help") + return 2 + + +def main_script(): + sys.exit(main(sys.argv[1:])) + + +if __name__ == "__main__": + main_script() + diff --git a/src/lisfloodutilities/gridding/lib/filters.py b/src/lisfloodutilities/gridding/lib/filters.py index 0cf4aeb..a8082e7 100644 --- a/src/lisfloodutilities/gridding/lib/filters.py +++ b/src/lisfloodutilities/gridding/lib/filters.py @@ -22,6 +22,7 @@ def __init__(self, filter_columns: dict = {}, filter_args: dict = {}): self.cur_timestamp = '' self.setup_column_names() self.OUTPUT_COLUMNS = [self.COL_LON, self.COL_LAT, self.COL_VALUE] + self.INTERNAL_COLUMNS = [f'{self.COL_STATION_DIARY_STATUS}_INTERNAL', f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_data_frames: List[pd.DataFrame]) -> List[pd.DataFrame]: """ @@ -136,6 +137,16 @@ class ObservationsKiwisFilter(KiwisFilter): CLUSTER_COLLAPSE_RADIUS = 0.011582073434000193 # decimal degrees (1287 m) + def __init__(self, filter_columns: dict = {}, filter_args: dict = {}): + super().__init__(filter_columns, filter_args) + self.INTERNAL_COLUMNS.append('has_neighbor_within_radius') + # Calculating the radius in decimal degrees + self.provider_radius = {} + for provider_id in self.args: + radius_km = self.args[provider_id] + radius = self.kilometers2degrees(radius_km) + self.provider_radius[provider_id] = radius + @staticmethod def kilometers2degrees(km: float) -> float: # Convert km to degrees of latitude @@ -145,9 +156,8 @@ def kilometers2degrees(km: float) -> float: def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame: df = super().apply_filter(df) # Apply filtering rules for each provider - for provider_id in self.args: - radius_km = self.args[provider_id] - radius = self.kilometers2degrees(radius_km) + for provider_id in self.provider_radius: + radius = self.provider_radius[provider_id] other_providers = df[(df[self.COL_PROVIDER_ID] != provider_id)] tree = cKDTree(other_providers[[self.COL_LON, self.COL_LAT]]) df['has_neighbor_within_radius'] = df.apply(self.has_neighbor_within_radius_from_other_providers, @@ -198,9 +208,8 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame: df = super().apply_filter(df) # Apply filtering rules for each provider - for provider_id in self.args: - radius_km = self.args[provider_id] - radius = self.kilometers2degrees(radius_km) + for provider_id in self.provider_radius: + radius = self.provider_radius[provider_id] other_providers = df[(df[self.COL_PROVIDER_ID] != provider_id)] tree = cKDTree(other_providers[[self.COL_LON, self.COL_LAT]]) df['has_neighbor_within_radius'] = df.apply(self.has_neighbor_within_radius_from_other_providers, @@ -214,3 +223,199 @@ def set_filtered_stations(self, df: pd.DataFrame): df_filtered = df.loc[(df['has_neighbor_within_radius'])] for station_id in df_filtered[self.COL_STATION_ID].values: self.filtered_station_ids[station_id] = '' + + +class DowgradedDailyTo6HourlyObservationsKiwisFilter(ObservationsKiwisFilter): + """ + Class to filter Kiwis files metadata for stations whose daily data was down graded to 6hourly data + by dividing the daily value by 4. + Expects to have in filter_args a dictionary containing the provider ID whose stations we want to + filter (as key) and the radius to find the real 6h observations from other providers (as value). + It will filter the station form all the 4 files if and only if at least one station in one of the + files contains a real observation from other providers in the indicated radius. + In the presence of a nearby real 6h station, this filter will remove all instances of the + downgraded station from all the dataframes. + """ + + def __init__(self, filter_columns: dict = {}, filter_args: dict = {}): + super().__init__(filter_columns, filter_args) + self.INTERNAL_COLUMNS.append('has_neighbor_within_radius_complete_6h') + self.kiwis_24h_dataframe = None + self.df_24h_without_neighbors = None + self.df_24h_with_neighbors = None + self.all_6h_df = None + self.num_6h_slots = 0 + self.PROVIDER_DWD_SYNOP = '1121' + self.PROVIDER_MARS = '1295' + + def get_all_6hourly_station_values_df(self, kiwis_data_frames: List[pd.DataFrame]) -> pd.DataFrame: + merged_df = pd.concat(kiwis_data_frames) + merged_df = merged_df[[self.COL_LON, self.COL_LAT, self.COL_PROVIDER_ID, self.COL_STATION_NUM, self.COL_STATION_ID, self.COL_VALUE]] + merged_df.reset_index(drop=True, inplace=True) + result_df = merged_df.astype({self.COL_VALUE: 'float'}).groupby([self.COL_LON, self.COL_LAT, + self.COL_PROVIDER_ID, + self.COL_STATION_NUM, + self.COL_STATION_ID])[self.COL_VALUE].agg(['sum','count']).reset_index() + result_df.columns = [self.COL_LON, self.COL_LAT, self.COL_PROVIDER_ID, self.COL_STATION_NUM, self.COL_STATION_ID, 'sum_6h_values', 'count_6h_slots'] + result_df.reset_index(drop=True, inplace=True) + return result_df + + def format_dwd_synop_wmo_num(self, station_num: str) -> str: + """ + Used to get the DWD Synop sations with WMO Number to be formatted like the ones from MARS to allow comparison + """ + try: + station_num_float = float(station_num) + return f'{station_num_float:.1f}' + except: + return station_num + + def get_decumulated_24h_value_for_missing_6h_values(self, row: pd.Series, tree: cKDTree = None, provider_id: int = 0, + radius: float = ObservationsKiwisFilter.CLUSTER_COLLAPSE_RADIUS, + stations_6h_df: pd.DataFrame = None) -> float: + """ + DECUMULATED_PR = (PR - Sum(PR6)) / (number of missing values) + If there are more than one 6h station in the radius, select according to the following rules by order: + 1. get the one with the highest number of slots + 2. get the one with the lowest positive difference to the 24h value + 3. get the one on the top + """ + cur_provider_id = row[self.COL_PROVIDER_ID] + cur_24h_value = row[self.COL_VALUE] + decumulated_value = cur_24h_value + if cur_provider_id == provider_id: + location = (row[self.COL_LON], row[self.COL_LAT]) + nearby_points = tree.query_ball_point(location, radius) + stations_6h = stations_6h_df.loc[stations_6h_df.index.isin(nearby_points)][['sum_6h_values', 'count_6h_slots']] + if stations_6h.empty: + return decumulated_value + sum_6h_values, count_6h_slots = stations_6h.iloc[0][['sum_6h_values', 'count_6h_slots']] + + if len(nearby_points) > 1: + stations_6h = stations_6h.assign(diff_to_24h=(cur_24h_value - stations_6h['sum_6h_values'])) + stations_6h = stations_6h.sort_values(by=['count_6h_slots', 'diff_to_24h'], ascending=[False, True]).reset_index(drop=True) + stations_6h = stations_6h[stations_6h['diff_to_24h'] >= 0] + if not stations_6h.empty: + stations_6h.reset_index(drop=True, inplace=True) + sum_6h_values, count_6h_slots = stations_6h.iloc[0][['sum_6h_values', 'count_6h_slots']] + + missing_6h_slots = self.num_6h_slots - count_6h_slots + if missing_6h_slots > 0: + decumulated_value = (cur_24h_value - sum_6h_values) / missing_6h_slots + else: + decumulated_value = cur_24h_value / self.num_6h_slots + return decumulated_value + + def update_column_if_provider_stations_are_in_radius(self, df: pd.DataFrame, tree: cKDTree, column_to_update: str = 'has_neighbor_within_radius') -> pd.DataFrame: + for provider_id in self.provider_radius: + radius = self.provider_radius[provider_id] + df.loc[df[self.COL_PROVIDER_ID] == provider_id, column_to_update] = df.apply(self.has_neighbor_within_radius_from_other_providers, + axis=1, tree=tree, provider_id=provider_id, radius=radius) + return df + + def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_data_frames: List[pd.DataFrame]) -> List[pd.DataFrame]: + """ + Filter all kiwis files in the list and returns a list of the corresponding filtered pandas data frames. + If the kiwis_data_frames is not empty then filter the kiwis dataframes instead of the kiwis_files + """ + # Guarantee datatype of value column + for i in range(len(kiwis_data_frames)): + kiwis_data_frames[i] = kiwis_data_frames[i].astype({ + # self.COL_LON: 'float', + # self.COL_LAT: 'float', + # self.COL_PROVIDER_ID: 'int', + # self.COL_STATION_ID: 'int', + self.COL_VALUE: 'float'}) + + self.kiwis_24h_dataframe = kiwis_data_frames[0] + kiwis_6h_dataframes = kiwis_data_frames[1:] + self.num_6h_slots = len(kiwis_6h_dataframes) + providers_ids = list(self.provider_radius.keys()) + + # Filter all the dataframes + filtered_data_frames = super().filter(kiwis_files[1:], kiwis_timestamps[1:], kiwis_6h_dataframes) + + # Get all 6hourly slots aggregated + self.all_6h_df = self.get_all_6hourly_station_values_df(filtered_data_frames) + +# # TODO: Delete +# # self.all_6h_df.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/all_6h_df.tsv', index=True, header=True, sep="\t") + + # Select the daily stations from the providers that do not contain a 6hourly station within radius and decumulate + # its value dividing by the number of 6hourly slots. These will be inserted directly in all the 6hourly slots + tree_all_6h = cKDTree(self.all_6h_df[[self.COL_LON, self.COL_LAT]]) + + self.kiwis_24h_dataframe['has_neighbor_within_radius'] = False + self.kiwis_24h_dataframe = self.update_column_if_provider_stations_are_in_radius(df=self.kiwis_24h_dataframe, tree=tree_all_6h) + df_24h_from_providers = self.kiwis_24h_dataframe[self.kiwis_24h_dataframe[self.COL_PROVIDER_ID].isin(providers_ids)] + self.df_24h_without_neighbors = df_24h_from_providers[df_24h_from_providers['has_neighbor_within_radius'] == False] + +# # TODO: Delete +# self.df_24h_without_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_without_neighbors1.tsv', index=True, header=True, sep="\t") + + # From the station without neighbors remove MARS stations that have a DWDSynop with the same WMO Number because + # they are actually the same station even though they might not be exactly within the defined radius. + wmo_nums_of_6h_dwd_stations = self.all_6h_df.loc[self.all_6h_df[self.COL_PROVIDER_ID] == self.PROVIDER_DWD_SYNOP, self.COL_STATION_NUM].values + # correct wmo num from DWD Synop into MARS format + wmo_nums_of_6h_dwd_stations = map(self.format_dwd_synop_wmo_num, wmo_nums_of_6h_dwd_stations) + # Remove MARS stations that do not have neighbors but have a 6h DWDSynop station with the same wmo num + mask = ((self.df_24h_without_neighbors[self.COL_PROVIDER_ID] == self.PROVIDER_MARS) & + (self.df_24h_without_neighbors[self.COL_STATION_NUM].isin(wmo_nums_of_6h_dwd_stations))) + self.df_24h_without_neighbors = self.df_24h_without_neighbors[~mask] + + # Decumulate the values of stations without neighbors + self.df_24h_without_neighbors[self.COL_VALUE] /= self.num_6h_slots + +# # TODO: Delete +# self.df_24h_without_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_without_neighbors2_and_without_dwdSynop_with_same_WMO_decumulated.tsv', index=True, header=True, sep="\t") + + # Remove the daily rows that have a complete 6hourly station in the radius because there is no need to decumulate in these cases + complete_6h_df = self.all_6h_df.loc[self.all_6h_df['count_6h_slots'] == self.num_6h_slots] + self.df_24h_with_neighbors = df_24h_from_providers[df_24h_from_providers['has_neighbor_within_radius'] == True] + tree_complete_6h = cKDTree(complete_6h_df[[self.COL_LON, self.COL_LAT]]) + self.df_24h_with_neighbors['has_neighbor_within_radius_complete_6h'] = False + self.df_24h_with_neighbors = self.update_column_if_provider_stations_are_in_radius(df=self.df_24h_with_neighbors, tree=tree_complete_6h, + column_to_update='has_neighbor_within_radius_complete_6h') + self.df_24h_with_neighbors = self.df_24h_with_neighbors[self.df_24h_with_neighbors['has_neighbor_within_radius_complete_6h'] == False] + +# # TODO: Delete +# self.df_24h_with_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_with_neighbors1_and_incomplete_6h_slots.tsv', index=True, header=True, sep="\t") + + # Change the 24h stations corresponding with the 6h stations containing missing values by changing its value using the formula + # (PR - Sum(PR6)) / (number of missing values), if and only if the resulting value is positive (>=0). + missing_6h_slots_df = self.all_6h_df.loc[self.all_6h_df['count_6h_slots'] < self.num_6h_slots] + missing_6h_slots_df.reset_index(drop=True, inplace=True) + tree_missing_6h = cKDTree(missing_6h_slots_df[[self.COL_LON, self.COL_LAT]]) + for provider_id in self.provider_radius: + radius = self.provider_radius[provider_id] + self.df_24h_with_neighbors.loc[self.df_24h_with_neighbors[self.COL_PROVIDER_ID] == provider_id, + self.COL_VALUE] = self.df_24h_with_neighbors.apply(self.get_decumulated_24h_value_for_missing_6h_values, + axis=1, tree=tree_missing_6h, provider_id=provider_id, + radius=radius, stations_6h_df=missing_6h_slots_df) + self.df_24h_with_neighbors = self.df_24h_with_neighbors.loc[self.df_24h_with_neighbors[self.COL_VALUE] >= 0.0] + +# # TODO: Delete +# self.df_24h_with_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_with_neighbors2_and_incomplete_6h_slots_decumulated.tsv', index=True, header=True, sep="\t") + + # Clean the dataframes of internal columns before merging them + self.df_24h_without_neighbors = self.df_24h_without_neighbors.drop(columns=self.INTERNAL_COLUMNS, errors='ignore') + + # Insert the decumulated stations in the respective 6h slots + return_data_frames = [self.kiwis_24h_dataframe] + for df in filtered_data_frames: + df = df.drop(columns=self.INTERNAL_COLUMNS, errors='ignore') + # Now we need to eliminate the stations that have neighbors on this 6h slot, + # which means the slot of 6h have already a 6h value in the radius and no + # decumulation is needed in this slot. + df_decumulated_24h = self.df_24h_with_neighbors + df_decumulated_24h['has_neighbor_within_radius'] = False + tree = cKDTree(df[[self.COL_LON, self.COL_LAT]]) + df_decumulated_24h = self.update_column_if_provider_stations_are_in_radius(df=df_decumulated_24h, tree=tree) + df_decumulated_24h = df_decumulated_24h[df_decumulated_24h['has_neighbor_within_radius'] == False] + df_decumulated_24h.drop(columns=self.INTERNAL_COLUMNS, errors='ignore') + # Append both decumulated 24h dataframes to the 6h slot + df_filtered = df.append(self.df_24h_without_neighbors) + df_filtered = df_filtered.append(df_decumulated_24h) + return_data_frames.append(df_filtered) + return return_data_frames +