-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8dca03a
commit 7765a1f
Showing
2 changed files
with
41 additions
and
120 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,148 +1,69 @@ | ||
Example Production ETL System - TPC-H | ||
===================================== | ||
# Easy Scalable Production ETL | ||
|
||
This is an example system that runs regular jobs on a schedule, at scale, on | ||
the cloud using a variety of technologies: | ||
**This repository is a lightweight scalable example pipeline that runs large Python jobs on a schedule in the cloud. We hope this example is easy to copy and modify for your own needs.** | ||
|
||
- **Prefect:** for job scheduling and workflow management | ||
- **Coiled::** for cloud hardware provisioning | ||
- **Dask:** for parallel processing | ||
- **Parquet** and **Deltalake:** for data storage | ||
- **XGBoost:** for model training | ||
- **Streamlit** and **Fast API:** for dashboarding and model hosting | ||
## Background | ||
|
||
Raw data flows in every minute and is transformed through several stages at | ||
different scales and cadences. | ||
It’s common to run regular large-scale Python jobs on the cloud as part of production data pipelines. Modern workflow orchestration systems like Prefect, Dagster, Airflow, Argo, etc. all work well for running jobs on a regular cadence, but we often see groups struggle with complexity around cloud infrastructure and lack of scalability. | ||
|
||
- **Data Generation:** (*every 30 seconds*) new JSON files appear | ||
- **Data Preprocessing:** (every minute) JSON gets transformed into Parquet / Delta | ||
- **Data Compaction:** (every 30 minutes) small Parquet files get repartitioned into larger ones | ||
- **Business Queries:** (every hour) large scale multi-table analysisqueries run that feed dashboards | ||
- **Model Training:** (every six hours) with XGBoost | ||
This repository contains a scalable data pipeline that runs regular jobs on the cloud with [Coiled](https://docs.coiled.io/user_guide/index.html) and Prefect. This approach is: | ||
|
||
Additionally we keep Streamlit and FastAPI servers up and running. | ||
|
||
It looks kinda like this: | ||
- **Easy to deploy** on the cloud | ||
- **Scalable** across many cloud machines | ||
- **Cheap to run** on ephemeral VMs and a small always-on VM | ||
|
||
![ETL Pipeline](images/excalidraw.png) | ||
|
||
How this works | ||
-------------- | ||
|
||
### Concurrent Flows | ||
|
||
The file [workflow.py](workflow.py) defines several Prefect flows operating at | ||
different cadences: | ||
|
||
```python | ||
# workflow.py | ||
... | ||
|
||
generate = generate_json.to_deployment( | ||
name="generate_data", | ||
interval=timedelta(seconds=30), | ||
) | ||
preprocess = json_to_parquet.to_deployment( | ||
name="preprocess", | ||
interval=timedelta(minutes=1), | ||
) | ||
train = update_model.to_deployment( | ||
name="train", | ||
interval=timedelta(hours=6), | ||
) | ||
|
||
prefect.serve( | ||
generate, | ||
preprocess, | ||
train, | ||
... | ||
) | ||
``` | ||
|
||
### Coiled Functions + Prefect Tasks | ||
## How to run | ||
|
||
These flows are defined in files like [pipeline/preprocess.py](pipeline/preprocess.py), which combine prefect tasks either with Coiled functions for remote execution like the following: | ||
_You can run this pipeline yourself, either locally or on the cloud._ | ||
|
||
```python | ||
# pipeline/preprocess.py | ||
|
||
import coiled | ||
from prefect import task, flow | ||
import pandas as pd | ||
Make sure you have a [Prefect cloud](https://www.prefect.io/cloud) account and have authenticated your local machine. | ||
|
||
@task | ||
@coiled.function(region="us-east-2", memory="64 GiB") | ||
def json_to_parquet(filename): | ||
df = pd.read_json(filename) | ||
df.to_parquet(OUTFILE / filename.split(".")[-2] + ".parquet") | ||
Clone this repository and install dependencies: | ||
|
||
@flow | ||
def convert_all_files(): | ||
files = list_files() | ||
json_to_parquet.map(files) | ||
```bash | ||
git clone https://github.com/coiled/etl-tpch | ||
cd etl-tpch | ||
mamba env create -f environment.yml | ||
mamba activate etl-tpch | ||
``` | ||
|
||
### Dask Clusters for larger jobs | ||
|
||
Or in files like [pipeline/reduce.py](pipeline/reduce.py) which create larger | ||
clusters on-demand. | ||
|
||
```python | ||
# pipeline/reduce.py | ||
### Local | ||
|
||
import coiled | ||
from prefect import task, flow | ||
import dask.dataframe as dd | ||
In your terminal run: | ||
|
||
@task | ||
def query_for_bad_accounts(bucket): | ||
with coiled.Cluster(n_workers=20, region="us-east-2") as cluster: | ||
with cluster.get_client() as client: | ||
df = dd.read_parquet(bucket) | ||
|
||
... # complex query | ||
|
||
result.to_parquet(...) | ||
```bash | ||
python workflow.py # Run data pipeline locally | ||
``` | ||
|
||
Data Generation | ||
--------------- | ||
### Cloud | ||
|
||
In the background we're generating the data ourselves. This data is taken from | ||
the TPC-H benchmark suite. It's a bunch of tables that simulate orders / | ||
customers / suppliers, and has a schema that looks like this: | ||
If you haven't already, create a Coiled account and follow the setup | ||
guide at [coiled.io/start](https://coiled.io/start). | ||
|
||
![TPC-H Schema](https://docs.snowflake.com/en/_images/sample-data-tpch-schema.png) | ||
Next, adjust the [`pipeline/config.yml`](pipeline/config.yml) | ||
configuration file by setting `local: false` and `data-dir` to an | ||
S3 bucket where you would like data assets to be stored. | ||
|
||
This gives the system a sense of realism, while still being able to tune up or | ||
down in scale and run easily as an example. | ||
|
||
How to Run Locally | ||
------------------ | ||
|
||
Make sure you have a [Prefect cloud](https://www.prefect.io/cloud) account and have authenticated your local machine. | ||
|
||
Create a software environment: | ||
Set `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` and `AWS_REGION` | ||
environment variables that enable access to your S3 bucket and | ||
specify the region the bucket is in, respectively. | ||
|
||
```bash | ||
mamba env create -f environment.yml | ||
export AWS_ACCESS_KEY_ID=... | ||
export AWS_SECRET_ACCESS_KEY=... | ||
export AWS_REGION=... | ||
``` | ||
|
||
Then run each command below in separate terminal windows: | ||
Finally, in your terminal run: | ||
|
||
```bash | ||
python serve_model.py # Serve ML model | ||
``` | ||
```bash | ||
python workflow.py # Run ETL pipeline | ||
``` | ||
```bash | ||
streamlit run dashboard.py # Serve dashboard | ||
coiled prefect serve \ | ||
--vm-type t3.medium \ # Small, always-on VM | ||
--region $AWS_REGION \ # Same region as data | ||
-f dashboard.py -f pipeline \ # Include pipeline files | ||
-e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ # S3 bucket access | ||
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ | ||
workflow.py | ||
``` | ||
|
||
Watch things run, both in your terminals and on Prefect cloud, and then ctrl-C to shut everything down. | ||
|
||
How to Run in the Cloud | ||
----------------------- | ||
|
||
This all works, we just haven't documented it well yet. |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.