Skip to content

Commit

Permalink
Apply linting to Python files.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffrey-Vervoort-KNMI committed Sep 22, 2023
1 parent 012b735 commit 5331455
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 85 deletions.
32 changes: 21 additions & 11 deletions data-loader/client_knmi_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,32 @@
import grpc
import pandas as pd
import xarray as xr
from parameters import knmi_parameter_names
from google.protobuf.timestamp_pb2 import Timestamp
from parameters import knmi_parameter_names


def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]:
time_series_request_messages = []
observation_request_messages = []
ts_id = 1

with xr.open_dataset(file_path, engine="netcdf4", chunks=None) as file: # chunks=None to disable dask
with xr.open_dataset(
file_path, engine="netcdf4", chunks=None
) as file: # chunks=None to disable dask
for param_id in knmi_parameter_names:
ts_observations = []

param_file = file[param_id]
for station_id, latitude, longitude, height in zip(
file["station"].values, file["lat"].values[0], file["lon"].values[0], file["height"].values[0]
file["station"].values,
file["lat"].values[0],
file["lon"].values[0],
file["height"].values[0],
):
tsMData = dstore.TSMetadata(
station_id=station_id,
param_id=param_id,
pos=dstore.Point(
lat=latitude,
lon=longitude
),
pos=dstore.Point(lat=latitude, lon=longitude),
other1=param_file.name,
other2=param_file.long_name,
other3="value3",
Expand All @@ -60,7 +62,9 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]:
dstore.Observation(
time=ts,
value=obs_value,
metadata=dstore.ObsMetadata(field1="KNMI", field2="Royal Dutch Meteorological Institute"),
metadata=dstore.ObsMetadata(
field1="KNMI", field2="Royal Dutch Meteorological Institute"
),
)
)

Expand All @@ -76,7 +80,9 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]:
def insert_data(time_series_request_messages: List, observation_request_messages: List):
workers = int(cpu_count())

with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel:
with grpc.insecure_channel(
f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}"
) as channel:
client = dstore_grpc.DatastoreStub(channel=channel)

print(f"Inserting {len(time_series_request_messages)} time series requests.")
Expand All @@ -102,8 +108,12 @@ def insert_data(time_series_request_messages: List, observation_request_messages
print("Starting with creating the time series and observations requests.")
create_requests_start = perf_counter()
file_path = Path(Path(__file__).parents[2] / "test-data" / "KNMI" / "20221231.nc")
time_series_request_messages, observation_request_messages = netcdf_file_to_requests(file_path=file_path)
print(f"Finished creating the time series and observation requests {perf_counter() - create_requests_start}.")
time_series_request_messages, observation_request_messages = netcdf_file_to_requests(
file_path=file_path
)
print(
f"Finished creating the time series and observation requests {perf_counter() - create_requests_start}."
)

insert_data(
time_series_request_messages=time_series_request_messages,
Expand Down
40 changes: 27 additions & 13 deletions data-loader/client_knmi_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
import grpc
import pandas as pd
import xarray as xr
from parameters import knmi_parameter_names
from google.protobuf.timestamp_pb2 import Timestamp
from parameters import knmi_parameter_names


def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]:
time_series_request_messages = []
observation_request_messages = []
ts_id = 1

with xr.open_dataset(file_path, engine="netcdf4", chunks=None) as file: # chunks=None to disable dask
with xr.open_dataset(
file_path, engine="netcdf4", chunks=None
) as file: # chunks=None to disable dask
for station_id, latitude, longitude, height in zip(
file["station"].values, file["lat"].values[0], file["lon"].values[0], file["height"].values[0]
file["station"].values,
file["lat"].values[0],
file["lon"].values[0],
file["height"].values[0],
):
ts_observations = []
station_slice = file.sel(station=station_id)
Expand All @@ -34,25 +39,28 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]:
tsMData = dstore.TSMetadata(
station_id=station_id,
param_id=param_id,
pos=dstore.Point(
lat=latitude,
lon=longitude
),
pos=dstore.Point(lat=latitude, lon=longitude),
other1=param_file.name,
other2=param_file.long_name,
other3="value3",
)
time_series_request_messages.append(dstore.AddTSRequest(id=ts_id, metadata=tsMData))
time_series_request_messages.append(
dstore.AddTSRequest(id=ts_id, metadata=tsMData)
)

observations = []
for time, obs_value in zip(pd.to_datetime(param_file["time"].data).to_pydatetime(), param_file.data):
for time, obs_value in zip(
pd.to_datetime(param_file["time"].data).to_pydatetime(), param_file.data
):
ts = Timestamp()
ts.FromDatetime(time)
observations.append(
dstore.Observation(
time=ts,
value=obs_value,
metadata=dstore.ObsMetadata(field1="KNMI", field2="Royal Dutch Meteorological Institute"),
metadata=dstore.ObsMetadata(
field1="KNMI", field2="Royal Dutch Meteorological Institute"
),
)
)

Expand All @@ -67,7 +75,9 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]:
def insert_data(time_series_request_messages: List, observation_request_messages: List):
workers = int(cpu_count())

with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel:
with grpc.insecure_channel(
f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}"
) as channel:
client = dstore_grpc.DatastoreStub(channel=channel)

print(f"Inserting {len(time_series_request_messages)} time series requests.")
Expand All @@ -93,8 +103,12 @@ def insert_data(time_series_request_messages: List, observation_request_messages
print("Starting with creating the time series and observations requests.")
create_requests_start = perf_counter()
file_path = Path(Path(__file__).parents[2] / "test-data" / "KNMI" / "20221231.nc")
time_series_request_messages, observation_request_messages = netcdf_file_to_requests(file_path=file_path)
print(f"Finished creating the time series and observation requests {perf_counter() - create_requests_start}.")
time_series_request_messages, observation_request_messages = netcdf_file_to_requests(
file_path=file_path
)
print(
f"Finished creating the time series and observation requests {perf_counter() - create_requests_start}."
)

insert_data(
time_series_request_messages=time_series_request_messages,
Expand Down
51 changes: 45 additions & 6 deletions data-loader/parameters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,46 @@
knmi_parameter_names = (
'hc3', 'nc2', 'zm', 'R1H', 'hc', 'tgn', 'Tn12', 'pr', 'pg', 'tn', 'rg',
'hc1', 'nc1', 'ts1', 'nc3', 'ts2', 'qg', 'ff', 'ww', 'gff', 'dd',
'td', 'ww-10', 'Tgn12', 'ss', 'Tn6', 'dr', 'rh', 'hc2', 'Tgn6',
'R12H', 'R24H', 'Tx6', 'Tx24', 'Tx12', 'Tgn14', 'D1H', 'R6H', 'pwc',
'tx', 'nc', 'pp', 'Tn14', 'ta'
)
"hc3",
"nc2",
"zm",
"R1H",
"hc",
"tgn",
"Tn12",
"pr",
"pg",
"tn",
"rg",
"hc1",
"nc1",
"ts1",
"nc3",
"ts2",
"qg",
"ff",
"ww",
"gff",
"dd",
"td",
"ww-10",
"Tgn12",
"ss",
"Tn6",
"dr",
"rh",
"hc2",
"Tgn6",
"R12H",
"R24H",
"Tx6",
"Tx24",
"Tx12",
"Tgn14",
"D1H",
"R6H",
"pwc",
"tx",
"nc",
"pp",
"Tn14",
"ta",
)
2 changes: 1 addition & 1 deletion data-loader/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

grpcio-tools~=1.56
netCDF4~=1.6
xarray~=2023.7
xarray~=2023.7
39 changes: 20 additions & 19 deletions examples/clients/python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,41 @@
import os
from datetime import datetime

from google.protobuf.timestamp_pb2 import Timestamp

import datastore_pb2 as dstore
import datastore_pb2_grpc as dstore_grpc
import grpc
from google.protobuf.timestamp_pb2 import Timestamp

MAGIC_ID = 1234567890
MAGIC_VALUE = 123.456


def callAddTimeSeries(stub):
print('calling AddTimeSeries() ...')
print("calling AddTimeSeries() ...")
tsMData = dstore.TSMetadata(
station_id='18700',
param_id='211',
station_id="18700",
param_id="211",
pos=dstore.Point(
lat=59.91,
lon=10.75,
),
other1='value1',
other2='value2',
other3='value3',
other1="value1",
other2="value2",
other3="value3",
)
request = dstore.AddTSRequest(
id=MAGIC_ID,
metadata=tsMData,
)
response = stub.AddTimeSeries(request)
print(' response: {}'.format(response))
print(" response: {}".format(response))


def callPutObservations(stub):
print('calling PutObservations() ...')
print("calling PutObservations() ...")
obsMData = dstore.ObsMetadata(
field1='value1',
field2='value2',
field1="value1",
field2="value2",
)
timestamp = Timestamp()
timestamp.FromDatetime(datetime.now())
Expand All @@ -59,11 +59,11 @@ def callPutObservations(stub):
],
)
response = stub.PutObservations(request)
print(' response: {}'.format(response))
print(" response: {}".format(response))


def callGetObservations(stub):
print('calling GetObservations() ...')
print("calling GetObservations() ...")
from_time = Timestamp()
from_time.FromDatetime(datetime(2023, 1, 1))
to_time = Timestamp()
Expand All @@ -75,14 +75,15 @@ def callGetObservations(stub):
totime=to_time,
)
response = stub.GetObservations(request)
print(' response: {}'.format(response))
print(" response: {}".format(response))

return response


if __name__ == '__main__':

with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel:
if __name__ == "__main__":
with grpc.insecure_channel(
f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}"
) as channel:
stub = dstore_grpc.DatastoreStub(channel)

callAddTimeSeries(stub)
Expand All @@ -94,6 +95,6 @@ def callGetObservations(stub):
for r in response.tsobs:
if r.tsid == MAGIC_ID:
for o in r.obs:
assert(o.value == MAGIC_VALUE)
assert o.value == MAGIC_VALUE
found_at_least_one = True
assert found_at_least_one
12 changes: 6 additions & 6 deletions integration-test/discover.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# This code was used to double-check the values tested in test_knmi.py
from pathlib import Path


import pandas as pd
import xarray as xr

file_path = Path(Path(__file__).parents[1] / "test-data" / "KNMI" / "20221231.nc")
with xr.open_dataset(file_path, engine="netcdf4", chunks=None) as ds: # chunks=None to disable dask
with xr.open_dataset(
file_path, engine="netcdf4", chunks=None
) as ds: # chunks=None to disable dask
# print(ds)
print(ds.sel(station='06260').isel(time=0).lat.values)
print(ds.sel(station='06260').isel(time=0).lon.values)
print(ds.sel(station="06260").isel(time=0).lat.values)
print(ds.sel(station="06260").isel(time=0).lon.values)

print(ds.dims)

print(ds.sel(station='06260').rh.values)

print(ds.sel(station="06260").rh.values)
Loading

0 comments on commit 5331455

Please sign in to comment.