Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race conditions in implicit creation of worker clients when serializing futures resulting in distributed.CancelledErrors #7498

Closed
fjetter opened this issue Jan 25, 2023 · 9 comments · Fixed by #8827

Comments

@fjetter
Copy link
Member

fjetter commented Jan 25, 2023

TLDR

Serializing futures as part of a dask collection can cause race conditions leading to spurious distributed.CancelledErrors or other entirely unrelated exceptions.

These can be particularly hard to debug since their cause and effect are decorrelated, i.e. task X can raise a CancelledError even though task Y was lost and a relation between X and Y is not apparent from the task graph.

I see a certain appeal of this feature but I am wondering how widely this is used. I am tempted to remove this feature (at least temporarily) from the code base entirely.

This issue came up during a closer review of #6028 and besides the consistency issues this feature currently has, it also adds complexity to how we serialize graphs.

cc @jrbourbeau @mrocklin @rjzamora does either one of you have more context about how commonly used this feature is and whether there are other cases I am missing? Feel free to skip the detailed explanation, I hope the example is sufficient to understand what I am talking about.

Example

(See test_client.py::test_serialize_collections for runnable code)

x = da.arange(10, chunks=(5,)).persist()

def f(x):
    assert isinstance(x, da.Array)
    return x.sum().compute()

future = c.submit(f, x)
result = await future
assert result == sum(range(10))

Alternative

This code can be replaced with the Client.{publish|get}_dataset functionality. This would require the user taking over explicit ownership of the futures/dataset and client and manage their lifecycle, thereby avoiding all the problems that are introduced by the current magic.

dataset_name = "foo"
x = await c.publish_dataset(**{dataset_name: da.arange(10, chunks=(5,))})

def f():
    from distributed import worker_client
    with worker_client() as c:
        x = c.get_dataset(dataset_name)
        assert isinstance(x, da.Array)
        return x.sum().compute()

future = c.submit(f)
result = await future
assert result == sum(range(10))
await c.unpublish_dataset(dataset_name)

Detailed explanation

Dask allows the serialization of client.Future objects iff they are embedded in a collection[1]. An example of this is (e.g. test_client.py::test_serialize_collections)

x = da.arange(10, chunks=(5,)).persist()

def f(x):
    assert isinstance(x, da.Array)
    return x.sum().compute()

future = c.submit(f, x)
result = await future
assert result == sum(range(10))

For instance, a collection is persisted. This causes the array collection task mapping to be materialized as a mapping from key to futures.
If we submit this as an argument to a task, the futures are serialized.

Since a future is owned by a client, we need to find a new owner for such a future upon deserialization. This is currently handled in __setstate__, see

def __setstate__(self, state):
key, address = state
try:
c = Client.current(allow_global=False)
except ValueError:
c = get_client(address)
self.__init__(key, c)
c._send_to_scheduler(
{
"op": "update-graph",
"tasks": {},
"keys": [stringify(self.key)],
"client": c.id,
}
)

This deserialization code is attempting to use a "current" client and falls back to worker.get_client which attempts to reuse an existing client but if this is not available, it will initialize a new one. [2]

So much for context. If this all would work flawlessly this would just be a bit of complex magic but the devil's in the details.

Specifically, the way the client is being implicitly initialized can cause spurious CancelledErrors

asynchronous = in_async_call(self.loop)
self._client = Client(
self.scheduler,
loop=self.loop,
security=self.security,
set_as_default=True,
asynchronous=asynchronous,
direct_to_workers=True,
name="worker",
timeout=timeout,
)
Worker._initialized_clients.add(self._client)

The initialization here starts by inferring whether or not we're in an async call,

def in_async_call(loop, default=False):
"""Whether this call is currently within an async call"""
try:
return loop.asyncio_loop is asyncio.get_running_loop()
except RuntimeError:
# No *running* loop in thread. If the event loop isn't running, it
# _could_ be started later in this thread though. Return the default.
if not loop.asyncio_loop.is_running():
return default
return False

This in_async_call is misleading since it does not actually tell you if we're awaiting the callable but rather weather we are in the main thread or not. Therefore, the client will always be initialized asynchronously (i.e. I believe this check is superfluous)

This Client is never awaited. It will schedule a future on the event loop that is never awaited but will eventually connect the client to the scheduler. If the connection attempt fails, this results in a raised CancelledError as soon as the user attempts to fetch the futures results.

self._started = asyncio.ensure_future(self._start(**kwargs))

Another failure case is if the future that is being serialized is forgotten prematurely. Since the worker only tells the scheduler about its interest in these keys upon deserialization, there is a significant time period in which the initial client could have released the tasks. Simplest example is if taking the original and deleting the reference after submission

    x = da.arange(10, chunks=(5,)).persist()

    def f(x):
        assert isinstance(x, da.Array)
        return x.sum().compute()

    future = c.submit(f, x)
    del x
    result = await future

I expected this to cause a CancelledError (and indeed I can see a FutureState.cancel call with a debugger) but instead I get an entirely unrelated exception from the array code numpy.AxisError: axis 0 is out of bounds for array of dimension 0.
My suspicion is that cancelling an internal future of a collection causes weird side effects.

[1] Futures that are not embedded in a collection are not serialized. Instead, their key is tracked as a dependency and passed to the user function materialized.

[2] Note: the first try/except to get a current(allow_global=False) is not very helpful for a couple of reasons. Firstly, during deserialization it is almost impossible to actually have a current client running (as in with client.as_current()), i.e. this is almost guaranteed to fail. Secondly, the get_client that is called as an exception handler is allowed to access a global/default client, i.e. disabeling this in the initial attempt is misleading.

@fjetter
Copy link
Member Author

fjetter commented Jan 25, 2023

I am a bit disappointed seeing that dataset_publish also relies on the serializability of the futures. I expected something like __dask_postpersist__ to provide a static method to construct the object from a set of futures but unfortunately that is not true

@rjzamora
Copy link
Member

cc @madsbk (for viz)

@fjetter
Copy link
Member Author

fjetter commented Jan 25, 2023

FWIW I believe the original __dask_postpersist__ output as introduced in dask/dask#2748 was static, e.g.

    def __dask_postpersist__(self):
        return Array, (self.name, self.chunks, self.dtype)

does not contain any references to the instance. this is basically a type and some literals, similar to what __reduce__ would return

nowadays, it looks like

    def __dask_postpersist__(self):
        return self._rebuild, ()

    def _rebuild(self, dsk, *, rename=None):
        name = self._name
        if rename:
            name = rename.get(name, name)
        return Array(dsk, name, self.chunks, self.dtype, self._meta)

which directly references the instance via the self.rebuild. I don't think that's truly necessary

@fjetter
Copy link
Member Author

fjetter commented Jan 25, 2023

Looks like this self reference was introduced in dask/dask#7142

@fjetter
Copy link
Member Author

fjetter commented Jan 25, 2023

FWIW to avoid the race above it is not necessary to forbid serialization entirely. We just have to disallow initialization of a new client.

@fjetter
Copy link
Member Author

fjetter commented Jan 25, 2023

There is a similar pattern with scatter, e.g.

    ddf = dd.from_pandas(df, npartitions=2).persist()
    future = await c.scatter(ddf)

I don't fully understand what this is intended to do. why would we want to scatter the futures/a persisted collection?

@fjetter
Copy link
Member Author

fjetter commented Jan 25, 2023

Looks like this functionality was introduced in #1201 This was a major PR in ancient past that changed how clients were initialized or reused. From the discussion there I gather these tests (test_serialize_collections_of_futures_sync and test_serialize_collections_of_futures) were mostly introduced as sanity checks but not necessarily to support a particular user behavior.

@fjetter
Copy link
Member Author

fjetter commented Jan 25, 2023

I opened #7500 which would remove the possibility to pass collections of futures as arguments by removing the implicit client instantiation.

@jrbourbeau
Copy link
Member

does either one of you have more context about how commonly used this feature is and whether there are other cases I am missing?

I'm not sure how widely this is used. I know xgboost.dask does some interesting things with persisted collections. It's probably worth running a toy xgboost.dask example against a PR that removes this functionality just to confirm it doesn't break things.

@mrocklin are you aware of other use cases where persisted collections are used as inputs to a client.submit call?

It does look a bit off. We don't support dask-in-dask well today, so it seems strange that we are intentionally supporting it in this one case (where the collection has been persisted)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants