Skip to content

Commit

Permalink
add dag
Browse files Browse the repository at this point in the history
  • Loading branch information
Tburm committed Aug 23, 2024
1 parent 6d4eb5e commit dce4d90
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 2 deletions.
87 changes: 87 additions & 0 deletions scheduler/dags/gov_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.operators.latest_only import LatestOnlyOperator
from datetime import datetime, timedelta
from docker.types import Mount

# environment variables
WORKING_DIR = os.getenv("WORKING_DIR")
NETWORK_RPCS = {
"snax_testnet": "NETWORK_13001_RPC",
}

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 8, 1),
"retries": 3,
"retry_delay": timedelta(minutes=1),
"catchup": False,
}


def create_docker_operator(dag, task_id, config_file, image, command, network_env_var):
return DockerOperator(
task_id=task_id,
command=f"python main.py {config_file}" if command is None else command,
image=image,
api_version="auto",
auto_remove=True,
docker_url="unix://var/run/docker.sock",
network_mode="data_data",
mounts=[
Mount(
source=f"{WORKING_DIR}/parquet-data",
target="/parquet-data",
type="bind",
)
],
environment={
"WORKING_DIR": WORKING_DIR,
"PG_PASSWORD": os.getenv("PG_PASSWORD"),
network_env_var: os.getenv(network_env_var),
},
dag=dag,
)


def create_dag(network, rpc_var, target="dev"):
version = f"{network}_{target}"

dag = DAG(
f"v3_etl_{version}",
default_args=default_args,
description=f"ETL pipeline for {version}",
schedule_interval="0 16 * * *",
)

latest_only_task = LatestOnlyOperator(task_id=f"latest_only_{version}", dag=dag)

transform_task_id = f"transform_{version}"
transform_task = create_docker_operator(
dag=dag,
task_id=transform_task_id,
config_file=None,
image="data-transformer",
command=f"dbt run --target prod --select tag:{network} --profiles-dir profiles --profile synthetix",
network_env_var=rpc_var,
)

test_task_id = f"test_{version}"
test_task = create_docker_operator(
dag=dag,
task_id=test_task_id,
config_file=None,
image="data-transformer",
command=f"dbt test --target prod --select tag:{network} --profiles-dir profiles --profile synthetix",
network_env_var=rpc_var,
)

latest_only_task >> transform_task >> test_task
return dag


for network, rpc_var in NETWORK_RPCS.items():
for target in ["dev", "prod"]:
globals()[f"gov_etl_{network}_{target}"] = create_dag(network, rpc_var, target)
4 changes: 2 additions & 2 deletions transformers/synthetix/models/raw/snax/testnet/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ models:
- dbt_utils.recency:
datepart: hour
field: block_timestamp
interval: 24
interval: 1
columns:
- name: id
description: Unique identifier for the event
Expand Down Expand Up @@ -85,7 +85,7 @@ models:
- dbt_utils.recency:
datepart: hour
field: block_timestamp
interval: 24 # Checking for events in the last 24 hours
interval: 1
columns:
- name: id
description: Unique identifier for the vote withdrawal event
Expand Down

0 comments on commit dce4d90

Please sign in to comment.