Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update master #25

Merged
merged 6 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rainforest/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ def read_task_file(task_file):
tasks_dic[int(line[0])] = line[1:]
return tasks_dic

def read_df(pattern, dbsystem = 'dask', sqlContext = None):
def read_df(pattern, dbsystem='dask', sqlContext=None):
"""
Reads a set of data contained in a folder as a spark or dask DataFrame

Expand All @@ -532,7 +532,7 @@ def read_df(pattern, dbsystem = 'dask', sqlContext = None):
dbsystem : str
Either "dask" if you want a Dask DataFrame or "spark" if you want a
spark dataframe
sqlContext : sqlContext instance
sqlContext : sqlContext instance, new: SparkSession instant
sqlContext to use, required only if dbystem = 'spark'

Returns
Expand Down
6 changes: 3 additions & 3 deletions rainforest/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext, DataFrame
from pyspark.sql import SparkSession, DataFrame

# This could benefit from some tweaks especially if the database becomes larger
conf = SparkConf()
Expand Down Expand Up @@ -96,7 +96,7 @@ def __init__(self, config_file = None):

"""
sparkContext = SparkContext(conf = conf)
self.sqlContext = SQLContext(sparkContext)
self.sqlContext = SparkSession(sparkContext)
self.tables = TableDict()
self.summaries = {}
if config_file:
Expand Down Expand Up @@ -888,7 +888,7 @@ def update_radar_data(self, gauge_table_name, output_folder,

for fn in job_files:
logging.info('Submitting job {}'.format(fn))
#subprocess.call('sbatch {:s}'.format(fn), shell = True)
subprocess.call('sbatch {:s}'.format(fn), shell = True)


def _compare_config(config1, config2, keys = None):
Expand Down
7 changes: 4 additions & 3 deletions rainforest/database/retrieve_radar_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ def process_all_timesteps(self):

logging.info('Processing timestep '+str(tstep))
# Set t-start -5 minutes to get all the files between, e.g., H:01 and H:10 and log at H:10
tstart = datetime.datetime.utcfromtimestamp(float(tstep)) - datetime.timedelta(minutes=5)
tend= datetime.datetime.utcfromtimestamp(float(tstep))
tstart = datetime.datetime.fromtimestamp(float(tstep), tz=datetime.timezone.utc) - datetime.timedelta(minutes=5)
tend= datetime.datetime.fromtimestamp(float(tstep), tz=datetime.timezone.utc)

stations_to_get = self.tasks[tstep]
# Change to the timestep where the data is logged
Expand Down Expand Up @@ -502,7 +502,8 @@ def process_all_timesteps(self):

status_file = rad_files['status'][tstamp]

radar = Radar(r, rad_files['radar'][tstamp], status_file, vpr_file)
radar = Radar(r, rad_files['radar'][tstamp], status_file, vpr_file,
temp_ref=self.temp_ref)

# Add ISO0_HEIGHT and height_over_iso0 to radar object
if (self.temp_ref == "ISO0_HEIGHT") or ("ISO0_HEIGHT" in self.other_variables):
Expand Down
8 changes: 5 additions & 3 deletions rainforest/performance/eval_get_estimates.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,14 @@ def _get_data(self):

# Check if the RF-models are there
for model in self.modellist:
self.model_files = {}
path = self.qpefolder+'{}'.format(model)
if len(os.listdir(path)) == 0 :
if not os.path.exists(path) or (len(os.listdir(path)) == 0):
logging.info('Extracting {} files from archive'.format(model))
try:
path = self.qpefolder+'{}'.format(model)
self.ref_files[ref] = retrieve_prod(path + '/', self.tstart,
self.tend, ref)
self.model_files[model] = retrieve_prod(path + '/', self.tstart,
self.tend, model)
logging.info('Model data: {} taken from file archive!'.format(model))
except:
logging.error('No QPE maps available for {}, please check path or produce QPE maps'.format(model))
Expand Down
Loading