Skip to content

Commit

Permalink
Added json endpoint.request body in json format is verified using pyd…
Browse files Browse the repository at this point in the history
…atic models
  • Loading branch information
shamlymajeed committed Feb 19, 2024
1 parent fe3ab46 commit 2ceda16
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 19 deletions.
54 changes: 54 additions & 0 deletions ingest/src/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import io
from typing import Union

import xarray as xr
from esoh.ingest.main import IngestToPipeline
from fastapi import FastAPI
from fastapi import UploadFile
from pydantic import BaseModel
from model import JsonMessageSchema


class Item(BaseModel):
name: str
id: int
description: str | None = None


app = FastAPI()

# Define configuration parameters
mqtt_configuration = {
"broker_url": "mqtt://your_mqtt_broker",
"username": "your_username",
"password": "your_password",
# ... other MQTT configuration options
}

datastore_configuration = {
"host": "your_datastore_host",
"port": 1234,
"username": "your_datastore_username",
"password": "your_datastore_password",
# ... other datastore configuration options
}


@app.post("/uploadfile/")
async def create_upload_file(files: UploadFile, input_type: str):
ingester = IngestToPipeline(
mqtt_conf=mqtt_configuration, dstore_conn=datastore_configuration, uuid_prefix="uuid", testing=True
)
contents = await files.read()
ds = xr.open_dataset(io.BytesIO(contents))
response = ingester.ingest(ds, input_type)
return response


@app.post("/json")
async def post_json(request: JsonMessageSchema, input_type: str):
ingester = IngestToPipeline(
mqtt_conf=mqtt_configuration, dstore_conn=datastore_configuration, uuid_prefix="uuid", testing=True
)
response = ingester.ingest(request.dict(exclude_none=True), input_type)
return response
8 changes: 5 additions & 3 deletions ingest/src/esoh/ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import grpc
import pkg_resources
from typing import Union
from esoh.ingest.datastore import DatastoreConnection
from esoh.ingest.messages import messages
from esoh.ingest.send_mqtt import MQTTConnection
Expand Down Expand Up @@ -55,15 +56,15 @@ def __init__(
else:
self.mqtt = MQTTConnection(mqtt_conf["host"], mqtt_conf["topic"])

def ingest(self, message: [str, object], input_type: str = None):
def ingest(self, message: Union[str, object], input_type: str = None):
"""
Method designed to be main interaction point with this package.
Will interpret call all methods for deciding input type, build the mqtt messages, and
publish them.
"""
if not input_type:
input_type = self.decide_input_type(message)
input_type = self._decide_input_type(message)

self.publish_messages(self._build_messages(message, input_type))

Expand Down Expand Up @@ -99,7 +100,7 @@ def _decide_input_type(self, message) -> str:
logger.critical(f"Unknown filetype provided. Got {message.split('.')[-1]}")
raise ValueError(f"Unknown filetype provided. Got {message.split('.')[-1]}")

def _build_messages(self, message: [str, object], input_type: str = None) -> list:
def _build_messages(self, message:Union[str, object], input_type: str = None) -> list:
"""
Internal method for calling the message building.
"""
Expand All @@ -110,3 +111,4 @@ def _build_messages(self, message: [str, object], input_type: str = None) -> lis
logger.critical("Illegal usage, not allowed to input" + "objects without specifying input type")
raise TypeError("Illegal usage, not allowed to input" + "objects without specifying input type")
return messages(message, input_type, self.uuid_prefix, self.schema_path, self.schema_validator)

11 changes: 10 additions & 1 deletion ingest/src/esoh/ingest/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@
from esoh.ingest.bufr.create_mqtt_message_from_bufr import build_all_json_payloads_from_bufr
from esoh.ingest.netCDF.extract_metadata_netcdf import build_all_json_payloads_from_netcdf
from jsonschema import ValidationError
import json

logger = logging.getLogger(__name__)


def build_message(file: [object], input_type: str, uuid_prefix: str, schema_path: str, validator: object):
def build_message(file: object, input_type: str, uuid_prefix: str, schema_path: str, validator: object):
match input_type:
case "netCDF":
unfinished_messages = build_all_json_payloads_from_netcdf(file, schema_path=schema_path)
case "bufr":
unfinished_messages = build_all_json_payloads_from_bufr(file)
case "json":
unfinished_messages = []
unfinished_messages.append(file)


# Set message publication time in RFC3339 format
# Create UUID for the message, and state message format version
Expand Down Expand Up @@ -51,6 +56,10 @@ def messages(message, input_type, uuid_prefix, schema_path, validator):
return build_message(
message, input_type=input_type, uuid_prefix=uuid_prefix, schema_path=schema_path, validator=validator
)
elif input_type == "json":
return build_message(
message, input_type=input_type, uuid_prefix=uuid_prefix, schema_path=schema_path, validator=validator
)
if isinstance(message, str):
return load_files(message, input_type=input_type, uuid_prefix=uuid_prefix)
elif isinstance(message, xr.Dataset):
Expand Down
2 changes: 1 addition & 1 deletion ingest/src/esoh/ingest/netCDF/extract_metadata_netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_metadata_dict(ds: xr.Dataset, json_message_target: dict, sub_map: dict)

def populate_json_message(json_message_target: dict, netcdf_metadata: dict, current_sub_dict: dict) -> dict:
"""
This function contains the loop for actually assing values in to the json_message_target.
This function contains the loop for actually adding values in to the json_message_target.
Keyword arguents:
json_message_target (dict) -- This dict is where all extracted metadata is stored.
Expand Down
23 changes: 15 additions & 8 deletions ingest/src/esoh/schemas/e-soh-message-spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@
]
},
"coordinates": {
"type": "array",
"minItems": 2,
"items": {
"type": "number"
"lat":{
"type": "number",
"description": "Latitude"
},
"lon":{
"type": "number",
"description": "Latitude"
}

}
}
},
Expand All @@ -63,10 +67,13 @@
"type": "array",
"minItems": 4,
"items": {
"type": "array",
"minItems": 2,
"items": {
"type": "number"
"lat":{
"type": "number",
"description": "Latitude"
},
"lon":{
"type": "number",
"description": "Latitude"
}
}
}
Expand Down
169 changes: 169 additions & 0 deletions ingest/src/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@

from __future__ import annotations

from enum import Enum
from typing import List, Optional, Union, Dict, Any

import json
from pydantic import BaseModel, Field


class Type(Enum):
Feature = 'Feature'


class Type1(Enum):
Point = 'Point'


class Geometry(BaseModel):
type: Type1
coordinates: Coordinate


class Coordinate(BaseModel):
lat: float
lon: float


class Type2(Enum):
Polygon = 'Polygon'


class Geometry1(BaseModel):
type: Type2
coordinates: List[Coordinate] = Field(..., min_items=3)


class CreatorType(Enum):
person = 'person'
group = 'group'
institution = 'institution'
position = 'position'


class Properties(BaseModel):
data_id: str = Field(
...,
description='Unique identifier of the data as defined by the data producer.\nData producers SHOULD NOT use an opaque id, but something meaningful to support client side filtering.\n',
)
title: Optional[str] = Field(
None,
description='short phrase or sentence describing the dataset. In many discovery systems, the title will be displayed in the results list from a search, and therefore should be human readable and reasonable to display in a list of such names. This attribute is also recommended by the NetCDF Users Guide and the CF conventions.',
)
summary: str = Field(
...,
description='A paragraph describing the dataset, analogous to an abstract for a paper.',
)
keywords: Optional[str] = Field(
None,
description="A comma-separated list of key words and/or phrases. Keywords may be common words or phrases, terms from a controlled vocabulary (GCMD is often used), or URIs for terms from a controlled vocabulary (see also 'keywords_vocabulary' attribute).",
)
keywords_vocabulary: Optional[str] = Field(
...,
description="If you are using a controlled vocabulary for the words/phrases in your 'keywords' attribute, this is the unique name or identifier of the vocabulary from which keywords are taken. If more than one keyword vocabulary is used, each may be presented with a prefix and a following comma, so that keywords may optionally be prefixed with the controlled vocabulary key.",
)
license: str = Field(
...,
description="Provide the URL to a standard or specific license, enter 'Freely Distributed' or 'None', or describe any restrictions to data access and distribution in free text.",
)
Conventions: str = Field(
...,
description="A comma-separated list of the conventions that are followed by the dataset. For files that follow this version of ACDD, include the string 'ACDD-1.3'. (This attribute is described in the NetCDF Users Guide.)",
)
naming_authority: str = Field(
...,
description="The organization that provides the initial id (see above) for the dataset. The naming authority should be uniquely specified by this attribute. We recommend using reverse-DNS naming for the naming authority; URIs are also acceptable. Example: 'edu.ucar.unidata'.",
)
creator_type: Optional[CreatorType] = Field(
None,
description="Specifies type of creator with one of the following: 'person', 'group', 'institution', or 'position'. If this attribute is not specified, the creator is assumed to be a person.",
)
creator_name: Optional[str] = Field(
None,
description='The name of the person (or other creator type specified by the creator_type attribute) principally responsible for creating this data.',
)
creator_email: Optional[str] = Field(
None,
description='The email address of the person (or other creator type specified by the creator_type attribute) principally responsible for creating this data.',
)
creator_url: Optional[str] = Field(
None,
description='The URL of the person (or other creator type specified by the creator_type attribute) principally responsible for creating this data.',
)
institution: Optional[str] = Field(
None,
description='The name of the institution principally responsible for originating this data. This attribute is recommended by the CF convention.',
)
project: Optional[str] = Field(
None,
description="The name of the project(s) principally responsible for originating this data. Multiple projects can be separated by commas, as described under Attribute Content Guidelines. Examples: 'PATMOS-X', 'Extended Continental Shelf Project'.",
)
source: Optional[str] = Field(
None,
description='The method of production of the original data. If it was model-generated, source should name the model and its version. If it is observational, source should characterize it. This attribute is defined in the CF Conventions.',
)
platform: Optional[str] = Field(
None,
description='Name of the platform(s) that supported the sensor data used to create this data set or product. Platforms can be of any type, including satellite, ship, station, aircraft or other. Indicate controlled vocabulary used in platform_vocabulary.',
)
platform_vocabulary: Optional[str] = Field(
None,
description="Controlled vocabulary for the names used in the 'platform' attribute.",
)
instrument: Optional[str] = Field(
None,
description="Name of the contributing instrument(s) or sensor(s) used to create this data set or product. Indicate controlled vocabulary used in instrument_vocabulary.",
)
instrument_vocabulary: Optional[str] = Field(
None,
description="Controlled vocabulary for the names used in the 'instrument' attribute.",
)
history: Optional[str] = Field(
None,
description="Provides an audit trail for modifications to the original data. This attribute is also in the NetCDF Users Guide: 'This is a character array with a line for each invocation of a program that has modified the dataset. Well-behaved generic netCDF applications should append a line containing: date, time of day, user name, program name and command arguments.' To include a more complete description you can append a reference to an ISO Lineage entity; see NOAA EDM ISO Lineage guidance.",
)
datetime: str = Field(
...,
description='Identifies the date/time of the datas being published, in RFC3339 format.',
)
start_datetime: Optional[str] = Field(
None,
description='Identifies the start date/time date of the data being published, in RFC3339 format.',
)
end_datetime: Optional[str] = Field(
None,
description='Identifies the end date/time date of the data being published, in RFC3339 format.',
)
processing_level: Optional[str] = Field(
None,
description='A textual description of the processing (or quality control) level of the data.',
)



class Link(BaseModel):
href: str = Field(..., example='http://data.example.com/buildings/123')
rel: str = Field(..., example='alternate')
type: Optional[str] = Field(None, example='application/geo+json')
hreflang: Optional[str] = Field(None, example='en')
title: Optional[str] = Field(None, example='Trierer Strasse 70, 53115 Bonn')
length: Optional[int] = None


class JsonMessageSchema(BaseModel):
type: Type
geometry: Union[Geometry, Geometry1]
properties: Properties
links: List[Link] = Field(..., min_items=1)
version:str

def dict(self, *args, **kwargs) -> Dict[str, Any]:
d = super().dict(*args, **kwargs)
d['type'] = self.type.value
d['geometry']['type'] = self.geometry.type.value
if isinstance(self.geometry, Geometry):
d['geometry']['coordinates'] = self.geometry.coordinates.dict()
elif isinstance(self.geometry, Geometry1):
d['geometry']['coordinates'] = [coord.dict() for coord in self.geometry.coordinates]
return d
26 changes: 26 additions & 0 deletions ingest/test/test_data/test1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"properties": {
"title": "Air temperature observations from weather station Gullingen , ROGALAND, Norge, (WIGOS ID: 0-578-0-46220, station ID: 46220).",
"institution": "Norwegian Meteorological Institute (MET Norway)",
"Conventions": "CF-1.10, ACDD-1.3",
"history": "2023-06-14T14:38:30.363447+00:00: Created from Frost.",
"keywords": "GCMDSK:EARTH SCIENCE > ATMOSPHERE > ATMOSPHERIC TEMPERATURE > SURFACE TEMPERATURE > AIR TEMPERATURE, GCMDLOC:Geographic Region > Northern Hemisphere, GCMDPROV: Government Agencies-non-US > Norway > NO/MET > Norwegian Meteorological Institute, GEMET:Meteorological geographical features, GEMET:Atmospheric conditions, GEMET:Oceanographic geographical features, NORTHEMES:Weather and climate",
"summary": "Time series of Air temperature observations from the Norwegian weather station Gullingen (WIGOS ID 0-578-0-46220, station ID 46220). The observations have been through the data collection system of the Norwegian Meteorological institute which includes a number of automated and manual quality control routines. The number of available quality control routines is element dependent.",
"source": "In Situ Land-based station",
"creator_name": "Norwegian Meteorological Institute",
"creator_url": "https://www.met.no",
"creator_email": "[email protected]",
"license": "http://spdx.org/licenses/CC-BY-4.0(CC-BY-4.0)",
"access_constraint": "Open"
},
"links": {
"href": "https://data.met.no/dataset/8a0744b7-f0dc-4929-8700-f1285e76df36(Dataset landing page)"
},
"geometry": {
"type": "Point",
"coordinates": [
"59.412800"
]
},
"type": "Feature"
}
12 changes: 6 additions & 6 deletions ingest/test/test_data/test_payload.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
],
"geometry": {
"type": "Point",
"coordinates": [
70.9394,
-8.669
]
"coordinates": {
"lat":"70.9394",
"lon":"-8.669"
}

},
"type": "Feature",
"version": "v4.0",
"id": "urn:x-wmo:md:norway:no.met:bb8c10a1-ba20-4676-a193-1e1929de9d13"
"version": "v4.0"
}

0 comments on commit 2ceda16

Please sign in to comment.