Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jkleinkauff committed Jan 24, 2022
0 parents commit 3853069
Show file tree
Hide file tree
Showing 46 changed files with 1,622 additions and 0 deletions.
Binary file added .DS_Store
Binary file not shown.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/dags/__pycache__
/dags/data/*
/logs
/plugins
/venv
/terraform/.terraform
/terraform/.terraform.lock.hcl
/src/etl_data/parquets/*
/src/etl_data/parquets_processed/*
/src/etl_data/dags/*
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.formatting.provider": "black"
}
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
psql-connect:
psql --host=localhost --port=5433 --username=postgres

build:
docker build -t etl-csv -f docker/Dockerfile .

build-up:
docker-compose -f docker-compose.yaml -f docker-compose-superset.yaml up
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
## How to start the project

In your /docker/.env file, add the following variables:

```
DATA_DIR="/.../<cloned-dir>/src/etl_data"
DAGS_DIR="/.../<cloned-dir>/dags/data"
```

```
make build
make build-up
```

The following services should start:
```
➜ project git:(master) ✗ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
357b712341f4 apache/airflow:2.2.3 "/usr/bin/dumb-init …" 5 hours ago Up 5 hours 8080/tcp xxx_airflow-scheduler_1
372d440f1b6a apache/airflow:2.2.3 "/usr/bin/dumb-init …" 5 hours ago Up 5 hours (healthy) 0.0.0.0:5555->5555/tcp, 8080/tcp xxx_flower_1
77eadeb3c0aa apache/airflow:2.2.3 "/usr/bin/dumb-init …" 5 hours ago Up 5 hours (healthy) 0.0.0.0:8080->8080/tcp xxx_airflow-webserver_1
7f68845efbc3 apache/airflow:2.2.3 "/usr/bin/dumb-init …" 5 hours ago Up 5 hours 8080/tcp xxx_airflow-worker_1
af9fb24ad6ff postgres:13 "docker-entrypoint.s…" 5 hours ago Up 5 hours (healthy) 5432/tcp xxx_postgres_1
4dadbd44de13 postgres "docker-entrypoint.s…" 5 hours ago Up 5 hours 0.0.0.0:5433->5432/tcp xxx_db_app_1
432837c58485 redis:latest "docker-entrypoint.s…" 5 hours ago Up 5 hours (healthy) 0.0.0.0:6379->6379/tcp superset_cache
04ee1864109a apache/superset:latest-dev "/app/docker/docker-…" 14 hours ago Up 7 hours (unhealthy) 8088/tcp superset_worker
8254b5f3d984 apache/superset:latest-dev "/app/docker/docker-…" 14 hours ago Up 7 hours (unhealthy) 8088/tcp superset_worker_beat
b35323ed9f27 apache/superset:latest-dev "/app/docker/docker-…" 14 hours ago Up 7 hours (healthy) 0.0.0.0:8088->8088/tcp superset_app
aeefa7a99c4e postgres:10 "docker-entrypoint.s…" 14 hours ago Up 7 hours 5432/tcp superset_db
```

48 changes: 48 additions & 0 deletions dags/etl_create_dm_dll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import datetime

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the Postgres Operator

with DAG(
dag_id="etl-dm-dll",
start_date=datetime.datetime(2020, 2, 2),
schedule_interval="@once",
catchup=False,
) as dag:
# create_pet_table = PostgresOperator(
# task_id="create_pet_table",
# sql="sql/pet_schema.sql",
# )

d_region = PostgresOperator(
task_id="create_d_region",
sql="sql/d_region.sql",
)

d_datasource = PostgresOperator(
task_id="create_d_datasource",
sql="sql/d_datasource.sql",
)

d_date = PostgresOperator(
task_id="create_d_date",
sql="sql/d_date.sql",
)

f_trips_staging = PostgresOperator(
task_id="create_f_trips_staging",
sql="sql/f_trips_staging.sql",
)

f_trips = PostgresOperator(
task_id="create_f_trips",
sql="sql/f_trips.sql",
)


d_region >> f_trips_staging >> f_trips
d_datasource >> f_trips_staging >> f_trips
d_date >> f_trips_staging >> f_trips
50 changes: 50 additions & 0 deletions dags/etl_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from datetime import datetime, timedelta
import os
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.docker_operator import DockerOperator
from docker.types import Mount


default_args = {
"owner": "airflow",
"description": "Job to ingest CSV data and convert to parquet",
"depend_on_past": False,
"start_date": datetime(2021, 5, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

abc = os.getenv("LOCAL_ETL_DATA")

with DAG(
"etl-ingestion-dag",
default_args=default_args,
schedule_interval="*/15 * * * *",
catchup=False,
) as dag:
start_dag = DummyOperator(task_id="start_dag")

t_ingest = DockerOperator(
task_id="task-ingest-convert-csv",
image="etl-csv",
# container_name="task_ingest_convert_csv",
api_version="auto",
auto_remove=True,
# command="/bin/sleep 30",
command=["python", "src/stream-csv-read-csv/read_convert.py"],
# docker_url="tcp://docker-proxy:2375",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
mounts=[
Mount(
source=os.getenv("DATA_DIR"),
target="/app/src/etl_data",
type="bind",
)
],
)

start_dag >> t_ingest
123 changes: 123 additions & 0 deletions dags/etl_load_dm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from datetime import datetime, timedelta
import os
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from docker.types import Mount

default_args = {
"owner": "airflow",
"description": "Job to ingest CSV data and convert to parquet",
"depend_on_past": False,
"start_date": datetime(2021, 5, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG(
"etl-load-dm-dag",
default_args=default_args,
schedule_interval="*/60 * * * *",
catchup=False,
) as dag:
start_dag = DummyOperator(task_id="start_dag")

t_generate_data = DockerOperator(
task_id="task-generate-dm-csv-data",
image="etl-csv",
# container_name="task_ingest_convert_csv",
api_version="auto",
auto_remove=True,
command=["spark-submit", "src/fill-dm/generate-import.py"],
# docker_url="tcp://docker-proxy:2375",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
mounts=[
Mount(
source=os.getenv("DATA_DIR"),
target="/app/src/etl_data",
type="bind",
),
Mount(
source=os.getenv("DAGS_DIR"),
target="/app/src/etl_data/dags",
type="bind",
),
],
)

# truncate all - certainly not for production, would do a SCD for dim and facts

t_truncate_all = PostgresOperator(
task_id="truncate_all_tables_data",
sql="""
TRUNCATE f_trips_staging, f_trips, d_datasource, d_date, d_region RESTART IDENTITY;
""",
)

t_load_d_datasource = BashOperator(
task_id="load_d_datasource",
bash_command="cd $AIRFLOW_HOME'/dags'"
"&& ls"
"&& f=$(ls data/d_datasource.csv/*.csv| head -1)"
"&& PGPASSWORD=password psql --host host.docker.internal --port 5433 --username postgres -d postgres -c '''\copy d_datasource(datasource) FROM '$f' '' WITH csv; ' ",
)

t_load_d_region = BashOperator(
task_id="load_d_region",
bash_command="cd $AIRFLOW_HOME'/dags'"
"&& ls"
"&& f=$(ls data/d_region.csv/*.csv| head -1)"
"&& PGPASSWORD=password psql --host host.docker.internal --port 5433 --username postgres -d postgres -c '''\copy d_region(region_name) FROM '$f' '' WITH csv; ' ",
)

t_load_d_date = BashOperator(
task_id="load_d_date",
bash_command="cd $AIRFLOW_HOME'/dags'"
"&& ls"
"&& f=$(ls data/d_date.csv/*.csv| head -1)"
"&& PGPASSWORD=password psql --host host.docker.internal --port 5433 --username postgres -d postgres -c '''\copy d_date(date,year,month,day) FROM '$f' '' WITH csv; ' ",
)

t_load_f_trips_staging = BashOperator(
task_id="load_f_trips_staging",
bash_command="cd $AIRFLOW_HOME'/dags'"
"&& ls"
"&& f=$(ls data/f_trips_staging.csv/*.csv| head -1)"
"&& PGPASSWORD=password psql --host host.docker.internal --port 5433 --username postgres -d postgres "
" -c '''\copy f_trips_staging(origin_coord_x,origin_coord_y,destination_coord_x,destination_coord_y,date,region,datasource, business_key) FROM '$f' '' WITH csv; ' ",
)

t_merge_f_trips = PostgresOperator(
task_id="merge_f_trips",
sql="""
insert into f_trips(origin, destination, sk_region, sk_datasource, sk_date, sk_business)
select
stg.origin_coord_point
, stg.destination_coord_point
, region.id
, datasource.id
, date.id
, stg.business_key
from f_trips_staging as stg
join d_region as region
on stg.region = region.region_name
join d_datasource as datasource
on stg.datasource = datasource.datasource
join d_date as date
on stg."date" = date.date
where stg.business_key not in (
select sk_business from f_trips
)
""",
)

start_dag >> t_truncate_all >> t_generate_data
t_generate_data >> t_load_d_datasource >> t_load_f_trips_staging
t_generate_data >> t_load_d_region >> t_load_f_trips_staging
t_generate_data >> t_load_d_date >> t_load_f_trips_staging
t_load_f_trips_staging >> t_merge_f_trips
47 changes: 47 additions & 0 deletions dags/etl_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from datetime import datetime, timedelta
import os
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.docker_operator import DockerOperator
from docker.types import Mount

default_args = {
"owner": "airflow",
"description": "Job to ingest CSV data and convert to parquet",
"depend_on_past": False,
"start_date": datetime(2021, 5, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG(
"etl-process-ingested-dag",
default_args=default_args,
schedule_interval="*/30 * * * *",
catchup=False,
) as dag:
start_dag = DummyOperator(task_id="start_dag")

t_data_process = DockerOperator(
task_id="task-process-data-csv",
image="etl-csv",
# container_name="task_ingest_convert_csv",
api_version="auto",
auto_remove=True,
# command="/bin/sleep 30",
command=["spark-submit", "src/process/spark_process.py"],
# docker_url="tcp://docker-proxy:2375",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
mounts=[
Mount(
source=os.getenv("DATA_DIR"),
target='/app/src/etl_data',
type='bind'
)
],
)

start_dag >> t_data_process
6 changes: 6 additions & 0 deletions dags/sql/d_datasource.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DROP TABLE IF EXISTS d_datasource CASCADE;
CREATE TABLE d_datasource (
id INT GENERATED ALWAYS AS IDENTITY,
datasource VARCHAR(51),
PRIMARY KEY(id)
);
9 changes: 9 additions & 0 deletions dags/sql/d_date.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DROP TABLE IF EXISTS d_date;
CREATE TABLE d_date (
id INT GENERATED ALWAYS AS IDENTITY,
date date,
year int,
month int,
day int,
PRIMARY KEY(id)
)
6 changes: 6 additions & 0 deletions dags/sql/d_region.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DROP TABLE IF EXISTS d_region CASCADE;
CREATE TABLE d_region (
id INT GENERATED ALWAYS AS IDENTITY,
region_name VARCHAR(50),
PRIMARY KEY(id)
)
15 changes: 15 additions & 0 deletions dags/sql/f_trips.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS f_trips;

CREATE TABLE f_trips (
id INT GENERATED ALWAYS AS IDENTITY,
origin point,
destination point,
sk_region INT,
sk_datasource INT,
sk_date INT,
sk_business UUID,
PRIMARY KEY(id),
FOREIGN KEY(sk_region) REFERENCES d_region(id),
FOREIGN KEY(sk_datasource) REFERENCES d_datasource(id),
FOREIGN KEY(sk_date) REFERENCES d_date
)
15 changes: 15 additions & 0 deletions dags/sql/f_trips_staging.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS f_trips_staging;
CREATE TABLE f_trips_staging (
id INT GENERATED ALWAYS AS IDENTITY,
origin_coord_x float,
origin_coord_y float,
origin_coord_point POINT GENERATED ALWAYS AS (point(origin_coord_x,origin_coord_y)) STORED,
destination_coord_x float,
destination_coord_y float,
destination_coord_point POINT GENERATED ALWAYS AS (point(destination_coord_x,destination_coord_y)) STORED,
date date,
region VARCHAR(50),
datasource VARCHAR(50),
business_key UUID,
PRIMARY KEY(id)
)
Loading

0 comments on commit 3853069

Please sign in to comment.