Skip to content

Commit

Permalink
chore(pipeline): use pydantic validation
Browse files Browse the repository at this point in the history
Currently validation differs between the pipeline and the api.
The api relies on the schema validation, using pydantic. But
the pipeline has its own sql-based validation. This can lead
to inconsistencies between pipeline and api and duplicates code.

This commit leverages plpython to reuse the schema validation
in the pipeline.

Validation is done at the source level. This commit also materializes
validation errors in a dedicated model, rather than using dbt data
tests, which are less convenient for products to use.
  • Loading branch information
vmttn committed Aug 25, 2024
1 parent f7961f4 commit c6c9937
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 8 deletions.
1 change: 1 addition & 0 deletions datawarehouse/processings/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ requires = ["setuptools", "wheel"]
name = "data-inclusion-processings"
version = "0.1.0"
dependencies = [
"data-inclusion-schema==0.16.0",
"numpy~=2.0",
"pandas~=2.2",
"requests~=2.31",
Expand Down
36 changes: 36 additions & 0 deletions datawarehouse/processings/scripts/create_udfs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,39 @@ return (
$$ LANGUAGE plpython3u;
EOSQL


psql --dbname="$POSTGRES_DB" <<- 'EOSQL'
DROP FUNCTION IF EXISTS validate;
DROP TYPE IF EXISTS pydantic_error;
DROP TYPE IF EXISTS resource_type;
CREATE TYPE resource_type AS ENUM ('structure', 'service');
CREATE TYPE pydantic_error AS (
type TEXT,
loc TEXT[],
msg TEXT,
input TEXT
);
CREATE OR REPLACE FUNCTION validate(resource_type resource_type, data JSONB)
RETURNS SETOF pydantic_error AS $$
import json
import pydantic
from data_inclusion import schema
model = schema.Structure if resource_type == "structure" else schema.Service
try:
model.model_validate_json(data)
except pydantic.ValidationError as e:
return e.errors()
return []
$$ LANGUAGE plpython3u;
EOSQL
28 changes: 28 additions & 0 deletions pipeline/dbt/models/intermediate/_models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,31 @@ models:
- street
- locality
- municipality

- name: int__erreurs
data_tests:
- dbt_utils.expression_is_true:
expression: "service_id IS NOT NULL OR structure_id IS NOT NULL"
columns:
- name: service_id
data_tests:
- relationships:
to: ref('int__union_services')
field: _di_surrogate_id
- name: structure_id
data_tests:
- relationships:
to: ref('int__union_structures')
field: _di_surrogate_id
- name: type
data_tests:
- not_null
- dbt_utils.not_empty_string
- name: loc
data_tests:
- not_null
- name: msg
data_tests:
- not_null
- dbt_utils.not_empty_string
- name: input
41 changes: 41 additions & 0 deletions pipeline/dbt/models/intermediate/int__erreurs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
WITH services AS (
SELECT * FROM {{ ref('int__union_services') }}
),

structures AS (
SELECT * FROM {{ ref('int__union_structures') }}
),

services_errors AS (
SELECT
services._di_surrogate_id AS "service_id",
NULL AS "structure_id",
errors.type AS "type",
errors.loc AS "loc",
errors.msg AS "msg",
errors.input AS "input"
FROM
services,
LATERAL (SELECT * FROM VALIDATE('service', TO_JSONB(services))) AS errors
),

structures_errors AS (
SELECT
NULL AS "service_id",
structures._di_surrogate_id AS "structure_id",
errors.type AS "type",
errors.loc AS "loc",
errors.msg AS "msg",
errors.input AS "input"
FROM
structures,
LATERAL (SELECT * FROM VALIDATE('structure', TO_JSONB(structures))) AS errors
),

final AS (
SELECT * FROM services_errors
UNION ALL
SELECT * FROM structures_errors
)

SELECT * FROM final
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ models:
- dbt_utils.not_empty_string

- name: int_dora__services
data_tests:
- check_service:
config:
severity: warn
columns:
- name: id
data_tests:
Expand All @@ -48,10 +44,6 @@ models:
field: id

- name: int_dora__structures
data_tests:
- check_structure:
config:
severity: warn
columns:
- name: id
data_tests:
Expand Down

0 comments on commit c6c9937

Please sign in to comment.