From 386ddd94455fd82cd4692829ad8805054f92d424 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Fri, 22 Sep 2023 13:37:45 +0200 Subject: [PATCH 01/17] Add python linting and pre-commit configuration. --- .github/workflows/ci.yml | 41 ++++++++++++++++++++++++++++++++ .pre-commit-config.yaml | 50 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 .pre-commit-config.yaml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d1b8f0d..e4acddf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,48 @@ on: tags: - '[0-9]+.[0-9]+.[0-9]+' +env: + LINE_LENGTH: 99 # TODO: would we like to increase this to 120? + jobs: + python-lint-black: + runs-on: ubuntu-latest + steps: + - name: Checkout Source + uses: actions/checkout@v3 + + - name: Python Setup + uses: actions/setup-python@v4 + with: + python-version: '3.11' + architecture: x64 + + - name: Lint Python with Black + uses: psf/black@stable + with: + options: "--check --verbose --line-length=$LINE_LENGTH" + + python-lint-flake8: + runs-on: ubuntu-latest + steps: + - name: Python Setup + uses: actions/setup-python@v4 + with: + python-version: '3.11' + architecture: x64 + + - name: Checkout Source + uses: actions/checkout@v3 + + - name: Install flake8 + run: pip install flake8 + + - name: Syntax Error Check + run: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + + - name: Code Style Check + run: flake8 . --count --max-line-length=$LINE_LENGTH --ignore=W503 --show-source --statistics + test: runs-on: ubuntu-latest steps: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a381097 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,50 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + # Formatting + - id: end-of-file-fixer # Makes sure files end in a newline and only a newline. + - id: pretty-format-json + args: ["--autofix", "--indent=4", "--no-ensure-ascii", "--no-sort-keys"] # Formats and sorts your JSON files. + - id: trailing-whitespace # Trims trailing whitespace. + # Checks + - id: check-json # Attempts to load all json files to verify syntax. + - id: check-merge-conflict # Check for files that contain merge conflict strings. + - id: check-shebang-scripts-are-executable # Checks that scripts with shebangs are executable. + - id: check-yaml + # only checks syntax not load the yaml: + # https://stackoverflow.com/questions/59413979/how-exclude-ref-tag-from-check-yaml-git-hook + args: ["--unsafe"] # Parse the yaml files for syntax. + + # reorder-python-imports ~ sort python imports + - repo: https://github.com/asottile/reorder_python_imports + rev: v3.12.0 + hooks: + - id: reorder-python-imports + + # black ~ Formats Python code + - repo: https://github.com/psf/black + rev: 23.10.0 + hooks: + - id: black + args: ["--line-length=99"] + + # flake8 ~ Enforces the Python PEP8 style guide + - repo: https://github.com/pycqa/flake8 + rev: 6.1.0 + hooks: + - id: flake8 + args: + [ + "--ignore=W503", + "--max-line-length=99", + ] + + # hadolint ~ Docker linter + - repo: https://github.com/hadolint/hadolint + rev: v2.12.0 + hooks: + - id: hadolint-docker + args: [ + "--ignore=DL3008", # Pin versions in apt get install. + ] From 20b10ad1e7a9508d17fef261636dc8d11e3a3707 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Fri, 22 Sep 2023 17:19:43 +0200 Subject: [PATCH 02/17] To use environment variable change to install with pip. --- .github/workflows/ci.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e4acddf..020544e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,10 +23,11 @@ jobs: python-version: '3.11' architecture: x64 + - name: Install Black + run: pip install black + - name: Lint Python with Black - uses: psf/black@stable - with: - options: "--check --verbose --line-length=$LINE_LENGTH" + run: black . --check --line-length=${LINE_LENGTH} python-lint-flake8: runs-on: ubuntu-latest From 757fbf67eb019e6be7b246b74922e15a91841797 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Fri, 20 Oct 2023 16:28:28 +0200 Subject: [PATCH 03/17] Update the readme with pre-commit information. --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 6c45bd9..6c1878a 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,10 @@ # e-soh-datastore-poc E-SOH datastore PoCs + + +## Pre-commit +To update the pre-commit hook run: `pre-commit autoupdate` + +To use the pre-commit hook reinitialize the repository with `git init` and install the pre-commit hook with `pre-commit install`. + +To run the pre-commit for every file in the repository run `pre-commit run --config './.pre-commit-config.yaml' --all-files` From 1bbdd828bd231bb151f0e81261815aceb5b8a7bc Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Fri, 20 Oct 2023 17:11:09 +0200 Subject: [PATCH 04/17] Run the pre-commit hook for the Python files. --- api/locustfile.py | 26 +++- api/main.py | 159 +++++++++++++++-------- data-loader/client_fmi_station.py | 59 ++++----- data-loader/client_knmi_station.py | 37 ++++-- data-loader/parameters.py | 51 +++++++- data-loader/requirements.in | 2 +- examples/clients/python/client.py | 91 ++++++------- integration-test/discover.py | 13 +- integration-test/test_delete.py | 24 +++- integration-test/test_knmi.py | 30 +++-- load-test/grpc_user.py | 5 +- load-test/locustfile.py | 39 ++++-- tstester/README.md | 1 - tstester/common.py | 48 ++++--- tstester/config.json | 20 +-- tstester/main.py | 31 ++--- tstester/netcdf.py | 94 +++++++------- tstester/netcdfsbe_tsmdatainpostgis.py | 28 ++-- tstester/pgconnectioninfo.py | 4 +- tstester/pgopbackend.py | 66 ++++++---- tstester/postgissbe.py | 156 +++++++++++++--------- tstester/storagebackend.py | 11 +- tstester/timescaledbsbe.py | 161 ++++++++++++++--------- tstester/timeseries.py | 7 +- tstester/tstester.py | 171 ++++++++++++++----------- 25 files changed, 807 insertions(+), 527 deletions(-) mode change 100644 => 100755 data-loader/client_fmi_station.py diff --git a/api/locustfile.py b/api/locustfile.py index 5c170a3..9e55120 100644 --- a/api/locustfile.py +++ b/api/locustfile.py @@ -1,11 +1,18 @@ import random -from locust import HttpUser, between, task +from locust import HttpUser +from locust import task parameters = ["ff", "dd", "rh", "pp", "tn"] -stations = ["06203", "06204", "06205", "06207", "06208", "06211", "06214", "06215", "06235", "06239", "06242", "06251", "06260", "06269", "06270", "06275", "06279", "06280", "06290", "06310", "06317", "06319", "06323", "06330", "06340", "06344", "06348", "06350", "06356", "06370", "06375", "06380", "78871", "78873"] - +# fmt: off +stations = [ + "06203", "06204", "06205", "06207", "06208", "06211", "06214", "06215", "06235", "06239", + "06242", "06251", "06260", "06269", "06270", "06275", "06279", "06280", "06290", "06310", + "06317", "06319", "06323", "06330", "06340", "06344", "06348", "06350", "06356", "06370", + "06375", "06380", "78871", "78873", +] +# fmt: on headers = {"Accept-Encoding": "br"} @@ -14,8 +21,17 @@ class WebsiteUser(HttpUser): def get_data_single_station_single_parameter(self): parameter = random.choice(parameters) station_id = random.choice(stations) - self.client.get(f"/collections/observations/locations/{station_id}?parameter-name={parameter}", name=f"single station {parameter}", headers=headers) + self.client.get( + f"/collections/observations/locations/{station_id}?parameter-name={parameter}", + name=f"single station {parameter}", + headers=headers, + ) @task def get_data_bbox_three_parameters(self): - self.client.get(f"/collections/observations/area?parameter-name=dd,ff,rh&coords=POLYGON((5.0 52.0,6.0 52.0,6.0 52.1,5.0 52.1,5.0 52.0))", name=f"bbox", headers=headers) + self.client.get( + "/collections/observations/area?parameter-name=dd,ff,rh&" + "coords=POLYGON((5.0 52.0,6.0 52.0,6.0 52.1,5.0 52.1,5.0 52.0))", + name="bbox", + headers=headers, + ) diff --git a/api/main.py b/api/main.py index 69c7119..31ea8aa 100644 --- a/api/main.py +++ b/api/main.py @@ -2,31 +2,34 @@ # For developing: uvicorn main:app --reload import itertools import os -from datetime import datetime from datetime import timezone from itertools import groupby +import datastore_pb2 as dstore +import datastore_pb2_grpc as dstore_grpc +import grpc from brotli_asgi import BrotliMiddleware - +from covjson_pydantic.coverage import Coverage +from covjson_pydantic.coverage import CoverageCollection +from covjson_pydantic.domain import Axes +from covjson_pydantic.domain import Domain +from covjson_pydantic.domain import DomainType +from covjson_pydantic.domain import ValuesAxis from covjson_pydantic.ndarray import NdArray from covjson_pydantic.observed_property import ObservedProperty from covjson_pydantic.parameter import Parameter -from covjson_pydantic.reference_system import ReferenceSystemConnectionObject, ReferenceSystem +from covjson_pydantic.reference_system import ReferenceSystem +from covjson_pydantic.reference_system import ReferenceSystemConnectionObject from fastapi import FastAPI -from fastapi import Query, Path -from geojson_pydantic import FeatureCollection, Feature, Point - -from google.protobuf.timestamp_pb2 import Timestamp +from fastapi import Path +from fastapi import Query +from geojson_pydantic import Feature +from geojson_pydantic import FeatureCollection +from geojson_pydantic import Point from pydantic import AwareDatetime - -import datastore_pb2 as dstore -import datastore_pb2_grpc as dstore_grpc -import grpc - -from covjson_pydantic.coverage import Coverage, CoverageCollection -from covjson_pydantic.domain import Domain, DomainType, Axes, ValuesAxis - -from shapely import wkt, buffer, geometry +from shapely import buffer +from shapely import geometry +from shapely import wkt # TODO: Order in CoverageJSON dictionaries (parameters, ranges) is not fixed! @@ -37,7 +40,9 @@ def collect_data(ts_mdata, obs_mdata): lat = obs_mdata[0].geo_point.lat # HACK: For now assume they all have the same position lon = obs_mdata[0].geo_point.lon - tuples = ((o.obstime_instant.ToDatetime(tzinfo=timezone.utc), float(o.value)) for o in obs_mdata) # HACK: str -> float + tuples = ( + (o.obstime_instant.ToDatetime(tzinfo=timezone.utc), float(o.value)) for o in obs_mdata + ) # HACK: str -> float (times, values) = zip(*tuples) param_id = ts_mdata.instrument @@ -45,7 +50,9 @@ def collect_data(ts_mdata, obs_mdata): def get_data_for_time_series(get_obs_request): - with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: + with grpc.insecure_channel( + f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" + ) as channel: grpc_stub = dstore_grpc.DatastoreStub(channel) response = grpc_stub.GetObservations(get_obs_request) @@ -55,51 +62,84 @@ def get_data_for_time_series(get_obs_request): # Need to sort before using groupBy data.sort(key=lambda x: x[0]) - # The multiple coverage logic is not needed for this endpoint, but we want to share this code between endpoints + # The multiple coverage logic is not needed for this endpoint, + # but we want to share this code between endpoints for (lat, lon, times), group in groupby(data, lambda x: x[0]): referencing = [ - ReferenceSystemConnectionObject(coordinates=["y", "x"], - system=ReferenceSystem(type="GeographicCRS", id="http://www.opengis.net/def/crs/EPSG/0/4326")), - ReferenceSystemConnectionObject(coordinates=["z"], - system=ReferenceSystem(type="TemporalRS", calendar="Gregorian")), + ReferenceSystemConnectionObject( + coordinates=["y", "x"], + system=ReferenceSystem( + type="GeographicCRS", id="http://www.opengis.net/def/crs/EPSG/0/4326" + ), + ), + ReferenceSystemConnectionObject( + coordinates=["z"], + system=ReferenceSystem(type="TemporalRS", calendar="Gregorian"), + ), ] - domain = Domain(domainType=DomainType.point_series, - axes=Axes(x=ValuesAxis[float](values=[lon]), - y=ValuesAxis[float](values=[lat]), - t=ValuesAxis[AwareDatetime](values=times)), - referencing=referencing) + domain = Domain( + domainType=DomainType.point_series, + axes=Axes( + x=ValuesAxis[float](values=[lon]), + y=ValuesAxis[float](values=[lat]), + t=ValuesAxis[AwareDatetime](values=times), + ), + referencing=referencing, + ) group1, group2 = itertools.tee(group, 2) # Want to use generator twice - parameters = {param_id: Parameter(observedProperty=ObservedProperty(label={"en": param_id})) - for ((_, _, _), param_id, values) in group1} - ranges = {param_id: NdArray(values=values, axisNames=["t", "y", "x"], shape=[len(values), 1, 1]) - for ((_, _, _), param_id, values) in group2} + parameters = { + param_id: Parameter(observedProperty=ObservedProperty(label={"en": param_id})) + for ((_, _, _), param_id, values) in group1 + } + ranges = { + param_id: NdArray( + values=values, axisNames=["t", "y", "x"], shape=[len(values), 1, 1] + ) + for ((_, _, _), param_id, values) in group2 + } coverages.append(Coverage(domain=domain, parameters=parameters, ranges=ranges)) if len(coverages) == 1: return coverages[0] else: - return CoverageCollection(coverages=coverages, parameters=coverages[0].parameters) # HACK to take parameters from first one + return CoverageCollection( + coverages=coverages, parameters=coverages[0].parameters + ) # HACK to take parameters from first one @app.get( "/collections/observations/locations", response_model=FeatureCollection, - response_model_exclude_none=True, ) -def get_locations(bbox: str = Query(..., example="5.0,52.0,6.0,52.1")) -> FeatureCollection: # Hack to use string + response_model_exclude_none=True, +) +def get_locations( + bbox: str = Query(..., example="5.0,52.0,6.0,52.1") +) -> FeatureCollection: # Hack to use string left, bottom, right, top = map(str.strip, bbox.split(",")) poly = geometry.Polygon([(left, bottom), (right, bottom), (right, top), (left, top)]) - with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: + with grpc.insecure_channel( + f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" + ) as channel: grpc_stub = dstore_grpc.DatastoreStub(channel) ts_request = dstore.GetObsRequest( instruments=["tn"], # Hack - inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]) + inside=dstore.Polygon( + points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords] + ), ) ts_response = grpc_stub.GetObservations(ts_request) features = [ - Feature(type="Feature", id=ts.ts_mdata.platform, properties=None, - geometry=Point(type="Point", coordinates=(ts.obs_mdata[0].geo_point.lon, ts.obs_mdata[0].geo_point.lat))) # HACK: Assume loc the same + Feature( + type="Feature", + id=ts.ts_mdata.platform, + properties=None, + geometry=Point( + type="Point", + coordinates=(ts.obs_mdata[0].geo_point.lon, ts.obs_mdata[0].geo_point.lat), + ), + ) # HACK: Assume loc the same for ts in ts_response.observations ] return FeatureCollection(features=features, type="FeatureCollection") @@ -108,17 +148,20 @@ def get_locations(bbox: str = Query(..., example="5.0,52.0,6.0,52.1")) -> Featur @app.get( "/collections/observations/locations/{location_id}", response_model=Coverage, - response_model_exclude_none=True, ) -def get_data_location_id(location_id: str = Path(..., example="06260"), - parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn")): - # TODO: There is no error handling of any kind at the moment! This is just a quick and dirty demo + response_model_exclude_none=True, +) +def get_data_location_id( + location_id: str = Path(..., example="06260"), + parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn"), +): + # TODO: There is no error handling of any kind at the moment! + # This is just a quick and dirty demo # TODO: Code does not handle nan when serialising to JSON # TODO: Get time interval from request (example to create protobuf timestamp: # from_time = Timestamp() # from_time.FromDatetime(datetime(2022, 12, 31)) get_obs_request = dstore.GetObsRequest( - platforms=[location_id], - instruments=list(map(str.strip, parameter_name.split(","))) + platforms=[location_id], instruments=list(map(str.strip, parameter_name.split(","))) ) return get_data_for_time_series(get_obs_request) @@ -126,11 +169,14 @@ def get_data_location_id(location_id: str = Path(..., example="06260"), @app.get( "/collections/observations/position", response_model=Coverage | CoverageCollection, - response_model_exclude_none=True, ) -def get_data_position(coords: str = Query(..., example="POINT(5.179705 52.0988218)"), - parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn")): + response_model_exclude_none=True, +) +def get_data_position( + coords: str = Query(..., example="POINT(5.179705 52.0988218)"), + parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn"), +): point = wkt.loads(coords) - assert(point.geom_type == "Point") + assert point.geom_type == "Point" poly = buffer(point, 0.0001, quad_segs=1) # Roughly 10 meters around the point return get_data_area(poly.wkt, parameter_name) @@ -138,13 +184,18 @@ def get_data_position(coords: str = Query(..., example="POINT(5.179705 52.098821 @app.get( "/collections/observations/area", response_model=Coverage | CoverageCollection, - response_model_exclude_none=True, ) -def get_data_area(coords: str = Query(..., example="POLYGON((5.0 52.0, 6.0 52.0,6.0 52.1,5.0 52.1, 5.0 52.0))"), - parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn")): + response_model_exclude_none=True, +) +def get_data_area( + coords: str = Query(..., example="POLYGON((5.0 52.0, 6.0 52.0,6.0 52.1,5.0 52.1, 5.0 52.0))"), + parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn"), +): poly = wkt.loads(coords) - assert(poly.geom_type == "Polygon") + assert poly.geom_type == "Polygon" get_obs_request = dstore.GetObsRequest( instruments=list(map(str.strip, parameter_name.split(","))), - inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]) + inside=dstore.Polygon( + points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords] + ), ) return get_data_for_time_series(get_obs_request) diff --git a/data-loader/client_fmi_station.py b/data-loader/client_fmi_station.py old mode 100644 new mode 100755 index df7ac4a..f8fb426 --- a/data-loader/client_fmi_station.py +++ b/data-loader/client_fmi_station.py @@ -3,6 +3,7 @@ import concurrent import os import uuid +from datetime import datetime from multiprocessing import cpu_count from pathlib import Path from time import perf_counter @@ -14,57 +15,55 @@ import grpc import pandas as pd from google.protobuf.timestamp_pb2 import Timestamp -from datetime import datetime def csv_file_to_requests(file_path: Path | str) -> Tuple[List, List]: - time_format = "%Y%m%d %H:%M:%S" observation_request_messages = [] ts_mdata = None obs_mdata = None - + # Read the CSV file into a pandas DataFrame - df = pd.read_csv(file_path, encoding = 'iso-8859-15', encoding_errors='replace') - df.dropna(subset=['DATA_VALUE'], inplace=True) - df = df.drop_duplicates(subset=['STATION_ID','MEASURAND_CODE','DATA_TIME']) - df.fillna('None', inplace=True) - df = df.groupby('STATION_ID') + df = pd.read_csv(file_path, encoding="iso-8859-15", encoding_errors="replace") + df.dropna(subset=["DATA_VALUE"], inplace=True) + df = df.drop_duplicates(subset=["STATION_ID", "MEASURAND_CODE", "DATA_TIME"]) + df.fillna("None", inplace=True) + df = df.groupby("STATION_ID") for i, r in df: observations = [] for i, r in r.iterrows(): ts_mdata = dstore.TSMetadata( - platform=str(r['STATION_ID']), - instrument=str(r['MEASURAND_CODE']), - title='FMI test data', - standard_name= str(r['MEASURAND_CODE']), - unit= r['MEASURAND_UNIT'] + platform=str(r["STATION_ID"]), + instrument=str(r["MEASURAND_CODE"]), + title="FMI test data", + standard_name=str(r["MEASURAND_CODE"]), + unit=r["MEASURAND_UNIT"], ) - + ts = Timestamp() - ts.FromDatetime(datetime.strptime(r['DATA_TIME'], time_format)) - + ts.FromDatetime(datetime.strptime(r["DATA_TIME"], time_format)) + obs_mdata = dstore.ObsMetadata( - id=str(uuid.uuid4()), - geo_point=dstore.Point( - lat=r['LATITUDE'], - lon=r['LONGITUDE'] - ), - obstime_instant=ts, - value=str(r['DATA_VALUE']), # TODO: Store float in DB - ) - + id=str(uuid.uuid4()), + geo_point=dstore.Point(lat=r["LATITUDE"], lon=r["LONGITUDE"]), + obstime_instant=ts, + value=str(r["DATA_VALUE"]), # TODO: Store float in DB + ) + observations.append(dstore.Metadata1(ts_mdata=ts_mdata, obs_mdata=obs_mdata)) - observation_request_messages.append(dstore.PutObsRequest(observations=observations)) + observation_request_messages.append(dstore.PutObsRequest(observations=observations)) return observation_request_messages + def insert_data(observation_request_messages: List): workers = int(cpu_count()) - with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: + with grpc.insecure_channel( + f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" + ) as channel: client = dstore_grpc.DatastoreStub(channel=channel) print(f"Inserting {len(observation_request_messages)} bulk observations requests.") obs_insert_start = perf_counter() @@ -82,11 +81,13 @@ def insert_data(observation_request_messages: List): file_path = Path(Path(__file__).parents[2] / "test-data" / "FMI" / "20221231.csv") print(file_path) observation_request_messages = csv_file_to_requests(file_path=file_path) - print(f"Finished creating the time series and observation requests {perf_counter() - create_requests_start}.") + print( + "Finished creating the time series and observation requests " + f"{perf_counter() - create_requests_start}." + ) insert_data( observation_request_messages=observation_request_messages, ) print(f"Finished, total time elapsed: {perf_counter() - total_time_start}") - diff --git a/data-loader/client_knmi_station.py b/data-loader/client_knmi_station.py index 18f331d..4be921f 100755 --- a/data-loader/client_knmi_station.py +++ b/data-loader/client_knmi_station.py @@ -14,16 +14,21 @@ import grpc import pandas as pd import xarray as xr -from parameters import knmi_parameter_names from google.protobuf.timestamp_pb2 import Timestamp +from parameters import knmi_parameter_names def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]: observation_request_messages = [] - with xr.open_dataset(file_path, engine="netcdf4", chunks=None) as file: # chunks=None to disable dask + with xr.open_dataset( + file_path, engine="netcdf4", chunks=None + ) as file: # chunks=None to disable dask for station_id, latitude, longitude, height in zip( - file["station"].values, file["lat"].values[0], file["lon"].values[0], file["height"].values[0] + file["station"].values, + file["lat"].values[0], + file["lon"].values[0], + file["height"].values[0], ): observations = [] station_slice = file.sel(station=station_id) @@ -35,21 +40,22 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]: platform=station_id, instrument=param_id, title=param_file.long_name, - standard_name=param_file.standard_name if 'standard_name' in param_file.attrs else None, - unit=param_file.units if 'units' in param_file.attrs else None + standard_name=param_file.standard_name + if "standard_name" in param_file.attrs + else None, + unit=param_file.units if "units" in param_file.attrs else None, ) - for time, obs_value in zip(pd.to_datetime(param_file["time"].data).to_pydatetime(), param_file.data): + for time, obs_value in zip( + pd.to_datetime(param_file["time"].data).to_pydatetime(), param_file.data + ): ts = Timestamp() ts.FromDatetime(time) obs_mdata = dstore.ObsMetadata( id=str(uuid.uuid4()), - geo_point=dstore.Point( - lat=latitude, - lon=longitude - ), + geo_point=dstore.Point(lat=latitude, lon=longitude), obstime_instant=ts, - value=str(obs_value), # TODO: Store float in DB + value=str(obs_value), # TODO: Store float in DB ) observations.append(dstore.Metadata1(ts_mdata=ts_mdata, obs_mdata=obs_mdata)) @@ -62,7 +68,9 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]: def insert_data(observation_request_messages: List): workers = int(cpu_count()) - with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: + with grpc.insecure_channel( + f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" + ) as channel: client = dstore_grpc.DatastoreStub(channel=channel) print(f"Inserting {len(observation_request_messages)} bulk observations requests.") @@ -80,7 +88,10 @@ def insert_data(observation_request_messages: List): create_requests_start = perf_counter() file_path = Path(Path(__file__).parents[2] / "test-data" / "KNMI" / "20221231.nc") observation_request_messages = netcdf_file_to_requests(file_path=file_path) - print(f"Finished creating the time series and observation requests {perf_counter() - create_requests_start}.") + print( + "Finished creating the time series and observation requests " + f"{perf_counter() - create_requests_start}." + ) insert_data( observation_request_messages=observation_request_messages, diff --git a/data-loader/parameters.py b/data-loader/parameters.py index 24eba56..14af459 100644 --- a/data-loader/parameters.py +++ b/data-loader/parameters.py @@ -1,7 +1,46 @@ knmi_parameter_names = ( - 'hc3', 'nc2', 'zm', 'R1H', 'hc', 'tgn', 'Tn12', 'pr', 'pg', 'tn', 'rg', - 'hc1', 'nc1', 'ts1', 'nc3', 'ts2', 'qg', 'ff', 'ww', 'gff', 'dd', - 'td', 'ww-10', 'Tgn12', 'ss', 'Tn6', 'dr', 'rh', 'hc2', 'Tgn6', - 'R12H', 'R24H', 'Tx6', 'Tx24', 'Tx12', 'Tgn14', 'D1H', 'R6H', 'pwc', - 'tx', 'nc', 'pp', 'Tn14', 'ta' -) \ No newline at end of file + "hc3", + "nc2", + "zm", + "R1H", + "hc", + "tgn", + "Tn12", + "pr", + "pg", + "tn", + "rg", + "hc1", + "nc1", + "ts1", + "nc3", + "ts2", + "qg", + "ff", + "ww", + "gff", + "dd", + "td", + "ww-10", + "Tgn12", + "ss", + "Tn6", + "dr", + "rh", + "hc2", + "Tgn6", + "R12H", + "R24H", + "Tx6", + "Tx24", + "Tx12", + "Tgn14", + "D1H", + "R6H", + "pwc", + "tx", + "nc", + "pp", + "Tn14", + "ta", +) diff --git a/data-loader/requirements.in b/data-loader/requirements.in index 5f399e0..5968fc7 100644 --- a/data-loader/requirements.in +++ b/data-loader/requirements.in @@ -5,4 +5,4 @@ grpcio-tools~=1.56 netCDF4~=1.6 -xarray~=2023.7 \ No newline at end of file +xarray~=2023.7 diff --git a/examples/clients/python/client.py b/examples/clients/python/client.py index 4116895..9e359c7 100755 --- a/examples/clients/python/client.py +++ b/examples/clients/python/client.py @@ -1,16 +1,15 @@ #!/usr/bin/env python3 # tested with Python 3.11 # Generate protobuf code with following command from top level directory: -# python -m grpc_tools.protoc --proto_path=datastore/protobuf datastore.proto --python_out=examples/clients/python --grpc_python_out=examples/clients/python - +# python -m grpc_tools.protoc --proto_path=datastore/protobuf datastore.proto --python_out=examples/clients/python --grpc_python_out=examples/clients/python # noqa: E501 import os -from datetime import datetime, timezone - -from google.protobuf.timestamp_pb2 import Timestamp +from datetime import datetime +from datetime import timezone import datastore_pb2 as dstore import datastore_pb2_grpc as dstore_grpc import grpc +from google.protobuf.timestamp_pb2 import Timestamp def dtime2tstamp(dtime): @@ -20,33 +19,33 @@ def dtime2tstamp(dtime): # callPutObs demonstrates how to insert observations in the datastore. -def callPutObs(stub, version, type, standard_name, unit, value): +def call_put_obs(stub, version, type, standard_name, unit, value): ts_mdata = dstore.TSMetadata( - version = version, - type = type, - standard_name = standard_name, - unit = unit, + version=version, + type=type, + standard_name=standard_name, + unit=unit, # add more attributes as required ... ) obs_mdata = dstore.ObsMetadata( - id = 'id_dummy', - geo_point = dstore.Point( - lat = 59.91, - lon = 10.75, + id="id_dummy", + geo_point=dstore.Point( + lat=59.91, + lon=10.75, ), - pubtime = dtime2tstamp(datetime(2023, 1, 1, 0, 0, 10, 0, tzinfo = timezone.utc)), - data_id = 'data_id_dummy', - obstime_instant = dtime2tstamp(datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo = timezone.utc)), - value = value, + pubtime=dtime2tstamp(datetime(2023, 1, 1, 0, 0, 10, 0, tzinfo=timezone.utc)), + data_id="data_id_dummy", + obstime_instant=dtime2tstamp(datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc)), + value=value, # add more attributes as required ... ) request = dstore.PutObsRequest( - observations = [ # insert only a single observation for now + observations=[ # insert only a single observation for now dstore.Metadata1( - ts_mdata = ts_mdata, - obs_mdata = obs_mdata, + ts_mdata=ts_mdata, + obs_mdata=obs_mdata, ) ], ) @@ -57,11 +56,11 @@ def callPutObs(stub, version, type, standard_name, unit, value): # callGetObsInTimeRange demonstrates how to retrieve from the datastore all observations in an # obs time range. -def callGetObsInTimeRange(stub): +def call_get_obs_in_time_range(stub): request = dstore.GetObsRequest( - interval = dstore.TimeInterval( - start = dtime2tstamp(datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo = timezone.utc)), - end = dtime2tstamp(datetime(2023, 1, 2, 0, 0, 0, 0, tzinfo = timezone.utc)), + interval=dstore.TimeInterval( + start=dtime2tstamp(datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc)), + end=dtime2tstamp(datetime(2023, 1, 2, 0, 0, 0, 0, tzinfo=timezone.utc)), ) ) response = stub.GetObservations(request) @@ -71,37 +70,39 @@ def callGetObsInTimeRange(stub): # callGetObsInPolygon demonstrates how to retrieve from the datastore all observations in a # polygon. -def callGetObsInPolygon(stub): +def call_get_obs_in_polygon(stub): points = [] - points.append(dstore.Point(lat = 59.90, lon = 10.70)) - points.append(dstore.Point(lat = 59.90, lon = 10.80)) - points.append(dstore.Point(lat = 60, lon = 10.80)) - points.append(dstore.Point(lat = 60, lon = 10.70)) + points.append(dstore.Point(lat=59.90, lon=10.70)) + points.append(dstore.Point(lat=59.90, lon=10.80)) + points.append(dstore.Point(lat=60, lon=10.80)) + points.append(dstore.Point(lat=60, lon=10.70)) - request = dstore.GetObsRequest(inside =dstore.Polygon(points = points)) + request = dstore.GetObsRequest(inside=dstore.Polygon(points=points)) response = stub.GetObservations(request) return response -if __name__ == '__main__': - with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: +if __name__ == "__main__": + with grpc.insecure_channel( + f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" + ) as channel: stub = dstore_grpc.DatastoreStub(channel) - version = 'version_dummy' - type = 'type_dummy' - standard_name = 'air_temperature' - unit = 'celsius' - value = '12.7' + version = "version_dummy" + type = "type_dummy" + standard_name = "air_temperature" + unit = "celsius" + value = "12.7" - response = callPutObs(stub, version, type, standard_name, unit, value) - print('response from callPutObs: {}'.format(response)) + response = call_put_obs(stub, version, type, standard_name, unit, value) + print("response from callPutObs: {}".format(response)) - response = callGetObsInTimeRange(stub) - print('response from callGetObsInTimeRange: {}'.format(response)) + response = call_get_obs_in_time_range(stub) + print("response from callGetObsInTimeRange: {}".format(response)) - response = callGetObsInPolygon(stub) - print('response from callGetObsInPolygon: {}'.format(response)) + response = call_get_obs_in_polygon(stub) + print("response from callGetObsInPolygon: {}".format(response)) assert len(response.observations) == 1 obs0 = response.observations[0] @@ -115,4 +116,4 @@ def callGetObsInPolygon(stub): obs_mdata = obs0.obs_mdata assert len(obs_mdata) == 1 obs_mdata0 = obs_mdata[0] - assert obs_mdata0.value == value + assert obs_mdata0.value == value diff --git a/integration-test/discover.py b/integration-test/discover.py index 7e46ff7..68e4c8f 100644 --- a/integration-test/discover.py +++ b/integration-test/discover.py @@ -1,17 +1,16 @@ # This code was used to double-check the values tested in test_knmi.py from pathlib import Path - -import pandas as pd import xarray as xr file_path = Path(Path(__file__).parents[1] / "test-data" / "KNMI" / "20221231.nc") -with xr.open_dataset(file_path, engine="netcdf4", chunks=None) as ds: # chunks=None to disable dask +with xr.open_dataset( + file_path, engine="netcdf4", chunks=None +) as ds: # chunks=None to disable dask # print(ds) - print(ds.sel(station='06260').isel(time=0).lat.values) - print(ds.sel(station='06260').isel(time=0).lon.values) + print(ds.sel(station="06260").isel(time=0).lat.values) + print(ds.sel(station="06260").isel(time=0).lon.values) print(ds.dims) - print(ds.sel(station='06260').rh.values) - + print(ds.sel(station="06260").rh.values) diff --git a/integration-test/test_delete.py b/integration-test/test_delete.py index 6c66745..3d7f84b 100644 --- a/integration-test/test_delete.py +++ b/integration-test/test_delete.py @@ -16,7 +16,9 @@ # # @pytest.fixture(scope="session") # def grpc_stub(): -# with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: +# with grpc.insecure_channel( +# f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" +# ) as channel: # yield dstore_grpc.DatastoreStub(channel) # # @@ -46,11 +48,17 @@ # # obs_metadata = dstore.ObsMetadata(field1="test_value1", field2="test_value2") # time_1 = Timestamp() -# time_1.FromDatetime(datetime(year=1999, month=9, day=9, hour=9, minute=9, tzinfo=timezone.utc)) +# time_1.FromDatetime( +# datetime(year=1999, month=9, day=9, hour=9, minute=9, tzinfo=timezone.utc) +# ) # time_2 = Timestamp() -# time_2.FromDatetime(datetime(year=1999, month=9, day=9, hour=9, minute=10, tzinfo=timezone.utc)) +# time_2.FromDatetime( +# datetime(year=1999, month=9, day=9, hour=9, minute=10, tzinfo=timezone.utc) +# ) # time_3 = Timestamp() -# time_3.FromDatetime(datetime(year=1999, month=9, day=9, hour=9, minute=11, tzinfo=timezone.utc)) +# time_3.FromDatetime( +# datetime(year=1999, month=9, day=9, hour=9, minute=11, tzinfo=timezone.utc) +# ) # obs = [ # dstore.Observation(time=time_1, value=1111.1111, metadata=obs_metadata), # dstore.Observation(time=time_2, value=2222.2222, metadata=obs_metadata), @@ -62,7 +70,9 @@ # return obs_put_request # # -# def test_delete_timeseries(grpc_stub, dummy_timeseries_for_delete, dummy_observations_for_delete): +# def test_delete_timeseries( +# grpc_stub, dummy_timeseries_for_delete, dummy_observations_for_delete +# ): # grpc_stub.AddTimeSeries(dummy_timeseries_for_delete) # # grpc_stub.PutObservations(dummy_observations_for_delete) @@ -76,7 +86,9 @@ # assert ts_find_response.tseries[0].id == dummy_timeseries_for_delete.id # # to_time = Timestamp() -# to_time.FromDatetime(datetime(year=1999, month=9, day=9, hour=9, minute=11, second=1, tzinfo=timezone.utc)) +# to_time.FromDatetime( +# datetime(year=1999, month=9, day=9, hour=9, minute=11, second=1, tzinfo=timezone.utc) +# ) # obs_get_request = dstore.GetObsRequest( # tsids=[dummy_timeseries_for_delete.id], # fromtime=dummy_observations_for_delete.tsobs[0].obs[0].time, diff --git a/integration-test/test_knmi.py b/integration-test/test_knmi.py index 4b82ca2..da27fac 100644 --- a/integration-test/test_knmi.py +++ b/integration-test/test_knmi.py @@ -1,12 +1,10 @@ # Note that this assumes that the KNMI test data is loader (using loader container) import os -from datetime import datetime import datastore_pb2 as dstore import datastore_pb2_grpc as dstore_grpc import grpc import pytest -from google.protobuf.timestamp_pb2 import Timestamp NUMBER_OF_PARAMETERS = 44 @@ -15,7 +13,9 @@ @pytest.fixture(scope="session") def grpc_stub(): - with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: + with grpc.insecure_channel( + f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" + ) as channel: yield dstore_grpc.DatastoreStub(channel) @@ -92,14 +92,23 @@ def test_get_values_single_station_single_parameters(grpc_stub): ["06260"], ), ( - # Middle bottom, should fall outside since polygon is curved because the earth is round (postgres geography). + # Middle bottom, should fall outside since polygon is curved, + # because the earth is round (postgres geography). ((52.1, 4.17), (52.1, 6.18), (52.0989, 6.18), (52.0989, 4.17)), ["rh"], [], ), ( # Complex polygon - ((51.45, 3.47), (51.39, 3.67), (51.39, 4.28), (51.52, 4.96), (51.89, 5.46), (52.18, 5.30), (51.75, 3.68)), + ( + (51.45, 3.47), + (51.39, 3.67), + (51.39, 4.28), + (51.52, 4.96), + (51.89, 5.46), + (52.18, 5.30), + (51.75, 3.68), + ), ["rh"], ["06260", "06310", "06323", "06340", "06343", "06348", "06350", "06356"], ), @@ -109,11 +118,12 @@ def test_get_values_single_station_single_parameters(grpc_stub): ["rh"], # fmt: off [ - "06201", "06203", "06204", "06205", "06207", "06208", "06211", "06214", "06215", "06225", "06229", - "06235", "06239", "06240", "06242", "06248", "06249", "06251", "06252", "06257", "06258", "06260", - "06267", "06269", "06270", "06273", "06275", "06277", "06278", "06279", "06280", "06283", "06286", - "06290", "06310", "06317", "06319", "06320", "06321", "06323", "06330", "06340", "06343", "06344", - "06348", "06350", "06356", "06370", "06375", "06377", "06380", "06391" + "06201", "06203", "06204", "06205", "06207", "06208", "06211", "06214", "06215", + "06225", "06229", "06235", "06239", "06240", "06242", "06248", "06249", "06251", + "06252", "06257", "06258", "06260", "06267", "06269", "06270", "06273", "06275", + "06277", "06278", "06279", "06280", "06283", "06286", "06290", "06310", "06317", + "06319", "06320", "06321", "06323", "06330", "06340", "06343", "06344", "06348", + "06350", "06356", "06370", "06375", "06377", "06380", "06391" ], # fmt: on ), diff --git a/load-test/grpc_user.py b/load-test/grpc_user.py index 708ebfd..ddaadfa 100644 --- a/load-test/grpc_user.py +++ b/load-test/grpc_user.py @@ -1,5 +1,7 @@ import time -from typing import Any, Callable +from typing import Any +from typing import Callable + import grpc import grpc.experimental.gevent as grpc_gevent from grpc_interceptor import ClientInterceptor @@ -59,4 +61,3 @@ def __init__(self, environment): self._channel = grpc.intercept_channel(self._channel, interceptor) self.stub = self.stub_class(self._channel) - diff --git a/load-test/locustfile.py b/load-test/locustfile.py index dba7355..b277d15 100644 --- a/load-test/locustfile.py +++ b/load-test/locustfile.py @@ -1,23 +1,34 @@ -# Use the following command to generate the python protobuf stuff in the correct place (from the root of the repository) -# python -m grpc_tools.protoc --proto_path=datastore/protobuf datastore.proto --python_out=load-test --grpc_python_out=load-test - +# Use the following command to generate the python protobuf stuff in +# the correct place (from the root of the repository) +# python -m grpc_tools.protoc --proto_path=datastore/protobuf datastore.proto --python_out=load-test --grpc_python_out=load-test # noqa: E501 import random from datetime import datetime -from shapely import wkt, buffer - -import grpc_user import datastore_pb2 as dstore import datastore_pb2_grpc as dstore_grpc -from locust import task - +import grpc_user from google.protobuf.timestamp_pb2 import Timestamp +from locust import task +from shapely import buffer +from shapely import wkt parameters = ["ff", "dd", "rh", "pp", "tn"] -stations = ["06203", "06204", "06205", "06207", "06208", "06211", "06214", "06215", "06235", "06239", "06242", "06251", "06260", "06269", "06270", "06275", "06279", "06280", "06290", "06310", "06317", "06319", "06323", "06330", "06340", "06344", "06348", "06350", "06356", "06370", "06375", "06380", "78871", "78873"] -points = ["POINT(5.179705 52.0988218)", "POINT(3.3416666666667 52.36)", "POINT(2.9452777777778 53.824130555556)", - "POINT(4.7811453228565 52.926865008825)", "POINT(4.342014 51.447744494043)"] +# fmt: off +stations = [ + "06203", "06204", "06205", "06207", "06208", "06211", "06214", "06215", "06235", "06239", + "06242", "06251", "06260", "06269", "06270", "06275", "06279", "06280", "06290", "06310", + "06317", "06319", "06323", "06330", "06340", "06344", "06348", "06350", "06356", "06370", + "06375", "06380", "78871", "78873", +] +# fmt: on +points = [ + "POINT(5.179705 52.0988218)", + "POINT(3.3416666666667 52.36)", + "POINT(2.9452777777778 53.824130555556)", + "POINT(4.7811453228565 52.926865008825)", + "POINT(4.342014 51.447744494043)", +] class StoreGrpcUser(grpc_user.GrpcUser): @@ -35,7 +46,6 @@ def get_data_for_single_timeserie(self): interval=dstore.TimeInterval(start=from_time, end=to_time), platforms=[random.choice(stations)], instruments=[random.choice(parameters)], - ) response = self.stub.GetObservations(request) assert len(response.observations) == 1 @@ -54,8 +64,9 @@ def get_data_single_station_through_bbox(self): request = dstore.GetObsRequest( interval=dstore.TimeInterval(start=from_time, end=to_time), instruments=[random.choice(parameters)], - inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]) - + inside=dstore.Polygon( + points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords] + ), ) response = self.stub.GetObservations(request) assert len(response.observations) == 1 diff --git a/tstester/README.md b/tstester/README.md index 1e2234b..740e7e6 100644 --- a/tstester/README.md +++ b/tstester/README.md @@ -120,4 +120,3 @@ CONTAINER ID IMAGE COMMAND CREATED STATUS ## Running TimescaleDB in docker container on local machine TODO! - diff --git a/tstester/common.py b/tstester/common.py index e89100b..1fc0342 100644 --- a/tstester/common.py +++ b/tstester/common.py @@ -1,23 +1,24 @@ -import random -import time import os +import random import subprocess +import time def select_weighted_value(x): """Select a random value based on probability weights. - x is of the form [(v_1, w_1), (v_2, w_2), ..., (v_n, w_n)]. - Returns v_i with a probability of w_i / (w_1 + w_2 + ... + w_n). + x is of the form [(v_1, w_1), (v_2, w_2), ..., (v_n, w_n)]. + Returns v_i with a probability of w_i / (w_1 + w_2 + ... + w_n). """ # check preconditions if len(x) == 0: - raise Exception('can\'t select from empty list') + raise Exception("can't select from empty list") for item in x: if item[1] <= 0: - raise Exception('non-positive weight not allowed (value: {}, weight: {})'.format( - item[0], item[1])) + raise Exception( + "non-positive weight not allowed (value: {}, weight: {})".format(item[0], item[1]) + ) w_sum_n = sum([z[1] for z in x]) # get total weight sum r = random.random() * w_sum_n # get random value within total weight sum @@ -43,22 +44,23 @@ def elapsed_secs(start_secs): return now_secs() - start_secs -def get_env_var(name, default_value='', fail_on_empty=True): +def get_env_var(name, default_value="", fail_on_empty=True): """Get environment variable.""" v = os.getenv(name, default_value) - if (v == '') and fail_on_empty: - raise Exception('environment variable {} empty or undefined'.format(name)) + if (v == "") and fail_on_empty: + raise Exception("environment variable {} empty or undefined".format(name)) return v def exec_command(cmd): - """Execute a command, returning stdout on success, raising an error on failure. - """ + """Execute a command, returning stdout on success, raising an error on failure.""" p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if p.returncode != 0: raise Exception( - '\n\'{}\' failed:\n EXIT CODE: {}\n STDOUT: \'{}\'\n STDERR: \'{}\'\n'.format( - cmd, p.returncode, p.stdout.strip(), p.stderr.strip())) + "\n'{}' failed:\n EXIT CODE: {}\n STDOUT: '{}'\n STDERR: '{}'\n".format( + cmd, p.returncode, p.stdout.strip(), p.stderr.strip() + ) + ) return p.stdout @@ -91,18 +93,24 @@ def validate_precondition(): v = (v1, v2) for j in [0, 1]: if len(t[j]) != len(v[j]): - raise Exception('precondition failed: len(t[{}]) ({}) != len(v[{}]) ({})'.format( - j, len(t[j]), j, len(v[j]))) + raise Exception( + "precondition failed: len(t[{}]) ({}) != len(v[{}]) ({})".format( + j, len(t[j]), j, len(v[j]) + ) + ) if len(t[j]) > 0: if t[j][-1] >= sentinel_obs_time: - raise Exception('precondition failed: t[{}][-1] >= {}'.format( - j, sentinel_obs_time)) + raise Exception( + "precondition failed: t[{}][-1] >= {}".format(j, sentinel_obs_time) + ) if len(t[j]) > 1: for i in range(1, len(t[j])): if t[j][i - 1] >= t[j][i]: raise Exception( - 'precondition failed: t[{}][{}] ({}) >= t[{}][{}] ({})'.format( - j, i - 1, t[j][i - 1], j, i, t[j][i])) + "precondition failed: t[{}][{}] ({}) >= t[{}][{}] ({})".format( + j, i - 1, t[j][i - 1], j, i, t[j][i] + ) + ) validate_precondition() diff --git a/tstester/config.json b/tstester/config.json index 97b8482..de372ec 100644 --- a/tstester/config.json +++ b/tstester/config.json @@ -1,39 +1,31 @@ { "max_age": 86400, - "_comment": "max age in secs relative to current time (older observations are inaccessible)", - + "_comment": "extra secs values to try with the AddNewObs test", "nstations": 3, - "_comment": "number of stations to generate", - "bbox": { "min_lat": -60.5, "max_lat": 62.5, "min_lon": -10.5, "max_lon": 12.5 }, - "_comment": "bounding box for randomly generated station locations (no two stations will have the same location)", - "params": { "min": 1, "max": 3 }, - "_comment": "minimum and maximum number of randomly generated params for a station", - "time_res": { "60": 0.2, "600": 0.3, "3600": 0.5 }, - "_comment": "probability weights of time series resolutions (around 20% will have time res 60 secs, around 30% will have time res 600 secs, and so on)", - - "extra_secs": [60, 600, 3600], - "_comment": "extra secs values to try with the AddNewObs test", - + "extra_secs": [ + 60, + 600, + 3600 + ], "ts_other_metadata": { "sensor_location_quality": 9, "sensor_performance_quality": 9 }, - "obs_metadata": { "quality": 9 } diff --git a/tstester/main.py b/tstester/main.py index e5a2441..bc3732a 100755 --- a/tstester/main.py +++ b/tstester/main.py @@ -1,15 +1,13 @@ #!/usr/bin/env python3 - # tested with Python 3.9 - # Usage: ./main - -import sys import argparse import json import pathlib -from traceback import format_exc import random +import sys +from traceback import format_exc + from tstester import TsTester @@ -17,21 +15,20 @@ def parse_args(args): """Parse and return command-line arguments.""" parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, - description='Test different data storage solutions for time series of observations.', - exit_on_error=False) - parser.add_argument( - '-v', '--verbose', action='store_true', help='Enable logging to stdout.') + description="Test different data storage solutions for time series of observations.", + exit_on_error=False, + ) + parser.add_argument("-v", "--verbose", action="store_true", help="Enable logging to stdout.") parser.add_argument( - '-c', '--cfg_file', default='config.json', type=pathlib.Path, help='Config file.') - parser.add_argument( - '-s', '--random_seed', type=int, default=-1, help='Random seed.') + "-c", "--cfg_file", default="config.json", type=pathlib.Path, help="Config file." + ) + parser.add_argument("-s", "--random_seed", type=int, default=-1, help="Random seed.") pres = parser.parse_args(args) return pres.verbose, pres.cfg_file, pres.random_seed -if __name__ == '__main__': - +if __name__ == "__main__": try: verbose, cfg_file, random_seed = parse_args(sys.argv[1:]) if random_seed >= 0: @@ -39,10 +36,10 @@ def parse_args(args): config = json.load(open(cfg_file)) TsTester(verbose, config).execute() except argparse.ArgumentError as e: - print('failed to parse command-line arguments: {}'.format(e), file=sys.stderr) + print("failed to parse command-line arguments: {}".format(e), file=sys.stderr) sys.exit(1) except SystemExit: sys.exit(1) # don't print stack trace in this case (e.g. when --help option is given) - except: - sys.stderr.write('error: {}'.format(format_exc())) + except Exception: + sys.stderr.write("error: {}".format(format_exc())) sys.exit(1) diff --git a/tstester/netcdf.py b/tstester/netcdf.py index a770591..a4abc4d 100644 --- a/tstester/netcdf.py +++ b/tstester/netcdf.py @@ -1,7 +1,8 @@ -import netCDF4 as nc import datetime as dt -import numpy as np + import common +import netCDF4 as nc # noqa: N813 +import numpy as np class NetCDF: @@ -13,52 +14,53 @@ def __init__(self, verbose): def create_initial_file(self, path, ts): """Create the initial file for a time series.""" - with nc.Dataset(path, 'w') as dset: - - dset.setncatts({ - 'station_id': ts.station_id(), - 'param_id': ts.param_id(), - 'spatial_representation': 'point', - 'geospatial_lat_min': ts.lat(), - 'geospatial_lat_max': ts.lat(), - 'geospatial_lon_min': ts.lon(), - 'geospatial_lon_max': ts.lon(), - }) - - vlat = dset.createVariable('latitude', 'f') - vlat.standard_name = 'latitude' - vlat.long_name = 'station latitude' - vlat.units = 'degrees_north' + with nc.Dataset(path, "w") as dset: + dset.setncatts( + { + "station_id": ts.station_id(), + "param_id": ts.param_id(), + "spatial_representation": "point", + "geospatial_lat_min": ts.lat(), + "geospatial_lat_max": ts.lat(), + "geospatial_lon_min": ts.lon(), + "geospatial_lon_max": ts.lon(), + } + ) + + vlat = dset.createVariable("latitude", "f") + vlat.standard_name = "latitude" + vlat.long_name = "station latitude" + vlat.units = "degrees_north" vlat[:] = ts.lat() - vlon = dset.createVariable('longitude', 'f') - vlon.standard_name = 'longitude' - vlon.long_name = 'station longitude' - vlon.units = 'degrees_east' + vlon = dset.createVariable("longitude", "f") + vlon.standard_name = "longitude" + vlon.long_name = "station longitude" + vlon.units = "degrees_east" vlon[:] = ts.lon() - dset.createDimension('time', 0) # create time as an unlimited dimension + dset.createDimension("time", 0) # create time as an unlimited dimension - v = dset.createVariable('time', 'i4', ('time',)) - v.standard_name = 'time' - v.long_name = 'Time of measurement' - v.calendar = 'standard' - ref_dt = dt.datetime.strptime('1970-01-01', '%Y-%m-%d').replace(tzinfo=dt.timezone.utc) + v = dset.createVariable("time", "i4", ("time",)) + v.standard_name = "time" + v.long_name = "Time of measurement" + v.calendar = "standard" + ref_dt = dt.datetime.strptime("1970-01-01", "%Y-%m-%d").replace(tzinfo=dt.timezone.utc) v.units = f"seconds since {ref_dt.strftime('%Y-%m-%d %H:%M:%S')}" - v.axis = 'T' + v.axis = "T" - v = dset.createVariable('value', 'f4', ['time']) + v = dset.createVariable("value", "f4", ["time"]) v.standard_name = ts.param_id() # for now - v.long_name = '{} (long name)'.format(ts.param_id()) # for now - v.coordinates = 'time latitude longitude' - v.coverage_content_type = 'physicalMeasurement' + v.long_name = "{} (long name)".format(ts.param_id()) # for now + v.coordinates = "time latitude longitude" + v.coverage_content_type = "physicalMeasurement" def replace_times_and_values(self, path, times, values): """Replace contents of 'time' and 'value' variables in file.""" - with nc.Dataset(path, 'a') as dset: - dset['time'][:] = times - dset['value'][:] = values + with nc.Dataset(path, "a") as dset: + dset["time"][:] = times + dset["value"][:] = values def add_times_and_values(self, path, times, values, oldest_time=None): """Add new or replace/remove observations in file. @@ -73,19 +75,19 @@ def add_times_and_values(self, path, times, values, oldest_time=None): # times_add = np.array(times) # values_add = np.array(values) - with nc.Dataset(path, 'a') as dset: - + with nc.Dataset(path, "a") as dset: # retrieve file variables - ftimes = dset['time'][:] - fvalues = dset['value'][:] + ftimes = dset["time"][:] + fvalues = dset["value"][:] # merge mtimes, mvalues = common.ts_merge( - ftimes.tolist(), fvalues.tolist(), times, values, oldest_time) + ftimes.tolist(), fvalues.tolist(), times, values, oldest_time + ) # replace file variables with merged arrays - dset['time'][:] = mtimes - dset['value'][:] = mvalues + dset["time"][:] = mtimes + dset["value"][:] = mvalues def get_times_and_values(self, path, from_time, to_time): """Retrieve contents of 'time' and 'value' variables from file within [from_time, to_time>. @@ -94,7 +96,7 @@ def get_times_and_values(self, path, from_time, to_time): StorageBackend.set_obs()) """ - with nc.Dataset(path, 'r') as dset: - time_var = dset.variables['time'] + with nc.Dataset(path, "r") as dset: + time_var = dset.variables["time"] indices = np.where((time_var[:] >= from_time) & (time_var[:] < to_time)) - return list(time_var[indices]), list(dset.variables['value'][indices]) + return list(time_var[indices]), list(dset.variables["value"][indices]) diff --git a/tstester/netcdfsbe_tsmdatainpostgis.py b/tstester/netcdfsbe_tsmdatainpostgis.py index 45c6ced..7f3defc 100644 --- a/tstester/netcdfsbe_tsmdatainpostgis.py +++ b/tstester/netcdfsbe_tsmdatainpostgis.py @@ -1,12 +1,13 @@ -from storagebackend import StorageBackend -from postgissbe import PostGISSBE import shutil +import sys from pathlib import Path + from netcdf import NetCDF -import sys +from postgissbe import PostGISSBE +from storagebackend import StorageBackend -class NetCDFSBE_TSMDataInPostGIS(StorageBackend): +class NetCDFSBE_TSMDataInPostGIS(StorageBackend): # noqa: N801 """A storage backend that uses netCDF files on the local file system for storage of observations and per observation metadata, and a PostGIS database for keeping per time series metadata. @@ -30,19 +31,20 @@ class NetCDFSBE_TSMDataInPostGIS(StorageBackend): """ def __init__(self, verbose, pg_conn_info, nc_dir): - super().__init__(verbose, 'netCDF (time series metadata in PostGIS)') + super().__init__(verbose, "netCDF (time series metadata in PostGIS)") self._pgsbe = PostGISSBE(verbose, pg_conn_info) # for keeping per time series metadata self._nc_dir = nc_dir # directory under which to keep the netCDF files self._netcdf = NetCDF(verbose) - self._nc_fname = 'data.nc' + self._nc_fname = "data.nc" def reset(self, tss): """See documentation in base class.""" if self._verbose: print( - 'resetting NetCDFSBE_TSMDataInPostGIS with {} time series'.format(len(tss)), - file=sys.stderr) + "resetting NetCDFSBE_TSMDataInPostGIS with {} time series".format(len(tss)), + file=sys.stderr, + ) self._pgsbe.reset(tss) @@ -52,22 +54,22 @@ def reset(self, tss): # create files with all per time series metadata, but with no observations for ts in tss: # create directory - target_dir = '{}/{}/{}'.format(self._nc_dir, ts.station_id(), ts.param_id()) + target_dir = "{}/{}/{}".format(self._nc_dir, ts.station_id(), ts.param_id()) Path(target_dir).mkdir(parents=True, exist_ok=True) # create initial file - self._netcdf.create_initial_file('{}/{}'.format(target_dir, self._nc_fname), ts) + self._netcdf.create_initial_file("{}/{}".format(target_dir, self._nc_fname), ts) def set_obs(self, ts, times, values): """See documentation in base class.""" - path = '{}/{}/{}/{}'.format(self._nc_dir, ts.station_id(), ts.param_id(), self._nc_fname) + path = "{}/{}/{}/{}".format(self._nc_dir, ts.station_id(), ts.param_id(), self._nc_fname) self._netcdf.replace_times_and_values(path, times, values) def add_obs(self, ts, times, values, oldest_time=None): """See documentation in base class.""" - path = '{}/{}/{}/{}'.format(self._nc_dir, ts.station_id(), ts.param_id(), self._nc_fname) + path = "{}/{}/{}/{}".format(self._nc_dir, ts.station_id(), ts.param_id(), self._nc_fname) self._netcdf.add_times_and_values(path, times, values, oldest_time) def get_obs(self, ts_ids, from_time, to_time): @@ -82,7 +84,7 @@ def get_obs_all(self, from_time, to_time): res = [] for ts_id in ts_ids: station_id, param_id = self._pgsbe.get_station_and_param(ts_id) - path = '{}/{}/{}/{}'.format(self._nc_dir, station_id, param_id, self._nc_fname) + path = "{}/{}/{}/{}".format(self._nc_dir, station_id, param_id, self._nc_fname) times, values = self._netcdf.get_times_and_values(path, from_time, to_time) res.append((ts_id, times[:], values[:])) diff --git a/tstester/pgconnectioninfo.py b/tstester/pgconnectioninfo.py index 123e926..21748ae 100644 --- a/tstester/pgconnectioninfo.py +++ b/tstester/pgconnectioninfo.py @@ -21,8 +21,8 @@ def password(self): return self._password def set_dbname(self, dbname): - if dbname.strip().lower() == 'postgres': - raise Exception('database name must be different from \'postgres\'') + if dbname.strip().lower() == "postgres": + raise Exception("database name must be different from 'postgres'") self._dbname = dbname def dbname(self): diff --git a/tstester/pgopbackend.py b/tstester/pgopbackend.py index de3d229..920b3df 100644 --- a/tstester/pgopbackend.py +++ b/tstester/pgopbackend.py @@ -1,12 +1,14 @@ -from abc import ABC, abstractmethod +import sys +from abc import ABC +from abc import abstractmethod + import common import psycopg2 -import sys class PGOpBackend(ABC): """The base class / interface for an executor backend for a Postgres database operation - (query or command). + (query or command). """ def __init__(self, verbose, descr): @@ -34,29 +36,36 @@ class Psycopg2BE(PGOpBackend): def __init__(self, verbose, conn_info): if verbose: - print('using psycopg2 adapter for PostGIS operations', file=sys.stderr) - super().__init__(verbose, 'psycopg2 adapter') + print("using psycopg2 adapter for PostGIS operations", file=sys.stderr) + super().__init__(verbose, "psycopg2 adapter") self._conn = self.__connect(conn_info) self._cur = self._conn.cursor() def __connect(self, conn_info): - """"Connect to the database server. + """ "Connect to the database server. Returns connection. """ if self._verbose: start = common.now_secs() - print('connecting to PostGIS ... ', file=sys.stderr, end='', flush=True) + print("connecting to PostGIS ... ", file=sys.stderr, end="", flush=True) # WARNING: the call to connect() may take very long; up to 15-20 secs! - conn = psycopg2.connect('host={} port={} user={} password={} dbname={}'.format( - conn_info.host(), conn_info.port(), conn_info.user(), conn_info.password(), - conn_info.dbname() - )) + conn = psycopg2.connect( + "host={} port={} user={} password={} dbname={}".format( + conn_info.host(), + conn_info.port(), + conn_info.user(), + conn_info.password(), + conn_info.dbname(), + ) + ) if self._verbose: print( - 'done (after {0:.4f} secs)'.format(common.now_secs() - start), file=sys.stderr, - flush=True) + "done (after {0:.4f} secs)".format(common.now_secs() - start), + file=sys.stderr, + flush=True, + ) return conn def execute(self, op, commit=True): @@ -68,7 +77,7 @@ def execute(self, op, commit=True): try: return self._cur.fetchall() - except: + except Exception: return [] # nothing to fetch def commit(self): @@ -82,20 +91,33 @@ class PsqlBE(PGOpBackend): def __init__(self, verbose, conn_info): if verbose: - print('using psql command for PostGIS operations', file=sys.stderr) - super().__init__(verbose, 'psql command') + print("using psql command for PostGIS operations", file=sys.stderr) + super().__init__(verbose, "psql command") self._conn_info = conn_info def execute(self, op, commit=False): """See documentation in base class.""" _ = commit # n/a - res = common.exec_command([ - 'psql', '-t', '-h', self._conn_info.host(), '-p', self._conn_info.port(), - '-U', self._conn_info.user(), '-d', self._conn_info.dbname(), '-c', op]) - - res = [x for x in res.decode('utf-8').split('\n') if len(x) > 0] - return list(map(lambda x: tuple([z.strip() for z in x.split('|')]), res)) + res = common.exec_command( + [ + "psql", + "-t", + "-h", + self._conn_info.host(), + "-p", + self._conn_info.port(), + "-U", + self._conn_info.user(), + "-d", + self._conn_info.dbname(), + "-c", + op, + ] + ) + + res = [x for x in res.decode("utf-8").split("\n") if len(x) > 0] + return list(map(lambda x: tuple([z.strip() for z in x.split("|")]), res)) def commit(self): """See documentation in base class.""" diff --git a/tstester/postgissbe.py b/tstester/postgissbe.py index 0ce06b4..452d226 100644 --- a/tstester/postgissbe.py +++ b/tstester/postgissbe.py @@ -1,9 +1,11 @@ -from storagebackend import StorageBackend -import common -from pgopbackend import Psycopg2BE, PsqlBE import json import sys +import common +from pgopbackend import PsqlBE +from pgopbackend import Psycopg2BE +from storagebackend import StorageBackend + # NOTE: we assume that the risk of SQL injection is zero in this context @@ -12,7 +14,7 @@ class PostGISSBE(StorageBackend): per observation metadata, and per time series metadata.""" def __init__(self, verbose, pg_conn_info): - super().__init__(verbose, 'PostGIS') + super().__init__(verbose, "PostGIS") self._conn_info = pg_conn_info @@ -21,7 +23,7 @@ def __init__(self, verbose, pg_conn_info): self.__create_database() # create a database operation executor backend - if common.get_env_var('PGOPBACKEND', 'psycopg2') == 'psycopg2': + if common.get_env_var("PGOPBACKEND", "psycopg2") == "psycopg2": self._pgopbe = Psycopg2BE(verbose, self._conn_info) else: self._pgopbe = PsqlBE(verbose, self._conn_info) @@ -33,12 +35,12 @@ def pg_config(self): """Returns Postgres config.""" return { - 'operation backend': self._pgopbe.descr(), - 'host': self._conn_info.host(), - 'port': self._conn_info.port(), - 'user': self._conn_info.user(), - 'password': '(not shown)', - 'dbname': self._conn_info.dbname() + "operation backend": self._pgopbe.descr(), + "host": self._conn_info.host(), + "port": self._conn_info.port(), + "user": self._conn_info.user(), + "password": "(not shown)", + "dbname": self._conn_info.dbname(), } def __drop_database(self): @@ -46,40 +48,68 @@ def __drop_database(self): if self._verbose: print( - 'dropping database {} ... '.format(self._conn_info.dbname()), file=sys.stderr, - end='', flush=True) - common.exec_command([ - 'dropdb', '-w', '-f', '--if-exists', '-h', self._conn_info.host(), - '-p', self._conn_info.port(), '-U', self._conn_info.user(), self._conn_info.dbname()]) + "dropping database {} ... ".format(self._conn_info.dbname()), + file=sys.stderr, + end="", + flush=True, + ) + common.exec_command( + [ + "dropdb", + "-w", + "-f", + "--if-exists", + "-h", + self._conn_info.host(), + "-p", + self._conn_info.port(), + "-U", + self._conn_info.user(), + self._conn_info.dbname(), + ] + ) if self._verbose: - print('done', file=sys.stderr, flush=True) + print("done", file=sys.stderr, flush=True) def __create_database(self): """Create database named self._conn_info.dbname().""" if self._verbose: print( - 'creating database {} ... '.format(self._conn_info.dbname()), file=sys.stderr, - end='', flush=True) - common.exec_command([ - 'createdb', '-w', '-h', self._conn_info.host(), '-p', self._conn_info.port(), - '-U', self._conn_info.user(), self._conn_info.dbname()]) + "creating database {} ... ".format(self._conn_info.dbname()), + file=sys.stderr, + end="", + flush=True, + ) + common.exec_command( + [ + "createdb", + "-w", + "-h", + self._conn_info.host(), + "-p", + self._conn_info.port(), + "-U", + self._conn_info.user(), + self._conn_info.dbname(), + ] + ) if self._verbose: - print('done', file=sys.stderr, flush=True) + print("done", file=sys.stderr, flush=True) def __install_postgis_extension(self): """Install the PostGIS extension.""" if self._verbose: - print('installing PostGIS extension ... ', file=sys.stderr, end='', flush=True) - self._pgopbe.execute('CREATE EXTENSION postgis') + print("installing PostGIS extension ... ", file=sys.stderr, end="", flush=True) + self._pgopbe.execute("CREATE EXTENSION postgis") if self._verbose: - print('done', file=sys.stderr, flush=True) + print("done", file=sys.stderr, flush=True) def __get_ts_id(self, station_id, param_id): """Get time series ID from station_id and param_id""" - query = 'SELECT id FROM time_series WHERE station_id = \'{}\' AND param_id = \'{}\'' + query = "SELECT id FROM time_series WHERE station_id = '{}' AND param_id = '{}'" rows = self._pgopbe.execute(query.format(station_id, param_id)) return int(rows[0][0]) # assuming for now this always works (i.e. don't handle any error) @@ -88,23 +118,23 @@ def __create_insert_values(self, ts_id, times, values): ivalues = [] for to in zip(times, values): - ivalues.append('({},to_timestamp({}),\'{}\')'.format(ts_id, to[0], to[1])) + ivalues.append("({},to_timestamp({}),'{}')".format(ts_id, to[0], to[1])) return ivalues def reset(self, tss): """See documentation in base class.""" if self._verbose: - print('resetting PostGISSBE with {} time series'.format(len(tss)), file=sys.stderr) + print("resetting PostGISSBE with {} time series".format(len(tss)), file=sys.stderr) # assume at this point that self._conn_info.dbname() exists, but not that it is # empty, so first step is to drop schema (all tables and indexes): - self._pgopbe.execute('DROP TABLE IF EXISTS ts') + self._pgopbe.execute("DROP TABLE IF EXISTS ts") # self._pgopbe.execute('DROP INDEX IF EXISTS ...') # TODO? # create time series table self._pgopbe.execute( - ''' + """ CREATE TABLE time_series ( id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, station_id TEXT NOT NULL, @@ -113,20 +143,21 @@ def reset(self, tss): pos GEOGRAPHY(Point) NOT NULL, other_metadata JSONB NOT NULL ) - ''') + """ + ) # insert rows in time series table for ts in tss: - - cmd = ''' + cmd = """ INSERT INTO time_series (station_id, param_id, pos, other_metadata) VALUES ('{}', '{}', '{}', '{}'::jsonb) - ''' + """ self._pgopbe.execute( - ' '.join(cmd.split()).format( - ts.station_id(), ts.param_id(), - 'POINT({} {})'.format(ts.lon(), ts.lat()), - json.dumps(ts.other_mdata()) + " ".join(cmd.split()).format( + ts.station_id(), + ts.param_id(), + "POINT({} {})".format(ts.lon(), ts.lat()), + json.dumps(ts.other_mdata()), ) ) @@ -135,32 +166,37 @@ def reset(self, tss): # create observations table self._pgopbe.execute( - ''' + """ CREATE TABLE observations ( ts_id integer REFERENCES time_series(id) ON DELETE CASCADE, tstamp timestamp, -- obs time (NOT NULL, but implied by being part of PK) value double precision, -- obs value PRIMARY KEY (ts_id, tstamp) ) - ''') + """ + ) def set_obs(self, ts, times, values): """See documentation in base class.""" if self._verbose: - print('setting observations in PostGIS SBE for time series >>>', file=sys.stderr) - print(' ts: {}\n times: (size: {}), values: (size: {})'.format( - ts.__dict__, len(times), len(values)), file=sys.stderr) + print("setting observations in PostGIS SBE for time series >>>", file=sys.stderr) + print( + " ts: {}\n times: (size: {}), values: (size: {})".format( + ts.__dict__, len(times), len(values) + ), + file=sys.stderr, + ) ts_id = self.__get_ts_id(ts.station_id(), ts.param_id()) # replace all rows in observations table for this time series - cmd = 'DELETE FROM observations WHERE ts_id = {}'.format(ts_id) + cmd = "DELETE FROM observations WHERE ts_id = {}".format(ts_id) self._pgopbe.execute(cmd) ivalues = self.__create_insert_values(ts_id, times, values) - cmd = 'INSERT INTO observations (ts_id, tstamp, value) VALUES {}'.format(','.join(ivalues)) + cmd = "INSERT INTO observations (ts_id, tstamp, value) VALUES {}".format(",".join(ivalues)) self._pgopbe.execute(cmd) def add_obs(self, ts, times, values, oldest_time=None): @@ -171,16 +207,20 @@ def add_obs(self, ts, times, values, oldest_time=None): # insert or update (i.e. "upsert") rows in observations table for this time series ivalues = self.__create_insert_values(ts_id, times, values) - cmd = ''' + cmd = """ INSERT INTO observations (ts_id, tstamp, value) VALUES {} ON CONFLICT ON CONSTRAINT observations_pkey DO UPDATE SET value = EXCLUDED.value - '''.format(','.join(ivalues)) + """.format( + ",".join(ivalues) + ) self._pgopbe.execute(cmd) if oldest_time is not None: # delete observations that are too old - cmd = ''' + cmd = """ DELETE FROM observations WHERE ts_id = {} AND EXTRACT(EPOCH FROM tstamp) < {} - '''.format(ts_id, oldest_time) + """.format( + ts_id, oldest_time + ) self._pgopbe.execute(cmd) def get_obs(self, ts_ids, from_time, to_time): @@ -190,13 +230,13 @@ def get_obs(self, ts_ids, from_time, to_time): def get_obs_all(self, from_time, to_time): """See documentation in base class.""" - query = ''' + query = """ SELECT ts_id, array_agg(CAST(EXTRACT(EPOCH FROM tstamp) AS int) ORDER BY tstamp), array_agg(value ORDER BY tstamp) FROM observations WHERE tstamp >= to_timestamp({}) AND tstamp < to_timestamp({}) GROUP BY ts_id - ''' + """ rows = self._pgopbe.execute(query.format(from_time, to_time)) @@ -206,8 +246,8 @@ def get_obs_all(self, from_time, to_time): if isinstance(times, str): # the case for PsqlBE # convert '{ITEM1, ITEM2, ..., ITEMN}' to # [convert(ITEM1), convert(ITEM2), ..., convert(ITEMN)] - times = [int(x) for x in times.strip()[1:-1].split(',')] - values = [float(x) for x in values.strip()[1:-1].split(',')] + times = [int(x) for x in times.strip()[1:-1].split(",")] + values = [float(x) for x in values.strip()[1:-1].split(",")] # assert(isinstance(times, list)) # assert(isinstance(values, list)) res.append((ts_id, times, values)) @@ -217,22 +257,22 @@ def get_obs_all(self, from_time, to_time): def get_station_and_param(self, ts_id): """See documentation in base class.""" - query = 'SELECT station_id, param_id FROM time_series WHERE id = {}' + query = "SELECT station_id, param_id FROM time_series WHERE id = {}" rows = self._pgopbe.execute(query.format(ts_id)) return rows[0][0], rows[0][1] def get_ts_ids_all(self): """See documentation in base class.""" - rows = self._pgopbe.execute('SELECT id FROM time_series') + rows = self._pgopbe.execute("SELECT id FROM time_series") return [int(row[0]) for row in rows] def get_ts_ids_in_circle(self, lat, lon, radius): """See documentation in base class.""" - query = ''' + query = """ SELECT id FROM time_series WHERE ST_Distance('SRID=4326;POINT({} {})'::geography, pos) < {} - ''' + """ rows = self._pgopbe.execute(query.format(lon, lat, radius)) return [int(row[0]) for row in rows] diff --git a/tstester/storagebackend.py b/tstester/storagebackend.py index be3aac7..c8d731d 100644 --- a/tstester/storagebackend.py +++ b/tstester/storagebackend.py @@ -1,4 +1,5 @@ -from abc import ABC, abstractmethod +from abc import ABC +from abc import abstractmethod class StorageBackend(ABC): @@ -23,8 +24,9 @@ def reset(self, tss): def set_obs(self, ts, times, values): """Replace any existing observations in time series ts with times/values. - - times are assumed to be a list of unique UNIX timestamps (secs since 1970-01-01T00:00:00Z) - in strictly increasing order, but not necessesarily uniformly distributed + - times are assumed to be a list of + unique UNIX timestamps (secs since 1970-01-01T00:00:00Z) + in strictly increasing order, but not necessarily uniformly distributed - values are assumed to be a list of floats - len(times) and len(values) are assumed to be equal """ @@ -34,7 +36,8 @@ def add_obs(self, ts, times, values, oldest_time): """Adds observations to time series ts. - times/values: same as in set_obs() - - if oldest_time is not None, observations older than this time are removed from the storage + - if oldest_time is not None, observations older than this time are removed + from the storage Observations at already existing times will be replaced. """ diff --git a/tstester/timescaledbsbe.py b/tstester/timescaledbsbe.py index 91614b9..13ef533 100644 --- a/tstester/timescaledbsbe.py +++ b/tstester/timescaledbsbe.py @@ -1,9 +1,11 @@ -from storagebackend import StorageBackend -import common -from pgopbackend import Psycopg2BE, PsqlBE import json import sys +import common +from pgopbackend import PsqlBE +from pgopbackend import Psycopg2BE +from storagebackend import StorageBackend + # NOTE: we assume that the risk of SQL injection is zero in this context @@ -12,7 +14,7 @@ class TimescaleDBSBE(StorageBackend): per observation metadata, and per time series metadata.""" def __init__(self, verbose, pg_conn_info): - super().__init__(verbose, 'TimescaleDB') + super().__init__(verbose, "TimescaleDB") self._conn_info = pg_conn_info @@ -21,7 +23,7 @@ def __init__(self, verbose, pg_conn_info): self.__create_database() # create a database operation executor backend - if common.get_env_var('PGOPBACKEND', 'psycopg2') == 'psycopg2': + if common.get_env_var("PGOPBACKEND", "psycopg2") == "psycopg2": self._pgopbe = Psycopg2BE(verbose, self._conn_info) else: self._pgopbe = PsqlBE(verbose, self._conn_info) @@ -33,12 +35,12 @@ def pg_config(self): """Returns TimescaleDB config.""" return { - 'operation backend': self._pgopbe.descr(), - 'host': self._conn_info.host(), - 'port': self._conn_info.port(), - 'user': self._conn_info.user(), - 'password': '(not shown)', - 'dbname': self._conn_info.dbname() + "operation backend": self._pgopbe.descr(), + "host": self._conn_info.host(), + "port": self._conn_info.port(), + "user": self._conn_info.user(), + "password": "(not shown)", + "dbname": self._conn_info.dbname(), } def __drop_database(self): @@ -46,40 +48,68 @@ def __drop_database(self): if self._verbose: print( - 'dropping database {} ... '.format(self._conn_info.dbname()), file=sys.stderr, - end='', flush=True) - common.exec_command([ - 'dropdb', '-w', '-f', '--if-exists', '-h', self._conn_info.host(), - '-p', self._conn_info.port(), '-U', self._conn_info.user(), self._conn_info.dbname()]) + "dropping database {} ... ".format(self._conn_info.dbname()), + file=sys.stderr, + end="", + flush=True, + ) + common.exec_command( + [ + "dropdb", + "-w", + "-f", + "--if-exists", + "-h", + self._conn_info.host(), + "-p", + self._conn_info.port(), + "-U", + self._conn_info.user(), + self._conn_info.dbname(), + ] + ) if self._verbose: - print('done', file=sys.stderr, flush=True) + print("done", file=sys.stderr, flush=True) def __create_database(self): """Create database named self._conn_info.dbname().""" if self._verbose: print( - 'creating database {} ... '.format(self._conn_info.dbname()), file=sys.stderr, - end='', flush=True) - common.exec_command([ - 'createdb', '-w', '-h', self._conn_info.host(), '-p', self._conn_info.port(), - '-U', self._conn_info.user(), self._conn_info.dbname()]) + "creating database {} ... ".format(self._conn_info.dbname()), + file=sys.stderr, + end="", + flush=True, + ) + common.exec_command( + [ + "createdb", + "-w", + "-h", + self._conn_info.host(), + "-p", + self._conn_info.port(), + "-U", + self._conn_info.user(), + self._conn_info.dbname(), + ] + ) if self._verbose: - print('done', file=sys.stderr, flush=True) + print("done", file=sys.stderr, flush=True) def __install_postgis_extension(self): """Install the PostGIS extension.""" if self._verbose: - print('installing PostGIS extension ... ', file=sys.stderr, end='', flush=True) - self._pgopbe.execute('CREATE EXTENSION postgis') + print("installing PostGIS extension ... ", file=sys.stderr, end="", flush=True) + self._pgopbe.execute("CREATE EXTENSION postgis") if self._verbose: - print('done', file=sys.stderr, flush=True) + print("done", file=sys.stderr, flush=True) def __get_ts_id(self, station_id, param_id): """Get time series ID from station_id and param_id""" - query = 'SELECT id FROM time_series WHERE station_id = \'{}\' AND param_id = \'{}\'' + query = "SELECT id FROM time_series WHERE station_id = '{}' AND param_id = '{}'" rows = self._pgopbe.execute(query.format(station_id, param_id)) return int(rows[0][0]) # assuming for now this always works (i.e. don't handle any error) @@ -88,23 +118,23 @@ def __create_insert_values(self, ts_id, times, values): ivalues = [] for to in zip(times, values): - ivalues.append('({},to_timestamp({}),\'{}\')'.format(ts_id, to[0], to[1])) + ivalues.append("({},to_timestamp({}),'{}')".format(ts_id, to[0], to[1])) return ivalues def reset(self, tss): """See documentation in base class.""" if self._verbose: - print('resetting PostGISSBE with {} time series'.format(len(tss)), file=sys.stderr) + print("resetting PostGISSBE with {} time series".format(len(tss)), file=sys.stderr) # assume at this point that self._conn_info.dbname() exists, but not that it is # empty, so first step is to drop schema (all tables and indexes): - self._pgopbe.execute('DROP TABLE IF EXISTS ts') + self._pgopbe.execute("DROP TABLE IF EXISTS ts") # self._pgopbe.execute('DROP INDEX IF EXISTS ...') # TODO? # create time series table self._pgopbe.execute( - ''' + """ CREATE TABLE time_series ( id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, station_id TEXT NOT NULL, @@ -113,20 +143,21 @@ def reset(self, tss): pos GEOGRAPHY(Point) NOT NULL, other_metadata JSONB NOT NULL ) - ''') + """ + ) # insert rows in time series table for ts in tss: - - cmd = ''' + cmd = """ INSERT INTO time_series (station_id, param_id, pos, other_metadata) VALUES ('{}', '{}', '{}', '{}'::jsonb) - ''' + """ self._pgopbe.execute( - ' '.join(cmd.split()).format( - ts.station_id(), ts.param_id(), - 'POINT({} {})'.format(ts.lon(), ts.lat()), - json.dumps(ts.other_mdata()) + " ".join(cmd.split()).format( + ts.station_id(), + ts.param_id(), + "POINT({} {})".format(ts.lon(), ts.lat()), + json.dumps(ts.other_mdata()), ) ) @@ -135,40 +166,46 @@ def reset(self, tss): # create observations table self._pgopbe.execute( - ''' + """ CREATE TABLE observations ( ts_id integer REFERENCES time_series(id) ON DELETE CASCADE, tstamp timestamp, -- obs time (NOT NULL, but implied by being part of PK) value double precision, -- obs value PRIMARY KEY (ts_id, tstamp) ) - ''') + """ + ) # create hypertable self._pgopbe.execute( - ''' + """ SELECT create_hypertable( 'observations', 'tstamp', chunk_time_interval => INTERVAL '1 hour' ) - ''') + """ + ) def set_obs(self, ts, times, values): """See documentation in base class.""" if self._verbose: - print('setting observations in PostGIS SBE for time series >>>', file=sys.stderr) - print(' ts: {}\n times: (size: {}), values: (size: {})'.format( - ts.__dict__, len(times), len(values)), file=sys.stderr) + print("setting observations in PostGIS SBE for time series >>>", file=sys.stderr) + print( + " ts: {}\n times: (size: {}), values: (size: {})".format( + ts.__dict__, len(times), len(values) + ), + file=sys.stderr, + ) ts_id = self.__get_ts_id(ts.station_id(), ts.param_id()) # replace all rows in observations table for this time series - cmd = 'DELETE FROM observations WHERE ts_id = {}'.format(ts_id) + cmd = "DELETE FROM observations WHERE ts_id = {}".format(ts_id) self._pgopbe.execute(cmd) ivalues = self.__create_insert_values(ts_id, times, values) - cmd = 'INSERT INTO observations (ts_id, tstamp, value) VALUES {}'.format(','.join(ivalues)) + cmd = "INSERT INTO observations (ts_id, tstamp, value) VALUES {}".format(",".join(ivalues)) self._pgopbe.execute(cmd) def add_obs(self, ts, times, values, oldest_time=None): @@ -179,16 +216,20 @@ def add_obs(self, ts, times, values, oldest_time=None): # insert or update (i.e. "upsert") rows in observations table for this time series ivalues = self.__create_insert_values(ts_id, times, values) - cmd = ''' + cmd = """ INSERT INTO observations (ts_id, tstamp, value) VALUES {} ON CONFLICT ON CONSTRAINT observations_pkey DO UPDATE SET value = EXCLUDED.value - '''.format(','.join(ivalues)) + """.format( + ",".join(ivalues) + ) self._pgopbe.execute(cmd) if oldest_time is not None: # delete observations that are too old - cmd = ''' + cmd = """ DELETE FROM observations WHERE ts_id = {} AND EXTRACT(EPOCH FROM tstamp) < {} - '''.format(ts_id, oldest_time) + """.format( + ts_id, oldest_time + ) self._pgopbe.execute(cmd) def get_obs(self, ts_ids, from_time, to_time): @@ -198,13 +239,13 @@ def get_obs(self, ts_ids, from_time, to_time): def get_obs_all(self, from_time, to_time): """See documentation in base class.""" - query = ''' + query = """ SELECT ts_id, array_agg(CAST(EXTRACT(EPOCH FROM tstamp) AS int) ORDER BY tstamp), array_agg(value ORDER BY tstamp) FROM observations WHERE tstamp >= to_timestamp({}) AND tstamp < to_timestamp({}) GROUP BY ts_id - ''' + """ rows = self._pgopbe.execute(query.format(from_time, to_time)) @@ -214,8 +255,8 @@ def get_obs_all(self, from_time, to_time): if isinstance(times, str): # the case for PsqlBE # convert '{ITEM1, ITEM2, ..., ITEMN}' to # [convert(ITEM1), convert(ITEM2), ..., convert(ITEMN)] - times = [int(x) for x in times.strip()[1:-1].split(',')] - values = [float(x) for x in values.strip()[1:-1].split(',')] + times = [int(x) for x in times.strip()[1:-1].split(",")] + values = [float(x) for x in values.strip()[1:-1].split(",")] # assert(isinstance(times, list)) # assert(isinstance(values, list)) res.append((ts_id, times, values)) @@ -225,22 +266,22 @@ def get_obs_all(self, from_time, to_time): def get_station_and_param(self, ts_id): """See documentation in base class.""" - query = 'SELECT station_id, param_id FROM time_series WHERE id = {}' + query = "SELECT station_id, param_id FROM time_series WHERE id = {}" rows = self._pgopbe.execute(query.format(ts_id)) return rows[0][0], rows[0][1] def get_ts_ids_all(self): """See documentation in base class.""" - rows = self._pgopbe.execute('SELECT id FROM time_series') + rows = self._pgopbe.execute("SELECT id FROM time_series") return [int(row[0]) for row in rows] def get_ts_ids_in_circle(self, lat, lon, radius): """See documentation in base class.""" - query = ''' + query = """ SELECT id FROM time_series WHERE ST_Distance('SRID=4326;POINT({} {})'::geography, pos) < {} - ''' + """ rows = self._pgopbe.execute(query.format(lon, lat, radius)) return [int(row[0]) for row in rows] diff --git a/tstester/timeseries.py b/tstester/timeseries.py index 6a12d13..9dfefbc 100644 --- a/tstester/timeseries.py +++ b/tstester/timeseries.py @@ -7,8 +7,7 @@ class TimeSeries: The time series is uniquely identified by the (station_id, param_id) combo. """ - def __init__( - self, verbose, station_id, lat, lon, param_id, time_res, other_mdata, obs_mdata): + def __init__(self, verbose, station_id, lat, lon, param_id, time_res, other_mdata, obs_mdata): self._verbose = verbose self._station_id = station_id @@ -17,7 +16,7 @@ def __init__( self._param_id = param_id if time_res < 1: - raise Exception('non-positive time resolution not allowed: {}'.format(time_res)) + raise Exception("non-positive time resolution not allowed: {}".format(time_res)) self._time_res = time_res # time resolution, i.e. seconds between observations self._other_mdata = other_mdata # other metadata (e.g. quality of sensor) @@ -57,7 +56,7 @@ def create_observations(self, t0, t1): """ if t0 >= t1: - raise Exception('invalid obs time range: [{}, {}]'.format(t0, t1)) + raise Exception("invalid obs time range: [{}, {}]".format(t0, t1)) times = [] values = [] diff --git a/tstester/tstester.py b/tstester/tstester.py index 7ba2e8b..39dee2b 100644 --- a/tstester/tstester.py +++ b/tstester/tstester.py @@ -1,15 +1,17 @@ -import common +import copy +import datetime +import json import random -from timescaledbsbe import TimescaleDBSBE -from postgissbe import PostGISSBE +import sys +from abc import ABC +from abc import abstractmethod + +import common from netcdfsbe_tsmdatainpostgis import NetCDFSBE_TSMDataInPostGIS -from timeseries import TimeSeries -from abc import ABC, abstractmethod from pgconnectioninfo import PGConnectionInfo -import json -import sys -import copy -import datetime +from postgissbe import PostGISSBE +from timescaledbsbe import TimescaleDBSBE +from timeseries import TimeSeries class TestBase(ABC): @@ -43,13 +45,13 @@ def __init__(self, verbose, config, storage_backends, tss): self._tss = tss def descr(self): - return 'reset storage backends with {} time series'.format(len(self._tss)) + return "reset storage backends with {} time series".format(len(self._tss)) def _execute(self): for sbe in self._storage_backends: start_secs = common.now_secs() sbe.reset(self._tss) - self._reg_stats(sbe, 'total secs', common.elapsed_secs(start_secs)) + self._reg_stats(sbe, "total secs", common.elapsed_secs(start_secs)) class FillStorage(TestBase): @@ -59,13 +61,13 @@ def __init__(self, verbose, config, storage_backends, tss, curr_time): self._curr_time = curr_time def descr(self): - return 'fill storage with observations' + return "fill storage with observations" def _execute(self): # fill each time series with observations using the entire accessible capacity # ([curr_time - max_age, curr_time]) ts_data = [] - from_time, to_time = self._curr_time - self._config['max_age'], self._curr_time + from_time, to_time = self._curr_time - self._config["max_age"], self._curr_time for ts in self._tss: times, values = ts.create_observations(from_time, to_time) ts_data.append((ts, times, values)) @@ -75,7 +77,7 @@ def _execute(self): start_secs = common.now_secs() for td in ts_data: sbe.set_obs(td[0], td[1], td[2]) - self._reg_stats(sbe, 'total secs', common.elapsed_secs(start_secs)) + self._reg_stats(sbe, "total secs", common.elapsed_secs(start_secs)) class AddNewObs(TestBase): @@ -85,17 +87,16 @@ def __init__(self, verbose, config, storage_backends, tss, curr_time): self._curr_time = curr_time def descr(self): - return 'add new observations to the storage' + return "add new observations to the storage" def _execute(self): - curr_time = self._curr_time - for extra_secs in self._config['extra_secs']: + for extra_secs in self._config["extra_secs"]: # add new observations to each time series in interval # [curr_time, curr_time + extra_secs]) ts_data = [] from_time, to_time = curr_time, curr_time + extra_secs - oldest_time = to_time - self._config['max_age'] # remove oldest observations + oldest_time = to_time - self._config["max_age"] # remove oldest observations for ts in self._tss: times, values = ts.create_observations(from_time, to_time) ts_data.append((ts, times, values, oldest_time)) @@ -106,8 +107,10 @@ def _execute(self): for td in ts_data: sbe.add_obs(td[0], td[1], td[2], td[3]) self._reg_stats( - sbe, 'total secs (extra secs = {})'.format(extra_secs), - common.elapsed_secs(start_secs)) + sbe, + "total secs (extra secs = {})".format(extra_secs), + common.elapsed_secs(start_secs), + ) curr_time += extra_secs @@ -118,18 +121,18 @@ def __init__(self, verbose, config, storage_backends, curr_time): self._curr_time = curr_time def descr(self): - return 'get all observations in the storage' + return "get all observations in the storage" def _execute(self): # retrieve all observations for all time series in time range # [curr_time - max_age, curr_time] # retrieve from each backend - from_time, to_time = self._curr_time - self._config['max_age'], self._curr_time + from_time, to_time = self._curr_time - self._config["max_age"], self._curr_time for sbe in self._storage_backends: start_secs = common.now_secs() sbe.get_obs_all(from_time, to_time) # don't use return value - self._reg_stats(sbe, 'total secs', common.elapsed_secs(start_secs)) + self._reg_stats(sbe, "total secs", common.elapsed_secs(start_secs)) class GetObsInCircle(TestBase): @@ -138,7 +141,7 @@ def __init__(self, verbose, config, storage_backends, curr_time): self._curr_time = curr_time def descr(self): - return 'get observations within a circle' + return "get observations within a circle" def _execute(self): # retrieve observations for all time series in time range @@ -149,12 +152,12 @@ def _execute(self): radius = 0 # distance in km (50% of self._config['bbox'] min. width ?) # retrieve from each backend - from_time, to_time = self._curr_time - self._config['max_age'], self._curr_time + from_time, to_time = self._curr_time - self._config["max_age"], self._curr_time for sbe in self._storage_backends: start_secs = common.now_secs() ts_ids = sbe.get_ts_ids_in_circle(lat, lon, radius) sbe.get_obs(ts_ids, from_time, to_time) # don't use return value - self._reg_stats(sbe, 'total secs', common.elapsed_secs(start_secs)) + self._reg_stats(sbe, "total secs", common.elapsed_secs(start_secs)) class TsTester: @@ -164,37 +167,46 @@ def __init__(self, verbose, config): self._verbose = verbose self._config = config - tsdb_host = common.get_env_var('TSDBHOST', 'localhost') - tsdb_port = common.get_env_var('TSDBPORT', '5433') - tsdb_user = common.get_env_var('TSDBUSER', 'postgres') - tsdb_password = common.get_env_var('TSDBPASSWORD', 'mysecretpassword') + tsdb_host = common.get_env_var("TSDBHOST", "localhost") + tsdb_port = common.get_env_var("TSDBPORT", "5433") + tsdb_user = common.get_env_var("TSDBUSER", "postgres") + tsdb_password = common.get_env_var("TSDBPASSWORD", "mysecretpassword") self._timescaledb_sbe = TimescaleDBSBE( verbose, PGConnectionInfo( - tsdb_host, tsdb_port, tsdb_user, tsdb_password, - common.get_env_var('TSDBDBNAME', 'esoh') - ) + tsdb_host, + tsdb_port, + tsdb_user, + tsdb_password, + common.get_env_var("TSDBDBNAME", "esoh"), + ), ) - pg_host = common.get_env_var('PGHOST', 'localhost') - pg_port = common.get_env_var('PGPORT', '5432') - pg_user = common.get_env_var('PGUSER', 'postgres') - pg_password = common.get_env_var('PGPASSWORD', 'mysecretpassword') + pg_host = common.get_env_var("PGHOST", "localhost") + pg_port = common.get_env_var("PGPORT", "5432") + pg_user = common.get_env_var("PGUSER", "postgres") + pg_password = common.get_env_var("PGPASSWORD", "mysecretpassword") self._postgis_sbe = PostGISSBE( verbose, PGConnectionInfo( - pg_host, pg_port, pg_user, pg_password, - common.get_env_var('PGDBNAME_POSTGIS', 'esoh_postgis') - ) + pg_host, + pg_port, + pg_user, + pg_password, + common.get_env_var("PGDBNAME_POSTGIS", "esoh_postgis"), + ), ) self._nc_sbe_tsmdata_in_postgis = NetCDFSBE_TSMDataInPostGIS( verbose, PGConnectionInfo( - pg_host, pg_port, pg_user, pg_password, - common.get_env_var('PGDBNAME_NETCDF', 'esoh_netcdf') + pg_host, + pg_port, + pg_user, + pg_password, + common.get_env_var("PGDBNAME_NETCDF", "esoh_netcdf"), ), - common.get_env_var('NCDIR', 'ncdir') + common.get_env_var("NCDIR", "ncdir"), ) self._storage_backends = [ # storage backends to test/compare @@ -217,16 +229,20 @@ def execute(self): curr_time = int(common.now_secs()) FillStorage(self._verbose, self._config, self._storage_backends, tss, curr_time).execute( - test_stats) + test_stats + ) AddNewObs(self._verbose, self._config, self._storage_backends, tss, curr_time).execute( - test_stats) + test_stats + ) GetObsAll(self._verbose, self._config, self._storage_backends, curr_time).execute( - test_stats) + test_stats + ) GetObsInCircle(self._verbose, self._config, self._storage_backends, curr_time).execute( - test_stats) + test_stats + ) # TODO: more tests (subclasses of TestBase): # - GetObsInPolygon @@ -236,14 +252,14 @@ def execute(self): # - ... cfg = copy.deepcopy(self._config) - cfg.pop('_comment', None) + cfg.pop("_comment", None) stats = { - 'start': datetime.datetime.utcfromtimestamp(start_secs).strftime('%Y-%m-%d %H:%M:%SZ'), - 'total secs': common.elapsed_secs(start_secs), - 'config': cfg, - 'timescaledb_config': self._timescaledb_sbe.pg_config(), - 'postgres_config': self._postgis_sbe.pg_config(), - 'tests': test_stats, + "start": datetime.datetime.utcfromtimestamp(start_secs).strftime("%Y-%m-%d %H:%M:%SZ"), + "total secs": common.elapsed_secs(start_secs), + "config": cfg, + "timescaledb_config": self._timescaledb_sbe.pg_config(), + "postgres_config": self._postgis_sbe.pg_config(), + "tests": test_stats, } print(json.dumps(stats, indent=4)) @@ -258,27 +274,29 @@ def create_time_series(verbose, config): Returns a list of TimeSeries objects. """ - nstations = config['nstations'] + nstations = config["nstations"] - time_res = config['time_res'] + time_res = config["time_res"] time_res = list({int(k): v for k, v in time_res.items()}.items()) - min_params = config['params']['min'] - max_params = config['params']['max'] + min_params = config["params"]["min"] + max_params = config["params"]["max"] - param_ids = list(map(lambda i: 'param_{}'.format(i), [i for i in range(max_params)])) + param_ids = list(map(lambda i: "param_{}".format(i), [i for i in range(max_params)])) - min_lat = config['bbox']['min_lat'] - max_lat = config['bbox']['max_lat'] + min_lat = config["bbox"]["min_lat"] + max_lat = config["bbox"]["max_lat"] if (min_lat < -90) or (min_lat >= max_lat) or (max_lat > 90): - raise Exception('invalid latitude range in bounding box: [{}, {}]'.format( - min_lat, max_lat)) + raise Exception( + "invalid latitude range in bounding box: [{}, {}]".format(min_lat, max_lat) + ) - min_lon = config['bbox']['min_lon'] - max_lon = config['bbox']['max_lon'] + min_lon = config["bbox"]["min_lon"] + max_lon = config["bbox"]["max_lon"] if (min_lon < -180) or (min_lon >= max_lon) or (max_lon > 180): - raise Exception('invalid longitude range in bounding box: [{}, {}]'.format( - min_lon, max_lon)) + raise Exception( + "invalid longitude range in bounding box: [{}, {}]".format(min_lon, max_lon) + ) tss = [] used_locs = set([]) # lat,lon locations used so far @@ -297,15 +315,15 @@ def create_new_loc(): def create_ts_other_metadata(): """Return dict of per time series metadata.""" - return config['ts_other_metadata'] # ### for now; eventually randomize? + return config["ts_other_metadata"] # ### for now; eventually randomize? def create_obs_metadata(): """Return dict of per observation metadata.""" - return config['obs_metadata'] # ### for now; eventually randomize? + return config["obs_metadata"] # ### for now; eventually randomize? for s in range(nstations): if verbose: - print('next station: {}'.format(s), file=sys.stderr) + print("next station: {}".format(s), file=sys.stderr) lat, lon = create_new_loc() random.shuffle(param_ids) @@ -315,12 +333,17 @@ def create_obs_metadata(): obs_mdata = create_obs_metadata() ts = TimeSeries( - verbose, 'station_{}'.format(s), lat, lon, - param_ids[p], common.select_weighted_value(time_res), - ts_other_mdata, obs_mdata + verbose, + "station_{}".format(s), + lat, + lon, + param_ids[p], + common.select_weighted_value(time_res), + ts_other_mdata, + obs_mdata, ) if verbose: - print('new ts (s = {}, p = {}): {}'.format(s, p, vars(ts)), file=sys.stderr) + print("new ts (s = {}, p = {}): {}".format(s, p, vars(ts)), file=sys.stderr) tss.append(ts) From 41dc76cfa2eadc8f08d33ba60ec5ee5b24996897 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Fri, 20 Oct 2023 17:45:56 +0200 Subject: [PATCH 05/17] Run the pre-commit hook for the Docker files. --- api/Dockerfile | 2 +- data-loader/Dockerfile | 4 ++-- datastore/Dockerfile | 2 +- examples/clients/python/Dockerfile | 2 +- integration-test/Dockerfile | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/api/Dockerfile b/api/Dockerfile index 504f864..9a85c44 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -6,7 +6,6 @@ ENV PROJECT_DATASTORE_PATH="datastore" ENV PROJECT_PYTHON_PATH="api" ENV DOCKER_PATH="/app" -# hadolint ignore=DL3013 RUN apt-get update \ && apt-get -y upgrade \ && apt-get install -y --no-install-recommends git \ @@ -19,6 +18,7 @@ RUN apt-get update \ COPY "${PROJECT_DATASTORE_PATH}/protobuf/datastore.proto" "/protobuf/datastore.proto" COPY "${PROJECT_PYTHON_PATH}/requirements.txt" "${DOCKER_PATH}/requirements.txt" +# hadolint ignore=DL3013 RUN pip install --no-cache-dir --upgrade pip \ && pip install --no-cache-dir --upgrade -r "${DOCKER_PATH}/requirements.txt" diff --git a/data-loader/Dockerfile b/data-loader/Dockerfile index e429866..812a635 100644 --- a/data-loader/Dockerfile +++ b/data-loader/Dockerfile @@ -8,10 +8,9 @@ ENV DOCKER_PATH="/clients/python" COPY "test-data/KNMI/20221231.nc" "/test-data/KNMI/20221231.nc" -# hadolint ignore=DL3013 RUN apt-get update \ && apt-get -y upgrade \ - && apt-get -y install build-essential \ + && apt-get -y install --no-install-recommends build-essential \ # Cleanup && rm -rf /usr/tmp \ && apt-get autoremove -y \ @@ -21,6 +20,7 @@ RUN apt-get update \ COPY "${PROJECT_DATASTORE_PATH}/protobuf/datastore.proto" "/protobuf/datastore.proto" COPY "${PROJECT_PYTHON_PATH}/requirements.txt" "${DOCKER_PATH}/requirements.txt" +# hadolint ignore=DL3013 RUN pip install --no-cache-dir --upgrade pip \ && pip install --no-cache-dir --upgrade -r "${DOCKER_PATH}/requirements.txt" diff --git a/datastore/Dockerfile b/datastore/Dockerfile index 33feb62..b5e1e71 100644 --- a/datastore/Dockerfile +++ b/datastore/Dockerfile @@ -5,7 +5,7 @@ # docker image rm -f main # ... -FROM golang:latest +FROM golang:1.21-bookworm ARG VERSION_GRPC_HEALTH_PROBE=v0.4.19 diff --git a/examples/clients/python/Dockerfile b/examples/clients/python/Dockerfile index c584c2b..2d4b486 100644 --- a/examples/clients/python/Dockerfile +++ b/examples/clients/python/Dockerfile @@ -6,7 +6,6 @@ ENV PROJECT_DATASTORE_PATH="datastore" ENV PROJECT_PYTHON_PATH="examples/clients/python" ENV DOCKER_PATH="/clients/python" -# hadolint ignore=DL3013 RUN apt-get update \ && apt-get -y upgrade \ # Cleanup @@ -18,6 +17,7 @@ RUN apt-get update \ COPY "${PROJECT_DATASTORE_PATH}/protobuf/datastore.proto" "/protobuf/datastore.proto" COPY "${PROJECT_PYTHON_PATH}/requirements.txt" "${DOCKER_PATH}/requirements.txt" +# hadolint ignore=DL3013 RUN pip install --no-cache-dir --upgrade pip \ && pip install --no-cache-dir --upgrade -r "${DOCKER_PATH}/requirements.txt" diff --git a/integration-test/Dockerfile b/integration-test/Dockerfile index 7f56e04..39a424a 100644 --- a/integration-test/Dockerfile +++ b/integration-test/Dockerfile @@ -6,7 +6,6 @@ ENV PROJECT_DATASTORE_PATH="datastore" ENV PROJECT_PYTHON_PATH="integration-test" ENV DOCKER_PATH="/clients/python" -# hadolint ignore=DL3013 RUN apt-get update \ && apt-get -y upgrade \ # Cleanup @@ -18,6 +17,7 @@ RUN apt-get update \ COPY "${PROJECT_DATASTORE_PATH}/protobuf/datastore.proto" "/protobuf/datastore.proto" COPY "${PROJECT_PYTHON_PATH}/requirements.txt" "${DOCKER_PATH}/requirements.txt" +# hadolint ignore=DL3013 RUN pip install --no-cache-dir --upgrade pip \ && pip install --no-cache-dir --upgrade -r "${DOCKER_PATH}/requirements.txt" From 85f9081fbeac703c81aa4c6684b6649fbe91e570 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Fri, 22 Sep 2023 17:29:14 +0200 Subject: [PATCH 06/17] Add Docker linting. --- .github/workflows/ci.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 020544e..ab4c914 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,6 +11,15 @@ env: LINE_LENGTH: 99 # TODO: would we like to increase this to 120? jobs: + docker-lint-hadolint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: hadolint/hadolint-action@v3.1.0 + with: + ignore: DL3008 + recursive: true + python-lint-black: runs-on: ubuntu-latest steps: From a17e00e7e62a2e2d3ea5c174dacdf43bfc65d63b Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Wed, 27 Sep 2023 08:45:00 +0200 Subject: [PATCH 07/17] Set permissions to read in the workflow. --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ab4c914..fb9c573 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,6 +7,9 @@ on: tags: - '[0-9]+.[0-9]+.[0-9]+' +permissions: + contents: read + env: LINE_LENGTH: 99 # TODO: would we like to increase this to 120? From 7b3ca07b1244b220828fad7944f18a12408335de Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Fri, 20 Oct 2023 18:03:48 +0200 Subject: [PATCH 08/17] Add GoLang linting. --- .github/workflows/ci.yml | 18 ++++++++++++++++++ ci/go/go-fmt.sh | 12 ++++++++++++ ci/go/go-vet.sh | 12 ++++++++++++ 3 files changed, 42 insertions(+) create mode 100755 ci/go/go-fmt.sh create mode 100755 ci/go/go-vet.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fb9c573..edae252 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,6 +23,24 @@ jobs: ignore: DL3008 recursive: true + go-fmt: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Run gofmt + run: ./ci/go/go-fmt.sh + shell: bash + + go-vet: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Run gofmt + run: ./ci/go/go-vet.sh + shell: bash + python-lint-black: runs-on: ubuntu-latest steps: diff --git a/ci/go/go-fmt.sh b/ci/go/go-fmt.sh new file mode 100755 index 0000000..35552e8 --- /dev/null +++ b/ci/go/go-fmt.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -euo pipefail + +output=$(gofmt -d -e -l -s .) + +if [ -n "${output}" ]; then + echo "${output}" + exit 1 +else + echo "All files are in the proper format." +fi diff --git a/ci/go/go-vet.sh b/ci/go/go-vet.sh new file mode 100755 index 0000000..458f7f9 --- /dev/null +++ b/ci/go/go-vet.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -uo pipefail + +output=$(cd ./datastore && go vet ./... 2>&1) + +if [ -n "${output}" ]; then + echo "${output}" +# exit 1 +else + echo "No subtle issues found in the code. All is working as intended." +fi From 1a83420453bd531ab9a00f0fc83da26472394040 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Fri, 20 Oct 2023 18:16:25 +0200 Subject: [PATCH 09/17] Apply go format. --- datastore/storagebackend/postgresql/getobservations.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datastore/storagebackend/postgresql/getobservations.go b/datastore/storagebackend/postgresql/getobservations.go index bedff21..1bb7555 100644 --- a/datastore/storagebackend/postgresql/getobservations.go +++ b/datastore/storagebackend/postgresql/getobservations.go @@ -180,7 +180,7 @@ func getMdataFilter(filterInfos []filterInfo, phVals *[]interface{}) string { // Returns expression. func getGeoFilter(inside *datastore.Polygon, phVals *[]interface{}) (string, error) { whereExpr := "TRUE" // by default, don't filter - if inside != nil { // get all points + if inside != nil { // get all points points := inside.Points equal := func(p1, p2 *datastore.Point) bool { From 95679b359a021688accb17896d3220a366e30df3 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Mon, 23 Oct 2023 11:52:58 +0200 Subject: [PATCH 10/17] Update readme with better instructions on the pre-commit hook. --- README.md | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 6c1878a..e73722f 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,13 @@ E-SOH datastore PoCs ## Pre-commit -To update the pre-commit hook run: `pre-commit autoupdate` - -To use the pre-commit hook reinitialize the repository with `git init` and install the pre-commit hook with `pre-commit install`. - -To run the pre-commit for every file in the repository run `pre-commit run --config './.pre-commit-config.yaml' --all-files` +### Setup +1. Go to the root of the repository. +2. Install the python pre-commit package with `pip install pre-commit`. +3. Reinitialize there repository with `git init`. +4. Install the hooks defined in `.pre-commit-config.yaml` with `pre-commit install`. + +### Useful Commands +- To update the pre-commit hooks in `.pre-commit-config.yaml` run: `pre-commit autoupdate`. +- To run the pre-commit for every file in the repository run `pre-commit run --config './.pre-commit-config.yaml' --all-files`. +- To commit without the pre-commit hook `git commit -m "Some message" --no-verify` From 6078f50f307c72c242c42c43bed670ed123148e7 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Mon, 23 Oct 2023 12:04:39 +0200 Subject: [PATCH 11/17] Add Go to the local pre-commit hook configuration. --- .pre-commit-config.yaml | 17 +++++++++++++++++ ci/go/go-vet.sh | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a381097..7a3c98a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,21 @@ repos: + - repo: local + hooks: + # go-fmt ~ Enforces Go standard formatting (whitespace, indentation, et cetera) + - id: go-fmt + name: go-fmt + description: "Enforces Go standard formatting (whitespace, indentation, et cetera)." + entry: ./ci/go/go-fmt.sh + language: script + pass_filenames: false + # go-vet ~ Finds subtle issues in Go where your code may not work as intended + - id: go-vet + name: go-vet + description: "Finds subtle issues in Go where your code may not work as intended." + entry: ./ci/go/go-vet.sh + language: script + pass_filenames: false + - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: diff --git a/ci/go/go-vet.sh b/ci/go/go-vet.sh index 458f7f9..a60407f 100755 --- a/ci/go/go-vet.sh +++ b/ci/go/go-vet.sh @@ -6,7 +6,7 @@ output=$(cd ./datastore && go vet ./... 2>&1) if [ -n "${output}" ]; then echo "${output}" -# exit 1 + exit 1 else echo "No subtle issues found in the code. All is working as intended." fi From 329114576ae5606dda9bc41200f7c1ef01576df6 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Mon, 23 Oct 2023 12:26:16 +0200 Subject: [PATCH 12/17] Add ShellCheck to the pre-commit hook and git actions. --- .github/workflows/ci.yml | 8 ++++++++ .pre-commit-config.yaml | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index edae252..87f2c29 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -80,6 +80,14 @@ jobs: - name: Code Style Check run: flake8 . --count --max-line-length=$LINE_LENGTH --ignore=W503 --show-source --statistics + shellcheck: + name: Shellcheck + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Run ShellCheck + uses: ludeeus/action-shellcheck@master + test: runs-on: ubuntu-latest steps: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7a3c98a..4f047c7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -65,3 +65,9 @@ repos: args: [ "--ignore=DL3008", # Pin versions in apt get install. ] + + # ShellCheck ~ Gives warnings and suggestions for bash/sh shell scripts + - repo: https://github.com/koalaman/shellcheck-precommit + rev: v0.9.0 + hooks: + - id: shellcheck From 823d9a9cf80f9f1dd98749230e777857577c6d99 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Mon, 23 Oct 2023 14:26:44 +0200 Subject: [PATCH 13/17] Disable go vet as it requires the protoc files to be generated. This can be annoying in a pre-commit hook. In the pipeline we might want to have it after building and publishing the image. --- .github/workflows/ci.yml | 16 ++++++++-------- .pre-commit-config.yaml | 14 +++++++------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87f2c29..d1508b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,14 +32,14 @@ jobs: run: ./ci/go/go-fmt.sh shell: bash - go-vet: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Run gofmt - run: ./ci/go/go-vet.sh - shell: bash +# go-vet: +# runs-on: ubuntu-latest +# steps: +# - uses: actions/checkout@v3 +# +# - name: Run gofmt +# run: ./ci/go/go-vet.sh +# shell: bash python-lint-black: runs-on: ubuntu-latest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4f047c7..2a0aea9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,13 +8,13 @@ repos: entry: ./ci/go/go-fmt.sh language: script pass_filenames: false - # go-vet ~ Finds subtle issues in Go where your code may not work as intended - - id: go-vet - name: go-vet - description: "Finds subtle issues in Go where your code may not work as intended." - entry: ./ci/go/go-vet.sh - language: script - pass_filenames: false +# # go-vet ~ Finds subtle issues in Go where your code may not work as intended +# - id: go-vet +# name: go-vet +# description: "Finds subtle issues in Go where your code may not work as intended." +# entry: ./ci/go/go-vet.sh +# language: script +# pass_filenames: false - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 From 36887a2772ceaaaaa0b4db79e9f8ba6d6e4ad3f9 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Tue, 24 Oct 2023 12:19:57 +0200 Subject: [PATCH 14/17] Increase line length to 120. --- .github/workflows/ci.yml | 2 +- .pre-commit-config.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d1508b3..47c2520 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ permissions: contents: read env: - LINE_LENGTH: 99 # TODO: would we like to increase this to 120? + LINE_LENGTH: 120 jobs: docker-lint-hadolint: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2a0aea9..71f2305 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -44,7 +44,7 @@ repos: rev: 23.10.0 hooks: - id: black - args: ["--line-length=99"] + args: ["--line-length=120"] # flake8 ~ Enforces the Python PEP8 style guide - repo: https://github.com/pycqa/flake8 @@ -54,7 +54,7 @@ repos: args: [ "--ignore=W503", - "--max-line-length=99", + "--max-line-length=120", ] # hadolint ~ Docker linter From 76b56c26d85be3ca532d11e7a20d5349443d7337 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Tue, 24 Oct 2023 12:20:58 +0200 Subject: [PATCH 15/17] Apply pre-commit hook with line-length of 120. --- api/main.py | 28 +++++++--------------------- data-loader/client_fmi_station.py | 9 ++------- data-loader/client_knmi_station.py | 21 +++++---------------- examples/clients/python/client.py | 4 +--- integration-test/discover.py | 4 +--- integration-test/test_knmi.py | 4 +--- load-test/locustfile.py | 4 +--- tstester/common.py | 12 +++--------- tstester/main.py | 4 +--- tstester/netcdf.py | 4 +--- tstester/postgissbe.py | 4 +--- tstester/timescaledbsbe.py | 4 +--- tstester/tstester.py | 24 ++++++------------------ 13 files changed, 31 insertions(+), 95 deletions(-) diff --git a/api/main.py b/api/main.py index 31ea8aa..ea19ebf 100644 --- a/api/main.py +++ b/api/main.py @@ -50,9 +50,7 @@ def collect_data(ts_mdata, obs_mdata): def get_data_for_time_series(get_obs_request): - with grpc.insecure_channel( - f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" - ) as channel: + with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: grpc_stub = dstore_grpc.DatastoreStub(channel) response = grpc_stub.GetObservations(get_obs_request) @@ -68,9 +66,7 @@ def get_data_for_time_series(get_obs_request): referencing = [ ReferenceSystemConnectionObject( coordinates=["y", "x"], - system=ReferenceSystem( - type="GeographicCRS", id="http://www.opengis.net/def/crs/EPSG/0/4326" - ), + system=ReferenceSystem(type="GeographicCRS", id="http://www.opengis.net/def/crs/EPSG/0/4326"), ), ReferenceSystemConnectionObject( coordinates=["z"], @@ -92,9 +88,7 @@ def get_data_for_time_series(get_obs_request): for ((_, _, _), param_id, values) in group1 } ranges = { - param_id: NdArray( - values=values, axisNames=["t", "y", "x"], shape=[len(values), 1, 1] - ) + param_id: NdArray(values=values, axisNames=["t", "y", "x"], shape=[len(values), 1, 1]) for ((_, _, _), param_id, values) in group2 } @@ -113,20 +107,14 @@ def get_data_for_time_series(get_obs_request): response_model=FeatureCollection, response_model_exclude_none=True, ) -def get_locations( - bbox: str = Query(..., example="5.0,52.0,6.0,52.1") -) -> FeatureCollection: # Hack to use string +def get_locations(bbox: str = Query(..., example="5.0,52.0,6.0,52.1")) -> FeatureCollection: # Hack to use string left, bottom, right, top = map(str.strip, bbox.split(",")) poly = geometry.Polygon([(left, bottom), (right, bottom), (right, top), (left, top)]) - with grpc.insecure_channel( - f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" - ) as channel: + with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: grpc_stub = dstore_grpc.DatastoreStub(channel) ts_request = dstore.GetObsRequest( instruments=["tn"], # Hack - inside=dstore.Polygon( - points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords] - ), + inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]), ) ts_response = grpc_stub.GetObservations(ts_request) @@ -194,8 +182,6 @@ def get_data_area( assert poly.geom_type == "Polygon" get_obs_request = dstore.GetObsRequest( instruments=list(map(str.strip, parameter_name.split(","))), - inside=dstore.Polygon( - points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords] - ), + inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]), ) return get_data_for_time_series(get_obs_request) diff --git a/data-loader/client_fmi_station.py b/data-loader/client_fmi_station.py index f8fb426..7bf9215 100755 --- a/data-loader/client_fmi_station.py +++ b/data-loader/client_fmi_station.py @@ -61,9 +61,7 @@ def csv_file_to_requests(file_path: Path | str) -> Tuple[List, List]: def insert_data(observation_request_messages: List): workers = int(cpu_count()) - with grpc.insecure_channel( - f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" - ) as channel: + with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: client = dstore_grpc.DatastoreStub(channel=channel) print(f"Inserting {len(observation_request_messages)} bulk observations requests.") obs_insert_start = perf_counter() @@ -81,10 +79,7 @@ def insert_data(observation_request_messages: List): file_path = Path(Path(__file__).parents[2] / "test-data" / "FMI" / "20221231.csv") print(file_path) observation_request_messages = csv_file_to_requests(file_path=file_path) - print( - "Finished creating the time series and observation requests " - f"{perf_counter() - create_requests_start}." - ) + print("Finished creating the time series and observation requests " f"{perf_counter() - create_requests_start}.") insert_data( observation_request_messages=observation_request_messages, diff --git a/data-loader/client_knmi_station.py b/data-loader/client_knmi_station.py index 4be921f..de949e7 100755 --- a/data-loader/client_knmi_station.py +++ b/data-loader/client_knmi_station.py @@ -21,9 +21,7 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]: observation_request_messages = [] - with xr.open_dataset( - file_path, engine="netcdf4", chunks=None - ) as file: # chunks=None to disable dask + with xr.open_dataset(file_path, engine="netcdf4", chunks=None) as file: # chunks=None to disable dask for station_id, latitude, longitude, height in zip( file["station"].values, file["lat"].values[0], @@ -40,15 +38,11 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]: platform=station_id, instrument=param_id, title=param_file.long_name, - standard_name=param_file.standard_name - if "standard_name" in param_file.attrs - else None, + standard_name=param_file.standard_name if "standard_name" in param_file.attrs else None, unit=param_file.units if "units" in param_file.attrs else None, ) - for time, obs_value in zip( - pd.to_datetime(param_file["time"].data).to_pydatetime(), param_file.data - ): + for time, obs_value in zip(pd.to_datetime(param_file["time"].data).to_pydatetime(), param_file.data): ts = Timestamp() ts.FromDatetime(time) obs_mdata = dstore.ObsMetadata( @@ -68,9 +62,7 @@ def netcdf_file_to_requests(file_path: Path | str) -> Tuple[List, List]: def insert_data(observation_request_messages: List): workers = int(cpu_count()) - with grpc.insecure_channel( - f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" - ) as channel: + with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: client = dstore_grpc.DatastoreStub(channel=channel) print(f"Inserting {len(observation_request_messages)} bulk observations requests.") @@ -88,10 +80,7 @@ def insert_data(observation_request_messages: List): create_requests_start = perf_counter() file_path = Path(Path(__file__).parents[2] / "test-data" / "KNMI" / "20221231.nc") observation_request_messages = netcdf_file_to_requests(file_path=file_path) - print( - "Finished creating the time series and observation requests " - f"{perf_counter() - create_requests_start}." - ) + print("Finished creating the time series and observation requests " f"{perf_counter() - create_requests_start}.") insert_data( observation_request_messages=observation_request_messages, diff --git a/examples/clients/python/client.py b/examples/clients/python/client.py index 9e359c7..9f05d2c 100755 --- a/examples/clients/python/client.py +++ b/examples/clients/python/client.py @@ -84,9 +84,7 @@ def call_get_obs_in_polygon(stub): if __name__ == "__main__": - with grpc.insecure_channel( - f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" - ) as channel: + with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: stub = dstore_grpc.DatastoreStub(channel) version = "version_dummy" diff --git a/integration-test/discover.py b/integration-test/discover.py index 68e4c8f..056e981 100644 --- a/integration-test/discover.py +++ b/integration-test/discover.py @@ -4,9 +4,7 @@ import xarray as xr file_path = Path(Path(__file__).parents[1] / "test-data" / "KNMI" / "20221231.nc") -with xr.open_dataset( - file_path, engine="netcdf4", chunks=None -) as ds: # chunks=None to disable dask +with xr.open_dataset(file_path, engine="netcdf4", chunks=None) as ds: # chunks=None to disable dask # print(ds) print(ds.sel(station="06260").isel(time=0).lat.values) print(ds.sel(station="06260").isel(time=0).lon.values) diff --git a/integration-test/test_knmi.py b/integration-test/test_knmi.py index da27fac..6d81eec 100644 --- a/integration-test/test_knmi.py +++ b/integration-test/test_knmi.py @@ -13,9 +13,7 @@ @pytest.fixture(scope="session") def grpc_stub(): - with grpc.insecure_channel( - f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}" - ) as channel: + with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: yield dstore_grpc.DatastoreStub(channel) diff --git a/load-test/locustfile.py b/load-test/locustfile.py index b277d15..81e966f 100644 --- a/load-test/locustfile.py +++ b/load-test/locustfile.py @@ -64,9 +64,7 @@ def get_data_single_station_through_bbox(self): request = dstore.GetObsRequest( interval=dstore.TimeInterval(start=from_time, end=to_time), instruments=[random.choice(parameters)], - inside=dstore.Polygon( - points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords] - ), + inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]), ) response = self.stub.GetObservations(request) assert len(response.observations) == 1 diff --git a/tstester/common.py b/tstester/common.py index 1fc0342..cc318ed 100644 --- a/tstester/common.py +++ b/tstester/common.py @@ -16,9 +16,7 @@ def select_weighted_value(x): raise Exception("can't select from empty list") for item in x: if item[1] <= 0: - raise Exception( - "non-positive weight not allowed (value: {}, weight: {})".format(item[0], item[1]) - ) + raise Exception("non-positive weight not allowed (value: {}, weight: {})".format(item[0], item[1])) w_sum_n = sum([z[1] for z in x]) # get total weight sum r = random.random() * w_sum_n # get random value within total weight sum @@ -94,15 +92,11 @@ def validate_precondition(): for j in [0, 1]: if len(t[j]) != len(v[j]): raise Exception( - "precondition failed: len(t[{}]) ({}) != len(v[{}]) ({})".format( - j, len(t[j]), j, len(v[j]) - ) + "precondition failed: len(t[{}]) ({}) != len(v[{}]) ({})".format(j, len(t[j]), j, len(v[j])) ) if len(t[j]) > 0: if t[j][-1] >= sentinel_obs_time: - raise Exception( - "precondition failed: t[{}][-1] >= {}".format(j, sentinel_obs_time) - ) + raise Exception("precondition failed: t[{}][-1] >= {}".format(j, sentinel_obs_time)) if len(t[j]) > 1: for i in range(1, len(t[j])): if t[j][i - 1] >= t[j][i]: diff --git a/tstester/main.py b/tstester/main.py index bc3732a..e608c20 100755 --- a/tstester/main.py +++ b/tstester/main.py @@ -19,9 +19,7 @@ def parse_args(args): exit_on_error=False, ) parser.add_argument("-v", "--verbose", action="store_true", help="Enable logging to stdout.") - parser.add_argument( - "-c", "--cfg_file", default="config.json", type=pathlib.Path, help="Config file." - ) + parser.add_argument("-c", "--cfg_file", default="config.json", type=pathlib.Path, help="Config file.") parser.add_argument("-s", "--random_seed", type=int, default=-1, help="Random seed.") pres = parser.parse_args(args) diff --git a/tstester/netcdf.py b/tstester/netcdf.py index a4abc4d..06fe32d 100644 --- a/tstester/netcdf.py +++ b/tstester/netcdf.py @@ -81,9 +81,7 @@ def add_times_and_values(self, path, times, values, oldest_time=None): fvalues = dset["value"][:] # merge - mtimes, mvalues = common.ts_merge( - ftimes.tolist(), fvalues.tolist(), times, values, oldest_time - ) + mtimes, mvalues = common.ts_merge(ftimes.tolist(), fvalues.tolist(), times, values, oldest_time) # replace file variables with merged arrays dset["time"][:] = mtimes diff --git a/tstester/postgissbe.py b/tstester/postgissbe.py index 452d226..37b6ee5 100644 --- a/tstester/postgissbe.py +++ b/tstester/postgissbe.py @@ -182,9 +182,7 @@ def set_obs(self, ts, times, values): if self._verbose: print("setting observations in PostGIS SBE for time series >>>", file=sys.stderr) print( - " ts: {}\n times: (size: {}), values: (size: {})".format( - ts.__dict__, len(times), len(values) - ), + " ts: {}\n times: (size: {}), values: (size: {})".format(ts.__dict__, len(times), len(values)), file=sys.stderr, ) diff --git a/tstester/timescaledbsbe.py b/tstester/timescaledbsbe.py index 13ef533..60f9ca8 100644 --- a/tstester/timescaledbsbe.py +++ b/tstester/timescaledbsbe.py @@ -191,9 +191,7 @@ def set_obs(self, ts, times, values): if self._verbose: print("setting observations in PostGIS SBE for time series >>>", file=sys.stderr) print( - " ts: {}\n times: (size: {}), values: (size: {})".format( - ts.__dict__, len(times), len(values) - ), + " ts: {}\n times: (size: {}), values: (size: {})".format(ts.__dict__, len(times), len(values)), file=sys.stderr, ) diff --git a/tstester/tstester.py b/tstester/tstester.py index 39dee2b..ae2fa10 100644 --- a/tstester/tstester.py +++ b/tstester/tstester.py @@ -228,21 +228,13 @@ def execute(self): curr_time = int(common.now_secs()) - FillStorage(self._verbose, self._config, self._storage_backends, tss, curr_time).execute( - test_stats - ) + FillStorage(self._verbose, self._config, self._storage_backends, tss, curr_time).execute(test_stats) - AddNewObs(self._verbose, self._config, self._storage_backends, tss, curr_time).execute( - test_stats - ) + AddNewObs(self._verbose, self._config, self._storage_backends, tss, curr_time).execute(test_stats) - GetObsAll(self._verbose, self._config, self._storage_backends, curr_time).execute( - test_stats - ) + GetObsAll(self._verbose, self._config, self._storage_backends, curr_time).execute(test_stats) - GetObsInCircle(self._verbose, self._config, self._storage_backends, curr_time).execute( - test_stats - ) + GetObsInCircle(self._verbose, self._config, self._storage_backends, curr_time).execute(test_stats) # TODO: more tests (subclasses of TestBase): # - GetObsInPolygon @@ -287,16 +279,12 @@ def create_time_series(verbose, config): min_lat = config["bbox"]["min_lat"] max_lat = config["bbox"]["max_lat"] if (min_lat < -90) or (min_lat >= max_lat) or (max_lat > 90): - raise Exception( - "invalid latitude range in bounding box: [{}, {}]".format(min_lat, max_lat) - ) + raise Exception("invalid latitude range in bounding box: [{}, {}]".format(min_lat, max_lat)) min_lon = config["bbox"]["min_lon"] max_lon = config["bbox"]["max_lon"] if (min_lon < -180) or (min_lon >= max_lon) or (max_lon > 180): - raise Exception( - "invalid longitude range in bounding box: [{}, {}]".format(min_lon, max_lon) - ) + raise Exception("invalid longitude range in bounding box: [{}, {}]".format(min_lon, max_lon)) tss = [] used_locs = set([]) # lat,lon locations used so far From dd1449f3903a36b4cb1328896d9b01519957e5b1 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Tue, 24 Oct 2023 13:51:51 +0200 Subject: [PATCH 16/17] Replace git actions with pre-commit hook. Did not use the pre-commit action as it is in maintenance only mode. --- .github/workflows/ci.yml | 90 +++++++--------------------------------- 1 file changed, 16 insertions(+), 74 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 47c2520..a0cc22c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,83 +10,25 @@ on: permissions: contents: read -env: - LINE_LENGTH: 120 - jobs: - docker-lint-hadolint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: hadolint/hadolint-action@v3.1.0 - with: - ignore: DL3008 - recursive: true - - go-fmt: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Run gofmt - run: ./ci/go/go-fmt.sh - shell: bash - -# go-vet: -# runs-on: ubuntu-latest -# steps: -# - uses: actions/checkout@v3 -# -# - name: Run gofmt -# run: ./ci/go/go-vet.sh -# shell: bash - - python-lint-black: - runs-on: ubuntu-latest - steps: - - name: Checkout Source - uses: actions/checkout@v3 - - - name: Python Setup - uses: actions/setup-python@v4 - with: - python-version: '3.11' - architecture: x64 - - - name: Install Black - run: pip install black - - - name: Lint Python with Black - run: black . --check --line-length=${LINE_LENGTH} - - python-lint-flake8: - runs-on: ubuntu-latest - steps: - - name: Python Setup - uses: actions/setup-python@v4 - with: - python-version: '3.11' - architecture: x64 - - - name: Checkout Source - uses: actions/checkout@v3 - - - name: Install flake8 - run: pip install flake8 - - - name: Syntax Error Check - run: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - - - name: Code Style Check - run: flake8 . --count --max-line-length=$LINE_LENGTH --ignore=W503 --show-source --statistics - - shellcheck: - name: Shellcheck + pre-commit-hook: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - name: Run ShellCheck - uses: ludeeus/action-shellcheck@master + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + - name: Install pre-commit + run: python -m pip install pre-commit + shell: bash + - name: Show pre-commit requirements + run: python -m pip freeze --local + shell: bash + - uses: actions/cache@v3 + with: + path: ~/.cache/pre-commit + key: pre-commit-3|${{ env.pythonLocation }}|${{ hashFiles('.pre-commit-config.yaml') }} + - name: Run pre-commit for all files + run: pre-commit run --config './.pre-commit-config.yaml' --all-files --color=always --show-diff-on-failure + shell: bash test: runs-on: ubuntu-latest From 83bb2125c19258939aacea3224ae06c98c7940a8 Mon Sep 17 00:00:00 2001 From: Jeffrey Vervoort Date: Tue, 24 Oct 2023 14:36:30 +0200 Subject: [PATCH 17/17] Add a number to comments so that they are not removed by the pre-commit hook. --- tstester/config.json | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tstester/config.json b/tstester/config.json index de372ec..f2f90ea 100644 --- a/tstester/config.json +++ b/tstester/config.json @@ -1,27 +1,32 @@ { "max_age": 86400, - "_comment": "extra secs values to try with the AddNewObs test", + "_comment_1": "max age in secs relative to current time (older observations are inaccessible)", "nstations": 3, + "_comment_2": "number of stations to generate", "bbox": { "min_lat": -60.5, "max_lat": 62.5, "min_lon": -10.5, "max_lon": 12.5 }, + "_comment_3": "bounding box for randomly generated station locations (no two stations will have the same location)", "params": { "min": 1, "max": 3 }, + "_comment_4": "minimum and maximum number of randomly generated params for a station", "time_res": { "60": 0.2, "600": 0.3, "3600": 0.5 }, + "_comment_5": "probability weights of time series resolutions (around 20% will have time res 60 secs, around 30% will have time res 600 secs, and so on)", "extra_secs": [ 60, 600, 3600 ], + "_comment_6": "extra secs values to try with the AddNewObs test", "ts_other_metadata": { "sensor_location_quality": 9, "sensor_performance_quality": 9