-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][POC] Add ResourceBarrier
expression to change resources within an expression graph
#1116
base: main
Are you sure you want to change the base?
Conversation
Shouldn't this be the other way round? |
This is actually they way I tried to implement it at first. However, I realized that we would need to add an additional mechanism to set the resources for "root" IO expressions. We cannot rely on global annotations, because we may not want all dependencies to use the same resources. To put it another way: The current implementation pushes down resources until there are no more dependencies, or another resource barrier is reached. If we want resources to be inherited in the other direction (which seems very reasonable to me), we will need to figure out how to deal with root IO tasks. Possible solution: We add an optional Any other ideas? |
I have to admit that I don't fully understand the problem you are describing. What is a "root IO expression"? |
I find the behaviour that a resource barrier defines the resource up until that point (i.e. for the past) very unintuitive. One potential solution is that an IO root can consume a resource barrier if it is stacked directly on top of it, but I am not sure if I like this any better tbh. |
Sorry, I'm sort of making up language as I go to fill in the gaps... Calling |
I completely agree with this. I do like the idea of using an expression to define the resource barrier, because this makes optimization behavior much easier to define. Perhaps we can make this work if we can come up with an intuitive (and maintainable) way to define the resources for an IO expression. I think we are in agreement about the direction of resource inheritance. I'll keep thinking about the root-IO problem. |
Update: I revised the direction of resource inheritance to be more intuitive. I also decided that it would be relatively easy to add a simple |
I'm not entirely sold on the "resources as expr" approach.
|
Another question I have is whether the Expression or the graph itself is even the best (or even just a good) place to define behavior like this. In the end you want... # The OR clauses are just different flavors of where and how this could be defined
if task.is_io_task() or isinstance(expr, IO) or isinstance(group, IOTask) or stuff := collection.get_io_things():
dead_scheduler_please_try_to_run_this_on_CPUs(task, expr, group, stuff)
elif task.uses_gpu() ...:
this_actually_requires_a_GPU(task, expr, group, collection)
... We're used to think about this problem on a per-task or Layer basis and implemented the earlier version as part of the HLG infrastructure. We're now trying to move this to the expressions but I wonder if this isn't the wrong approach. Especially the CPU vs GPU question doesn't feel at all like a "graph or expression problem" at all but rather a "what function are we actually running" problem so dask/dask#9969 or something similar might be a better choice. I'm not saying that we should use task/runspecs to encode this knowledge but I would like us to at least consider options that don't necessarily involve expressions and the optimizer |
One thing I've not seem mentioned here is repartitioning. A common use case we are keen to improve is reading data from S3 and performing heavy compute on it. Cloud object stores are typically most performant when you chunk the data into partitions of the order of 100MB. GPU compute is most performant when operating on larger partitions that can utilize GPU memory. So to get the best throughput we want to have the CPU reading lots of small partitions from S3 and then concatenating them into larger partitions on the GPU ready for compute. I wonder if the repartition step is a common place where resource barriers could be added? |
FWIW dask-expr supports concatenating multiple partitions directly in the read-parquet step, this gets rid of a lot of network transfer that would be necessary if you use repartition and also hides the complexity from the scheduler You can configure this with |
Thansk @phofl. If my parquet is stored as 100MB partitions and I want to group them into 1GB partitions on the GPU does each native partition get read in parallel, and then concatenated in the next step? Or are they read sequentially? |
They are read sequentially in a single task. So using this configuration doesn't make sense if you have "nr-gpu-partitions < threads on cluster". If "nr-of-gpu-partitions > (by quite a bit) threads-on-cluster" then the option will be a lot better than repartitioning |
So this is something we want to avoid because it will not saturate S3 patiularly well when using large GPU partitions. We want all the CPU cores to read the native partition sizes in parallel, then concatenate them onto the GPU from main memory. |
Thanks for the feedback here! @fjetter - You are skeptical that we should use an expression (or graph) to define resource requirements. I like your idea of defining this kind of relationship at the function/task level instead. I also agree that a general resource specification describes more-so "how" things" should be computed than "what" should be computed. With that said, CPU<->GPU movement is typically initiated with a Given this perspective, maybe it is only the backend movement that should be defined at the expression level (similar to #1115)?
It is not currently possible to
@jacobtomlinson - The optimization @phofl is describing improves overall ELT performance for TPC-H using the "pandas" backend, because they still have many threads reading from s3 at once (even though each thread is also iterating over multiple files to avoid network transfer after the fact). For dask-cudf, we only have one thread per GPU, and so we get lower S3 throughput. We are currently looking into a multi-threaded Cpp-level solution for this in RAPIDS, but using distinct resources in Dask also seems like a very practical solution to me. |
dask-expr
dask#10937Proposal
We should make it possible to define explicit resource barriers within a query.
Version 2 (Current)
For example:
Here, we add a
resources
argument to IO operations (likeread_parquet
), and add a distinctresource_barrier
API to apply resource-constraint changes after a collection has already been created.Under the hood, we use a
ResourceBarrier
expression to ensure that the resource constraints won't get lost or broken during optimization. Also, we assume:ResourceBarrier
expressions should not be fused with adjacent expressionsIf this general approach is palatable, the
to_backend
API can be updated to cover the common case that the backend and resource-constraints are often changed at the same time. E.g.Version 1 (August 1, 2024)
For example:
In this case, calling
resource_barrier
will add an explicitResourceBarrier
expression to the expression graph. Then, whenpersist
/compute
is called, the corresponding resource requirement will be attached to all expressions before that point (stopping only at otherResourceBarrier
expressions). This makes it possible to add arbitrary resource barriers within a query without adding any special resource/annotation code to anyExpr
classes.The underlying
ResourceBarrier
expression also allows column-projection and filter operations to pass through without preventing the final resource annotations from being "optimized away."