-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fuse pipelines with different numbers of tasks #284
Comments
I think this makes sense - at a high-level it's like what rechunker does with I think there are a couple of tricky parts:
I wonder if the A couple of other things I noticed:
Strictly speaking we should check that
They are used as inputs, but do not have any chunks written to storage. In the case of the |
Oh yeah of course it is! In that case they were trying to minimise number of tasks, whereas in cubed we would be either trying to minimise number of tasks or to set the number of tasks to whatever value allows for more fusion.
Maybe? I think that it would be instructive to focus on the reduction case initially, because it would directly affect performance in many real workloads, and expose this "fan-in" question immediately. As an example: spec = cubed.Spec(allowed_mem='80kB')
a = cubed.random.random((10000, 100), chunks=(10, 100), spec=spec)
a avg = xp.max(a)
avg.visualize(optimize_graph=True) print_num_tasks_per_pipeline(avg.plan, optimize_graph=True)
So you're saying there is a trade-off between opening many chunks in one task (which won't be parallel) and reading one chunk per task (which is embarrassingly parallel but requires waiting for disk IO). Does that mean the optimal "fan-in" is basically just "the number of chunks one task would have to attempt to read before reading them took longer than it takes to do an extra round of read/write IO to disk for one chunk"?
That would be nice. It would make the DAG more intuitive I think. |
Yes, I think so. At the moment the number of rounds is determined solely by the memory available, but what we're saying here is that there should also be some consideration of number of chunks read (fan-in). |
Last week @tomwhite and @dcherian and I discussed possible future optimizations for Cubed - this is my attempt to elucidate what I was suggesting.
Motivation
The best-case scenario for a cubed computation is that all sequential operations get fused, because then no writing to intermediate stores is required. With no writing then every chunk moves through the whole calculation in parallel, despite multiple operations happening to it along the way. In general we can't guarantee zero intermediate stores being required because we also want to guarantee predictable memory usage during a full shuffle, but we might nevertheless aspire to fuse everything else 😁
Idea
Currently Cubed's optimization pass fuses some blockwise operations together, but it can only fuse blockwise operations that have the same number of tasks. If we could find a way to fuse blockwise operations with different numbers of tasks then potentially anything up to a full shuffle (see #282) could be fused.
Use cases
It's possible to construct cubed plans in which blockwise operations with different numbers of tasks occur sequentially.
This can happen with
concat
:(I don't really understand what all the side inputs are in these graphs - I hope they don't invalidate what I'm suggesting!)
Or
matmul
:Implementation ideas
By definition 1 task == processing one Cubed chunk, but Cubed also currently assumes that 1 Zarr chunk == 1 Cubed chunk. This is generally what sets the number of tasks in a stage, and hence which pipelines can be fused. To fuse other pipelines we have to generalize this relationship. We can't open multiple Cubed chunks per Zarr chunk because reading/writing to different parts of the same Zarr chunk would sacrifice idempotence.
However we could imagine opening multiple Zarr chunks for one Cubed chunk. (As long as the total size of the Zarr chunks opened for 1 Cubed chunk is <
allowed_mem
.) This would make the number of tasks for a pipeline choosable (within some range), and we could choose how many Zarr chunks to open such that the number of tasks now matches between two consecutive pipelines.Another way to maybe think about this is that if during your computation you have smaller chunks than your
allowed_mem
budget was set for, then as you still only load one chunk per container, you are potentially "wasting" all that extra RAM overhead you requested. Opening more chunks per container allows for using that extra RAM in some cases, and if you can fit all the extra chunks you need to get from one pipeline to another you could now just fuse those two pipelines.Questions
allowed_mem >> chunksize
. But if "batches" can be submitted then might work?The text was updated successfully, but these errors were encountered: