A provider you can install into your Airflow environment to access custom Ray XCom backends, Ray Hooks, and Ray Operators.
This provider is an experimental alpha containing necessary components to orchestrate and schedule Ray tasks using Airflow. It is actively maintained and being developed to bring production-ready workflows to Ray using Airflow. Thie release contains everything needed to begin building these workflows using the Airlfow taskflow API.
Current Release: 0.2.1
Visit the Ray Project page for more info on Ray.
⚠️ The server version and client version (build) of Ray MUST be the same.
- Python Version >= 3.7
- Airflow Version >= 2.0.0
- Ray Version == 1.3.0
- Filelock >= 3.0.0
- Ray XCom Backend: Custom XCom backend to assist operators in moving data between tasks using the Ray API with its internal Plasma store, thereby allowing for in-memory distributed processing and handling of large data objects.
- Ray Hook: Extension of
Http
hook that uses the Ray client to provide connections to the Ray Server. - Ray Decorator: Task decorator
to be used with the task flow API, combining wrapping the existing airflow
@task
decorate withray.remote
functionality, thereby executing each task on the ray cluster.
-
Add the provider package wheel file to the root directory of your Airflow project.
-
In your Airflow
Dockerfile
, you will need to add an environment variable to specify your custom backend, along with the provider wheel install. Add the following:FROM quay.io/astronomer/ap-airflow:2.0.2-1-buster-onbuild USER root RUN pip uninstall astronomer-airflow-version-check -y USER astro ENV AIRFLOW__CORE__XCOM_BACKEND=ray_provider.xcom.ray_backend.RayBackend
Check ap-airflow version, if unsure, change to
ap-airflow:latest-onbuild
-
We are using a Ray
1.3.0
and python version3.7
. To get a bleeding edge version of Ray, you can to follow this format to build the wheel url in yourrequirements.txt
file:pip install airflow-provider-ray
-
Configure Ray Locally. To run ray locally, you'll need a minimum 6GB of free memory.To start, in your environment with ray installed, run:
(venv)$ ray start --num-cpus=8 --object-store-memory=7000000000 --head
If you have extra resources, you can bump the memory up.
You should now be able to open the ray dashboard at http://127.0.0.1:8265/.
-
Start your Airflow environment and open the UI.
-
In the Airflow UI, add an
Airflow Pool
with the following:Pool (name): ray_worker_pool Slots: 25
-
In the Airflow UI, add an
Airflow Connection
with the following:Conn Id: ray_cluster_connection Conn Type: HTTP Host: Cluster IP Address, with basic Auth params if needed Port: 10001
-
In your Airflow DAG python file, you must include the following in your
default_args
dictionary:from ray_provider.xcom.ray_backend import RayBackend . . . default_args = { 'on_success_callback': RayBackend.on_success_callback, 'on_failure_callback': RayBackend.on_failure_callback, . . . } @dag( default_args=default_args, . . ) def ray_example_dag(): # do stuff
-
Using the taskflow API, your airflow task should now use the
@ray_task
decorator for any ray task and add theray_conn_id
, parameter astask_args
, like:from ray_provider.decorators import ray_task default_args = { 'on_success_callback': RayBackend.on_success_callback, 'on_failure_callback': RayBackend.on_failure_callback, . . . } task_args = {"ray_conn_id": "ray_cluster_connection"} . . . @dag( default_args=default_args, . . ) def ray_example_dag(): @ray_task(**task_args) def sum_cols(df: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame(df.sum()).T
This project is built in collaboration between Astronomer and Anyscale, with active contributions from:
This project is formatted via black
:
pip install black
black .
TBD - [Info on building a connection to Ray]