diff --git a/pipeline/dags/dag_utils/dbt.py b/pipeline/dags/dag_utils/dbt.py index 17453f04..5a0f8656 100644 --- a/pipeline/dags/dag_utils/dbt.py +++ b/pipeline/dags/dag_utils/dbt.py @@ -109,6 +109,7 @@ def get_intermediate_tasks(): # main since the beginning as it required intermediate data to be # present ? "path:models/intermediate/int__geocodages.sql", + "path:models/intermediate/int__criteres_qualite.sql", "path:models/intermediate/int__union_contacts.sql", "path:models/intermediate/int__union_adresses.sql", "path:models/intermediate/int__union_services.sql", diff --git a/pipeline/dbt/macros/create_udfs.sql b/pipeline/dbt/macros/create_udfs.sql index 4276c0f9..7fb0f041 100644 --- a/pipeline/dbt/macros/create_udfs.sql +++ b/pipeline/dbt/macros/create_udfs.sql @@ -12,6 +12,7 @@ Another way would be to use the `on-run-start` hook, but it does not play nicely CREATE SCHEMA IF NOT EXISTS processings; {{ udf__geocode() }} +{{ udf__score() }} {{ create_udf_soliguide__new_hours_to_osm_opening_hours() }} {{ create_udf__common_checks() }} diff --git a/pipeline/dbt/macros/udfs/udf__score.sql b/pipeline/dbt/macros/udfs/udf__score.sql new file mode 100644 index 00000000..6dde4097 --- /dev/null +++ b/pipeline/dbt/macros/udfs/udf__score.sql @@ -0,0 +1,40 @@ +{% macro udf__score() %} + +DROP FUNCTION IF EXISTS processings.score; + +CREATE OR REPLACE FUNCTION processings.score(data JSONB) +RETURNS + TABLE( + score_ligne FLOAT, + nom_critere TEXT, + score_critere FLOAT + ) +AS $$ + +import json + +import pydantic + +from data_inclusion.schema import Service, score_qualite + + +# TODO(vmttn): run score *after* pydantic validation then remove this try/except +try: + service = Service(**json.loads(data)) +except pydantic.ValidationError as exc: + return [] + +score, details = score_qualite.score(service) + +return [ + { + "score_ligne": score, + "nom_critere": nom_critere, + "score_critere": score_critere, + } + for nom_critere, score_critere in details.items() +] + +$$ LANGUAGE plpython3u; + +{% endmacro %} \ No newline at end of file diff --git a/pipeline/dbt/models/intermediate/_models.yml b/pipeline/dbt/models/intermediate/_models.yml index 58723ce5..a741fc6a 100644 --- a/pipeline/dbt/models/intermediate/_models.yml +++ b/pipeline/dbt/models/intermediate/_models.yml @@ -129,6 +129,33 @@ models: - locality - municipality + - name: int__criteres_qualite + description: | + Quality criteria scorings for services from all sources. + Each row holds a single criterion score for a service. + For a given service, there is as many rows as defined criteria. + A null score means the criterion is not applicable to the service. + Scoring is done by data-inclusion-schema scoring api in PL/Python. + columns: + - name: service_id + data_tests: + - not_null + - relationships: + to: ref('int__union_services') + field: _di_surrogate_id + - name: nom_critere + description: Name of the criterion. + data_tests: + - not_null + - dbt_utils.not_empty_string + - name: score_critere + description: | + Score for the given criterion and the given service, between 0 and 1. + - name: score_ligne + data_tests: + - dbt_utils.not_constant + + unit_tests: - name: test_geocodages_full_refresh_mode model: int__geocodages diff --git a/pipeline/dbt/models/intermediate/int__criteres_qualite.sql b/pipeline/dbt/models/intermediate/int__criteres_qualite.sql new file mode 100644 index 00000000..03b88ded --- /dev/null +++ b/pipeline/dbt/models/intermediate/int__criteres_qualite.sql @@ -0,0 +1,16 @@ +WITH services AS ( + SELECT * FROM {{ ref('int__union_services__enhanced') }} +), + +final AS ( + SELECT + services._di_surrogate_id AS "service_id", + scores.score_critere AS "score_critere", + scores.nom_critere AS "nom_critere", + scores.score_ligne AS "score_ligne" + FROM + services, + LATERAL (SELECT * FROM processings.score(TO_JSONB(services))) AS scores +) + +SELECT * FROM final diff --git a/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml b/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml index dd1be2e0..dd3ed895 100644 --- a/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml +++ b/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml @@ -229,6 +229,12 @@ models: data_type: text - name: zone_diffusion_nom data_type: text + - name: score_qualite + data_type: float + constraints: + - type: not_null + - type: check + expression: 'score_qualite BETWEEN 0 AND 1' - name: marts_inclusion__services_frais config: diff --git a/pipeline/dbt/models/marts/inclusion/marts_inclusion__services.sql b/pipeline/dbt/models/marts/inclusion/marts_inclusion__services.sql index d20e37db..1bc2f080 100644 --- a/pipeline/dbt/models/marts/inclusion/marts_inclusion__services.sql +++ b/pipeline/dbt/models/marts/inclusion/marts_inclusion__services.sql @@ -4,6 +4,17 @@ WITH services AS ( SELECT * FROM {{ ref('int__union_services__enhanced') }} ), +criteres AS ( + SELECT * FROM {{ ref('int__criteres_qualite') }} +), + +scores AS ( + SELECT DISTINCT ON (1) + criteres.service_id AS "service_id", + criteres.score_ligne AS "score" + FROM criteres +), + final AS ( SELECT {{ @@ -15,8 +26,15 @@ final AS ( 'adresse_id', ] ) - }} + }}, + scores.score AS "score_qualite" FROM services + LEFT JOIN scores ON services._di_surrogate_id = scores.service_id + -- TODO(vmttn): services that pass SQL validation, but fail pydantic validation + -- don't have a score... scoring must be done on pydantic validated data + -- this filter is a temporary workaround until validation is done consistently + -- with pydantic + WHERE scores.score IS NOT NULL ) SELECT * FROM final