diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cb6ad43e..f9c6b8e2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -126,6 +126,7 @@ jobs: pip install ./ingest cd ./ingest && python3 api/generate_standard_name.py + - name: Copy Protobuf file to api directory and build run: | mkdir ./ingest/protobuf @@ -135,7 +136,8 @@ jobs: - name: Run Tests run: | cd ingest - python -m pytest -v --timeout=60 + mkdir -p /tmp/metrics + PROMETHEUS_MULTIPROC_DIR=/tmp/metrics python -m pytest -v --timeout=60 publish-test-results: needs: test-datastore diff --git a/api/Dockerfile b/api/Dockerfile index 7be5016c..840e9bfd 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -28,6 +28,9 @@ RUN python -m grpc_tools.protoc \ COPY "." "${DOCKER_PATH}/" +# Create folder for metrics +ENV PROMETHEUS_MULTIPROC_DIR=/tmp/metrics +RUN mkdir -p /tmp/metrics WORKDIR "${DOCKER_PATH}" CMD ["gunicorn", "main:app", "--worker-class=uvicorn.workers.UvicornWorker"] diff --git a/api/dev_requirements.txt b/api/dev_requirements.txt index 53a9b083..06df5bfb 100644 --- a/api/dev_requirements.txt +++ b/api/dev_requirements.txt @@ -40,13 +40,13 @@ edr-pydantic==0.4.0 # via -r requirements.txt fastapi==0.110.3 # via -r requirements.txt -geojson-pydantic==1.1.0 +geojson-pydantic==1.1.1 # via -r requirements.txt -grpcio==1.66.0 +grpcio==1.66.1 # via # -r requirements.txt # grpcio-tools -grpcio-tools==1.66.0 +grpcio-tools==1.66.1 # via -r requirements.txt gunicorn==22.0.0 # via -r requirements.txt @@ -63,7 +63,7 @@ httptools==0.6.1 # uvicorn httpx==0.27.0 # via -r dev_requirements.in -idna==3.7 +idna==3.10 # via # -r requirements.txt # anyio @@ -78,7 +78,7 @@ markupsafe==2.1.5 # via # -r requirements.txt # jinja2 -numpy==2.1.0 +numpy==2.1.1 # via # -r requirements.txt # shapely @@ -91,18 +91,24 @@ packaging==24.1 # pytest pluggy==1.5.0 # via pytest -protobuf==5.27.3 +prometheus-client==0.20.0 + # via + # -r requirements.txt + # prometheus-fastapi-instrumentator +prometheus-fastapi-instrumentator==7.0.0 + # via -r requirements.txt +protobuf==5.28.1 # via # -r requirements.txt # grpcio-tools -pydantic==2.8.2 +pydantic==2.9.1 # via # -r requirements.txt # covjson-pydantic # edr-pydantic # fastapi # geojson-pydantic -pydantic-core==2.20.1 +pydantic-core==2.23.3 # via # -r requirements.txt # pydantic @@ -139,6 +145,7 @@ starlette==0.37.2 # -r requirements.txt # brotli-asgi # fastapi + # prometheus-fastapi-instrumentator typing-extensions==4.12.2 # via # -r requirements.txt @@ -151,11 +158,11 @@ uvloop==0.20.0 # via # -r requirements.txt # uvicorn -watchfiles==0.23.0 +watchfiles==0.24.0 # via # -r requirements.txt # uvicorn -websockets==13.0 +websockets==13.0.1 # via # -r requirements.txt # uvicorn diff --git a/api/export_metrics.py b/api/export_metrics.py new file mode 100644 index 00000000..542c40ca --- /dev/null +++ b/api/export_metrics.py @@ -0,0 +1,13 @@ +from fastapi import FastAPI +from prometheus_client import CollectorRegistry +from prometheus_client import multiprocess +from prometheus_fastapi_instrumentator import Instrumentator + + +def add_metrics(app: FastAPI): + registry = CollectorRegistry() + multiprocess.MultiProcessCollector(registry) + + instrumentator = Instrumentator(excluded_handlers=[".*admin.*", "/metrics", "/health"]) + instrumentator.add() + instrumentator.instrument(app).expose(app, include_in_schema=False) diff --git a/api/formatters/geojson.py b/api/formatters/geojson.py index 39ebbf1a..bce49b51 100644 --- a/api/formatters/geojson.py +++ b/api/formatters/geojson.py @@ -3,7 +3,9 @@ from geojson_pydantic import FeatureCollection from geojson_pydantic import Point -from utilities import seconds_to_iso_8601_duration, convert_cm_to_m +from utilities import seconds_to_iso_8601_duration +from utilities import convert_cm_to_m + def _make_properties(ts): diff --git a/api/main.py b/api/main.py index 49328c61..25490db4 100644 --- a/api/main.py +++ b/api/main.py @@ -14,6 +14,8 @@ from routers import feature from utilities import create_url_from_request +from export_metrics import add_metrics + def setup_logging(): logger = logging.getLogger() @@ -30,6 +32,7 @@ def setup_logging(): app = FastAPI(swagger_ui_parameters={"tryItOutEnabled": True}) app.add_middleware(BrotliMiddleware) +add_metrics(app) @app.get( diff --git a/api/requirements.in b/api/requirements.in index a17a8bdd..c882648c 100644 --- a/api/requirements.in +++ b/api/requirements.in @@ -15,3 +15,4 @@ geojson-pydantic~=1.0 aiocached~=0.3.0 jinja2~=3.1 isodate~=0.6.1 +prometheus-fastapi-instrumentator~=7.0.0 diff --git a/api/requirements.txt b/api/requirements.txt index 2137bfca..e6b38f35 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.11 # by the following command: # -# pip-compile --no-emit-index-url +# pip-compile --no-emit-index-url --output-file=requirements.txt # aiocached==0.3 # via -r requirements.in @@ -24,11 +24,11 @@ edr-pydantic==0.4.0 # via -r requirements.in fastapi==0.110.3 # via -r requirements.in -geojson-pydantic==1.1.0 +geojson-pydantic==1.1.1 # via -r requirements.in -grpcio==1.66.0 +grpcio==1.66.1 # via grpcio-tools -grpcio-tools==1.66.0 +grpcio-tools==1.66.1 # via -r requirements.in gunicorn==22.0.0 # via -r requirements.in @@ -36,7 +36,7 @@ h11==0.14.0 # via uvicorn httptools==0.6.1 # via uvicorn -idna==3.7 +idna==3.10 # via anyio isodate==0.6.1 # via -r requirements.in @@ -44,19 +44,23 @@ jinja2==3.1.4 # via -r requirements.in markupsafe==2.1.5 # via jinja2 -numpy==2.1.0 +numpy==2.1.1 # via shapely packaging==24.1 # via gunicorn -protobuf==5.27.3 +prometheus-client==0.20.0 + # via prometheus-fastapi-instrumentator +prometheus-fastapi-instrumentator==7.0.0 + # via -r requirements.in +protobuf==5.28.1 # via grpcio-tools -pydantic==2.8.2 +pydantic==2.9.1 # via # covjson-pydantic # edr-pydantic # fastapi # geojson-pydantic -pydantic-core==2.20.1 +pydantic-core==2.23.3 # via pydantic python-dotenv==1.0.1 # via uvicorn @@ -72,6 +76,7 @@ starlette==0.37.2 # via # brotli-asgi # fastapi + # prometheus-fastapi-instrumentator typing-extensions==4.12.2 # via # fastapi @@ -81,9 +86,9 @@ uvicorn[standard]==0.29.0 # via -r requirements.in uvloop==0.20.0 # via uvicorn -watchfiles==0.23.0 +watchfiles==0.24.0 # via uvicorn -websockets==13.0 +websockets==13.0.1 # via uvicorn # The following packages are considered to be unsafe in a requirements file: diff --git a/api/templates/dataset_metadata_template.j2 b/api/templates/dataset_metadata_template.j2 index 85f12bab..e2cd47c9 100644 --- a/api/templates/dataset_metadata_template.j2 +++ b/api/templates/dataset_metadata_template.j2 @@ -1,5 +1,5 @@ { - "id": "what is E-SOH data set ID?", + "id": "urn:wmo:md:eu-eumetnet:weather.observations.swob-realtime", "conformsTo": ["http://wis.wmo.int/spec/wcmp/2/conf/core"], "type": "Feature", "geometry": { @@ -11,26 +11,18 @@ "resolution": "PT10M" }, "properties": { - "title": "Additional and sub-hourly observations from European area, including synoptic observations distributed over GTS", - "description": "collection of observations from weather stations situated in European countries. Data from synoptic stations and additional stations from 3rd party station holders is distributed with best possible time resolution. All parameters resolved in the CF-standards could be distributed. The number of parameters differs pr station. Observations from the last 24hours is available for download from EDR-api. Timestamp for each observations is based on observation time, or the end of a aggregation period. Depending om the method used to produce the parameter. All timestamps is given as UTC. Timeliness of the data depends on when it is distributed from the NMS. Timeliness within the system is 1 minute after reception time at the E-soh ingestor. Observations in E-soh is initial received at one of the NWS'es. The NMS is responsible for checking the quality of the observations. The NMS is also responsible for collection and distribution of metadata compliant to E-soh and WIS-2.0. E-soh also include all global observations distributed over the Global Transition System (GTS) that is considered as open and free for use. E-soh aims to serve all data according to FAIR principles. For questions or technical documentation, you can go to https://?????????", + "title": "Land surface weather observations", + "description": "Land surface observations measured at automatic and manual stations of EUMETNET Members (last 24 hours)", "themes": [ { - "concepts": [{"id": "meteorology"}], - "scheme": "http://wis.wmo.int/2012/codelists/WMOCodeLists#WMO_CategoryCode" + "concepts": [{"id": "weather"}], + "scheme": "https://codes.wmo.int/wis/topic-hierarchy/earth-system-discipline" }, { "concepts": [ - {"id": "Surface observations"} + {"id": "surface-based-observations"} ], - "scheme": "https://github.com/wmo-im/topic-hierarchy/earth-system-discipline/weather/surface-based-observations/index.csv" - }, - { - "concepts": [{"id": "weather"}], - "scheme": "https://github.com/wmo-im/wis2-topic-hierarchy/blob/main/topic-hierarchy/earth-system-discipline/index.csv" - }, - { - "concepts": [{"id": "continual"}], - "scheme": "https://standards.iso.org/iso/19139/resources/gmxCodelists.xml#MD_FrequencyCode" + "scheme": "https://codes.wmo.int/wis/topic-hierarchy/earth-system-discipline/_weather" } ] }, @@ -59,10 +51,16 @@ "title": "E-SOH API documentation", "type": "application/json" }, + { + "rel": "about", + "href": "https://www.eumetnet.eu/wp-content/uploads/2018/03/List-of-current-Members-as-pdf.pdf", + "title": "EUMETNET Members", + "type": "text/html" + }, { "rel": "license", - "href": "need to agree on a license", - "title": "need to agree on license", + "href": "https://creativecommons.org/licenses/by/4.0/", + "title": "Creative Commons BY 4.0 licence", "type": "text/html" } ] diff --git a/api/unit.Dockerfile b/api/unit.Dockerfile index f80573b8..29426e68 100644 --- a/api/unit.Dockerfile +++ b/api/unit.Dockerfile @@ -29,13 +29,17 @@ RUN python -m grpc_tools.protoc \ COPY "." "${DOCKER_PATH}/" +# Create folder for metrics +ENV PROMETHEUS_MULTIPROC_DIR=/tmp/metrics +RUN mkdir -p /tmp/metrics + WORKDIR "${DOCKER_PATH}" CMD ["/bin/sh", "-c", "{ python -m pytest \ - --timeout=60 \ - --junitxml=./output/pytest.xml \ - --cov-report=term-missing \ - --cov=. \ - --cov-config=./test/.coveragerc 2>&1; \ - echo $? > ./output/exit-code; } | \ - tee ./output/pytest-coverage.txt; \ - exit $(cat ./output/exit-code)"] + --timeout=60 \ + --junitxml=./output/pytest.xml \ + --cov-report=term-missing \ + --cov=. \ + --cov-config=./test/.coveragerc 2>&1; \ + echo $? > ./output/exit-code; } | \ + tee ./output/pytest-coverage.txt; \ + exit $(cat ./output/exit-code)"] diff --git a/datastore/migrate/data/migrations/1726758806_obstime_index.down.sql b/datastore/migrate/data/migrations/1726758806_obstime_index.down.sql new file mode 100644 index 00000000..476e91e3 --- /dev/null +++ b/datastore/migrate/data/migrations/1726758806_obstime_index.down.sql @@ -0,0 +1 @@ +DROP INDEX CONCURRENTLY observation_obstim_instant; diff --git a/datastore/migrate/data/migrations/1726758806_obstime_index.up.sql b/datastore/migrate/data/migrations/1726758806_obstime_index.up.sql new file mode 100644 index 00000000..d3b7425f --- /dev/null +++ b/datastore/migrate/data/migrations/1726758806_obstime_index.up.sql @@ -0,0 +1 @@ +CREATE INDEX CONCURRENTLY observation_obstime_instant ON observation (obstime_instant); diff --git a/docker-compose.yml b/docker-compose.yml index 52231289..3e970870 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,7 @@ services: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=mysecretpassword - POSTGRES_DB=data + shm_size: 312m restart: on-failure healthcheck: # HACK Due to the installation of Postgis extension the database is restarted, the healthcheck checks if the database is up for longer than specified time. diff --git a/ingest/Dockerfile b/ingest/Dockerfile index a4519d72..470d24f9 100644 --- a/ingest/Dockerfile +++ b/ingest/Dockerfile @@ -40,5 +40,9 @@ RUN python "api/generate_standard_name.py" # hadolint ignore=DL3013 RUN pip install --no-cache-dir --upgrade pip \ - && pip install --no-cache-dir --upgrade "${DOCKER_PATH}/" + && pip install --no-cache-dir --upgrade "${DOCKER_PATH}/" \ + && mkdir -p /tmp/metrics + +ENV PROMETHEUS_MULTIPROC_DIR=/tmp/metrics + CMD ["gunicorn", "api.main:app", "--worker-class=uvicorn.workers.UvicornWorker"] diff --git a/ingest/README.md b/ingest/README.md index 0d0cd2b3..f1542a9c 100644 --- a/ingest/README.md +++ b/ingest/README.md @@ -1,13 +1,18 @@ # e-soh-event-queue ## Enviornment variables -| Variable | Type | Default | Explenation | -| ---------| ---- | ------- | ----------- | -| MQTT_HOST | String | None | Set host for MQTT broker | -| MQTT_USERNAME | String | None | Set username for MQTT | -| MQTT_PASSWORD | String | None | Set password for MQTT | -| MQTT_TLS | Bool | False | Enable TLS for MQTT connection | -| MQTT_PORT | Int | 8883 | Set port for MQTT broker | -| FASTAPI_ROOT_PATH | String | "" | If this api is behind proxy, this need to be set to the root path | + +| Variable | Default Value | Description | +|----------------------------|----------------------------|-----------------------------------------------------------------------------| +| `DSHOST` | `store` | Host address for the data store. Defaults to `store` if not set. | +| `DSPORT` | `50050` | Port for the data store connection. Defaults to `50050` if not set. | +| `MQTT_HOST` | | Host address for the MQTT broker. | +| `MQTT_USERNAME` | | Username for authentication with the MQTT broker. | +| `MQTT_PASSWORD` | | Password for authentication with the MQTT broker. | +| `MQTT_TLS` | `True` | Whether to use TLS (True/False) for the MQTT connection. Defaults to `True`.| +| `PROMETHEUS_MULTIPROC_DIR` | `/tmp/metrics` | Directory for Prometheus multiprocess mode metrics. Defaults to `/tmp/metrics`. | +| `INGEST_LOGLEVEL` | | Logging level for the ingestion process. | +| `GUNICORN_CMD_ARGS` | | Command-line arguments for configuring Gunicorn, a Python WSGI HTTP Server. | +| `FASTAPI_ROOT_PATH` | | If this api is behind proxy, this need to be set to the root path | ## Dev install diff --git a/ingest/api/api_metrics.py b/ingest/api/api_metrics.py new file mode 100644 index 00000000..ed98971d --- /dev/null +++ b/ingest/api/api_metrics.py @@ -0,0 +1,12 @@ +from fastapi import FastAPI +from prometheus_client import CollectorRegistry +from prometheus_client import multiprocess +from prometheus_fastapi_instrumentator import Instrumentator + + +def add_metrics(app: FastAPI): + registry = CollectorRegistry() + multiprocess.MultiProcessCollector(registry) + + instrumentator = Instrumentator() + instrumentator.instrument(app).expose(app, include_in_schema=False) diff --git a/ingest/api/datastore.py b/ingest/api/datastore.py index c6c90ba0..442f046e 100644 --- a/ingest/api/datastore.py +++ b/ingest/api/datastore.py @@ -35,7 +35,6 @@ def build_grpc_messages(msg: str) -> None: function = ts_metadata.function standard_name = ts_metadata.standard_name parameter_name = hashlib.md5(":".join([standard_name, level, function, period]).encode()).hexdigest() - setattr(ts_metadata, "parameter_name", parameter_name) observation_data = dstore.ObsMetadata() diff --git a/ingest/api/ingest.py b/ingest/api/ingest.py index ed4202c8..564a6b71 100644 --- a/ingest/api/ingest.py +++ b/ingest/api/ingest.py @@ -1,7 +1,9 @@ import logging from typing import Union +import isodate import grpc +import json from fastapi import HTTPException @@ -12,7 +14,6 @@ import datastore_pb2 as dstore - logger = logging.getLogger(__name__) @@ -36,6 +37,25 @@ def __init__( logger.error("Failed to establish connection to mqtt, " + "\n" + str(e)) raise e + def seconds_to_iso_8601_duration(self, seconds: int) -> str: + duration = isodate.Duration(seconds=seconds) + iso_duration = isodate.duration_isoformat(duration) + + # TODO: find a better way to format these + # Use PT24H instead of P1D + if iso_duration == "P1D": + iso_duration = "PT24H" + + # iso_duration defaults to P0D when seconds is 0 + if iso_duration == "P0D": + iso_duration = "PT0S" + + return iso_duration + + def convert_to_meter(self, level: int) -> str: + + level = str(float(level) / 100) + async def ingest(self, message: Union[str, object]): """ This method will interpret call all methods for deciding input type, build the mqtt messages, and @@ -68,8 +88,14 @@ async def publish_messages(self, messages: list): + "/" + msg["properties"]["content"]["standard_name"] ) + + # modify the period back to iso format and level back to meter + period_iso = self.seconds_to_iso_8601_duration(msg["properties"]["period"]) + level_string = self.convert_to_meter(msg["properties"]["level"]) + msg["properties"]["level"] = level_string + msg["properties"]["period"] = period_iso try: - send_message(topic, msg, self.client) + send_message(topic, json.dump(msg), self.client) logger.info("Succesfully published to mqtt") except Exception as e: logger.error("Failed to publish to mqtt, " + "\n" + str(e)) diff --git a/ingest/api/main.py b/ingest/api/main.py index d9bd3fda..435c4ccf 100644 --- a/ingest/api/main.py +++ b/ingest/api/main.py @@ -9,6 +9,7 @@ from api.ingest import IngestToPipeline from api.model import JsonMessageSchema from api.messages import build_json_payload +from api.api_metrics import add_metrics log_level = os.environ.get("INGEST_LOGLEVEL", "INFO") @@ -39,6 +40,7 @@ class Response(BaseModel): ingester = IngestToPipeline(mqtt_conf=mqtt_configuration, uuid_prefix="uuid") app = FastAPI(root_path=os.getenv("FASTAPI_ROOT_PATH", "")) +add_metrics(app) @app.post("/bufr") diff --git a/ingest/api/messages.py b/ingest/api/messages.py index 10e73ac5..8dbf497d 100644 --- a/ingest/api/messages.py +++ b/ingest/api/messages.py @@ -26,10 +26,11 @@ def build_messages(message: object, uuid_prefix: str): # Set message publication time in RFC3339 format # Create UUID for the message, and state message format version for json_msg in message: + period = json_msg["properties"]["period_int"] message_uuid = f"{uuid_prefix}:{str(uuid.uuid4())}" json_msg["id"] = message_uuid json_msg["properties"]["data_id"] = message_uuid - + json_msg["properties"]["period"] = period # MD5 hash of a join on naming_authority, platform, standard_name, level,function and period. timeseries_id_string = ( json_msg["properties"]["naming_authority"] diff --git a/ingest/api/model.py b/ingest/api/model.py index e841098b..8fc0b9c5 100644 --- a/ingest/api/model.py +++ b/ingest/api/model.py @@ -205,11 +205,12 @@ class Properties(BaseModel): platform: str = Field( ..., description=( - "Name of the platform(s) that supported the sensor data used to create this data set or product. " + "WIGOS-ID of the platform(s) that supported the sensor data used to create this data set or product. " "Platforms can be of any type, including satellite, ship, station, aircraft or other. " "Indicate controlled vocabulary used in platform_vocabulary." ), ) + platform_name: Optional[str] = Field(None, description=("Human readable name for the platform.")) platform_vocabulary: Optional[str] = Field( None, description="Controlled vocabulary for the names used in the 'platform' attribute.", @@ -236,6 +237,7 @@ class Properties(BaseModel): "https://www.iso.org/iso-8601-date-and-time-format.html" ), ) + period_int: int = Field(None, exclude_from_schema=True) function: Literal[ "point", "sum", @@ -343,7 +345,7 @@ def transform_period_to_seconds(self): else: raise ValueError("Duration not convertable to seconds.") - self.period = int(total_seconds) + self.period_int = int(total_seconds) return self diff --git a/ingest/api/send_mqtt.py b/ingest/api/send_mqtt.py index c0a2b4dd..5baacee9 100644 --- a/ingest/api/send_mqtt.py +++ b/ingest/api/send_mqtt.py @@ -1,4 +1,5 @@ import logging +import json from paho.mqtt import client as mqtt_client from fastapi import HTTPException @@ -35,8 +36,13 @@ def send_message(topic: str, message: str, client: object): if len(topic) != 0: mqtt_topic = topic try: - client.publish(mqtt_topic, message) + if isinstance(message, dict): + client.publish(mqtt_topic, json.dumps(message)) + elif isinstance(message, (str, bytes)): + client.publish(mqtt_topic, message) + else: + raise TypeError("Mqtt message of unknown type") except Exception as e: - logger.critical(str(e)) + logger.critical(str(e), message, type(message)) raise HTTPException(status_code=500, detail="Failed to publish to mqtt") diff --git a/ingest/requirements.in b/ingest/requirements.in index 49e0f2d9..5c8e40fa 100644 --- a/ingest/requirements.in +++ b/ingest/requirements.in @@ -15,3 +15,4 @@ uvicorn==0.25.0 python-multipart~=0.0.9 requests == 2.31.0 isodate~=0.6.1 +prometheus-fastapi-instrumentator diff --git a/ingest/requirements.txt b/ingest/requirements.txt index 87953fab..3b1ae959 100644 --- a/ingest/requirements.txt +++ b/ingest/requirements.txt @@ -32,7 +32,7 @@ gunicorn==22.0.0 # via -r requirements.in h11==0.14.0 # via uvicorn -idna==3.8 +idna==3.10 # via # anyio # requests @@ -48,6 +48,10 @@ packaging==24.1 # via gunicorn paho-mqtt==2.1.0 # via -r requirements.in +prometheus-client==0.20.0 + # via prometheus-fastapi-instrumentator +prometheus-fastapi-instrumentator==7.0.0 + # via -r requirements.in protobuf==4.25.4 # via # -r requirements.in @@ -73,7 +77,9 @@ six==1.16.0 sniffio==1.3.1 # via anyio starlette==0.32.0.post1 - # via fastapi + # via + # fastapi + # prometheus-fastapi-instrumentator typing-extensions==4.12.2 # via # anyio @@ -81,7 +87,7 @@ typing-extensions==4.12.2 # pydantic # pydantic-core # uvicorn -urllib3==2.2.2 +urllib3==2.2.3 # via requests uvicorn==0.25.0 # via -r requirements.in