Skip to content

Commit

Permalink
Respond to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed Nov 19, 2021
1 parent 0e403ec commit 9b9a68b
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions distributed/shuffle/shuffle_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ def receive(self, output_partition: int, data: pd.DataFrame) -> None:
async def add_partition(self, data: pd.DataFrame) -> None:
assert not self.transferred, "`add_partition` called after barrier task"
tasks = []
# TODO grouping is blocking, should it be offloaded to a thread?
# It mostly doesn't release the GIL though, so may not make much difference.
# NOTE: `groupby` blocks the event loop, but it also holds the GIL,
# so we don't bother offloading to a thread. See bpo-7946.
for output_partition, data in data.groupby(self.metadata.column):
# NOTE: `column` must refer to an integer column, which is the output partition number for the row.
# This is always `_partitions`, added by `dask/dataframe/shuffle.py::shuffle`.
addr = self.metadata.worker_for(int(output_partition))
task = asyncio.create_task(
self.worker.rpc(addr).shuffle_receive(
Expand All @@ -95,7 +97,9 @@ async def add_partition(self, data: pd.DataFrame) -> None:
)
tasks.append(task)

# TODO handle errors and cancellation here
# TODO Once RerunGroup logic exists (https://github.com/dask/distributed/issues/5403),
# handle errors and cancellation here in a way that lets other workers cancel & clean up their shuffles.
# Without it, letting errors kill the task is all we can do.
await asyncio.gather(*tasks)

def get_output_partition(self, i: int) -> pd.DataFrame:
Expand Down

0 comments on commit 9b9a68b

Please sign in to comment.