Skip to content

Commit

Permalink
Add ClickHouse ingestion dag
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-snx committed Oct 3, 2024
1 parent 6bbdb25 commit d16ad0c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
3 changes: 3 additions & 0 deletions scheduler/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ ENV PATH="/home/airflow/venv/bin:$PATH"
COPY transformers/requirements.txt /transformers_requirements.txt
RUN pip install --no-cache-dir -r /transformers_requirements.txt

COPY clickhouse/requirements.txt /clickhouse_requirements.txt
RUN pip install --no-cache-dir -r /clickhouse_requirements.txt

ENV AIRFLOW__CORE__LOAD_EXAMPLES=False
ENV AIRFLOW__WEBSERVER__RBAC=True

Expand Down
17 changes: 16 additions & 1 deletion scheduler/dags/v3_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ def create_dag(network, rpc_var, target="dev"):
""",
)

clickhouse_import_task_id = f"clickhouse_import_{version}"
clickhouse_import_task = create_bash_operator(
dag=dag,
task_id=clickhouse_import_task_id,
command=f"""
source /home/airflow/venv/bin/activate && python /clickhouse/clickhouse_import.py --network {network}
""",
)

transform_task_id = f"transform_{version}"
transform_task = create_bash_operator(
dag=dag,
Expand Down Expand Up @@ -149,7 +158,13 @@ def create_dag(network, rpc_var, target="dev"):
>> test_task
)
else:
latest_only_task >> sync_repo_task >> transform_task >> test_task
(
latest_only_task
>> sync_repo_task
>> clickhouse_import_task
>> transform_task
>> test_task
)

return dag

Expand Down

0 comments on commit d16ad0c

Please sign in to comment.