Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle Service with Scheduler Logic #6007

Merged
merged 134 commits into from
May 5, 2022
Merged

Conversation

mrocklin
Copy link
Member

Builds off of #5976 which in terms builds off of #5957

Commit message of the first novel commit:

Previously we assigned a partition to a worker based on a simple
formula. This was good, but also error prone in a few ways:

  1. We would send data around needlessly if the task wasn't desired
  2. Conflicting shuffles could assign tasks to conflicting workers

Now we as the scheduler to manage this process, and all of the workers
check in with it to get assignments. Currently the actual logic is the
same, but now things get to live in a single location where we can make
smarter decisions to avoid conflicts.

This removes the previous shuffle setup steps on the worker, simplifying
the code a bit (subjectively anyway). It makes the compute steps a bit
worse because now we have a small pandas join in the middle of things.
It should already avoid excess labor. It does not yet avoid restriction
conflicts.

Performance is good, still need to track down memory
This manages memory more smoothly.

We still have issues though in that we're still passing around slices
of arrow tables, which hold onto large references
This helps to reduce lots of extra unmanaged memory

This flow pretty well right now.  I'm finding that it's useful to blend
between the disk and comm buffer sizes.

The abstraction in multi_file and multi_comm are getting a little bit
worn down (it would be awkward to shift back to pandas), but maybe that's ok.
We don't need a lot of comm buffer, we also don't want more connecitons
than machines (too much sitting in buffers).

We also improve some printing
To enable better diagnostics, it would be useful to allow worker
extensions to piggy-back on the standard heartbeat.  This adds an
optional "heartbeat" method to extensions, and, if present, calls a
custom method that gets sent to the scheduler and processed by an
extension of the same name.

This also starts to store the extensions on the worker in a named
dictionary.  Previously this was a list, but I'm not sure that it was
actually used anywhere.  This is a breaking change without deprecation,
but in a space that I suspect no one will care about.  I'm happy to
provide a fallback if desired.
Tests are failing.  I can't reproduce locally.  This is just blind
praying that it fixes the problem.  It should be innocuous.
@mrocklin
Copy link
Member Author

Tests are green(ish). This could use some review and some help in breaking things.

I haven't gotten too creative in trying to break this, but so far it holds up pretty well (better than what's currently in main) in situations where we don't lose any workers.

@mrocklin
Copy link
Member Author

This is at a point where, I think, it could be merged. It does not succeed in cases where workers fail, or where outputs shift during execution (although neither does the solution in main), but it feels pretty solid in the common case. There is a proposed plan for a next step in #6105 .

This PR hasn't had deep review yet (no one is keeping me honest around docstrings, code cleanliness, and so on) but it's been about a month so far without that review, and I do plan to keep working on this. I'm inclined to strip out link to my mrocklin/dask fork, merge that in, and then merge this in. I won't do this solo (this is big enough that that doesn't seem right), but I'll probably start pestering people to get this in by early next week.

@mrocklin
Copy link
Member Author

Merging in one week if there are no further comments.

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made it mostly through shuffle_extension.py so far. These are some preliminary comments. Some are nits/tweaks/asks for documentation, but there are at least 2-3 that I think are serious issues (generally around not handling errors in concurrency #6201) that could result in deadlocks or silently incorrect results.

.github/workflows/tests.yaml Outdated Show resolved Hide resolved
ShuffleWorkerExtension,
)

__all__ = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__all__ usually makes flake8 and typecheckers happier?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about this. My understanding is that there isn't anything defined in this module that isn't listed in __all__. Why would __all__ be informative in this case?

distributed/shuffle/arrow.py Outdated Show resolved Hide resolved
Comment on lines +20 to +21
if file.tell() == 0:
file.write(schema.serialize())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem thread-safe. Probably good to mention.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done.

bio = io.BytesIO()
bio.write(schema.serialize())
for batch in data:
bio.write(batch)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a lot more memory copying than I would expect with Arrow. I assume improving this isn't important here, and would just be in scope for future performance tuning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You and me both. Yeah, Arrow is great if you use all Arrow primitives, but if you want to compose it with other things it's not that great because of all of the views and the lack of a deserialize function. If we stick with Arrow (maybe?) then we should upstream a bunch of issues for this use case so that it gets cleaner in future releases.

I actually did a bit of performance tuning here. Memory copies aren't significant yet, but I've reduced them to the extent that I can.


def shuffle_inputs_done(self, comm: object, shuffle_id: ShuffleId) -> None:
shuffle = await self._get_shuffle(shuffle_id)
future = asyncio.ensure_future(shuffle.receive(data))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this task fails, but nothing's awaiting it? receive can raise (and even intentionally raises) exceptions. Something should be done to keep track of it. I think that piece of data would just be silently lost, producing incorrect output.

This can also allow memory to build up in the receive tasks, before they've started writing to the multi_file. I don't think there's any limitation on concurrent receive calls? At the beginning of a shuffle with 1000s of peer workers, couldn't you even start backing up behind the offload threadpool?

Those two issues make me wonder if we should always be awaiting something here. Both because if the data can't be successfully written to disk, that should probably be an error that's propagated back to the sender, and because it would give more useful backpressure.

I get not wanting to block sends on data fully writing to disk. That would probably slows us down a bit, especially at the beginning of a large shuffle. But as I understand, performance tuning is a separate, later step, and not always awaiting shuffle.receive here feels like a tricky optimization that currently risks both incorrectness and blowing up memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a valid concern. I don't yet have a great answer to this yet. We can raise the exception here early if we want to (avoids hiding the exceptions that we're intentionally raising, but doesn't handle the unknown ones).

If you're cool with it, I'd like to put in a TODO here so that we don't forget, and then defer this to future work.

distributed/shuffle/shuffle_extension.py Show resolved Hide resolved
)

def close(self):
self.executor.shutdown()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd think there's more to do than this? Stop any active shuffles? Clean up disk?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thought. I've changed this to close all shuffles (which in turn clean up multi_files (which in turn clean up disk)).

There is now also a test for closing workers mid-shuffle. (with a couple of TODOs as well for the next round)

distributed/shuffle/shuffle_extension.py Show resolved Hide resolved
except KeyError:
queue = asyncio.Queue()
for _ in range(MultiComm.max_connections):
queue.put_nowait(None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a bit to understand that this queue works the opposite way you'd expect—it's not pushing data in, it's pushing permission to write more back out. Sort of a form of CapacityLimiter? I think this would help to document.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a note in the docstring

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the comment. Took me a bit to understand this myself.

I think factoring this out should not be scope of this PR but I could see value in doing it regardless.

I think this is a case for a follow up ticket. @gjoseph92 would you mind opening one?

@mrocklin
Copy link
Member Author

mrocklin commented May 2, 2022

@gjoseph92 I know that you're travelling, so no pressure, but if you have time for another pass that would be welcome.

distributed/shuffle/shuffle_extension.py Show resolved Hide resolved
distributed/shuffle/shuffle_extension.py Outdated Show resolved Hide resolved
except KeyError:
queue = asyncio.Queue()
for _ in range(MultiComm.max_connections):
queue.put_nowait(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the comment. Took me a bit to understand this myself.

I think factoring this out should not be scope of this PR but I could see value in doing it regardless.

I think this is a case for a follow up ticket. @gjoseph92 would you mind opening one?

from distributed.utils import log_errors


class MultiFile:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with both. I'm not entirely sure if the classes themselves should be the same or if some common functionality can be shared (e.g. factor our the queue stuff as a CapacityLimiter (xref https://github.com/dask/distributed/pull/6007/files#r862239876)

@mrocklin
Copy link
Member Author

mrocklin commented May 3, 2022

Good to go?

@fjetter fjetter merged commit bc3c891 into dask:main May 5, 2022
@mrocklin mrocklin deleted the p2p-shuffle-scheduler branch August 17, 2022 16:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants