It's a very simple minimalistic project to clean and deploy datasets in real time using python. Its ideal for small and mid-sized organizations that want to deploy into production a data-processing solution with very few steps and cheap costs.
It supports batch and streaming processing and it comes with a tool to manually test pipline and unit-test your pipeline.
I was overwhelmed with all the steps and products needed to deploy a serious data pipeline in the cloud. So I created dataflow to do the same thing in a smaller scale, without barely any steps and tools.
- All pipelines are commited into a git repository, making them easy to collaborate, tracking versions, etc.
- Each pipeline transformation is a unit-tested python file, making them easier to maintain and detect errors in the future.
- Deploy pipelines by pushing into git with continuous integration.
- Ingest or create datasets from any SQL Database, Heroku, Big Query and CSV files from buckets.
Note: This product works and is being used to process thousands of records per day, but its being maintained by a small team and you may encounter some bugs along the way.
- First, install the project, (only the first time, 4Geeks employees skip this step)
- Create your first pipeline using the dataflow-project-template, this step represents 90% of the effort, the result should be a functional fully tested pipeline ready to be deployed. Please read the documentation carefully and make sure to follow all steps.
- Once your project and piplines are ready to be deployed into production, publish the project as a github repository.
- Create the project inside the dataflow django
/admin/dataflow/project/add/
interface, to do so you will need the github repo URL you just created in the previous step. - Pull the project code from github.
- Make sure the pipeline has been properly listed in the Pipelines List.
- The pipeline transformations are also listed in the admin, make sure the "order" collumn matches what you specified in the project yml.
- Specify the input and output datasources to be used, you can specify CSV Files, SQL Databases or Google Big Query.
- Run your pipeline. If you have any errors while running your pipeline please refer to the debugging section.
Dataflow can retrieve or store datasets of information from and into CSV files, SQL Databases and Google BigQuery. New source types will be added in the future.
A pipeline is all the steps needed to clean an incoming source dataset and save it into another dataset.
- One pipeline is comprised with one or many data transformations.
- One pipeline has one or more sources of information (in batch or streaming).
- One pipeline has one destination dataset.
By default, pipelines run in batch, which basically means that one (or more) entire dataset is sent to the transformation queue to be cleaned.
Sometimes you need to process a single incoming item into the dataset, instead of cleaning the whole dataset again you only want to clean that single item before adding it to the dataset (one at a time). This is what we call a stream
.
Dataflow can provide a URL endpoint that can be called every time an incoming stream will arrive.
You only need to install this project once, no matter how many piplines or sub-projects you have. If you work at 4Geeks you don't have to install it because its already installed under BreatheCode Dataflow.
If you are not a 4Geeks employee, and you want to continue installing the dataflow core, follow this steps:
- This project must be deployed in heroku (recommended) or any other cloud, you will need a Redis server for async processing and a Postgres Database for the whole system.
- Once deployed, a cron job must be configured to run the command
python manage.py run_pipeline
every 10 min, this will be the lowest time delta that can be used to run a batch pipeline, for example: You will not be able to run a batch pipeline every9 min
, only10 min
or more.
- Since pipelines are divided and atomized into transformations; it makes sense to start the debugging process by verifying which transformation failed.
- You can check the list of the transformations for the column
status
with valueERROR
. - Open the transformation and check for the
stdout
value, this is the buffer stdout that was created while running the transformation, everyprint
statement, error or warning should show up here.
To delete all the records:
$ python manage.py clean_execution_history
To delete all the records of an specific pipeline:
$ python manage.py clean_execution_history --pipeline=<pipeline_slug>
To delete all the records older than a specific number of days:
$ python manage.py clean_execution_history --days_old=<days_old>
To delete all the records from a specific pipeline an older than a specific number of days:
$ python manage.py clean_execution_history --pipeline=<pipeline_slug> --days_old=<days_old>