-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race conditions in implicit creation of worker clients when serializing futures resulting in distributed.CancelledError
s
#7498
Comments
I am a bit disappointed seeing that |
cc @madsbk (for viz) |
FWIW I believe the original def __dask_postpersist__(self):
return Array, (self.name, self.chunks, self.dtype) does not contain any references to the instance. this is basically a type and some literals, similar to what nowadays, it looks like def __dask_postpersist__(self):
return self._rebuild, ()
def _rebuild(self, dsk, *, rename=None):
name = self._name
if rename:
name = rename.get(name, name)
return Array(dsk, name, self.chunks, self.dtype, self._meta) which directly references the instance via the |
Looks like this self reference was introduced in dask/dask#7142 |
FWIW to avoid the race above it is not necessary to forbid serialization entirely. We just have to disallow initialization of a new client. |
There is a similar pattern with scatter, e.g. ddf = dd.from_pandas(df, npartitions=2).persist()
future = await c.scatter(ddf) I don't fully understand what this is intended to do. why would we want to scatter the futures/a persisted collection? |
Looks like this functionality was introduced in #1201 This was a major PR in ancient past that changed how clients were initialized or reused. From the discussion there I gather these tests ( |
I opened #7500 which would remove the possibility to pass collections of futures as arguments by removing the implicit client instantiation. |
I'm not sure how widely this is used. I know @mrocklin are you aware of other use cases where persisted collections are used as inputs to a It does look a bit off. We don't support dask-in-dask well today, so it seems strange that we are intentionally supporting it in this one case (where the collection has been persisted) |
TLDR
Serializing futures as part of a dask collection can cause race conditions leading to spurious
distributed.CancelledError
s or other entirely unrelated exceptions.These can be particularly hard to debug since their cause and effect are decorrelated, i.e. task X can raise a
CancelledError
even though task Y was lost and a relation between X and Y is not apparent from the task graph.I see a certain appeal of this feature but I am wondering how widely this is used. I am tempted to remove this feature (at least temporarily) from the code base entirely.
This issue came up during a closer review of #6028 and besides the consistency issues this feature currently has, it also adds complexity to how we serialize graphs.
cc @jrbourbeau @mrocklin @rjzamora does either one of you have more context about how commonly used this feature is and whether there are other cases I am missing? Feel free to skip the detailed explanation, I hope the example is sufficient to understand what I am talking about.
Example
(See test_client.py::test_serialize_collections for runnable code)
Alternative
This code can be replaced with the
Client.{publish|get}_dataset
functionality. This would require the user taking over explicit ownership of the futures/dataset and client and manage their lifecycle, thereby avoiding all the problems that are introduced by the current magic.Detailed explanation
Dask allows the serialization of
client.Future
objects iff they are embedded in a collection[1]. An example of this is (e.g. test_client.py::test_serialize_collections)For instance, a collection is persisted. This causes the array collection task mapping to be materialized as a mapping from key to futures.
If we submit this as an argument to a task, the futures are serialized.
Since a future is owned by a client, we need to find a new owner for such a future upon deserialization. This is currently handled in
__setstate__
, seedistributed/distributed/client.py
Lines 464 to 478 in 0161991
This deserialization code is attempting to use a "current" client and falls back to
worker.get_client
which attempts to reuse an existing client but if this is not available, it will initialize a new one. [2]So much for context. If this all would work flawlessly this would just be a bit of complex magic but the devil's in the details.
Specifically, the way the client is being implicitly initialized can cause spurious
CancelledErrors
distributed/distributed/worker.py
Lines 2572 to 2583 in 0161991
The initialization here starts by inferring whether or not we're in an async call,
distributed/distributed/utils.py
Lines 343 to 352 in 0161991
This
in_async_call
is misleading since it does not actually tell you if we're awaiting the callable but rather weather we are in the main thread or not. Therefore, the client will always be initialized asynchronously (i.e. I believe this check is superfluous)This Client is never awaited. It will schedule a future on the event loop that is never awaited but will eventually connect the client to the scheduler. If the connection attempt fails, this results in a raised
CancelledError
as soon as the user attempts to fetch the futures results.distributed/distributed/client.py
Line 1186 in 0161991
Another failure case is if the future that is being serialized is forgotten prematurely. Since the worker only tells the scheduler about its interest in these keys upon deserialization, there is a significant time period in which the initial client could have released the tasks. Simplest example is if taking the original and deleting the reference after submission
I expected this to cause a
CancelledError
(and indeed I can see aFutureState.cancel
call with a debugger) but instead I get an entirely unrelated exception from the array codenumpy.AxisError: axis 0 is out of bounds for array of dimension 0
.My suspicion is that cancelling an internal future of a collection causes weird side effects.
[1] Futures that are not embedded in a collection are not serialized. Instead, their key is tracked as a dependency and passed to the user function materialized.
[2] Note: the first try/except to get a
current(allow_global=False)
is not very helpful for a couple of reasons. Firstly, during deserialization it is almost impossible to actually have a current client running (as inwith client.as_current()
), i.e. this is almost guaranteed to fail. Secondly, the get_client that is called as an exception handler is allowed to access a global/default client, i.e. disabeling this in the initial attempt is misleading.The text was updated successfully, but these errors were encountered: