Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Peer-to-peer shuffle design #5435

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Binary file added distributed/shuffle/graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
217 changes: 217 additions & 0 deletions distributed/shuffle/shuffle-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# Peer-to-peer DataFrame shuffling

This is a proposal for the high-level design of an extension built into distributed for shuffling very large DataFrames reliably and performantly. It does so by transferring data between workers out-of-band (not managed by the scheduler) using stateful worker extensions. This significantly reduces the size of the graph and eliminates the scheduler as a bottleneck, compared to the current task-based shuffle.

This work builds off the proof-of-concept in https://github.com/dask/dask/pull/8223.

## Motivation

Shuffles are an integral part of most DataFrame workloads, as part of a `merge`, `set_index`, or `groupby().apply()`. Shuffling is a poor fit for centralized graph-based scheduling, since the graph is all-to-all (O(N²) in size), yet the logic is so simple, it gets little benefit from centralized coordination, while suffering significant overhead from it. With task-based shuffles, the amount of data we can shuffle effectively (before workers run out of memory, or the scheduler crashes, or both) is severely limited. By allowing workers to autonomously exchange data with their peers, and manage disk and memory usage in a more fine-grained way, that limit becomes significantly higher.

See https://coiled.io/blog/better-shuffling-in-dask-a-proof-of-concept/ for more background.

## Goals and non-goals

End goals:
* Can reliably shuffle orders-of-magnitude larger datasets (in total size and number of partitions) than the current task-based shuffle
* Maintainable, thoroughly tested code using (or adding) public APIs
* Can shuffle larger-than-memory datasets by spilling to disk
* Constant, predictable memory footprint per worker, which scales linearly with partition size, not total number of partitions
* Just works, without users needing to tune parameters (buffer sizes, etc.)
* Graceful restarting when possible, and quick failure when not
* All state is cleaned up on success, failure, or cancellation
* Shuffle performance is IO-bound (network, disk)

Non-goals:
* Utilize new workers that enter the cluster midway though a shuffle
* Resilience via data duplication (a shuffle can continue through losing some number of workers)
* Worker loss only requires partial re-transfer of data

## Plan

