Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ERROR] asyncio.exceptions.CancelledError #1532

Closed
xmyhhh opened this issue Jun 27, 2022 · 8 comments · Fixed by #1540
Closed

[ERROR] asyncio.exceptions.CancelledError #1532

xmyhhh opened this issue Jun 27, 2022 · 8 comments · Fixed by #1540

Comments

@xmyhhh
Copy link

xmyhhh commented Jun 27, 2022

The program throws an error at runtime, how can this error be solved?

[ERROR] [2022-06-27 14:13:51,391:asyncio.events] 
Traceback (most recent call last):
  File "/home/xumaoyuan/.virtualenvs/lib/python3.8/site-packages/distributed/utils.py", line 761, in wrapper
    return await func(*args, **kwargs)
  File "/home/xumaoyuan/.virtualenvs/lib/python3.8/site-packages/distributed/client.py", line 1400, in _handle_report
    await self._reconnect()
  File "/home/xumaoyuan/.virtualenvs/lib/python3.8/site-packages/distributed/utils.py", line 761, in wrapper
    return await func(*args, **kwargs)
  File "/home/xumaoyuan/.virtualenvs/lib/python3.8/site-packages/distributed/client.py", line 1211, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/home/xumaoyuan/.virtualenvs/lib/python3.8/site-packages/distributed/client.py", line 1241, in _ensure_connected
    comm = await connect(
  File "/home/xumaoyuan/.virtualenvs/lib/python3.8/site-packages/distributed/comm/core.py", line 313, in connect
    await asyncio.sleep(backoff)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 659, in sleep
    return await future
asyncio.exceptions.CancelledError
@eddiebergman eddiebergman self-assigned this Jun 27, 2022
@eddiebergman
Copy link
Contributor

eddiebergman commented Jun 27, 2022

Hi @xmyhhh,

It would be helpful if you could provide more context:

  • What system are you running on?
  • Do you have some sample code that reproduces this?
  • What version of autosklearn are you using? pip list | grep auto
  • Can you provide the contents of in your virtual env? pip list

Best,
Eddie

@eddiebergman
Copy link
Contributor

eddiebergman commented Jun 27, 2022

Hi @xmyhhh,

I did some experimenting and the fix is pip install distributed==2022.04.

Not sure why the newest dask versions break things but that's a fix I have for you right now

@eddiebergman
Copy link
Contributor

Did some more thinking as to why this wasn't an issue. I used this example as a test and I'm surprised this didn't trigger for any of the tests, needs to be investigated a little further

@eddiebergman
Copy link
Contributor

I'll hot patch this later today :)

@eddiebergman eddiebergman unpinned this issue Jun 27, 2022
@eddiebergman
Copy link
Contributor

eddiebergman commented Jun 27, 2022

No longer going to hotpatch this, I thought this was a breaking issue in which autosklearn no longer function but instead it's just seems to be an error with logging within dask while we close down the dask Client.

I'll keep a running log here as I debug it as it's likely going to take a lot of trial and error to fix and I don't have deep time to dedicate.

@eddiebergman
Copy link
Contributor

eddiebergman commented Jun 28, 2022

First thing to note is that asyncio.exceptions.CancelledError is absolutely fine and all of this comes from within Dask and not from us. We may be shutting down slightly incorrectly for our setup that leads to this error leak. However the errors are not harmful and exist only in the logger. CancelledError is basically when something tries to get a result from an async task that was cancelled (I think) and that's alright while we're shutting down anyways.

Can confirm that it happens when we try to close() the dask client.

self._dask_client.shutdown()
self._dask_client.close()

  • shutdown() closes down the scheduler that the Client is attached to first, and this closes without issue.
  • close() will then close down the client at which point it catches this error twice from a decorator @log_errors and logs it.

At first I thought it might be reference issue where the scheduler gets collected too early since it has no named reference outside of Client so I moved the scheduler LocalCluster out of the Client into self._cluster but the issue still remained.

self._dask_client = Client(
LocalCluster(
n_workers=self._n_jobs,
processes=False,
threads_per_worker=1,
# We use the temporal directory to save the
# dask workers, because deleting workers takes
# more time than deleting backend directories
# This prevent an error saying that the worker
# file was deleted, so the client could not close
# the worker properly
local_directory=tempfile.gettempdir(),
# Memory is handled by the pynisher, not by the dask worker/nanny
memory_limit=0,
),
# Heartbeat every 10s
heartbeat_interval=10000,
)

Might be worth closing down the scheduler using self._cluster.close() and then self._client.close() but I don't think this is the issue.


Quick check also confirmed there are no pending futures before the shutdown() is called so whatever was cancelled happened before we reached dask.close()


Tried to naively just use contextlib.supress(CancelledError) around client.close() but unfortunately it's just sent straight to the logger and not actually raised


Doesn't seem to be anything on dask about this coming up.


Next suggestions

Main thing would be to identify the futures which give the CancelledError. It would also be good to create a minimal example of the above creation of the dask client, submit a job or two and then do our shutdown process and see if the problem is minimally reporducible.

  1. Do we stop the client's current jobs anywhere else? These stopped jobs may need to also be removed from some internal queue of dask.
  2. Can we manually trigger these errors and collect these futures outside of close and just catch them so dask doesn't log it?
  3. Failing all that, we can just add something to our default logging.yaml that ignores them

@xmyhhh
Copy link
Author

xmyhhh commented Jun 28, 2022

I can't believe you replied so much, thanks so much but I need a moment to check your reply~

@eddiebergman
Copy link
Contributor

eddiebergman commented Jul 8, 2022

So I got back to this and created a minimal example that solves the issue. Essentially client.shutdown() should be reserved for usage when the Client creates the cluster itself. I created a minimal reproducible example where the issue occurs and how to solve it.

Essentially, when self.client goes out of scope, its __del__ method gets called which causes the Client to call close() on itself. If it's status is already "closed" then nothing happens.

In the broken example, we can see status = connecting for some reason and so it will try to close twice. In the second example, we can see its status is "closed" and so it safely does nothing.

# Recreate broken
from dask.distributed import Client, LocalCluster

class A:
    def do(self):
        print("setup", flush=True)

        self.client = Client(LocalCluster(n_workers=2, processes=False))
        print(f"post creation | self.client.status = {self.client.status}", flush=True)

        self.client.shutdown()
        print(f"post shutdown | self.client.status = {self.client.status}", flush=True)

a = A()
a.do()

# setup
# post creation | self.client.status = running                                                                                                                                                                                            
# post shutdown | self.client.status = connecting
# `a` goes out of scope, self.client.__del__ is called, `Client` is "connecting", close again                                                                                                                                                                                                                                                                                                                                                                                                                      
# ...errors
# Fixed
from dask.distributed import Client, LocalCluster

class A:
    def do(self):
        print("setup", flush=True)

        self.cluster = LocalCluster(n_workers=2, processes=False)
        self.client = Client(self.cluster)
        print(f"post creation | self.client.status = {self.client.status}", flush=True)
        print(f"post creation | self.cluster.status = {self.cluster.status}", flush=True)

        self.client.close()
        self.cluster.close()
        print(f"post shutdown | self.client.status = {self.client.status}", flush=True)
        print(f"post shutdown | self.cluster.status = {self.cluster.status}", flush=True)

a = A()
a.do()

# setup                                                                                                                                                                                                                                   
# post creation | self.client.status = running                                                                                                                                                                                            
# post creation | self.cluster.status = Status.running                                                                                                                                                                                    
# post shutdown | self.client.status = closed                                                                                                                                                                                             
# post shutdown | self.cluster.status = Status.closed
# # `a` goes out of scope, self.client.__del__ is called, `Client` is "closed", do nothing   

This means since we create our own cluster, we are in charge of shutting it down, not the client. Or so that's what I gather from this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants