From 3def0164192111f7b78ff7c014e3c46cb597143e Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Mon, 15 Jul 2024 17:42:03 +0200 Subject: [PATCH] __wip: Add a first "monitoring" tool We are trying to get better at counting, alerting and reporting the quality of our data. In the end, an alerting system that takes into account the past quality (freshness, number of lines, empty columns, geolocation scores, etc) should be available for us to be warned when a serious data quality issue appears. On the top of that, an always-available dashboard that imeediately tells us about the current data quality would be a must. (data-inclusion) [victor@W]<~/dev/data-inclusion/pipeline> python dags/monitor.py ! ERROR query='SELECT COUNT(*) FROM public_intermediate.int_annuaire_du_service_public__structures;' failed, table does not exist ! ERROR query='SELECT COUNT(*) FROM fredo.structures;' failed, table does not exist ! ERROR query='SELECT COUNT(*) FROM public_intermediate.int_fredo__structures;' failed, table does not exist ! ERROR query='SELECT COUNT(*) FROM public_intermediate.int_un_jeune_une_solution__services;' failed, table does not exist ! ERROR query='SELECT COUNT(*) FROM public_intermediate.int_un_jeune_une_solution__structures;' failed, table does not exist source stream raw staging intermediate marts action_logement services 26 23 2760 0 action_logement structures 123 120 120 0 agefiph services 31 31 27 25 annuaire_du_service_public etablissements 68500 68500 0 0 cd35 organisations 3538 3538 3538 3546 cd72 services 474 0 0 0 cd72 structures 217 474 474 457 data_inclusion services 47 44 44 50 data_inclusion structures 22 19 19 20 dora services 17635 11619 11619 10042 dora structures 8507 8507 8507 7786 emplois_de_linclusion organisations 8575 8575 15811 15802 emplois_de_linclusion siaes 7236 7236 15811 15802 finess etablissements 95746 95746 15934 0 france_travail agences 888 888 888 888 france_travail services 28 25 22200 20424 fredo structures 0 0 0 0 mediation_numerique services 20845 19463 19463 18830 mediation_numerique structures 20845 19463 19463 18830 mes_aides aides 684 684 959 957 mes_aides garages 907 907 869 868 reseau_alpha formations 388 388 388 757 reseau_alpha structures 757 757 757 729 siao etablissements 19422 19422 19422 0 soliguide lieux 22223 22223 22028 21041 un_jeune_une_solution benefits 933 933 0 0 un_jeune_une_solution institutions 713 713 0 0 --- pipeline/dags/monitor.py | 104 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 pipeline/dags/monitor.py diff --git a/pipeline/dags/monitor.py b/pipeline/dags/monitor.py new file mode 100644 index 00000000..649a4fb4 --- /dev/null +++ b/pipeline/dags/monitor.py @@ -0,0 +1,104 @@ +import os +from collections import defaultdict + +import psycopg2 + +from dag_utils import sources + +STREAM_NAME_MAP = { + "mes_aides": { + "garages": "structures", + "aides": "services", + }, + "siao": { + "etablissements": "structures", + }, + "finess": { + "etablissements": "structures", + }, + "cd35": { + "organisations": "structures", + }, + "emplois_de_linclusion": { + "siaes": "structures", + "organisations": "structures", + }, + "un_jeune_une_solution": { + "benefits": "services", + "institutions": "structures", + }, + "annuaire_du_service_public": { + "etablissements": "structures", + }, + "soliguide": { + "lieux": "structures", + }, + "reseau_alpha": { + "formations": "services", + }, + "france_travail": { + "agences": "agences", + }, +} + +CONN_STRING = os.environ.get("AIRFLOW_CONN_PG") + +COUNTS_BY_SOURCE = {} +for raw_source, conf in sorted(sources.SOURCES_CONFIGS.items()): + source = raw_source.replace("-", "_") + COUNTS_BY_SOURCE[source] = {} + for stream in sorted(conf["streams"]): + COUNTS_BY_SOURCE[source][stream] = defaultdict(int) + +with psycopg2.connect(CONN_STRING) as conn: + cursor = conn.cursor() + + def safe_execute(query): + try: + cursor.execute(query) + return cursor.fetchall()[0][0] + except psycopg2.errors.UndefinedTable: + print(f"! ERROR {query=} failed, table does not exist") + cursor.execute("ROLLBACK") + conn.commit() + return 0 + + for source, streams in COUNTS_BY_SOURCE.items(): + for stream in streams: + COUNTS_BY_SOURCE[source][stream]["raw"] = safe_execute( + f"SELECT COUNT(*) FROM {source}.{stream};", + ) + COUNTS_BY_SOURCE[source][stream]["staging"] = safe_execute( + f"SELECT COUNT(*) FROM public_staging.stg_{source}__{stream};", + ) + if stream in STREAM_NAME_MAP.get(source, {}): + int_stream_name = STREAM_NAME_MAP[source][stream] + else: + int_stream_name = stream + + COUNTS_BY_SOURCE[source][stream]["intermediate"] = safe_execute( + "SELECT COUNT(*) FROM " + f"public_intermediate.int_{source}__{int_stream_name};", + ) + + # FIXME(vperron): This 'agences' should be fixed, it's the only exception + stream_kind = ( + "structure" + if int_stream_name in ("structures", "agences") + else "service" + ) + src_canonical = source.replace("_", "-") + COUNTS_BY_SOURCE[source][stream]["marts"] = safe_execute( + "SELECT COUNT(*) FROM " + f"public.{stream_kind} WHERE source = '{src_canonical}';", + ) + +print( + f"{'source':<40}{'stream':<20}{'raw':>10}{'staging':>10}{'intermediate':>15}{'marts':>10}" +) +for source, streams in COUNTS_BY_SOURCE.items(): + for stream, counts in streams.items(): + print( + f"{source:<40}{stream:<20}{counts["raw"]:>10}{counts["staging"]:>10}" + f"{counts["intermediate"]:>15}{counts["marts"]:>10}" + )