Skip to content

Commit

Permalink
Expose more metrics in Scarf
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Dec 19, 2024
1 parent bc5f630 commit d6ab6b7
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,6 @@ def _missing_value_(cls, value): # type: ignore

DBT_COMPILE_TASK_ID = "dbt_compile"

TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{cosmos_task_count}"
TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{is_cosmos_dag}/{cosmos_task_groups_count}/{task_count}/{cosmos_task_count}"
TELEMETRY_VERSION = "v1"
TELEMETRY_TIMEOUT = 5.0
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import functools
import time
from contextlib import contextmanager
from typing import Generator

from airflow.listeners import hookimpl
from airflow.models.dag import DAG
Expand All @@ -9,6 +12,17 @@
from cosmos import telemetry
from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.log import get_logger

logger = get_logger(__name__)


@contextmanager
def measure_time() -> Generator[None, None, None]:
start = time.perf_counter()
yield
end = time.perf_counter()
logger.info(f"DAG listener metrics collection time: {end - start:.6f} seconds")


class EventStatus:
Expand Down Expand Up @@ -54,16 +68,19 @@ def uses_cosmos(dag: DAG) -> bool:
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str) -> None:
dag = dag_run.get_dag()

if not uses_cosmos(dag):
return
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.SUCCESS,
"task_count": len(dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(dag),
"cosmos_task_groups_count": total_cosmos_task_groups(dag),
"is_cosmos_dag": is_cosmos_dag(dag),
}

with measure_time():
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.SUCCESS,
"task_count": len(dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(dag),
"cosmos_task_groups_count": total_cosmos_task_groups(dag),
"is_cosmos_dag": is_cosmos_dag(dag),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)

Expand All @@ -73,13 +90,15 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None:
dag = dag_run.get_dag()
if not uses_cosmos(dag):
return
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.FAILED,
"task_count": len(dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(dag),
"cosmos_task_groups_count": total_cosmos_task_groups(dag),
"is_cosmos_dag": is_cosmos_dag(dag),
}

with measure_time():
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.FAILED,
"task_count": len(dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(dag),
"cosmos_task_groups_count": total_cosmos_task_groups(dag),
"is_cosmos_dag": is_cosmos_dag(dag),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
4 changes: 2 additions & 2 deletions cosmos/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from flask import abort, url_for
from flask_appbuilder import AppBuilder, expose

from cosmos.listeners import dag_run
from cosmos.listeners import dag_run_listener
from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud

if in_astro_cloud:
Expand Down Expand Up @@ -270,4 +270,4 @@ class CosmosPlugin(AirflowPlugin):
"href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs",
}
appbuilder_views = [item]
listeners = [dag_run]
listeners = [dag_run_listener]
14 changes: 10 additions & 4 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
"event_type": "dag_run",
"status": "success",
"dag_hash": "d151d1fa2f03270ea116cc7494f2c591",
"is_cosmos_dag": True,
"cosmos_task_groups_count": 0,
"task_count": 3,
"cosmos_task_count": 3,
}
is_success = telemetry.emit_usage_metrics(sample_metrics)
mock_httpx_get.assert_called_once_with(
f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3""",
f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3""",
timeout=5.0,
follow_redirects=True,
)
assert not is_success
log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3. Status code: 404. Message: Non existent URL"""
log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3. Status code: 404. Message: Non existent URL"""
assert caplog.text.startswith("WARNING")
assert log_msg in caplog.text

Expand All @@ -80,8 +83,11 @@ def test_emit_usage_metrics_succeeds(caplog):
"platform_machine": "amd64",
"event_type": "dag_run",
"status": "success",
"dag_hash": "d151d1fa2f03270ea116cc7494f2c591",
"cosmos_task_count": 3,
"dag_hash": "dag-hash-ci",
"is_cosmos_dag": False,
"cosmos_task_groups_count": 1,
"task_count": 33,
"cosmos_task_count": 33,
}
is_success = telemetry.emit_usage_metrics(sample_metrics)
assert is_success
Expand Down

0 comments on commit d6ab6b7

Please sign in to comment.