Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffle returns incorrect results if errors occur receiving data (also asyncio tasks are leaked) #6277

Closed
gjoseph92 opened this issue May 5, 2022 · 1 comment
Labels
bug Something is broken shuffle

Comments

@gjoseph92
Copy link
Collaborator

If an error occurs deserializing, regrouping, serializing, or writing data to disk, and the shuffle isn't in "backpressure mode", that data will simply be lost and the shuffle will still succeed

shuffle = await self._get_shuffle(shuffle_id)
task = asyncio.create_task(shuffle.receive(data))
if (
shuffle.multi_file.total_size + sum(map(len, data))
> shuffle.multi_file.memory_limit
):
await task # backpressure

When task isn't awaited, the asyncio task is leaked. Any error that occurred in it is also lost (besides being logged).

@graingert and I tried to fix this, but we couldn't get something working:

  • The dumb way (keep leaking tasks, wrap all of receive in a try/except, set self._exception, raise self._exception in add_partition, get_output_partition, inputs_done) fails tests because of leaking tasks
  • The "proper" asyncio way causes a CancelledError to pop out in some unexpected place and seems to shut down the whole worker?

xref #6201

@fjetter
Copy link
Member

fjetter commented Nov 11, 2022

should be fixed after #7268

@fjetter fjetter closed this as completed Nov 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken shuffle
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants