Skip to content

Commit

Permalink
Implemented plugin loader
Browse files Browse the repository at this point in the history
  • Loading branch information
Teddy-1000 committed Jan 12, 2024
1 parent 5da3c2d commit 33e3be9
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 93 deletions.
22 changes: 20 additions & 2 deletions api/formatter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import importlib
import glob

from base_formatter import EDR_formatter

def get_formatters():

def get_EDR_formatters() -> dict:
"""
This method should grab all available formatters and make them reachable in a dict
This way we can dynamicly grab all available formats and skip configuring this.
Should aliases be made available, and how do one make formatters present in openapi doc?
"""
pass
# Get all .py files in local folder, ignore files that start with _
formatter_files = [i.strip(".py") for i in glob.glob("[!_]*.py")]

available_formatters = {}

formatters = [importlib.import_module(i) for i in formatter_files]

for formatter in formatters:
if hasattr(formatter, "formatter_name"):
obj = getattr(formatter, formatter.formatter_name)
if issubclass(obj, EDR_formatter):
# Make instance of formatter and save
available_formatters[formatter.name] = formatter()

return available_formatters
3 changes: 2 additions & 1 deletion api/formatter/base_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
class EDR_formatter(ABC):
"""
This is the abstract class for implementing a formatter in the E-SOH EDR formatter
Name of class should represent expected output format.
"""
pass

@abstractmethod
def convert():
def convert(self, datastore_reply):
"""
Main method for converting protobuf object to given format.
"""
Expand Down
95 changes: 95 additions & 0 deletions api/formatter/covjson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import math

from datetime import timezone
from itertools import groupby

from covjson_pydantic.coverage import Coverage
from covjson_pydantic.coverage import CoverageCollection
from covjson_pydantic.domain import Axes
from covjson_pydantic.domain import Domain
from covjson_pydantic.domain import DomainType
from covjson_pydantic.domain import ValuesAxis
from covjson_pydantic.ndarray import NdArray
from covjson_pydantic.observed_property import ObservedProperty
from covjson_pydantic.parameter import Parameter
from covjson_pydantic.reference_system import ReferenceSystem
from covjson_pydantic.reference_system import ReferenceSystemConnectionObject
from covjson_pydantic.unit import Unit

from pydantic import AwareDatetime

from base_formatter import EDR_formatter


# Requierd for
formatter_name = "Covjson"


class Covjson(EDR_formatter):
"""
Class for converting protobuf object to coverage json
"""

def __init__(self):
self.alias = ["covjson", "coveragejson"]

def convert(self, response):
# Collect data
coverages = []
data = [self._collect_data(md.ts_mdata, md.obs_mdata) for md in response.observations]

# Need to sort before using groupBy. Also sort on param_id to get consistently sorted output
data.sort(key=lambda x: (x[0], x[1]))
# The multiple coverage logic is not needed for this endpoint,
# but we want to share this code between endpoints
for (lat, lon, times), group in groupby(data, lambda x: x[0]):
referencing = [
ReferenceSystemConnectionObject(
coordinates=["y", "x"],
system=ReferenceSystem(type="GeographicCRS",
id="http://www.opengis.net/def/crs/EPSG/0/4326"),
),
ReferenceSystemConnectionObject(
coordinates=["z"],
system=ReferenceSystem(type="TemporalRS", calendar="Gregorian"),
),
]
domain = Domain(
domainType=DomainType.point_series,
axes=Axes(
x=ValuesAxis[float](values=[lon]),
y=ValuesAxis[float](values=[lat]),
t=ValuesAxis[AwareDatetime](values=times),
),
referencing=referencing,
)

parameters = {}
ranges = {}
for (_, _, _), param_id, unit, values in group:
if all(math.isnan(v) for v in values):
continue # Drop ranges if completely nan.
# TODO: Drop the whole coverage if it becomes empty?
values_no_nan = [v if not math.isnan(v) else None for v in values]
# TODO: Improve this based on "standard name", etc.
parameters[param_id] = Parameter(
observedProperty=ObservedProperty(label={"en": param_id}), unit=Unit(label={"en": unit})
) # TODO: Also fill symbol?
ranges[param_id] = NdArray(
values=values_no_nan, axisNames=["t", "y", "x"], shape=[len(values_no_nan), 1, 1]
)

coverages.append(Coverage(domain=domain, parameters=parameters, ranges=ranges))
return CoverageCollection(coverages=coverages)

def _collect_data(ts_mdata, obs_mdata):
lat = obs_mdata[0].geo_point.lat # HACK: For now assume they all have the same position
lon = obs_mdata[0].geo_point.lon
tuples = (
(o.obstime_instant.ToDatetime(tzinfo=timezone.utc), float(o.value)) for o in obs_mdata
) # HACK: str -> float
(times, values) = zip(*tuples)
param_id = ts_mdata.instrument
unit = ts_mdata.unit

return (lat, lon, times), param_id, unit, values
12 changes: 12 additions & 0 deletions api/grpc_getter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import grpc
import os

import datastore_pb2_grpc as dstore_grpc


def get_obsrequest(get_obs_request):
with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel:
grpc_stub = dstore_grpc.DatastoreStub(channel)
response = grpc_stub.GetObservations(get_obs_request)

return response
114 changes: 24 additions & 90 deletions api/main.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
# Run with:
# For developing: uvicorn main:app --reload
import math
import os
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from itertools import groupby
from typing import Tuple

import datastore_pb2 as dstore
import datastore_pb2_grpc as dstore_grpc
import grpc
import metadata_endpoints
from brotli_asgi import BrotliMiddleware

from covjson_pydantic.coverage import Coverage
from covjson_pydantic.coverage import CoverageCollection
from covjson_pydantic.domain import Axes
from covjson_pydantic.domain import Domain
from covjson_pydantic.domain import DomainType
from covjson_pydantic.domain import ValuesAxis
from covjson_pydantic.ndarray import NdArray
from covjson_pydantic.observed_property import ObservedProperty
from covjson_pydantic.parameter import Parameter
from covjson_pydantic.reference_system import ReferenceSystem
from covjson_pydantic.reference_system import ReferenceSystemConnectionObject
from covjson_pydantic.unit import Unit

from edr_pydantic.capabilities import LandingPageModel
from edr_pydantic.collections import Collection
from edr_pydantic.collections import Collections
Expand All @@ -44,82 +33,14 @@
from shapely import wkt


app = FastAPI()
app.add_middleware(BrotliMiddleware)


def collect_data(ts_mdata, obs_mdata):
lat = obs_mdata[0].geo_point.lat # HACK: For now assume they all have the same position
lon = obs_mdata[0].geo_point.lon
tuples = (
(o.obstime_instant.ToDatetime(tzinfo=timezone.utc), float(o.value)) for o in obs_mdata
) # HACK: str -> float
(times, values) = zip(*tuples)
param_id = ts_mdata.instrument
unit = ts_mdata.unit

return (lat, lon, times), param_id, unit, values


def get_data_for_time_series(get_obs_request):
with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel:
grpc_stub = dstore_grpc.DatastoreStub(channel)
response = grpc_stub.GetObservations(get_obs_request)
from grpc_getter import get_obsrequest

# Collect data
coverages = []
data = [collect_data(md.ts_mdata, md.obs_mdata) for md in response.observations]
from formatter import get_EDR_formatters

# Need to sort before using groupBy. Also sort on param_id to get consistently sorted output
data.sort(key=lambda x: (x[0], x[1]))
# The multiple coverage logic is not needed for this endpoint,
# but we want to share this code between endpoints
for (lat, lon, times), group in groupby(data, lambda x: x[0]):
referencing = [
ReferenceSystemConnectionObject(
coordinates=["y", "x"],
system=ReferenceSystem(type="GeographicCRS", id="http://www.opengis.net/def/crs/EPSG/0/4326"),
),
ReferenceSystemConnectionObject(
coordinates=["z"],
system=ReferenceSystem(type="TemporalRS", calendar="Gregorian"),
),
]
domain = Domain(
domainType=DomainType.point_series,
axes=Axes(
x=ValuesAxis[float](values=[lon]),
y=ValuesAxis[float](values=[lat]),
t=ValuesAxis[AwareDatetime](values=times),
),
referencing=referencing,
)

parameters = {}
ranges = {}
for (_, _, _), param_id, unit, values in group:
if all(math.isnan(v) for v in values):
continue # Drop ranges if completely nan.
# TODO: Drop the whole coverage if it becomes empty?
values_no_nan = [v if not math.isnan(v) else None for v in values]
# TODO: Improve this based on "standard name", etc.
parameters[param_id] = Parameter(
observedProperty=ObservedProperty(label={"en": param_id}), unit=Unit(label={"en": unit})
) # TODO: Also fill symbol?
ranges[param_id] = NdArray(
values=values_no_nan, axisNames=["t", "y", "x"], shape=[len(values_no_nan), 1, 1]
)

coverages.append(Coverage(domain=domain, parameters=parameters, ranges=ranges))
app = FastAPI()
app.add_middleware(BrotliMiddleware)

if len(coverages) == 0:
raise HTTPException(status_code=404, detail="No data found")
elif len(coverages) == 1:
return coverages[0]
else:
return CoverageCollection(
coverages=coverages, parameters=coverages[0].parameters
) # HACK to take parameters from first one
edr_formatter = get_EDR_formatters()


def get_datetime_range(datetime_string: str | None) -> Tuple[Timestamp, Timestamp] | None:
Expand All @@ -141,7 +62,8 @@ def get_datetime_range(datetime_string: str | None) -> Tuple[Timestamp, Timestam
start_datetime.FromDatetime(datetime.min)
if datetimes[1] != "..":
# HACK add one second so that the end_datetime is included in the interval.
end_datetime.FromDatetime(aware_datetime_type_adapter.validate_python(datetimes[1]) + timedelta(seconds=1))
end_datetime.FromDatetime(aware_datetime_type_adapter.validate_python(
datetimes[1]) + timedelta(seconds=1))
else:
end_datetime.FromDatetime(datetime.max)

Expand Down Expand Up @@ -193,7 +115,8 @@ def get_locations(bbox: str = Query(..., example="5.0,52.0,6.0,52.1")) -> Featur
grpc_stub = dstore_grpc.DatastoreStub(channel)
ts_request = dstore.GetObsRequest(
instruments=["tn"], # Hack
inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]),
inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0])
for coord in poly.exterior.coords]),
)
ts_response = grpc_stub.GetObservations(ts_request)

Expand Down Expand Up @@ -261,13 +184,24 @@ def get_data_area(
coords: str = Query(..., example="POLYGON((5.0 52.0, 6.0 52.0,6.0 52.1,5.0 52.1, 5.0 52.0))"),
parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn"),
datetime: str | None = None,
f: str | None = "covjson"
):
poly = wkt.loads(coords)
assert poly.geom_type == "Polygon"
range = get_datetime_range(datetime)
get_obs_request = dstore.GetObsRequest(
instruments=list(map(str.strip, parameter_name.split(","))),
inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]),
inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0])
for coord in poly.exterior.coords]),
interval=dstore.TimeInterval(start=range[0], end=range[1]) if range else None,
)
return get_data_for_time_series(get_obs_request)
coverages = get_obsrequest(get_obs_request)
coverages = edr_formatter[f](coverages) # will need to handle new format request
if len(coverages) == 0:
raise HTTPException(status_code=404, detail="No data found")
elif len(coverages) == 1:
return coverages[0]
else:
return CoverageCollection(
coverages=coverages, parameters=coverages[0].parameters
) # HACK to take parameters from first one

0 comments on commit 33e3be9

Please sign in to comment.