Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle Service #5976

Closed
wants to merge 101 commits into from
Closed

Shuffle Service #5976

wants to merge 101 commits into from

Conversation

mrocklin
Copy link
Member

This extends the shufle service in #5520 with disk, relatively-well bounded memory, and performance. It does not yet handle issues like worker resilience, partial outputs, conflicting restrictions, or other topics, but I think that it's nearing a point where it could be useful.

This PR is large. There are a couple things that can help with that:

  1. I have a few PRs open, notably Allow worker extensions to piggy-back on heartbeat #5957 and Track Event Loop intervals in dashboard plot #5964 which if merged would reduce the diff here
  2. Folks can safely ignore the bokeh code for a while
  3. I made a screencast to help serve as a tour for the code as it is when opening this PR

There are a few more caveats that I'll mention in the diff below.

Performance is good, still need to track down memory
This manages memory more smoothly.

We still have issues though in that we're still passing around slices
of arrow tables, which hold onto large references
This helps to reduce lots of extra unmanaged memory

This flow pretty well right now.  I'm finding that it's useful to blend
between the disk and comm buffer sizes.

The abstraction in multi_file and multi_comm are getting a little bit
worn down (it would be awkward to shift back to pandas), but maybe that's ok.
We don't need a lot of comm buffer, we also don't want more connecitons
than machines (too much sitting in buffers).

We also improve some printing
To enable better diagnostics, it would be useful to allow worker
extensions to piggy-back on the standard heartbeat.  This adds an
optional "heartbeat" method to extensions, and, if present, calls a
custom method that gets sent to the scheduler and processed by an
extension of the same name.

This also starts to store the extensions on the worker in a named
dictionary.  Previously this was a list, but I'm not sure that it was
actually used anywhere.  This is a breaking change without deprecation,
but in a space that I suspect no one will care about.  I'm happy to
provide a fallback if desired.
Tests are failing.  I can't reproduce locally.  This is just blind
praying that it fixes the problem.  It should be innocuous.
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how intense you're looking for the multi_file and multi_comm tests to be. The recent commit does the very smallest amount of work that one can do, but doesn't actually verify much of the internals. As we've spoken before I'm personally ok with this. I don't really view these systems as separate from the larger system

I think it's particularly useful to test edge and error cases since they are much more difficult to reproduce in the larger system.
Below are a few suggestions of tests I would write. Once can start a debate about whether or not things like a max_message_size are internals but the overall behaviour "put large stuff in and see if it works" is a reasonable black box test. Same for concurrency and exception handling.

Tests I would like to see for MultiComm

  • What happens if we put messages in that are larger than max_message_size?
  • ... larger than memory_limit?
  • We have more data/shards than allowed connections
  • What happens if send raises an OSError / any other Error in any of the above situations?
  • It appears that we're just inserting stuff sequentially. In reality we're inserting stuff concurrently. This is particularly interesting with exceptions

For MultiFile similar things

  • What happens if we put messages in that are larger than memory_limit?
  • What happens if dump, load or sizeof raises an OSError / any other Error?
  • Put concurrency, something like asyncio.gather(asyncio.create_task(lamdba: mf.put(b"foo"))) with many tasks (i.e. more than the limit)
  • concurrency, particularly with errors

Comment on lines +81 to +84
- name: Install dask branch
shell: bash -l {0}
run: python -m pip install --no-deps git+https://github.com/mrocklin/dask@p2p-shuffle

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this already in?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet: dask/dask#8836

from typing import TYPE_CHECKING

if TYPE_CHECKING:
import pyarrow as pa
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces pyarrow as an optional dependency. We typically deal with optional imports by using a

try:
...
except ImportError:
    # raise useful warning

distributed/shuffle/multi_comm.py Show resolved Hide resolved
Comment on lines +125 to +126
if not self.shards:
await asyncio.sleep(0.1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it should rather use an asyncio.Event, shouldn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I used to use a condition/event. It was a little complex. I can bring it back though. In practice I don't think that this matters much. Polling is mildly less efficient, but far less tricky/error prone. I decided to sacrifice performance in favor of robust understanding.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that your aversion to polling comes from reviewing tests, where I fully agree it is the bane of all humanity. I think that in this context it's actually a better choice though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few reasons why I dislike this

  • The sleep is very artificial. Why 0.1? Why are we "waking" the event loop if nothing needs to be done?
  • It makes things slower in tests and introduces some artificial overhead. This overhead is very likely completely irrelevant in practice but it is unnecessary. we're not introducing random sleeps in other places, are we? Events and Conditions where built for this.

I decided to sacrifice performance in favor of robust understanding.

That's interesting because I would consider this more difficult to understand and maintain. Apart from the lack of performance, my actual concern is that this polling obfuscates a connection between communicate and poll. If an Event is set, I know for a fact that something else is listening and I know this listener will do something on the next event tick. With the polling, neither is very clear.

Below is how I started to familiarize myself with the code. This is true, not a hypothetical scenario.

  • I started reading rearrange_by_column_p2p to understand high level what's happening, transfer, barrier, collect. Great, easy.
  • (browse through setup, unique IDs, register and initialize stuff, cool, move on)
  • ...
  • I look into shuffle_transfer which I follow to Shuffle.add_partition that calls MultiComm.put
  • Reading MultiComm.put tells me we're buffering something. We're not sending anything, yet. Not even if the buffer grows very large...
  • I read on and look into barrier because that's the next logical step, isn't it?
  • The barrier actually flushes the buffer. Is this where we are sending things now? What is flush actually doing?
  • No, the flush actually just waits for the buffer to become empty (again a poll) but how is it supposed to get emptied if we're never sending anything?
  • Let's read all the code and it's individual methods and try to find what's going on (it's not that much code, I don't want to exaggerate)
  • ... aha there is a background task that is polling... I should start from the top with this in mind
  • ...

Of course, this is an individual journey and a subjective argument about simplicity.

In general, I try to avoid polling if possible. Apart from clarity, in my experience, debugging polling is a bit harder because if the poll never finishes, it's less clear what we're actually waiting on. If there is an Event, I can follow the Event.

Most of my concern is still about style and we do not have to agree on this. If you feel strongly about the benefits of polling, let's move on. I just wanted to provide a bit more context to this conversation because my concerns are not always about testing :)

distributed/shuffle/multi_comm.py Outdated Show resolved Hide resolved
dump_batch, schema=pa.Schema.from_pandas(self.metadata.empty)
),
load=load_arrow,
directory=os.path.join(self.worker.local_directory, str(self.metadata.id)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from an API perspective, I'm wondering if the MultiFile shouldn't be responsible for creating the directory. It is also responsible for cleaning it up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does create the directory. It currently doesn't specify where it should be though. I prefer it having the same name as the shuffle ID. Currently the MultiFile doesn't know its id, which I like.

distributed/shuffle/shuffle_extension.py Outdated Show resolved Hide resolved
distributed/shuffle/shuffle_extension.py Outdated Show resolved Hide resolved
Comment on lines 46 to +49

def test_basic(client: Client):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
df["name"] = df["name"].astype("string[python]")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_basic(client: Client):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
df["name"] = df["name"].astype("string[python]")
@pytest.mark.parametrize("cast_string", [True, False])
def test_basic(client: Client, cast_string: bool):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
if cast_string:
df["name"] = df["name"].astype("string[python]")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a live issue. Arrow likes converting things to strings if they are actually strings. I don't mind this. I've been avoiding this for now, but it's good that you brought it up so that we can address it explicitly.

I also haven't tested non-string object types at all. I should add a larger test for this behavior.

Comment on lines +12 to +13
async def send(address, shards):
d[address].extend(shards)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if send raises an OSError? What happens if it raises any error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why things grind to a halt of course. I actually ran into this on Coiled when I ran out of disk space. Things just hung.

I'd prefer to address this by creating a simple end-to-end test that tries to mimic that behavior. Objections? Or would you prefer something within each of multi_file and multi_comm? All of the above?

@mrocklin
Copy link
Member Author

What happens if we put messages in that are larger than max_message_size?

This is just a performance optimization. We happily pass large messages through. I don't think that a test here is super relevant.

... larger than memory_limit?

Same. These aren't strict. We also aren't promising that we'll handle giant things any better than the underlying system.

We have more data/shards than allowed connections

That's already tested, and is the common case.

What happens if send raises an OSError / any other Error in any of the above situations?

This one I think is interesting

It appears that we're just inserting stuff sequentially. In reality we're inserting stuff concurrently. This is particularly interesting with exceptions

Rather than try to mock out a thing that looks like the shuffle service in terms of concurrency I'd rather use the existing system. How we treat this system in terms of concurrency will likely change over the next few weeks. I don't think that a test here is worth the effort. I'd rather continue relying on and investing in a more serious shuffling test suite.

Multifile

What happens if we put messages in that are larger than memory_limit?

Again, I don't actually care what happens here so much.

What happens if dump, load or sizeof raises an OSError / any other Error?

This one I like. I'm less concerned about how exactly this error gets raised with MultiFile, there are many valid behaviors here I think, and I don't care to prescribe one of them. What I actually care about is how this error gets back out to the user in a Dask context. That I definitely want to invest in.

Put concurrency, something like asyncio.gather(asyncio.create_task(lamdba: mf.put(b"foo"))) with many tasks (i.e. more than the limit) concurrency, particularly with errors

Same as above. I can do this. I don't think it's worth the effort so much, but I'm happy to acquiesce on this one.

@mrocklin
Copy link
Member Author

What happens if dump, load or sizeof raises an OSError / any other Error?

I've added a couple of checks for

  1. Writing suddenly failing
  2. A worker/comm going down

In both cases we verify that we get a sensible error message. These tests aren't around MultiFile/MultiComm specifically, but instead around the whole system. I personally greatly prefer this approach.

crusaderky and others added 12 commits March 31, 2022 15:16
Add SchedulerPlugin to dump state on cluster close

This also adds a new method to SchedulerPlugins that runs directly before closing time
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This was flakey due to cleaning up resources.

My experience is that making things async helps with this in general.
I don't have strong confidence that this will fix the issue, but I do
have mild confidence, and strong confidence that it won't hurt.
@crusaderky crusaderky assigned crusaderky and mrocklin and unassigned crusaderky Apr 4, 2022
self.send = send
self.shards = defaultdict(list)
self.sizes = defaultdict(int)
self.total_size = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now a class attribute but you initialize it here again to zero. is this intended?

Comment on lines +125 to +126
if not self.shards:
await asyncio.sleep(0.1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few reasons why I dislike this

  • The sleep is very artificial. Why 0.1? Why are we "waking" the event loop if nothing needs to be done?
  • It makes things slower in tests and introduces some artificial overhead. This overhead is very likely completely irrelevant in practice but it is unnecessary. we're not introducing random sleeps in other places, are we? Events and Conditions where built for this.

I decided to sacrifice performance in favor of robust understanding.

That's interesting because I would consider this more difficult to understand and maintain. Apart from the lack of performance, my actual concern is that this polling obfuscates a connection between communicate and poll. If an Event is set, I know for a fact that something else is listening and I know this listener will do something on the next event tick. With the polling, neither is very clear.

Below is how I started to familiarize myself with the code. This is true, not a hypothetical scenario.

  • I started reading rearrange_by_column_p2p to understand high level what's happening, transfer, barrier, collect. Great, easy.
  • (browse through setup, unique IDs, register and initialize stuff, cool, move on)
  • ...
  • I look into shuffle_transfer which I follow to Shuffle.add_partition that calls MultiComm.put
  • Reading MultiComm.put tells me we're buffering something. We're not sending anything, yet. Not even if the buffer grows very large...
  • I read on and look into barrier because that's the next logical step, isn't it?
  • The barrier actually flushes the buffer. Is this where we are sending things now? What is flush actually doing?
  • No, the flush actually just waits for the buffer to become empty (again a poll) but how is it supposed to get emptied if we're never sending anything?
  • Let's read all the code and it's individual methods and try to find what's going on (it's not that much code, I don't want to exaggerate)
  • ... aha there is a background task that is polling... I should start from the top with this in mind
  • ...

Of course, this is an individual journey and a subjective argument about simplicity.

In general, I try to avoid polling if possible. Apart from clarity, in my experience, debugging polling is a bit harder because if the poll never finishes, it's less clear what we're actually waiting on. If there is an Event, I can follow the Event.

Most of my concern is still about style and we do not have to agree on this. If you feel strongly about the benefits of polling, let's move on. I just wanted to provide a bit more context to this conversation because my concerns are not always about testing :)

distributed/shuffle/multi_comm.py Outdated Show resolved Hide resolved
try:
with self.time("read"):
with open(
self.directory / str(id), mode="rb", buffering=100_000_000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it depends on what you're doing with the file descriptor. For instance, if you just call fd.read() and read the entire file, this should not make a difference. If you are trying to perfom a bunch of partial reads, it will matter but I don't think we're doing this, are we? Our writer is performing many small writes and append to the same file so that makes sense.

image

In the end, it depends on what arrow is doing in pa.RecordBatchStreamReader.read_all. As I said, it shouldn't do any harm so we can keep it

from distributed.utils_test import gen_cluster


@gen_cluster(client=True, timeout=1000000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove these timeouts before merging

Comment on lines +187 to +188
while self.shards:
await asyncio.sleep(0.05)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would've expected it to be sufficient to first await _communicate_future then.
I tried but this is not possible since the communicate never completes if we do not set _done. However, if we set it too soon, we will not flush all shards. This feels a bit brittle and I was wondering if we should distinguish "done because we flushed" and "done because an exception was raised", with something like

def communicate(self):
    while not self._exception:
        if self._flushed and not self.shards:
            break
        if not self.shards:
            await asyncio.sleep(0.1)

I also receive a bunch of warnings when executing tests about pending tasks

Task was destroyed but it is pending!
task: <Task pending name='Task-3542' coro=<MultiComm.communicate() running at /Users/fjetter/workspace/distributed-main/distributed/shuffle/multi_comm.py:129> wait_for=<Future pending cb=[Task.task_wakeup()]>>

(same for the multi_file). The line points to the sleep / polling

empty[c] = empty[c].astype(
"string"
) # TODO: we fail at non-string object dtypes
empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually thing we should have both. A bug report to get the issue fixed and a test case that ensures that we actually hotfixed this bug.

If I remove the int cast, test_basic actually fails but I am having a very hard time figuring out what is going on. If we remove the string conversion, nothing fails. How would we know when this is safe to be removed?

df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to remove this constraint and just use the ordinary timeseries. If we include the name column, this covers at least the case where we're casting the meta column to string

Comment on lines +110 to +121
async def put(self, data: dict[str, list[object]]):
"""
Writes many objects into the local buffers, blocks until ready for more

Parameters
----------
data: dict
A dictionary mapping destinations to lists of objects that should
be written to that destination
"""
if self._exception:
raise self._exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not crucial but I see that flush sets done to False. IIUC, once we call flush and the buffer is empty, the communicate will stop. If after this another put happens, we'd just append to the buffer but would never know what happened to it.

How about raising an exception here if we're done to ensure this object is not accidentally reused? Not reusing this is probably a very important attribute in error cases as well so this would serve two purposes at once.

Alternatively, upon closing we could set the exception attribute with an appropriate exception which would serve a similar purpose, I believe

@mrocklin
Copy link
Member Author

@fjetter sorry to trouble you, but I've abandoned this PR and moved on to #6007

@fjetter fjetter closed this Apr 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants