From e3170aef09428a3ca92c2b19baceee4fff03aac6 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 19 Nov 2021 15:02:13 -0700 Subject: [PATCH] Add responses to comments from 9b9a68b --- distributed/shuffle/shuffle_worker.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/distributed/shuffle/shuffle_worker.py b/distributed/shuffle/shuffle_worker.py index ab35c4b7f5..9ded0a294e 100644 --- a/distributed/shuffle/shuffle_worker.py +++ b/distributed/shuffle/shuffle_worker.py @@ -112,8 +112,7 @@ async def shuffle_inputs_done(self, comm: object, id: ShuffleId) -> None: state.barrier_reached = True if not state.out_parts_left: - # No output partitions, remove shuffle it now: - # `get_output_partition` will never be called. + # No output partitions, remove shuffle now: `get_output_partition` will never be called. # This happens when there are fewer output partitions than workers. self.remove(id) @@ -255,9 +254,11 @@ async def send_partition( ) -> None: "Split up an input partition and send its parts to peers." tasks = [] - # TODO grouping is blocking, should it be offloaded to a thread? - # It mostly doesn't release the GIL though, so may not make much difference. + # NOTE: `groupby` blocks the event loop, but it also holds the GIL, + # so we don't bother offloading to a thread. See bpo-7946. for output_partition, data in data.groupby(column): + # NOTE: `column` must refer to an integer column, which is the output partition number for the row. + # This is always `_partitions`, added by `dask/dataframe/shuffle.py::shuffle`. addr = worker_for(int(output_partition), npartitions, workers) task = asyncio.create_task( self.worker.rpc(addr).shuffle_receive( @@ -268,7 +269,9 @@ async def send_partition( ) tasks.append(task) - # TODO handle errors and cancellation here + # TODO Once RerunGroup logic exists (https://github.com/dask/distributed/issues/5403), + # handle errors and cancellation here in a way that lets other workers cancel & clean up their shuffles. + # Without it, letting errors kill the task is all we can do. await asyncio.gather(*tasks) @property