Skip to content

Commit

Permalink
feat(pipeline): add scoring model
Browse files Browse the repository at this point in the history
This implementation uses a plpython udf that calls
the scoring api of the data inclusion schema.
  • Loading branch information
vmttn committed Sep 27, 2024
1 parent 988b07e commit c54975c
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 1 deletion.
1 change: 1 addition & 0 deletions pipeline/dags/dag_utils/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def get_intermediate_tasks():
# main since the beginning as it required intermediate data to be
# present ?
"path:models/intermediate/int__geocodages.sql",
"path:models/intermediate/int__criteres_qualite.sql",
"path:models/intermediate/int__union_contacts.sql",
"path:models/intermediate/int__union_adresses.sql",
"path:models/intermediate/int__union_services.sql",
Expand Down
1 change: 1 addition & 0 deletions pipeline/dbt/macros/create_udfs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Another way would be to use the `on-run-start` hook, but it does not play nicely
CREATE SCHEMA IF NOT EXISTS processings;

{{ udf__geocode() }}
{{ udf__score() }}

{{ create_udf_soliguide__new_hours_to_osm_opening_hours() }}
{{ create_udf__common_checks() }}
Expand Down
40 changes: 40 additions & 0 deletions pipeline/dbt/macros/udfs/udf__score.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{% macro udf__score() %}

DROP FUNCTION IF EXISTS processings.score;

CREATE OR REPLACE FUNCTION processings.score(data JSONB)
RETURNS
TABLE(
score_ligne FLOAT,
nom_critere TEXT,
score_critere FLOAT
)
AS $$

import json

import pydantic

from data_inclusion.schema import Service, score_qualite


# TODO(vmttn): run score *after* pydantic validation then remove this try/except
try:
service = Service(**json.loads(data))
except pydantic.ValidationError as exc:
return []

score, details = score_qualite.score(service)

return [
{
"score_ligne": score,
"nom_critere": nom_critere,
"score_critere": score_critere,
}
for nom_critere, score_critere in details.items()
]

$$ LANGUAGE plpython3u;

{% endmacro %}
27 changes: 27 additions & 0 deletions pipeline/dbt/models/intermediate/_models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,33 @@ models:
- locality
- municipality

- name: int__criteres_qualite
description: |
Quality criteria scorings for services from all sources.
Each row holds a single criterion score for a service.
For a given service, there is as many rows as defined criteria.
A null score means the criterion is not applicable to the service.
Scoring is done by data-inclusion-schema scoring api in PL/Python.
columns:
- name: service_id
data_tests:
- not_null
- relationships:
to: ref('int__union_services')
field: _di_surrogate_id
- name: nom_critere
description: Name of the criterion.
data_tests:
- not_null
- dbt_utils.not_empty_string
- name: score_critere
description: |
Score for the given criterion and the given service, between 0 and 1.
- name: score_ligne
data_tests:
- dbt_utils.not_constant


unit_tests:
- name: test_geocodages_full_refresh_mode
model: int__geocodages
Expand Down
16 changes: 16 additions & 0 deletions pipeline/dbt/models/intermediate/int__criteres_qualite.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
WITH services AS (
SELECT * FROM {{ ref('int__union_services__enhanced') }}
),

final AS (
SELECT
services._di_surrogate_id AS "service_id",
scores.score_critere AS "score_critere",
scores.nom_critere AS "nom_critere",
scores.score_ligne AS "score_ligne"
FROM
services,
LATERAL (SELECT * FROM processings.score(TO_JSONB(services))) AS scores
)

SELECT * FROM final
6 changes: 6 additions & 0 deletions pipeline/dbt/models/marts/inclusion/_inclusion_models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ models:
data_type: text
- name: zone_diffusion_nom
data_type: text
- name: score_qualite
data_type: float
constraints:
- type: not_null
- type: check
expression: 'score_qualite BETWEEN 0 AND 1'

- name: marts_inclusion__services_frais
config:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ WITH services AS (
SELECT * FROM {{ ref('int__union_services__enhanced') }}
),

criteres AS (
SELECT * FROM {{ ref('int__criteres_qualite') }}
),

scores AS (
SELECT DISTINCT ON (1)
criteres.service_id AS "service_id",
criteres.score_ligne AS "score"
FROM criteres
),

final AS (
SELECT
{{
Expand All @@ -15,8 +26,15 @@ final AS (
'adresse_id',
]
)
}}
}},
scores.score AS "score_qualite"
FROM services
LEFT JOIN scores ON services._di_surrogate_id = scores.service_id
-- TODO(vmttn): services that pass SQL validation, but fail pydantic validation
-- don't have a score... scoring must be done on pydantic validated data
-- this filter is a temporary workaround until validation is done consistently
-- with pydantic
WHERE scores.score IS NOT NULL
)

SELECT * FROM final

0 comments on commit c54975c

Please sign in to comment.