-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
309ffd5
commit d0f9633
Showing
3 changed files
with
137 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import importlib | ||
import pkgutil | ||
|
||
import formatters | ||
|
||
|
||
def get_EDR_formatters() -> dict: | ||
""" | ||
This method should grab all available formatters and make them reachable in a dict | ||
This way we can dynamicly grab all available formats and skip configuring this. | ||
Should aliases be made available, and how do one make formatters present in openapi doc? | ||
""" | ||
available_formatters = {} | ||
|
||
formatter_plugins = [importlib.import_module("formatters."+i.name) for i in pkgutil.iter_modules( | ||
formatters.__path__) if i.name != "base_formatter"] | ||
print(formatter_plugins) | ||
for formatter_module in formatter_plugins: | ||
# Make instance of formatter and save | ||
available_formatters[formatter_module.__name__.split(".")[-1]] = getattr( | ||
formatter_module, formatter_module.formatter_name) | ||
|
||
# Should also setup dict for alias discovery | ||
|
||
return available_formatters |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from abc import ABC, abstractmethod | ||
|
||
|
||
class EDR_formatter(ABC): | ||
""" | ||
This is the abstract class for implementing a formatter in the E-SOH EDR formatter | ||
Name of class should represent expected output format. | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def convert(self, datastore_reply): | ||
""" | ||
Main method for converting protobuf object to given format. | ||
""" | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import math | ||
|
||
from datetime import timezone | ||
from itertools import groupby | ||
|
||
from covjson_pydantic.coverage import Coverage | ||
from covjson_pydantic.coverage import CoverageCollection | ||
from covjson_pydantic.domain import Axes | ||
from covjson_pydantic.domain import Domain | ||
from covjson_pydantic.domain import DomainType | ||
from covjson_pydantic.domain import ValuesAxis | ||
from covjson_pydantic.ndarray import NdArray | ||
from covjson_pydantic.observed_property import ObservedProperty | ||
from covjson_pydantic.parameter import Parameter | ||
from covjson_pydantic.reference_system import ReferenceSystem | ||
from covjson_pydantic.reference_system import ReferenceSystemConnectionObject | ||
from covjson_pydantic.unit import Unit | ||
|
||
from pydantic import AwareDatetime | ||
|
||
from formatters.base_formatter import EDR_formatter | ||
|
||
|
||
# Requierd for pugin discovery | ||
formatter_name = "Covjson" | ||
|
||
|
||
class Covjson(EDR_formatter): | ||
""" | ||
Class for converting protobuf object to coverage json | ||
""" | ||
|
||
def __init__(self): | ||
self.alias = ["covjson", "coveragejson"] | ||
self.mime_type = "application/json" # find the type for covjson | ||
|
||
def convert(self, response): | ||
# Collect data | ||
coverages = [] | ||
data = [self._collect_data(md.ts_mdata, md.obs_mdata) for md in response.observations] | ||
|
||
# Need to sort before using groupBy. Also sort on param_id to get consistently sorted output | ||
data.sort(key=lambda x: (x[0], x[1])) | ||
# The multiple coverage logic is not needed for this endpoint, | ||
# but we want to share this code between endpoints | ||
for (lat, lon, times), group in groupby(data, lambda x: x[0]): | ||
referencing = [ | ||
ReferenceSystemConnectionObject( | ||
coordinates=["y", "x"], | ||
system=ReferenceSystem(type="GeographicCRS", | ||
id="http://www.opengis.net/def/crs/EPSG/0/4326"), | ||
), | ||
ReferenceSystemConnectionObject( | ||
coordinates=["z"], | ||
system=ReferenceSystem(type="TemporalRS", calendar="Gregorian"), | ||
), | ||
] | ||
domain = Domain( | ||
domainType=DomainType.point_series, | ||
axes=Axes( | ||
x=ValuesAxis[float](values=[lon]), | ||
y=ValuesAxis[float](values=[lat]), | ||
t=ValuesAxis[AwareDatetime](values=times), | ||
), | ||
referencing=referencing, | ||
) | ||
|
||
parameters = {} | ||
ranges = {} | ||
for (_, _, _), param_id, unit, values in group: | ||
if all(math.isnan(v) for v in values): | ||
continue # Drop ranges if completely nan. | ||
# TODO: Drop the whole coverage if it becomes empty? | ||
values_no_nan = [v if not math.isnan(v) else None for v in values] | ||
# TODO: Improve this based on "standard name", etc. | ||
parameters[param_id] = Parameter( | ||
observedProperty=ObservedProperty(label={"en": param_id}), unit=Unit(label={"en": unit}) | ||
) # TODO: Also fill symbol? | ||
ranges[param_id] = NdArray( | ||
values=values_no_nan, axisNames=["t", "y", "x"], shape=[len(values_no_nan), 1, 1] | ||
) | ||
|
||
coverages.append(Coverage(domain=domain, parameters=parameters, ranges=ranges)) | ||
return CoverageCollection(coverages=coverages) | ||
|
||
def _collect_data(ts_mdata, obs_mdata): | ||
lat = obs_mdata[0].geo_point.lat # HACK: For now assume they all have the same position | ||
lon = obs_mdata[0].geo_point.lon | ||
tuples = ( | ||
(o.obstime_instant.ToDatetime(tzinfo=timezone.utc), float(o.value)) for o in obs_mdata | ||
) # HACK: str -> float | ||
(times, values) = zip(*tuples) | ||
param_id = ts_mdata.instrument | ||
unit = ts_mdata.unit | ||
|
||
return (lat, lon, times), param_id, unit, values |