-
Notifications
You must be signed in to change notification settings - Fork 1
/
importers.py
309 lines (241 loc) · 8.62 KB
/
importers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import gzip
from matplotlib.pyplot import imread
import numpy as np
try:
import h5py
H5PY_IMPORTED = True
except ImportError:
H5PY_IMPORTED = False
try:
import netCDF4
NETCDF4_IMPORTED = True
except ImportError:
NETCDF4_IMPORTED = False
try:
import pyproj
PYPROJ_IMPORTED = True
except ImportError:
PYPROJ_IMPORTED = False
try:
import xarray as xr
XARRAY_IMPORTED = True
except ImportError:
XARRAY_IMPORTED = False
def get_method(name):
"""Return the given importer method. The currently implemented options are
'netcdf', 'odim_hdf5' and 'pgm'."""
if name == "netcdf":
return import_netcdf
elif name == "odim_hdf5":
return import_opera_odim_hdf5
elif name == "pgm":
return import_pgm
else:
raise NotImplementedError(f"importer {name} not implemented")
def import_netcdf(filename, corr_refl=True, **kwargs):
"""Import a NetCDF file produced by radar_composite_generator."""
if not XARRAY_IMPORTED:
raise ModuleNotFoundError("xarray is required but not installed")
if not NETCDF4_IMPORTED:
raise ModuleNotFoundError("netCDF4 is required but not installed")
ds = xr.load_dataset(filename)
qty = "DBZHC" if corr_refl else "DBZH"
refl = np.array(ds[qty][0])
precip = 10.0 ** (refl / 10.0)
precip = (precip / 223.0) ** (1.0 / 1.53)
return precip, dict()
def import_pgm(filename, gzipped=True, **kwargs):
"""
Import a 8-bit PGM radar reflectivity composite from the FMI archive and
convert it to precipitation rate (mm/h).
Parameters
----------
filename: str
Name of the file to read.
gzipped: bool
If True, the input file is treated as a compressed gzip file.
Raises
------
ModuleNotFoundError
If pyproj was not found.
Returns
-------
out: tuple
A two-element tuple containing the reflectivity composite in dBZ and
the associated metadata.
Notes
-----
Reading georeferencing metadata is supported only for stereographic
projection. For other projections, the keys related to georeferencing are
not set.
"""
if not PYPROJ_IMPORTED:
raise ModuleNotFoundError("pyproj is required but not installed")
if gzipped is False:
precip = imread(filename)
else:
precip = imread(gzip.open(filename, "r"))
pgm_metadata = _import_fmi_pgm_metadata(filename, gzipped=gzipped)
geodata = _import_fmi_pgm_geodata(pgm_metadata)
mask = precip == pgm_metadata["missingval"]
precip = precip.astype(float)
precip[mask] = np.nan
precip = (precip - 64.0) / 2.0
# convert from dBZ to rain rate (mm/h)
precip = 10.0 ** (precip / 10.0)
precip = (precip / 223.0) ** (1.0 / 1.53)
metadata = geodata
metadata["institution"] = "Finnish Meteorological Institute"
metadata["accutime"] = 5.0
metadata["unit"] = "dBZ"
metadata["transform"] = "dB"
metadata["zerovalue"] = np.nanmin(precip)
metadata["threshold"] = _get_threshold_value(precip)
metadata["zr_a"] = 223.0
metadata["zr_b"] = 1.53
return precip, metadata
def import_opera_odim_hdf5(filename, quantity="DBZH", **kwargs):
"""Read a composite from a OPERA ODIM HDF5 file.
Parameters
----------
filename : str
Name of the file to read.
quantity : the quantity to read
ACRR: hourly accumulated rainfall (mm)
DBZH: reflectivity (dBZ)
RATE: rainfall rate (mm/h)
Raises
------
KeyError
If the requested quantity was not found.
ModuleNotFoundError
If h5py or pyproj was not found.
OSError
If the input file could not be read.
Returns
-------
out : tuple
A two-element tuple containing the radar composite in a numpy array and
the metadata dictionary.
"""
if not H5PY_IMPORTED:
raise ModuleNotFoundError("h5py required for reading HDF5 files but not found")
if not PYPROJ_IMPORTED:
raise ModuleNotFoundError("pyproj is required but not installed")
f = h5py.File(filename, "r")
data_found = False
for k in f.keys():
if "dataset" in k:
qty = f[k]["what"].attrs["quantity"]
if qty.decode() == quantity:
data_found = True
data = f[k]["data1"]["data"][...]
nodata_mask = data == f[k]["what"].attrs["nodata"]
undetect_mask = data == f[k]["what"].attrs["undetect"]
radar_composite = data.astype(np.float32)
gain = f[k]["what"].attrs["gain"]
offset = f[k]["what"].attrs["offset"]
radar_composite = radar_composite * gain + offset
radar_composite[nodata_mask] = np.nan
if quantity != "DBZH":
radar_composite[undetect_mask] = offset
else:
radar_composite[undetect_mask] = -30.0
metadata = {}
projection = f["where"].attrs["projdef"].decode()
metadata["projection"] = projection
ll_lon = f["where"].attrs["LL_lon"]
ll_lat = f["where"].attrs["LL_lat"]
ur_lon = f["where"].attrs["UR_lon"]
ur_lat = f["where"].attrs["UR_lat"]
pr = pyproj.Proj(projection)
ll_x, ll_y = pr(ll_lon, ll_lat)
ur_x, ur_y = pr(ur_lon, ur_lat)
metadata["ll_x"] = ll_x
metadata["ll_y"] = ll_y
metadata["ur_x"] = ur_x
metadata["ur_y"] = ur_y
xpixelsize = f["where"].attrs["xscale"]
ypixelsize = f["where"].attrs["yscale"]
metadata["xpixelsize"] = xpixelsize
metadata["ypixelsize"] = ypixelsize
metadata["institution"] = "Finnish Meteorological Institute"
metadata["timestep"] = 5
if quantity == "ACRR":
metadata["unit"] = "mm"
elif quantity == "DBZH":
metadata["unit"] = "dBZ"
elif quantity == "RATE":
metadata["unit"] = "mm/h"
break
f.close()
if not data_found:
raise KeyError(f"no composite for quantity '{quantity}' found from {filename}")
else:
return radar_composite, metadata
def _import_fmi_pgm_geodata(metadata):
geodata = {}
projdef = ""
if "type" in metadata.keys() and metadata["type"][0] == "stereographic":
projdef += "+proj=stere "
projdef += " +lon_0=" + metadata["centrallongitude"][0] + "E"
projdef += " +lat_0=" + metadata["centrallatitude"][0] + "N"
projdef += " +lat_ts=" + metadata["truelatitude"][0]
# These are hard-coded because the projection definition
# is missing from the PGM files.
projdef += " +a=6371288"
projdef += " +x_0=380886.310"
projdef += " +y_0=3395677.920"
projdef += " +no_defs"
#
geodata["projection"] = projdef
ll_lon, ll_lat = [float(v) for v in metadata["bottomleft"]]
ur_lon, ur_lat = [float(v) for v in metadata["topright"]]
pr = pyproj.Proj(projdef)
x1, y1 = pr(ll_lon, ll_lat)
x2, y2 = pr(ur_lon, ur_lat)
geodata["x1"] = x1
geodata["y1"] = y1
geodata["x2"] = x2
geodata["y2"] = y2
geodata["cartesian_unit"] = "m"
geodata["xpixelsize"] = float(metadata["metersperpixel_x"][0])
geodata["ypixelsize"] = float(metadata["metersperpixel_y"][0])
geodata["yorigin"] = "upper"
return geodata
def _get_threshold_value(precip):
valid_mask = np.isfinite(precip)
if valid_mask.any():
_precip = precip[valid_mask]
min_precip = _precip.min()
above_min_mask = _precip > min_precip
if above_min_mask.any():
return np.min(_precip[above_min_mask])
else:
return min_precip
else:
return np.nan
def _import_fmi_pgm_metadata(filename, gzipped=False):
metadata = {}
if not gzipped:
f = open(filename, "rb")
else:
f = gzip.open(filename, "rb")
file_line = f.readline()
while not file_line.startswith(b"#"):
file_line = f.readline()
while file_line.startswith(b"#"):
x = file_line.decode()
x = x[1:].strip().split(" ")
if len(x) >= 2:
k = x[0]
v = x[1:]
metadata[k] = v
else:
file_line = f.readline()
continue
file_line = f.readline()
file_line = f.readline().decode()
metadata["missingval"] = int(file_line)
f.close()
return metadata