From c6c993790af8fc623a6736a615e658d99c003e35 Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Mon, 19 Aug 2024 08:50:11 +0200 Subject: [PATCH] chore(pipeline): use pydantic validation Currently validation differs between the pipeline and the api. The api relies on the schema validation, using pydantic. But the pipeline has its own sql-based validation. This can lead to inconsistencies between pipeline and api and duplicates code. This commit leverages plpython to reuse the schema validation in the pipeline. Validation is done at the source level. This commit also materializes validation errors in a dedicated model, rather than using dbt data tests, which are less convenient for products to use. --- datawarehouse/processings/pyproject.toml | 1 + .../processings/scripts/create_udfs.sh | 36 ++++++++++++++++ pipeline/dbt/models/intermediate/_models.yml | 28 +++++++++++++ .../dbt/models/intermediate/int__erreurs.sql | 41 +++++++++++++++++++ .../sources/dora/_dora__models.yml | 8 ---- 5 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 pipeline/dbt/models/intermediate/int__erreurs.sql diff --git a/datawarehouse/processings/pyproject.toml b/datawarehouse/processings/pyproject.toml index abc61c85..3bbb22f9 100644 --- a/datawarehouse/processings/pyproject.toml +++ b/datawarehouse/processings/pyproject.toml @@ -6,6 +6,7 @@ requires = ["setuptools", "wheel"] name = "data-inclusion-processings" version = "0.1.0" dependencies = [ + "data-inclusion-schema==0.16.0", "numpy~=2.0", "pandas~=2.2", "requests~=2.31", diff --git a/datawarehouse/processings/scripts/create_udfs.sh b/datawarehouse/processings/scripts/create_udfs.sh index 6cb9ff78..a29fc0ef 100755 --- a/datawarehouse/processings/scripts/create_udfs.sh +++ b/datawarehouse/processings/scripts/create_udfs.sh @@ -42,3 +42,39 @@ return ( $$ LANGUAGE plpython3u; EOSQL + + +psql --dbname="$POSTGRES_DB" <<- 'EOSQL' +DROP FUNCTION IF EXISTS validate; +DROP TYPE IF EXISTS pydantic_error; +DROP TYPE IF EXISTS resource_type; + +CREATE TYPE resource_type AS ENUM ('structure', 'service'); + +CREATE TYPE pydantic_error AS ( + type TEXT, + loc TEXT[], + msg TEXT, + input TEXT +); + +CREATE OR REPLACE FUNCTION validate(resource_type resource_type, data JSONB) +RETURNS SETOF pydantic_error AS $$ + +import json + +import pydantic + +from data_inclusion import schema + +model = schema.Structure if resource_type == "structure" else schema.Service + +try: + model.model_validate_json(data) +except pydantic.ValidationError as e: + return e.errors() + +return [] + +$$ LANGUAGE plpython3u; +EOSQL diff --git a/pipeline/dbt/models/intermediate/_models.yml b/pipeline/dbt/models/intermediate/_models.yml index 7a645981..2197c19d 100644 --- a/pipeline/dbt/models/intermediate/_models.yml +++ b/pipeline/dbt/models/intermediate/_models.yml @@ -118,3 +118,31 @@ models: - street - locality - municipality + + - name: int__erreurs + data_tests: + - dbt_utils.expression_is_true: + expression: "service_id IS NOT NULL OR structure_id IS NOT NULL" + columns: + - name: service_id + data_tests: + - relationships: + to: ref('int__union_services') + field: _di_surrogate_id + - name: structure_id + data_tests: + - relationships: + to: ref('int__union_structures') + field: _di_surrogate_id + - name: type + data_tests: + - not_null + - dbt_utils.not_empty_string + - name: loc + data_tests: + - not_null + - name: msg + data_tests: + - not_null + - dbt_utils.not_empty_string + - name: input diff --git a/pipeline/dbt/models/intermediate/int__erreurs.sql b/pipeline/dbt/models/intermediate/int__erreurs.sql new file mode 100644 index 00000000..453c9fec --- /dev/null +++ b/pipeline/dbt/models/intermediate/int__erreurs.sql @@ -0,0 +1,41 @@ +WITH services AS ( + SELECT * FROM {{ ref('int__union_services') }} +), + +structures AS ( + SELECT * FROM {{ ref('int__union_structures') }} +), + +services_errors AS ( + SELECT + services._di_surrogate_id AS "service_id", + NULL AS "structure_id", + errors.type AS "type", + errors.loc AS "loc", + errors.msg AS "msg", + errors.input AS "input" + FROM + services, + LATERAL (SELECT * FROM VALIDATE('service', TO_JSONB(services))) AS errors +), + +structures_errors AS ( + SELECT + NULL AS "service_id", + structures._di_surrogate_id AS "structure_id", + errors.type AS "type", + errors.loc AS "loc", + errors.msg AS "msg", + errors.input AS "input" + FROM + structures, + LATERAL (SELECT * FROM VALIDATE('structure', TO_JSONB(structures))) AS errors +), + +final AS ( + SELECT * FROM services_errors + UNION ALL + SELECT * FROM structures_errors +) + +SELECT * FROM final diff --git a/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml b/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml index df17232e..79126411 100644 --- a/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml +++ b/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml @@ -24,10 +24,6 @@ models: - dbt_utils.not_empty_string - name: int_dora__services - data_tests: - - check_service: - config: - severity: warn columns: - name: id data_tests: @@ -48,10 +44,6 @@ models: field: id - name: int_dora__structures - data_tests: - - check_structure: - config: - severity: warn columns: - name: id data_tests: