diff --git a/ingest/src/app.py b/ingest/src/app.py new file mode 100644 index 00000000..524a7441 --- /dev/null +++ b/ingest/src/app.py @@ -0,0 +1,54 @@ +import io +from typing import Union + +import xarray as xr +from esoh.ingest.main import IngestToPipeline +from fastapi import FastAPI +from fastapi import UploadFile +from pydantic import BaseModel +from model import JsonMessageSchema + + +class Item(BaseModel): + name: str + id: int + description: str | None = None + + +app = FastAPI() + +# Define configuration parameters +mqtt_configuration = { + "broker_url": "mqtt://your_mqtt_broker", + "username": "your_username", + "password": "your_password", + # ... other MQTT configuration options +} + +datastore_configuration = { + "host": "your_datastore_host", + "port": 1234, + "username": "your_datastore_username", + "password": "your_datastore_password", + # ... other datastore configuration options +} + + +@app.post("/uploadfile/") +async def create_upload_file(files: UploadFile, input_type: str): + ingester = IngestToPipeline( + mqtt_conf=mqtt_configuration, dstore_conn=datastore_configuration, uuid_prefix="uuid", testing=True + ) + contents = await files.read() + ds = xr.open_dataset(io.BytesIO(contents)) + response = ingester.ingest(ds, input_type) + return response + + +@app.post("/json") +async def post_json(request: JsonMessageSchema, input_type: str): + ingester = IngestToPipeline( + mqtt_conf=mqtt_configuration, dstore_conn=datastore_configuration, uuid_prefix="uuid", testing=True + ) + response = ingester.ingest(request.dict(exclude_none=True), input_type) + return response diff --git a/ingest/src/esoh/ingest/main.py b/ingest/src/esoh/ingest/main.py index 5ae94786..8c08364e 100644 --- a/ingest/src/esoh/ingest/main.py +++ b/ingest/src/esoh/ingest/main.py @@ -5,6 +5,7 @@ import grpc import pkg_resources +from typing import Union from esoh.ingest.datastore import DatastoreConnection from esoh.ingest.messages import messages from esoh.ingest.send_mqtt import MQTTConnection @@ -55,7 +56,7 @@ def __init__( else: self.mqtt = MQTTConnection(mqtt_conf["host"], mqtt_conf["topic"]) - def ingest(self, message: [str, object], input_type: str = None): + def ingest(self, message: Union[str, object], input_type: str = None): """ Method designed to be main interaction point with this package. Will interpret call all methods for deciding input type, build the mqtt messages, and @@ -63,7 +64,7 @@ def ingest(self, message: [str, object], input_type: str = None): """ if not input_type: - input_type = self.decide_input_type(message) + input_type = self._decide_input_type(message) self.publish_messages(self._build_messages(message, input_type)) @@ -99,7 +100,7 @@ def _decide_input_type(self, message) -> str: logger.critical(f"Unknown filetype provided. Got {message.split('.')[-1]}") raise ValueError(f"Unknown filetype provided. Got {message.split('.')[-1]}") - def _build_messages(self, message: [str, object], input_type: str = None) -> list: + def _build_messages(self, message:Union[str, object], input_type: str = None) -> list: """ Internal method for calling the message building. """ @@ -110,3 +111,4 @@ def _build_messages(self, message: [str, object], input_type: str = None) -> lis logger.critical("Illegal usage, not allowed to input" + "objects without specifying input type") raise TypeError("Illegal usage, not allowed to input" + "objects without specifying input type") return messages(message, input_type, self.uuid_prefix, self.schema_path, self.schema_validator) + diff --git a/ingest/src/esoh/ingest/messages.py b/ingest/src/esoh/ingest/messages.py index a7eade61..8f5553a8 100644 --- a/ingest/src/esoh/ingest/messages.py +++ b/ingest/src/esoh/ingest/messages.py @@ -7,16 +7,21 @@ from esoh.ingest.bufr.create_mqtt_message_from_bufr import build_all_json_payloads_from_bufr from esoh.ingest.netCDF.extract_metadata_netcdf import build_all_json_payloads_from_netcdf from jsonschema import ValidationError +import json logger = logging.getLogger(__name__) -def build_message(file: [object], input_type: str, uuid_prefix: str, schema_path: str, validator: object): +def build_message(file: object, input_type: str, uuid_prefix: str, schema_path: str, validator: object): match input_type: case "netCDF": unfinished_messages = build_all_json_payloads_from_netcdf(file, schema_path=schema_path) case "bufr": unfinished_messages = build_all_json_payloads_from_bufr(file) + case "json": + unfinished_messages = [] + unfinished_messages.append(file) + # Set message publication time in RFC3339 format # Create UUID for the message, and state message format version @@ -51,6 +56,10 @@ def messages(message, input_type, uuid_prefix, schema_path, validator): return build_message( message, input_type=input_type, uuid_prefix=uuid_prefix, schema_path=schema_path, validator=validator ) + elif input_type == "json": + return build_message( + message, input_type=input_type, uuid_prefix=uuid_prefix, schema_path=schema_path, validator=validator + ) if isinstance(message, str): return load_files(message, input_type=input_type, uuid_prefix=uuid_prefix) elif isinstance(message, xr.Dataset): diff --git a/ingest/src/esoh/ingest/netCDF/extract_metadata_netcdf.py b/ingest/src/esoh/ingest/netCDF/extract_metadata_netcdf.py index 60951c97..cb1d5e55 100644 --- a/ingest/src/esoh/ingest/netCDF/extract_metadata_netcdf.py +++ b/ingest/src/esoh/ingest/netCDF/extract_metadata_netcdf.py @@ -63,7 +63,7 @@ def get_metadata_dict(ds: xr.Dataset, json_message_target: dict, sub_map: dict) def populate_json_message(json_message_target: dict, netcdf_metadata: dict, current_sub_dict: dict) -> dict: """ - This function contains the loop for actually assing values in to the json_message_target. + This function contains the loop for actually adding values in to the json_message_target. Keyword arguents: json_message_target (dict) -- This dict is where all extracted metadata is stored. diff --git a/ingest/src/esoh/schemas/e-soh-message-spec.json b/ingest/src/esoh/schemas/e-soh-message-spec.json index e80a9e22..9133e385 100644 --- a/ingest/src/esoh/schemas/e-soh-message-spec.json +++ b/ingest/src/esoh/schemas/e-soh-message-spec.json @@ -36,11 +36,15 @@ ] }, "coordinates": { - "type": "array", - "minItems": 2, - "items": { - "type": "number" + "lat":{ + "type": "number", + "description": "Latitude" + }, + "lon":{ + "type": "number", + "description": "Latitude" } + } } }, @@ -63,10 +67,13 @@ "type": "array", "minItems": 4, "items": { - "type": "array", - "minItems": 2, - "items": { - "type": "number" + "lat":{ + "type": "number", + "description": "Latitude" + }, + "lon":{ + "type": "number", + "description": "Latitude" } } } diff --git a/ingest/src/model.py b/ingest/src/model.py new file mode 100644 index 00000000..ed9dd44b --- /dev/null +++ b/ingest/src/model.py @@ -0,0 +1,169 @@ + +from __future__ import annotations + +from enum import Enum +from typing import List, Optional, Union, Dict, Any + +import json +from pydantic import BaseModel, Field + + +class Type(Enum): + Feature = 'Feature' + + +class Type1(Enum): + Point = 'Point' + + +class Geometry(BaseModel): + type: Type1 + coordinates: Coordinate + + +class Coordinate(BaseModel): + lat: float + lon: float + + +class Type2(Enum): + Polygon = 'Polygon' + + +class Geometry1(BaseModel): + type: Type2 + coordinates: List[Coordinate] = Field(..., min_items=3) + + +class CreatorType(Enum): + person = 'person' + group = 'group' + institution = 'institution' + position = 'position' + + +class Properties(BaseModel): + data_id: str = Field( + ..., + description='Unique identifier of the data as defined by the data producer.\nData producers SHOULD NOT use an opaque id, but something meaningful to support client side filtering.\n', + ) + title: Optional[str] = Field( + None, + description='short phrase or sentence describing the dataset. In many discovery systems, the title will be displayed in the results list from a search, and therefore should be human readable and reasonable to display in a list of such names. This attribute is also recommended by the NetCDF Users Guide and the CF conventions.', + ) + summary: str = Field( + ..., + description='A paragraph describing the dataset, analogous to an abstract for a paper.', + ) + keywords: Optional[str] = Field( + None, + description="A comma-separated list of key words and/or phrases. Keywords may be common words or phrases, terms from a controlled vocabulary (GCMD is often used), or URIs for terms from a controlled vocabulary (see also 'keywords_vocabulary' attribute).", + ) + keywords_vocabulary: Optional[str] = Field( + ..., + description="If you are using a controlled vocabulary for the words/phrases in your 'keywords' attribute, this is the unique name or identifier of the vocabulary from which keywords are taken. If more than one keyword vocabulary is used, each may be presented with a prefix and a following comma, so that keywords may optionally be prefixed with the controlled vocabulary key.", + ) + license: str = Field( + ..., + description="Provide the URL to a standard or specific license, enter 'Freely Distributed' or 'None', or describe any restrictions to data access and distribution in free text.", + ) + Conventions: str = Field( + ..., + description="A comma-separated list of the conventions that are followed by the dataset. For files that follow this version of ACDD, include the string 'ACDD-1.3'. (This attribute is described in the NetCDF Users Guide.)", + ) + naming_authority: str = Field( + ..., + description="The organization that provides the initial id (see above) for the dataset. The naming authority should be uniquely specified by this attribute. We recommend using reverse-DNS naming for the naming authority; URIs are also acceptable. Example: 'edu.ucar.unidata'.", + ) + creator_type: Optional[CreatorType] = Field( + None, + description="Specifies type of creator with one of the following: 'person', 'group', 'institution', or 'position'. If this attribute is not specified, the creator is assumed to be a person.", + ) + creator_name: Optional[str] = Field( + None, + description='The name of the person (or other creator type specified by the creator_type attribute) principally responsible for creating this data.', + ) + creator_email: Optional[str] = Field( + None, + description='The email address of the person (or other creator type specified by the creator_type attribute) principally responsible for creating this data.', + ) + creator_url: Optional[str] = Field( + None, + description='The URL of the person (or other creator type specified by the creator_type attribute) principally responsible for creating this data.', + ) + institution: Optional[str] = Field( + None, + description='The name of the institution principally responsible for originating this data. This attribute is recommended by the CF convention.', + ) + project: Optional[str] = Field( + None, + description="The name of the project(s) principally responsible for originating this data. Multiple projects can be separated by commas, as described under Attribute Content Guidelines. Examples: 'PATMOS-X', 'Extended Continental Shelf Project'.", + ) + source: Optional[str] = Field( + None, + description='The method of production of the original data. If it was model-generated, source should name the model and its version. If it is observational, source should characterize it. This attribute is defined in the CF Conventions.', + ) + platform: Optional[str] = Field( + None, + description='Name of the platform(s) that supported the sensor data used to create this data set or product. Platforms can be of any type, including satellite, ship, station, aircraft or other. Indicate controlled vocabulary used in platform_vocabulary.', + ) + platform_vocabulary: Optional[str] = Field( + None, + description="Controlled vocabulary for the names used in the 'platform' attribute.", + ) + instrument: Optional[str] = Field( + None, + description="Name of the contributing instrument(s) or sensor(s) used to create this data set or product. Indicate controlled vocabulary used in instrument_vocabulary.", + ) + instrument_vocabulary: Optional[str] = Field( + None, + description="Controlled vocabulary for the names used in the 'instrument' attribute.", + ) + history: Optional[str] = Field( + None, + description="Provides an audit trail for modifications to the original data. This attribute is also in the NetCDF Users Guide: 'This is a character array with a line for each invocation of a program that has modified the dataset. Well-behaved generic netCDF applications should append a line containing: date, time of day, user name, program name and command arguments.' To include a more complete description you can append a reference to an ISO Lineage entity; see NOAA EDM ISO Lineage guidance.", + ) + datetime: str = Field( + ..., + description='Identifies the date/time of the datas being published, in RFC3339 format.', + ) + start_datetime: Optional[str] = Field( + None, + description='Identifies the start date/time date of the data being published, in RFC3339 format.', + ) + end_datetime: Optional[str] = Field( + None, + description='Identifies the end date/time date of the data being published, in RFC3339 format.', + ) + processing_level: Optional[str] = Field( + None, + description='A textual description of the processing (or quality control) level of the data.', + ) + + + +class Link(BaseModel): + href: str = Field(..., example='http://data.example.com/buildings/123') + rel: str = Field(..., example='alternate') + type: Optional[str] = Field(None, example='application/geo+json') + hreflang: Optional[str] = Field(None, example='en') + title: Optional[str] = Field(None, example='Trierer Strasse 70, 53115 Bonn') + length: Optional[int] = None + + +class JsonMessageSchema(BaseModel): + type: Type + geometry: Union[Geometry, Geometry1] + properties: Properties + links: List[Link] = Field(..., min_items=1) + version:str + + def dict(self, *args, **kwargs) -> Dict[str, Any]: + d = super().dict(*args, **kwargs) + d['type'] = self.type.value + d['geometry']['type'] = self.geometry.type.value + if isinstance(self.geometry, Geometry): + d['geometry']['coordinates'] = self.geometry.coordinates.dict() + elif isinstance(self.geometry, Geometry1): + d['geometry']['coordinates'] = [coord.dict() for coord in self.geometry.coordinates] + return d diff --git a/ingest/test/test_data/test1.json b/ingest/test/test_data/test1.json new file mode 100644 index 00000000..76fc62a0 --- /dev/null +++ b/ingest/test/test_data/test1.json @@ -0,0 +1,26 @@ +{ + "properties": { + "title": "Air temperature observations from weather station Gullingen , ROGALAND, Norge, (WIGOS ID: 0-578-0-46220, station ID: 46220).", + "institution": "Norwegian Meteorological Institute (MET Norway)", + "Conventions": "CF-1.10, ACDD-1.3", + "history": "2023-06-14T14:38:30.363447+00:00: Created from Frost.", + "keywords": "GCMDSK:EARTH SCIENCE > ATMOSPHERE > ATMOSPHERIC TEMPERATURE > SURFACE TEMPERATURE > AIR TEMPERATURE, GCMDLOC:Geographic Region > Northern Hemisphere, GCMDPROV: Government Agencies-non-US > Norway > NO/MET > Norwegian Meteorological Institute, GEMET:Meteorological geographical features, GEMET:Atmospheric conditions, GEMET:Oceanographic geographical features, NORTHEMES:Weather and climate", + "summary": "Time series of Air temperature observations from the Norwegian weather station Gullingen (WIGOS ID 0-578-0-46220, station ID 46220). The observations have been through the data collection system of the Norwegian Meteorological institute which includes a number of automated and manual quality control routines. The number of available quality control routines is element dependent.", + "source": "In Situ Land-based station", + "creator_name": "Norwegian Meteorological Institute", + "creator_url": "https://www.met.no", + "creator_email": "observation_data_archive@met.no", + "license": "http://spdx.org/licenses/CC-BY-4.0(CC-BY-4.0)", + "access_constraint": "Open" + }, + "links": { + "href": "https://data.met.no/dataset/8a0744b7-f0dc-4929-8700-f1285e76df36(Dataset landing page)" + }, + "geometry": { + "type": "Point", + "coordinates": [ + "59.412800" + ] + }, + "type": "Feature" +} \ No newline at end of file diff --git a/ingest/test/test_data/test_payload.json b/ingest/test/test_data/test_payload.json index d4d164bf..daa8da50 100644 --- a/ingest/test/test_data/test_payload.json +++ b/ingest/test/test_data/test_payload.json @@ -35,12 +35,12 @@ ], "geometry": { "type": "Point", - "coordinates": [ - 70.9394, - -8.669 - ] + "coordinates": { + "lat":"70.9394", + "lon":"-8.669" + } + }, "type": "Feature", - "version": "v4.0", - "id": "urn:x-wmo:md:norway:no.met:bb8c10a1-ba20-4676-a193-1e1929de9d13" + "version": "v4.0" }