Skip to content

Commit

Permalink
__wip: Add a first "monitoring" tool
Browse files Browse the repository at this point in the history
We are trying to get better at counting, alerting and reporting the
quality of our data. In the end, an alerting system that takes into
account the past quality (freshness, number of lines, empty columns,
geolocation scores, etc) should be available for us to be warned when a
serious data quality issue appears.

On the top of that, an always-available dashboard that imeediately tells
us about the current data quality would be a must.

(data-inclusion) [victor@W]<~/dev/data-inclusion/pipeline> python dags/monitor.py
! ERROR query='SELECT COUNT(*) FROM public_intermediate.int_annuaire_du_service_public__structures;' failed, table does not exist
! ERROR query='SELECT COUNT(*) FROM fredo.structures;' failed, table does not exist
! ERROR query='SELECT COUNT(*) FROM public_intermediate.int_fredo__structures;' failed, table does not exist
! ERROR query='SELECT COUNT(*) FROM public_intermediate.int_un_jeune_une_solution__services;' failed, table does not exist
! ERROR query='SELECT COUNT(*) FROM public_intermediate.int_un_jeune_une_solution__structures;' failed, table does not exist
source                                  stream                     raw   staging   intermediate     marts
action_logement                         services                    26        23           2760         0
action_logement                         structures                 123       120            120         0
agefiph                                 services                    31        31             27        25
annuaire_du_service_public              etablissements           68500     68500              0         0
cd35                                    organisations             3538      3538           3538      3546
cd72                                    services                   474         0              0         0
cd72                                    structures                 217       474            474       457
data_inclusion                          services                    47        44             44        50
data_inclusion                          structures                  22        19             19        20
dora                                    services                 17635     11619          11619     10042
dora                                    structures                8507      8507           8507      7786
emplois_de_linclusion                   organisations             8575      8575          15811     15802
emplois_de_linclusion                   siaes                     7236      7236          15811     15802
finess                                  etablissements           95746     95746          15934         0
france_travail                          agences                    888       888            888       888
france_travail                          services                    28        25          22200     20424
fredo                                   structures                   0         0              0         0
mediation_numerique                     services                 20845     19463          19463     18830
mediation_numerique                     structures               20845     19463          19463     18830
mes_aides                               aides                      684       684            959       957
mes_aides                               garages                    907       907            869       868
reseau_alpha                            formations                 388       388            388       757
reseau_alpha                            structures                 757       757            757       729
siao                                    etablissements           19422     19422          19422         0
soliguide                               lieux                    22223     22223          22028     21041
un_jeune_une_solution                   benefits                   933       933              0         0
un_jeune_une_solution                   institutions               713       713              0         0
  • Loading branch information
vperron committed Jul 15, 2024
1 parent 16da257 commit 3def016
Showing 1 changed file with 104 additions and 0 deletions.
104 changes: 104 additions & 0 deletions pipeline/dags/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
from collections import defaultdict

import psycopg2

from dag_utils import sources

STREAM_NAME_MAP = {
"mes_aides": {
"garages": "structures",
"aides": "services",
},
"siao": {
"etablissements": "structures",
},
"finess": {
"etablissements": "structures",
},
"cd35": {
"organisations": "structures",
},
"emplois_de_linclusion": {
"siaes": "structures",
"organisations": "structures",
},
"un_jeune_une_solution": {
"benefits": "services",
"institutions": "structures",
},
"annuaire_du_service_public": {
"etablissements": "structures",
},
"soliguide": {
"lieux": "structures",
},
"reseau_alpha": {
"formations": "services",
},
"france_travail": {
"agences": "agences",
},
}

CONN_STRING = os.environ.get("AIRFLOW_CONN_PG")

COUNTS_BY_SOURCE = {}
for raw_source, conf in sorted(sources.SOURCES_CONFIGS.items()):
source = raw_source.replace("-", "_")
COUNTS_BY_SOURCE[source] = {}
for stream in sorted(conf["streams"]):
COUNTS_BY_SOURCE[source][stream] = defaultdict(int)

with psycopg2.connect(CONN_STRING) as conn:
cursor = conn.cursor()

def safe_execute(query):
try:
cursor.execute(query)
return cursor.fetchall()[0][0]
except psycopg2.errors.UndefinedTable:
print(f"! ERROR {query=} failed, table does not exist")
cursor.execute("ROLLBACK")
conn.commit()
return 0

for source, streams in COUNTS_BY_SOURCE.items():
for stream in streams:
COUNTS_BY_SOURCE[source][stream]["raw"] = safe_execute(
f"SELECT COUNT(*) FROM {source}.{stream};",
)
COUNTS_BY_SOURCE[source][stream]["staging"] = safe_execute(
f"SELECT COUNT(*) FROM public_staging.stg_{source}__{stream};",
)
if stream in STREAM_NAME_MAP.get(source, {}):
int_stream_name = STREAM_NAME_MAP[source][stream]
else:
int_stream_name = stream

COUNTS_BY_SOURCE[source][stream]["intermediate"] = safe_execute(
"SELECT COUNT(*) FROM "
f"public_intermediate.int_{source}__{int_stream_name};",
)

# FIXME(vperron): This 'agences' should be fixed, it's the only exception
stream_kind = (
"structure"
if int_stream_name in ("structures", "agences")
else "service"
)
src_canonical = source.replace("_", "-")
COUNTS_BY_SOURCE[source][stream]["marts"] = safe_execute(
"SELECT COUNT(*) FROM "
f"public.{stream_kind} WHERE source = '{src_canonical}';",
)

print(
f"{'source':<40}{'stream':<20}{'raw':>10}{'staging':>10}{'intermediate':>15}{'marts':>10}"
)
for source, streams in COUNTS_BY_SOURCE.items():
for stream, counts in streams.items():
print(
f"{source:<40}{stream:<20}{counts["raw"]:>10}{counts["staging"]:>10}"
f"{counts["intermediate"]:>15}{counts["marts"]:>10}"
)

0 comments on commit 3def016

Please sign in to comment.