The implementation will be completed in multiple stages (order TBD after #1):
1. Establish the patterns for how this out-of-band system will interact with distributed, in the simplest possible implementation with no optimizations.
1. Retries (shuffle restarts automatically if a worker leaves)
1. Improve performance with concurrency if possible
1. Spill-to-disk
1. Backpressure
1. Performance

## Design

Here we'll discuss the highest-level architectural components of the shuffling system.

_Note: unlike the POC PR, we propose keeping this code in `dask/distributed`, not `dask/dask`. The implementation will be somewhat dependent on worker internals, so internal changes in `distributed` are far more likely to break things than changes in `dask`. We'd like tests to fail (and fixes to happen) in the same repo as the changes that break things. Plus, the code is very `distributed`-specific anyway._
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

### Task graph
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

The whole point of a peer-to-peer shuffle using a separate worker extension is to not have to capture the full operation in a dask graph.
Therefore, the primary purpose of the graph is to mediate between dask-managed data and out-of-band processing:
- Hand off dask-managed data to be processed by the extension
- Bring data produced out-of-band back into dask
- Clean up when keys depending on the shuffle are cancelled

The graph also has a secondary synchronization benefit, letting us bypass some difficult distributed problems (exactly-once initialization and cleanup tasks, determining when all peer-to-peer transfers are complete) by leaning on the scheduler.

![diagram of graph](graph.png)

### `ShuffleExtension`

Problems to solve:
* Holding per-worker out-of-band state for an in-progress shuffle
* Adding new handlers in an organized way for workers to transfer shuffle data
* Doing the above cleanly with multiple concurrent shuffles (`merge`)
* Coordinating multiple concurrent shuffles which may need to share limited resources (memory, threads, etc.)
* Getting metrics back to the scheduler/dashboard, like managed memory & bytes spilled to disk (eventually)

The `ShuffleExtension` will be built into distributed and added to workers automatically (like the `PubSubWorkerExtension`). It'll add a route to the worker; something like:

```python
def shuffle_receive(comm, shuffle_id: str, data: DataFrame) -> None:
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
"""
Receive an incoming shard of data from a peer worker.
Using an unknown ``shuffle_id`` will first initialize any state needed for that new shuffle.
"""
...
```

The `ShuffleExtension` will hold all per-shuffle state and buffers specific to that worker. For example, things like (will vary depending on the stage of the implementation):
- a buffer of received shards
- a buffer of outgoing shards (waiting to accumulate enough to be worth sending to a worker)
- a datastore that transparently spills received shards to disk
- locks or synchronization mechanisms

The `ShuffleExtension` will also hold any global state shared by all shuffles, such as worker threads or coroutines.

Most of the implementation-specific logic will happen in the `ShuffleExtension`. As we improve performance, add concurrency, etc., this code will change the most (though the interface will likely not).

The `transfer` tasks will pass their input partitions into the `ShuffleExtension`, blocking the task until the extension is ready to receive another input. Internally, the extension will do whatever it needs to do to transfer the data, using worker comms to call the `shuffle_receive` RPC on peer workers. Simultaneously, it'll handle any incoming data from other workers.

### Retries and cancellation

Problems to solve:
* Causing all tasks in the shuffle to rerun when a worker leaves
* Cleaning up out-of-band state when a user cancels a shuffle, or it errors

Because most the tasks in the shuffle graph are impure and run for their side effects, restarting an in-progress shuffle requires rerunning _every_ task involved, even ones that appear to have successfully transitioned to `memory` and whose "results" are stored on non-yet-dead workers.
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

Additionally, cleanly stopping a running shuffle takes more than just releasing the shuffle tasks from memory: since there's out-of-band processing going on, the `ShuffleExtension` has to be informed in some way that it needs to stop doing whatever it's doing in the background, and clear out its buffers. Also, executing tasks may be blocking on the `ShuffleExtension` doing something; without a way to tell the extension to shut down, those tasks might block forever, deadlocking the cluster.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends what "releasing" means. Currently releasing a task on the worker means

  • Ensure the task is not executed and no coroutine is running which is trying to fetch that tasks result/key/data.
  • Reset any TaskState attributes to a neutral value.
  • Remove the TaskState.key from Worker.data if it is in there.

Whatever the extension does, it might be coupled to any of these mechanisms and a "release task" action would be sufficient for cleanup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be a mechnaism to release input buffer immediately and we could get rid of data in memory/disk but output buffer is actually very difficult

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the TaskState.key from Worker.data if it is in there.

In the POC we used this as the signal to clean up state (the ShuffleService object was a dependency of every task, got copied into every Worker.data, and had a __del__ method to clean things up when it was removed).

But this is unreliable in pathological shuffle cases; for example when some workers either don't send any data or don't receive any data in the shuffle. Also, there are weird questions around what happens when that object itself is spilled to disk.

Whatever the extension does, it might be coupled to any of these mechanisms and a "release task" action would be sufficient for cleanup

I agree that this would be nice, but I don't really like the idea of hooking into gather_dep / TaskState attribute changes to detect when this has happened. It just feels brittle to me and like it requires really really careful reasoning about what signals the scheduler is going to send in which cases. Having an explicit mechanism built into RerunGroup seems both easy and reliable.

In the spirit of what I said in the blog post:

It may be worth it to adjust the system so that bypassing task graphs isn’t a hack, but a supported (and tested and maintained) feature.
https://coiled.io/blog/better-shuffling-in-dask-a-proof-of-concept/

I just prefer the idea of making a clear path for how to this in the API that doesn't require deep understanding of the current implementation of the worker and scheduler.


Therefore, we propose adding a `RerunGroup` (`ImpureGroup`? `CoExecutionGroup`? `RestartGroup`? `OutOfBandGroup`? name TBD) structure to the scheduler which intertwines the fates of all tasks within it: if any one task is to be rescheduled (due to its worker leaving), all tasks are restarted; if any one is to be prematurely released (due to cancellation), all are released.
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

Membership in a `RerunGroup` is implemented via task annotations, where each task gives the name of the `RerunGroup` it belongs to. A task can belong to at most one `RerunGroup`. TBD if we will enforce any structural restrictions on `RerunGroup`s to prevent odd/invalid states from emerging—we probably should, such as not allowing disjoint parts of the graph to be in the same `RerunGroup`, etc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to say this is out of scope and the responsibility of the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are annotations impacted by graph optimizations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What happens if you fuse an annotated blockwise with an unannotated blockwise. The intended behaviour is currently unclear.

The rerunning might actually be a special case since this specific case is "inheritable", i.e. fused objects should both be rerun.

The current default behaviour according to Jim is that annotated tasks are not fused but we might be able to special case this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to say this is out of scope and the responsibility of the user.

I think this could create all sorts of headaches for us down the line in the scheduler, where we have to bend ourselves into knots to handle weird cases that are technically possible but nobody should actually do. A more restrictive API up front means less work for us!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What happens if you fuse an annotated blockwise with an unannotated blockwise. The intended behaviour is currently unclear.

There's an open issue around annotations getting lost during optimization dask/dask#7036 but I don't think it fully recognizes the problem (we don't have a clear policy on annotation propagation during optimization). Fixing this would also be necessary work here; I've mentioned it in the doc.


Additionally, the scheduler informs workers whenever a `RerunGroup` is restarted or cancelled. Workers will have a way to pass this information on to any interested out-of-band operations. This could be something like:
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
- Workers have a named `threading.Event` for each `RerunGroup` that any of their current tasks belong to. When the scheduler tells workers about a restart/cancellation, they `set()` the corresponding event so that some background thread can respond accordingly.
- A `register_cancellation_handler(rerun_group: str, async_callback: Callable)` method on workers that registers an async function to be run when that group is cancelled/restarted. A potential upside (and potential deadlock) is that the scheduler's `cancel_rerun_group` RPC to workers could block on this callback completing, meaning the scheduler wouldn't treat the `RerunGroup` as successfully cancelled until every callback on every worker succeeded. That could give us some nice synchronization guarantees (which we may or many not actually need) ensuring a shuffle doesn't try to restart while it's also trying to shut down.

### Peer discovery and initialization
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

Problems to solve:
* Workers need to all have the same list of peers participating in the shuffle (otherwise data could end up in two different places!)
* Scheduler needs to be told where to run the `unpack` tasks which bring data back in-band

We'll run a single `shuffle-setup` task before all the transfers to do some initialization.
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

First, it will ask the scheduler for the addresses of all workers that should participate in the shuffle (taking into account worker or resource restrictions). How this will be implemented is TBD.
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

Next, it will set worker restrictions on the `unpack` tasks, so each task will run on the worker that will receive that output partition. (This is computed just by binning the number of output partitions into the number of workers.) Note that we could also do this step in the barrier task; just seems nicer and potentially a tiny bit less overhead to do all scheduler comms in one place.

It'll return the list of worker addresses. This will be input to all the `transfer` tasks, which use the same binning logic to decide where to send a given row of each input partition.

Since this `shuffle-setup` task will be part of the `RerunGroup`, every time the shuffle is restarted, we'll recompute these peer addresses (accounting for any lost or gained workers) and reset any now-invalid worker restrictions on the `unpack` tasks (preventing deadlocks from waiting to schedule a task on a worker that doesn't exist).

Also possibly (TBD) `shuffle-setup` could call an RPC on all the other workers informing them that a shuffle is about to happen. This is most likely unnecessary, but in case there are some resources that need to be initialized before any data moves around, this would give us an easy way to do it.

### Spilling to disk

1. Communication is initiated by the data producer, i.e. every participating Worker will receive incoming connections trying to submit data.
2. Every participating Worker will receive incoming requests to accept data.
3. Receiving and submitting data must happen concurrently.
4. Submitting data requires GIL locking compute (groupby/split) // `transfer` task.
5. Received and submitted data `shards` are expected to be much smaller than the final output partitions. The system must be capable of handling small reads/writes.
6. There may be multiple shuffles happening and we need to be able to distinguish data accordingly when storing it.
7. Write to disk must happen without blocking the main event loop.
8. There is no clear correlation between number of incoming data connections and number of workers or number of input partitions. [1]
9. All incoming data combined will build M output partitions. M may be unequal to N.
10. Any kind of regrouping, if necessary, must not block the main event loop.
11. Every worker must be able to handle more incoming data than it has memory. Therefore, some spill-to-disk capabilities must be supported.
12. Spilling may not be possible without bounds, see https://github.com/dask/distributed/issues/5364. It should be possible to use buffer data in memory.
13. Assuming there is sufficient disk space available and individual shards are below `X` times the workers memory limit [2], a worker must never go out of memory.
14. There should be a possibility to notify the Worker instrumentation about (un)managed memory and spilled disk to update.
15. If a shuffle `Y` is aborted or retried, all data belonging to `Y` needs to be deleted from the worker, regardless of where it is stored (memory, disk, ..)
16. Implementation is sufficiently well encapsulated to be unit-testable

[1] Data producer is encouraged to implement data buffering to avoid comm overhead.
[2] `0<X<1`, accounting for data copies, buffer sizes, serialization and interpreter overhead.


#### Data submission protocol

Relevant requirements: 4., 6., 9. 10.,

The data producer will perform a groupby [M, W] where M is the number of output partitions and W is the number of participating workers

If the producer already splits by output partition we will not need to regroup on the receiver side. Concat could happen in the unpack method resolving us from any GIL/Thread problems.

```python
# Does not need to be a *dict* but we'll need some kind of mapping, either by endoding or by data structure.
{
"W1": {
# Every element is a pandas dataframe / arrow table
"M1": ["W1-M1-N1", "W1-M1-N2", "W1-M1-N3"],
"M2": ["W1-M2-N1", "W1-M2-N2", "W1-M2-N3"],
},
"W1": {
"M1": ["W1-M1-N1", "W1-M1-N2", "W1-M1-N3"],
"M2": ["W1-M2-N1", "W1-M2-N2", "W1-M2-N3"],
},
...
}
```

Receiving end

```python
# This is disk stuff!
class ShardCollection:

def __init__(self):
self._buffer = [] # or memory view, whatever
self.thread = Thread() # likely need a thread for async

async def append(self, shard):
buffer.append(shard)
if buffer.full():
await self.dump_buffer_to_disk()

async def put(self):
pass

async def get(self):
load_data_to_buffer()
return self._buffer

def __del__(self):
delete_disk()
self._buffer = None


Worker.data[key] = ShardCollection()
```

#### Asnychronous operations

Due to 1., 2.

#### Backpressure

The prototype implementation in #XXX is not introducing data backpressure to deal with 11.
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

## Appendix

### Data-transfer diagrams

Some pretty pictures of the principle behind the data transfer:

![diagram of transfers](transfer.png)
Binary file added distributed/shuffle/transfer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.