Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support using callable config in @ray.task #103

Merged
merged 4 commits into from
Nov 29, 2024
Merged

Conversation

tatiana
Copy link
Collaborator

@tatiana tatiana commented Nov 29, 2024

The Ray provider 0.2.1 allowed users to define a hard-coded configuration to materialize the Kubernetes cluster. This PR aims to enable users to define a function that can receive the Airflow context and generate the configuration dynamically using context properties. This request came from an Astronomer customer.

There is an example DAG file illustrating how to use this feature. It has a parent DAG that triggers two child DAGs, which leverage the just introduced @ray.task callable configuration.

Closes #81

How to test

The screenshots below show their success using the local development instructions with Astro CLI.

Parent DAG, manually trigerred:
Screenshot 2024-11-29 at 12 15 13

Child 1 DAG, triggered by parent DAG:
Screenshot 2024-11-29 at 12 15 56

Example of logs that illustrate the RayCluster using dynamic configuration was created and used in Kubernetes, with its own IP address:

(...)
[2024-11-29T12:14:52.276+0000] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', 'ray_dynamic_config_child_1', 'process_data_with_ray', 'manual__2024-11-29T12:14:50.273712+00:00', '--job-id', '773', '--raw', '--subdir', 'DAGS_FOLDER/ray_dynamic_config.py', '--cfg-path', '/tmp/tmpkggwlv23']
[2024-11-29T12:14:52.278+0000] {logging_mixin.py:190} WARNING - /usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py:70 DeprecationWarning: This process (pid=238) is multi-threaded, use of fork() may lead to deadlocks in the child.
(...)
[2024-11-29T12:14:52.745+0000] {decorators.py:94} INFO - Using the following config {'conn_id': 'ray_conn', 'runtime_env': {'working_dir': '/usr/local/airflow/dags/ray_scripts', 'pip': ['numpy']}, 'num_cpus': 1, 'num_gpus': 0, 'memory': 0, 'poll_interval': 5, 'ray_cluster_yaml': '/usr/local/airflow/dags/scripts/first-254.yaml', 'xcom_task_key': 'dashboard'}
(...)
[2024-11-29T12:14:55.430+0000] {hooks.py:474} INFO - ::group::Create Ray Cluster
[2024-11-29T12:14:55.430+0000] {hooks.py:475} INFO - Loading yaml content for Ray cluster CRD...
[2024-11-29T12:14:55.451+0000] {hooks.py:410} INFO - Creating new Ray cluster: first-254
[2024-11-29T12:14:55.456+0000] {hooks.py:494} INFO - ::endgroup::
(...)
[2024-11-29T12:14:55.663+0000] {hooks.py:498} INFO - ::group::Setup Load Balancer service
[2024-11-29T12:14:55.663+0000] {hooks.py:334} INFO - Attempt 1: Checking LoadBalancer status...
[2024-11-29T12:14:55.669+0000] {hooks.py:278} ERROR - Error getting service first-254-head-svc: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '81b07ac4-db3b-48a6-b336-f52ae93bee55', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '955e8bb0-08b1-4d45-a768-e49387a9767c', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'd5240328-288d-4366-b094-d8fd793c7431', 'Date': 'Fri, 29 Nov 2024 12:14:55 GMT', 'Content-Length': '212'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services \"first-254-head-svc\" not found","reason":"NotFound","details":{"name":"first-254-head-svc","kind":"services"},"code":404}
[2024-11-29T12:14:55.669+0000] {hooks.py:355} INFO - LoadBalancer service is not available yet...
[2024-11-29T12:15:35.670+0000] {hooks.py:334} INFO - Attempt 2: Checking LoadBalancer status...
[2024-11-29T12:15:35.688+0000] {hooks.py:348} INFO - LoadBalancer is ready.
[2024-11-29T12:15:35.688+0000] {hooks.py:441} INFO - {'ip': '172.18.255.1', 'hostname': None, 'ports': [{'name': 'client', 'port': 10001}, {'name': 'dashboard', 'port': 8265}, {'name': 'gcs', 'port': 6379}, {'name': 'metrics', 'port': 8080}, {'name': 'serve', 'port': 8000}], 'working_address': '172.18.255.1'}

(...)

[2024-11-29T12:15:38.345+0000] {triggers.py:124} INFO - ::group:: Trigger 1/2: Checking the job status
[2024-11-29T12:15:38.345+0000] {triggers.py:125} INFO - Polling for job raysubmit_paxAkyLiKxEHPmwG every 5 seconds...
(...)
[2024-11-29T12:15:38.354+0000] {hooks.py:156} INFO - Dashboard URL is: http://172.18.255.1:8265
[2024-11-29T12:15:38.361+0000] {hooks.py:208} INFO - Job raysubmit_paxAkyLiKxEHPmwG status: PENDING
[2024-11-29T12:15:38.361+0000] {triggers.py:100} INFO - Status of job raysubmit_paxAkyLiKxEHPmwG is: PENDING
[2024-11-29T12:15:38.361+0000] {triggers.py:108} INFO - ::group::raysubmit_paxAkyLiKxEHPmwG logs
[2024-11-29T12:15:43.416+0000] {hooks.py:208} INFO - Job raysubmit_paxAkyLiKxEHPmwG status: RUNNING
[2024-11-29T12:15:43.416+0000] {triggers.py:100} INFO - Status of job raysubmit_paxAkyLiKxEHPmwG is: RUNNING
[2024-11-29T12:15:43.417+0000] {triggers.py:112} INFO - 2024-11-29 04:15:40,813	INFO worker.py:1429 -- Using address 10.244.0.140:6379 set in the environment variable RAY_ADDRESS
[2024-11-29T12:15:43.417+0000] {triggers.py:112} INFO - 2024-11-29 04:15:40,814	INFO worker.py:1564 -- Connecting to existing Ray cluster at address: 10.244.0.140:6379...
[2024-11-29T12:15:43.417+0000] {triggers.py:112} INFO - 2024-11-29 04:15:40,820	INFO worker.py:1740 -- Connected to Ray cluster. View the dashboard at �[1m�[32m10.244.0.140:8265 �[39m�[22m
[2024-11-29T12:15:48.430+0000] {hooks.py:208} INFO - Job raysubmit_paxAkyLiKxEHPmwG status: SUCCEEDED
[2024-11-29T12:15:48.430+0000] {triggers.py:112} INFO - Mean of this population is 12.0
[2024-11-29T12:15:48.430+0000] {triggers.py:112} INFO - �[36m(autoscaler +5s)�[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
[2024-11-29T12:15:48.430+0000] {triggers.py:112} INFO - �[36m(autoscaler +5s)�[0m Adding 1 node(s) of type small-group.
[2024-11-29T12:15:49.448+0000] {triggers.py:113} INFO - ::endgroup::
[2024-11-29T12:15:49.448+0000] {triggers.py:144} INFO - ::endgroup::
[2024-11-29T12:15:49.448+0000] {triggers.py:145} INFO - ::group:: Trigger 2/2: Job reached a terminal state
[2024-11-29T12:15:49.448+0000] {triggers.py:146} INFO - Status of completed job raysubmit_paxAkyLiKxEHPmwG is: SUCCEEDED
(...)

Child 2 DAG, also triggered by the parent DAG:
Screenshot 2024-11-29 at 12 17 20

Kubernetes RayClusters spun:
Screenshot 2024-11-29 at 12 15 37

Limitations

The example DAGs are not currently being executed in the CI, but there is a dedicated ticket for this work:
#95

References

This PR had inspiration from:
#67

Several other actions that were done as part of this work, but they were split into other PRs aiming to simplify the review process:

@tatiana tatiana added this to the Astro Ray Provider 0.3.0 milestone Nov 29, 2024
@tatiana tatiana requested a review from a team as a code owner November 29, 2024 12:28
@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 98.52%. Comparing base (5b15f3a) to head (64d1b6a).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #103      +/-   ##
==========================================
+ Coverage   97.63%   98.52%   +0.88%     
==========================================
  Files           7        7              
  Lines         593      610      +17     
==========================================
+ Hits          579      601      +22     
+ Misses         14        9       -5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tatiana tatiana merged commit 56387cc into main Nov 29, 2024
22 checks passed
return re.sub(r"[^\w\-\.]", "-", value).lower()


def create_config_from_context(context, **kwargs):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main drawback of this approach is that we are creating the YAML file dynamically and not deleting it. Ideally, we'd only materialise the YAML file during the cluster setup/tear down, and delete as part of the task execution.

tatiana added a commit that referenced this pull request Nov 29, 2024
**Breaking changes**

* Removal of ``SubmitRayJob.terminal_states``. The same values are now available at ``ray_provider.constants.TERMINAL_JOB_STATUSES``.
* Simplify the project structure and debugging by @tatiana in #93

In order to improve the development and troubleshooting DAGs created with this provider, we introduced breaking changes
to the folder structure. It was flattened and the import paths to existing decorators, hooks, operators and trigger
changed, as documented in the table below:
  | Type      | Previous import path                        | Current import path                     |
  |-----------|---------------------------------------------|-----------------------------------------|
  | Decorator | ray_provider.decorators.ray.ray             | ray_provider.decorators.ray             |
  | Hook      | ray_provider.hooks.ray.RayHook              | ray_provider.hooks.RayHook              |
  | Operator  | ray_provider.operators.ray.DeleteRayCluster | ray_provider.operators.DeleteRayCluster |
  | Operator  | ray_provider.operators.ray.SetupRayCluster  | ray_provider.operators.SetupRayCluster  |
  | Operator  | ray_provider.operators.ray.SubmitRayJob     | ray_provider.operators.SubmitRayJob     |
  | Trigger   | ray_provider.triggers.ray.RayJobTrigger     | ray_provider.triggers.RayJobTrigger     |


**Features**

* Support using callable ``config`` in ``@ray.task`` by @tatiana in #103
* Support running Ray jobs indefinitely without timing out by @venkatajagannath and @tatiana in #74

**Bug fixes**

* Fix integration test and bug in load balancer wait logic by @pankajastro in #85
* Bugfix: Better exception handling and cluster clean up by @venkatajagannath in #68
* Stop catching generic ``Exception`` in operators by @tatiana in #100
* Stop catching generic ``Exception`` in trigger by @tatiana in #99

**Docs**

* Add docs to deploy project on Astro Cloud by @pankajastro in #90
* Fix dead reference in docs index page by @pankajastro in #87
* Cloud Auth documentation update by @venkatajagannath in #58
* Improve main docs page by @TJaniF in #71

**Others**

Local development

* Fix the local development environment and update documentation by @tatiana in #92
* Enable secret detection precommit check by @pankajastro in #91
* Add astro cli project + kind Raycluster setup instruction by @pankajastro in #83
* Remove pytest durations from tests by @tatiana in #102
* Fix running make docker-run when there is a new version by @tatiana in #99 and #101
* Improve Astro CLI DAGs test so running hatch test-cov locally doesn't fail by @tatiana in #97

CI

* CI improvement by @venkatajagannath in #73
* CI fix related to broken coverage upload artifact by @pankajkoti in #60
* Allow tests to run for PRs from forked repos by @venkatajagannath in #72
* Update CODEOWNERS by @tatiana in #84
* Add Airflow 2.10 (released in August 2024) to tests by @tatiana in #96
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Support dynamic configuration based on Airflow context
3 participants