Skip to content

Commit

Permalink
Changed to async grpc requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Teddy-1000 committed Jan 18, 2024
1 parent d0f9633 commit 6c42e24
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 22 deletions.
12 changes: 11 additions & 1 deletion api/formatters/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pydantic import BaseModel, validator
import importlib
import pkgutil

Expand All @@ -21,5 +22,14 @@ def get_EDR_formatters() -> dict:
formatter_module, formatter_module.formatter_name)

# Should also setup dict for alias discovery
class edr_formatters_model(BaseModel):
f: str

return available_formatters
@validator("f")
def f_must_be_available_formatter(cls, f):
named_formatters = list(available_formatters.keys())
if f not in named_formatters:
raise ValueError(f"f must be a provided formatter, one of {named_formatters}")
return f

return available_formatters, edr_formatters_model
1 change: 1 addition & 0 deletions api/formatters/covjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@


# Requierd for pugin discovery
# Need to be available at top level of formatter plugin
formatter_name = "Covjson"


Expand Down
13 changes: 9 additions & 4 deletions api/grpc_getter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
import datastore_pb2_grpc as dstore_grpc


def get_obsrequest(get_obs_request):
with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel:
grpc_stub = dstore_grpc.DatastoreStub(channel)
response = grpc_stub.GetObservations(get_obs_request)
# Functions in this file should be made async,
# These functions should be the only components that are
# dependent on external services.

async def getObsRequest(get_obs_request):
channel = grpc.aio.insecure_channel(
f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}")
grpc_stub = dstore_grpc.DatastoreStub(channel)
response = await grpc_stub.GetObservations(get_obs_request)

return response
29 changes: 12 additions & 17 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
from shapely import wkt


from grpc_getter import get_obsrequest
from grpc_getter import getObsRequest

from formatter import get_EDR_formatters
import formatters

app = FastAPI()
app.add_middleware(BrotliMiddleware)

edr_formatter = get_EDR_formatters()
edr_formatter, edr_format_pydantic_model = formatters.get_EDR_formatters()


def get_datetime_range(datetime_string: str | None) -> Tuple[Timestamp, Timestamp] | None:
Expand Down Expand Up @@ -141,10 +141,11 @@ def get_locations(bbox: str = Query(..., example="5.0,52.0,6.0,52.1")) -> Featur
response_model=Coverage,
response_model_exclude_none=True,
)
def get_data_location_id(
async def get_data_location_id(
location_id: str = Path(..., example="06260"),
parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn"),
datetime: str | None = None,
f: edr_format_pydantic_model | None = "covjson"
):
# TODO: There is no error handling of any kind at the moment!
# This is just a quick and dirty demo
Expand All @@ -154,7 +155,8 @@ def get_data_location_id(
instruments=list(map(str.strip, parameter_name.split(","))),
interval=dstore.TimeInterval(start=range[0], end=range[1]) if range else None,
)
return get_data_for_time_series(get_obs_request)
response = await getObsRequest(get_obs_request)
return edr_formatter[f](response)


@app.get(
Expand All @@ -180,11 +182,11 @@ def get_data_position(
response_model=Coverage | CoverageCollection,
response_model_exclude_none=True,
)
def get_data_area(
async def get_data_area(
coords: str = Query(..., example="POLYGON((5.0 52.0, 6.0 52.0,6.0 52.1,5.0 52.1, 5.0 52.0))"),
parameter_name: str = Query(..., alias="parameter-name", example="dd,ff,rh,pp,tn"),
datetime: str | None = None,
f: str | None = "covjson"
f: str = Query(default="covjson", alias="file", example="covjson")
):
poly = wkt.loads(coords)
assert poly.geom_type == "Polygon"
Expand All @@ -195,13 +197,6 @@ def get_data_area(
for coord in poly.exterior.coords]),
interval=dstore.TimeInterval(start=range[0], end=range[1]) if range else None,
)
coverages = get_obsrequest(get_obs_request)
coverages = edr_formatter[f](coverages) # will need to handle new format request
if len(coverages) == 0:
raise HTTPException(status_code=404, detail="No data found")
elif len(coverages) == 1:
return coverages[0]
else:
return CoverageCollection(
coverages=coverages, parameters=coverages[0].parameters
) # HACK to take parameters from first one
coverages = await getObsRequest(get_obs_request)
coverages = edr_formatter[f](coverages)
return coverages

0 comments on commit 6c42e24

Please sign in to comment.