From 9c3364649b6a1af51e94e51f17e2a6effb88d26a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20H=2E=20Garc=C3=ADa?= Date: Tue, 10 Sep 2024 11:42:51 -0600 Subject: [PATCH 1/2] Sigfox IET10 Trackers Webhook v1.0 --- README.md | 407 +-------------------------------- app/webhooks/configurations.py | 16 ++ app/webhooks/handlers.py | 75 ++++++ requirements.txt | 143 ++++++++++++ 4 files changed, 236 insertions(+), 405 deletions(-) create mode 100644 requirements.txt diff --git a/README.md b/README.md index 555d834..f61ddbb 100644 --- a/README.md +++ b/README.md @@ -1,405 +1,2 @@ -# gundi-integration-action-runner -Template repo for integration in Gundi v2. - -## Usage -- Fork this repo -- Implement your own actions in `actions/handlers.py` -- Define configurations needed for your actions in `action/configurations.py` -- Or implement a webhooks handler in `webhooks/handlers.py` -- and define configurations needed for your webhooks in `webhooks/configurations.py` -- Optionally, add the `@activity_logger()` decorator in actions to log common events which you can later see in the portal: - - Action execution started - - Action execution complete - - Error occurred during action execution -- Optionally, add the `@webhook_activity_logger()` decorator in the webhook handler to log common events which you can later see in the portal: - - Webhook execution started - - Webhook execution complete - - Error occurred during webhook execution -- Optionally, use `log_action_activity()` or `log_webhook_activity()` to log custom messages which you can later see in the portal - - -## Action Examples: - -```python -# actions/configurations.py -from .core import PullActionConfiguration - - -class PullObservationsConfiguration(PullActionConfiguration): - lookback_days: int = 10 - - -``` - -```python -# actions/handlers.py -from app.services.activity_logger import activity_logger, log_activity -from app.services.gundi import send_observations_to_gundi -from gundi_core.events import LogLevel -from .configurations import PullObservationsConfiguration - - -@activity_logger() -async def action_pull_observations(integration, action_config: PullObservationsConfiguration): - - # Add your business logic to extract data here... - - # Optionally, log a custom messages to be shown in the portal - await log_activity( - integration_id=integration.id, - action_id="pull_observations", - level=LogLevel.INFO, - title="Extracting observations with filter..", - data={"start_date": "2024-01-01", "end_date": "2024-01-31"}, - config_data=action_config.dict() - ) - - # Normalize the extracted data into a list of observations following to the Gundi schema: - observations = [ - { - "source": "collar-xy123", - "type": "tracking-device", - "subject_type": "puma", - "recorded_at": "2024-01-24 09:03:00-0300", - "location": { - "lat": -51.748, - "lon": -72.720 - }, - "additional": { - "speed_kmph": 10 - } - } - ] - - # Send the extracted data to Gundi - await send_observations_to_gundi(observations=observations, integration_id=integration.id) - - # The result will be recorded in the portal if using the activity_logger decorator - return {"observations_extracted": 10} -``` - - -## Webhooks Usage: -This framework provides a way to handle incoming webhooks from external services. You can define a handler function in `webhooks/handlers.py` and define the expected payload schema and configurations in `webhooks/configurations.py`. Several base classes are provided in `webhooks/core.py` to help you define the expected schema and configurations. - - -### Fixed Payload Schema -If you expect to receive data with a fixed schema, you can define a Pydantic model for the payload and configurations. These models will be used for validating and parsing the incoming data. -```python -# webhooks/configurations.py -import pydantic -from .core import WebhookPayload, WebhookConfiguration - - -class MyWebhookPayload(WebhookPayload): - device_id: str - timestamp: str - lat: float - lon: float - speed_kmph: float - - -class MyWebhookConfig(WebhookConfiguration): - custom_setting: str - another_custom_setting: bool - -``` -### Webhook Handler -Your webhook handler function must be named webhook_handler and it must accept the payload and config as arguments. The payload will be validated and parsed using the annotated Pydantic model. The config will be validated and parsed using the annotated Pydantic model. You can then implement your business logic to extract the data and send it to Gundi. -```python -# webhooks/handlers.py -from app.services.activity_logger import webhook_activity_logger -from app.services.gundi import send_observations_to_gundi -from .configurations import MyWebhookPayload, MyWebhookConfig - - -@webhook_activity_logger() -async def webhook_handler(payload: MyWebhookPayload, integration=None, webhook_config: MyWebhookConfig = None): - # Implement your custom logic to process the payload here... - - # If the request is related to an integration, you can use the integration object to access the integration's data - - # Normalize the extracted data into a list of observations following to the Gundi schema: - transformed_data = [ - { - "source": payload.device_id, - "type": "tracking-device", - "recorded_at": payload.timestamp, - "location": { - "lat": payload.lat, - "lon": payload.lon - }, - "additional": { - "speed_kmph": payload.speed_kmph - } - } - ] - await send_observations_to_gundi( - observations=transformed_data, - integration_id=integration.id - ) - - return {"observations_extracted": 1} -``` - -### Dynamic Payload Schema -If you expect to receive data with different schemas, you can define a schema per integration using JSON schema. To do that, annotate the payload arg with the `GenericJsonPayload` model, and annotate the webhook_config arg with the `DynamicSchemaConfig` model or a subclass. Then you can define the schema in the Gundi portal, and the framework will build the Pydantic model on runtime based on that schema, to validate and parse the incoming data. -```python -# webhooks/configurations.py -import pydantic -from .core import DynamicSchemaConfig - - -class MyWebhookConfig(DynamicSchemaConfig): - custom_setting: str - another_custom_setting: bool - -``` -```python -# webhooks/handlers.py -from app.services.activity_logger import webhook_activity_logger -from .core import GenericJsonPayload -from .configurations import MyWebhookConfig - - -@webhook_activity_logger() -async def webhook_handler(payload: GenericJsonPayload, integration=None, webhook_config: MyWebhookConfig = None): - # Implement your custom logic to process the payload here... - return {"observations_extracted": 1} -``` - - -### Simple JSON Transformations -For simple JSON to JSON transformations, you can use the [JQ language](https://jqlang.github.io/jq/manual/#basic-filters) to transform the incoming data. To do that, annotate the webhook_config arg with the `GenericJsonTransformConfig` model or a subclass. Then you can specify the `jq_filter` and the `output_type` (`ev` for event or `obv` for observation) in Gundi. -```python -# webhooks/configurations.py -import pydantic -from .core import WebhookPayload, GenericJsonTransformConfig - - -class MyWebhookPayload(WebhookPayload): - device_id: str - timestamp: str - lat: float - lon: float - speed_kmph: float - - -class MyWebhookConfig(GenericJsonTransformConfig): - custom_setting: str - another_custom_setting: bool - - -``` -```python -# webhooks/handlers.py -import json -import pyjq -from app.services.activity_logger import webhook_activity_logger -from app.services.gundi import send_observations_to_gundi -from .configurations import MyWebhookPayload, MyWebhookConfig - - -@webhook_activity_logger() -async def webhook_handler(payload: MyWebhookPayload, integration=None, webhook_config: MyWebhookConfig = None): - # Sample implementation using the JQ language to transform the incoming data - input_data = json.loads(payload.json()) - transformation_rules = webhook_config.jq_filter - transformed_data = pyjq.all(transformation_rules, input_data) - print(f"Transformed Data:\n: {transformed_data}") - # webhook_config.output_type == "obv": - response = await send_observations_to_gundi( - observations=transformed_data, - integration_id=integration.id - ) - data_points_qty = len(transformed_data) if isinstance(transformed_data, list) else 1 - print(f"{data_points_qty} data point(s) sent to Gundi.") - return {"data_points_qty": data_points_qty} -``` - - -### Dynamic Payload Schema with JSON Transformations -You can combine the dynamic schema and JSON transformations by annotating the payload arg with the `GenericJsonPayload` model, and annotating the webhook_config arg with the `GenericJsonTransformConfig` models or their subclasses. Then you can define the schema and the JQ filter in the Gundi portal, and the framework will build the Pydantic model on runtime based on that schema, to validate and parse the incoming data, and apply a [JQ filter](https://jqlang.github.io/jq/manual/#basic-filters) to transform the data. -```python -# webhooks/handlers.py -import json -import pyjq -from app.services.activity_logger import webhook_activity_logger -from app.services.gundi import send_observations_to_gundi -from .core import GenericJsonPayload, GenericJsonTransformConfig - - -@webhook_activity_logger() -async def webhook_handler(payload: GenericJsonPayload, integration=None, webhook_config: GenericJsonTransformConfig = None): - # Sample implementation using the JQ language to transform the incoming data - input_data = json.loads(payload.json()) - filter_expression = webhook_config.jq_filter.replace("\n", ""). replace(" ", "") - transformed_data = pyjq.all(filter_expression, input_data) - print(f"Transformed Data:\n: {transformed_data}") - # webhook_config.output_type == "obv": - response = await send_observations_to_gundi( - observations=transformed_data, - integration_id=integration.id - ) - data_points_qty = len(transformed_data) if isinstance(transformed_data, list) else 1 - print(f"{data_points_qty} data point(s) sent to Gundi.") - return {"data_points_qty": data_points_qty} -``` - - -### Hex string payloads -If you expect to receive payloads containing binary data encoded as hex strings (e.g. ), you can use StructHexString, HexStringPayload and HexStringConfig which facilitate validation and parsing of hex strings. The user will define the name of the field containing the hex string and will define the structure of the data in the hex string, using Gundi. -The fields are defined in the hex_format attribute of the configuration, following the [struct module format string syntax](https://docs.python.org/3/library/struct.html#format-strings). The fields will be extracted from the hex string and made available as sub-fields in the data field of the payload. THey will be extracted in the order they are defined in the hex_format attribute. -```python -# webhooks/configurations.py -from app.services.utils import StructHexString -from .core import HexStringConfig, WebhookConfiguration - - -# Expected data: {"device": "BF170A","data": "6881631900003c20020000c3", "time": "1638201313", "type": "bove"} -class MyWebhookPayload(HexStringPayload, WebhookPayload): - device: str - time: str - type: str - data: StructHexString - - -class MyWebhookConfig(HexStringConfig, WebhookConfiguration): - custom_setting: str - another_custom_setting: bool - -""" -Sample configuration in Gundi: -{ - "hex_data_field": "data", - "hex_format": { - "byte_order": ">", - "fields": [ - { - "name": "start_bit", - "format": "B", - "output_type": "int" - }, - { - "name": "v", - "format": "I" - }, - { - "name": "interval", - "format": "H", - "output_type": "int" - }, - { - "name": "meter_state_1", - "format": "B" - }, - { - "name": "meter_state_2", - "format": "B", - "bit_fields": [ - { - "name": "meter_batter_alarm", - "end_bit": 0, - "start_bit": 0, - "output_type": "bool" - }, - { - "name": "empty_pipe_alarm", - "end_bit": 1, - "start_bit": 1, - "output_type": "bool" - }, - { - "name": "reverse_flow_alarm", - "end_bit": 2, - "start_bit": 2, - "output_type": "bool" - }, - { - "name": "over_range_alarm", - "end_bit": 3, - "start_bit": 3, - "output_type": "bool" - }, - { - "name": "temp_alarm", - "end_bit": 4, - "start_bit": 4, - "output_type": "bool" - }, - { - "name": "ee_error", - "end_bit": 5, - "start_bit": 5, - "output_type": "bool" - }, - { - "name": "transduce_in_error", - "end_bit": 6, - "start_bit": 6, - "output_type": "bool" - }, - { - "name": "transduce_out_error", - "end_bit": 7, - "start_bit": 7, - "output_type": "bool" - }, - { - "name": "transduce_out_error", - "end_bit": 7, - "start_bit": 7, - "output_type": "bool" - } - ] - }, - { - "name": "r1", - "format": "B", - "output_type": "int" - }, - { - "name": "r2", - "format": "B", - "output_type": "int" - }, - { - "name": "crc", - "format": "B" - } - ] - } -} -""" -# The data extracted from the hex string will be made available as new sub-fields as follows: -""" -{ - "device": "AB1234", - "time": "1638201313", - "type": "bove", - "data": { - "value": "6881631900003c20020000c3", - "format_spec": ">BIHBBBBB", - "unpacked_data": { - "start_bit": 104, - "v": 1663873, - "interval": 15360, - "meter_state_1": 32, - "meter_state_2": 2, - "r1": 0, - "r2": 0, - "crc": 195, - "meter_batter_alarm": True, - "empty_pipe_alarm": True, - "reverse_flow_alarm": False, - "over_range_alarm": False, - "temp_alarm": False, - "ee_error": False, - "transduce_in_error": False, - "transduce_out_error": False - } - } -} -""" -``` -Notice: This can also be combined with Dynamic Schema and JSON Transformations. In that case the hex string will be parsed first, adn then the JQ filter can be applied to the extracted data. +# gundi-integration-sigfox-iet10 +Template repo for Sigfox IET10 integration in Gundi v2. diff --git a/app/webhooks/configurations.py b/app/webhooks/configurations.py index e69de29..059ba9e 100644 --- a/app/webhooks/configurations.py +++ b/app/webhooks/configurations.py @@ -0,0 +1,16 @@ +from datetime import datetime + +from .core import GenericJsonTransformConfig, GenericJsonPayload + + +class BinaryWebhookPayload(GenericJsonPayload): + device: str + time: datetime + data: str + seqNumber: int + ack: bool + + + +class BinaryWebhookConfig(GenericJsonTransformConfig): + pass diff --git a/app/webhooks/handlers.py b/app/webhooks/handlers.py index e69de29..ffc5616 100644 --- a/app/webhooks/handlers.py +++ b/app/webhooks/handlers.py @@ -0,0 +1,75 @@ +import json +import pyjq +import logging +from app.services.gundi import send_observations_to_gundi, send_events_to_gundi +from app.services.activity_logger import webhook_activity_logger +from .configurations import BinaryWebhookPayload, BinaryWebhookConfig + + +logger = logging.getLogger(__name__) + + +# @webhook_activity_logger() +async def webhook_handler(payload: BinaryWebhookPayload, integration=None, webhook_config: BinaryWebhookConfig = None): + logger.info(f"Webhook handler executed with integration: '{integration}'.") + logger.info(f"Payload: '{payload}'.") + logger.info(f"Config: '{webhook_config}'.") + if isinstance(payload, list): + input_data = [json.loads(i.json()) for i in payload] + else: + input_data = json.loads(payload.json()) + + filter_expression = webhook_config.jq_filter.replace("\n", "") + transformed_data = pyjq.all(filter_expression, input_data) + logger.info(f"Transformed Data: {transformed_data}") + # Check if a filter is present in the filtered data + for data in transformed_data: + status = data.get("status", "OK") + if status != "OK": + logger.info(f"'{data}' point received was filtered") + transformed_data.remove(data) + if transformed_data: + # Make binary operations (Sigfox Trackers for now) + # TODO: make this more generic + for data in transformed_data: + # Latitude operations + latitude_sign = data["location"].get("latitude_sign") + if latitude_sign == "8": # Negative indicator (Provided by sigfox) + latitude_sign = -1 + else: + latitude_sign = 1 + latitude_to_decimal = (round(((int(data["location"].get("latitude"), 16)) / 1000000), 3)) * latitude_sign + + # Longitude operations + longitude_sign = data["location"].get("longitude_sign") + if longitude_sign == "8": # Negative indicator (Provided by sigfox) + longitude_sign = -1 + else: + longitude_sign = 1 + longitude_to_decimal = (round(((int(data["location"].get("longitude"), 16)) / 1000000), 3)) * longitude_sign + + # Replace location data + data["location"] = dict(lat=latitude_to_decimal, lon=longitude_to_decimal) + + # Calculate battery voltage in 'additional' + data["additional"]["battery"] = int(data["additional"].get("battery"), 16) / 10 + + + if webhook_config.output_type == "obv": # ToDo: Use an enum? + response = await send_observations_to_gundi( + observations=transformed_data, + integration_id=integration.id + ) + elif webhook_config.output_type == "ev": + response = await send_events_to_gundi( + events=transformed_data, + integration_id=integration.id + ) + else: + raise ValueError(f"Invalid output type: {webhook_config.output_type}. Please review the configuration.") + data_points_qty = len(response) + logger.info(f"'{data_points_qty}' data point(s) sent to Gundi.") + return {"data_points_qty": data_points_qty} + else: + logger.info(f"No data point(s) sent to Gundi.") + return {"data_points_qty": 0} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1d38f8b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,143 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --output-file=requirements.txt requirements-base.in requirements-dev.in requirements.in +# +aiohappyeyeballs==2.4.0 + # via aiohttp +aiohttp==3.10.5 + # via gcloud-aio-auth +aiosignal==1.3.1 + # via aiohttp +anyio==3.7.1 + # via + # fastapi + # httpcore + # starlette +async-timeout==4.0.3 + # via + # aiohttp + # redis +attrs==24.2.0 + # via aiohttp +backoff==2.2.1 + # via gcloud-aio-auth +certifi==2024.8.30 + # via + # httpcore + # httpx +cffi==1.17.1 + # via cryptography +chardet==5.2.0 + # via gcloud-aio-auth +click==8.1.7 + # via + # -r requirements-base.in + # uvicorn +cryptography==43.0.1 + # via gcloud-aio-auth +environs==9.5.0 + # via + # -r requirements-base.in + # gundi-client-v2 +exceptiongroup==1.2.2 + # via + # anyio + # pytest +fastapi==0.103.2 + # via -r requirements-base.in +frozenlist==1.4.1 + # via + # aiohttp + # aiosignal +gcloud-aio-auth==5.3.2 + # via gcloud-aio-pubsub +gcloud-aio-pubsub==6.0.1 + # via -r requirements-base.in +gundi-client-v2==2.3.5 + # via -r requirements-base.in +gundi-core==1.5.9 + # via + # -r requirements-base.in + # gundi-client-v2 +h11==0.14.0 + # via + # httpcore + # uvicorn +httpcore==0.17.3 + # via httpx +httpx==0.24.1 + # via + # gundi-client-v2 + # respx +idna==3.8 + # via + # anyio + # httpx + # yarl +iniconfig==2.0.0 + # via pytest +marshmallow==3.22.0 + # via environs +multidict==6.0.5 + # via + # aiohttp + # yarl +packaging==24.1 + # via + # marshmallow + # pytest +pluggy==1.5.0 + # via pytest +prometheus-client==0.20.0 + # via gcloud-aio-pubsub +pycparser==2.22 + # via cffi +pydantic==1.10.18 + # via + # -r requirements-base.in + # fastapi + # gundi-client-v2 + # gundi-core +pyjq==2.6.0 + # via -r requirements-base.in +pyjwt==2.9.0 + # via gcloud-aio-auth +pytest==7.4.4 + # via + # -r requirements-dev.in + # pytest-asyncio + # pytest-mock +pytest-asyncio==0.21.2 + # via -r requirements-dev.in +pytest-mock==3.12.0 + # via -r requirements-dev.in +python-dotenv==1.0.1 + # via environs +redis==5.0.8 + # via -r requirements-base.in +respx==0.21.1 + # via gundi-client-v2 +sniffio==1.3.1 + # via + # anyio + # httpcore + # httpx +stamina==23.2.0 + # via -r requirements-base.in +starlette==0.27.0 + # via fastapi +tenacity==9.0.0 + # via stamina +tomli==2.0.1 + # via pytest +typing-extensions==4.12.2 + # via + # fastapi + # pydantic + # uvicorn +uvicorn==0.23.2 + # via -r requirements-base.in +yarl==1.11.1 + # via aiohttp From 92a31d1f72eb3a2a167d5df4abb27a60bb0df79d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20H=2E=20Garc=C3=ADa?= Date: Tue, 10 Sep 2024 11:53:42 -0600 Subject: [PATCH 2/2] Enable activity logs --- app/webhooks/handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/webhooks/handlers.py b/app/webhooks/handlers.py index ffc5616..4d5d40f 100644 --- a/app/webhooks/handlers.py +++ b/app/webhooks/handlers.py @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) -# @webhook_activity_logger() +@webhook_activity_logger() async def webhook_handler(payload: BinaryWebhookPayload, integration=None, webhook_config: BinaryWebhookConfig = None): logger.info(f"Webhook handler executed with integration: '{integration}'.") logger.info(f"Payload: '{payload}'.")