Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker has no exceptions attribute anymore #5225

Closed
steffen-schroeder-by opened this issue Aug 18, 2021 · 4 comments · Fixed by #5226
Closed

Worker has no exceptions attribute anymore #5225

steffen-schroeder-by opened this issue Aug 18, 2021 · 4 comments · Fixed by #5226

Comments

@steffen-schroeder-by
Copy link

steffen-schroeder-by commented Aug 18, 2021

What happened:
We have a worker plugin which forwards exceptions to sentry. This broke silently (because exceptions is worker appear to be swallowed) because the worker does not have an exceptions attribute anymore.

What you expected to happen:
exceptions attribute is present as it was in past revisions and as in the example: https://github.com/dask/distributed/blob/main/distributed/diagnostics/plugin.py#L114-L127

Minimal Complete Verifiable Example:
Since asserts appear to be swallowed, the control flow is shown via print

from typing import Optional, Any

from distributed import WorkerPlugin, Worker, Client


class MissingExceptionsPlugin(WorkerPlugin):
    name = "missing_exceptions"

    def __init__(self) -> None:
        self.worker: Optional[Worker] = None

    def setup(self, worker: Worker) -> None:
        self.worker = worker

    def transition(self, key: str, start: str, finish: str, **kwargs: Any) -> None:
        if finish == "error":
            try:
                assert self.worker is not None
                print(f"Fine: {self.worker.exceptions[key].data}")
            except BaseException as b:
                print(f"Boom: {b}")
                pass


def raise_exception():
    raise ValueError("foo")


def test_foo():
    client = Client()
    client.register_worker_plugin(MissingExceptionsPlugin())
    result = client.submit(raise_exception)
    result.result()

Output with dask/distributed 2.30.0 - the test correctly prints Fine: foo

============================================================================================================= test session starts ==============================================================================================================
platform linux -- Python 3.8.10, pytest-5.4.3, py-1.10.0, pluggy-0.13.1
benchmark: 3.2.3 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: /Users/foo, inifile: pytest.ini
plugins: timeout-1.3.3, cov-2.11.1, requests-mock-1.8.0, mock-3.5.1, hypothesis-5.23.7, dependency-0.5.1, vcr-1.0.2, benchmark-3.2.3, asyncio-0.12.0
collected 1 item

test_distrubuted_plugin.py distributed.worker - WARNING -  Compute Failed
Function:  raise_exception
args:      ()
kwargs:    {}
Exception: ValueError('foo')

Fine: foo
F

=================================================================================================================== FAILURES ===================================================================================================================
___________________________________________________________________________________________________________________ test_foo ___________________________________________________________________________________________________________________
test_distrubuted_plugin.py:33: in test_foo
    result.result()
/home/docker/venv3.8/lib/python3.8/site-packages/distributed/client.py:225: in result
    raise exc.with_traceback(tb)
test_distrubuted_plugin.py:26: in raise_exception
    raise ValueError("foo")
E   ValueError: foo
=========================================================================================================== short test summary info ============================================================================================================
FAILED test_distrubuted_plugin.py::test_foo - ValueError: foo
============================================================================================================== 1 failed in 1.91s ===============================================================================================================

Output with dask/distributed 2021.7.0 prints Boom: 'Worker' object has no attribute 'exceptions' because the worker has no exception attribute anymore (ignore the noise at the end)

=================================================== test session starts ===================================================
platform linux -- Python 3.8.10, pytest-5.4.3, py-1.10.0, pluggy-0.13.1
benchmark: 3.2.3 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: /Users/foo, inifile: pytest.ini
plugins: cov-2.11.1, requests-mock-1.8.0, mock-3.5.1, hypothesis-5.23.7, dependency-0.5.1, vcr-1.0.2, benchmark-3.2.3, asyncio-0.12.0
collected 1 item

test_distrubuted_plugin.py distributed.worker - WARNING - Compute Failed
Function:  raise_exception
args:      ()
kwargs:    {}
Exception: ValueError('foo')

Boom: 'Worker' object has no attribute 'exceptions'
F

======================================================== FAILURES =========================================================
________________________________________________________ test_foo _________________________________________________________
test_distrubuted_plugin.py:33: in test_foo
    result.result()
/home/docker/venv3.8/lib/python3.8/site-packages/distributed/client.py:226: in result
    raise exc.with_traceback(tb)
test_distrubuted_plugin.py:26: in raise_exception
    raise ValueError("foo")
E   ValueError: foo
================================================= short test summary info =================================================
FAILED test_distrubuted_plugin.py::test_foo - ValueError: foo
==================================================== 1 failed in 2.06s ====================================================
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/core.py", line 502, in handle_comm
    result = await result
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 5054, in add_client
    self.remove_client(client=client)
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 5081, in remove_client
    self.client_releases_keys(
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 4821, in client_releases_keys
    self.transitions(recommendations)
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 6753, in transitions
    self.send_all(client_msgs, worker_msgs)
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 5335, in send_all
    w = stream_comms[worker]
KeyError: 'tcp://127.0.0.1:33979'
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f93401e4940>, <Task finished name='Task-39' coro=<BaseTCPListener._handle_stream() done, defined at /home/docker/venv3.8/lib/python3.8/site-packages/distributed/comm/tcp.py:473> exception=KeyError('tcp://127.0.0.1:33979')>)
Traceback (most recent call last):
  File "/home/docker/venv3.8/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/docker/venv3.8/lib/python3.8/site-packages/tornado/tcpserver.py", line 331, in <lambda>
    gen.convert_yielded(future), lambda f: f.result()
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 490, in _handle_stream
    await self.comm_handler(comm)
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/core.py", line 502, in handle_comm
    result = await result
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 5054, in add_client
    self.remove_client(client=client)
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 5081, in remove_client
    self.client_releases_keys(
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 4821, in client_releases_keys
    self.transitions(recommendations)
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 6753, in transitions
    self.send_all(client_msgs, worker_msgs)
  File "/home/docker/venv3.8/lib/python3.8/site-packages/distributed/scheduler.py", line 5335, in send_all
    w = stream_comms[worker]
KeyError: 'tcp://127.0.0.1:33979'

Anything else we need to know?:
If you are using pytest to run the example, run it with -s option to see the prints on stdout

Environment:

  • Dask version: 2021.7.0
  • Python version:3.8.1
  • Operating System: Debian 9.13 (strech)
  • Install method (conda, pip, source): pip
@steffen-schroeder-by
Copy link
Author

The exception in question can be found in self.worker.tasks[key].exception so at least the docstring in https://github.com/dask/distributed/blob/main/distributed/diagnostics/plugin.py#L114-L127 might need to be changed.

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Aug 18, 2021

It looks like this was removed nearly a year ago in #4107. Perhaps @gforsyth can comment on the reasoning and if there is any alternative?

@fjetter
Copy link
Member

fjetter commented Aug 18, 2021

Indeed the alternative is now self.worker.tasks[key].exceptions. A lot of task specific worker state has been moved to a distributed.worker.TaskState object which has an exception and traceback attribute. Both is also available as plain formatted text already via the exception_text / traceback_text attribute (in case the exception is shipped to a place where you do not want to deserialize stuff, see also #5126 for some context)

I'll update the referenced doc strings accordingly


What might interest you as well is that we're currently investigating to capture all server-side exceptions and log them as an event, e.g. to be forwarded to a client.

see #5184 for a discussion about the exception capturing and #5217 for the event subscription on client side

With this you might no longer need to scrape worker plugins but could do this on the scheduler or client instead. nothing wrong with the worker plugin in general, though.

@fjetter
Copy link
Member

fjetter commented Aug 18, 2021

For reference, the TaskState attributes are documented here

class TaskState:
"""Holds volatile state relating to an individual Dask task
* **dependencies**: ``set(TaskState instances)``
The data needed by this key to run
* **dependents**: ``set(TaskState instances)``
The keys that use this dependency.
* **duration**: ``float``
Expected duration the a task
* **priority**: ``tuple``
The priority this task given by the scheduler. Determines run order.
* **state**: ``str``
The current state of the task. One of ["waiting", "ready", "executing",
"fetch", "memory", "flight", "long-running", "rescheduled", "error"]
* **who_has**: ``set(worker)``
Workers that we believe have this data
* **coming_from**: ``str``
The worker that current task data is coming from if task is in flight
* **waiting_for_data**: ``set(keys of dependencies)``
A dynamic version of dependencies. All dependencies that we still don't
have for a particular key.
* **resource_restrictions**: ``{str: number}``
Abstract resources required to run a task
* **exception**: ``str``
The exception caused by running a task if it erred
* **traceback**: ``str``
The exception caused by running a task if it erred
* **type**: ``type``
The type of a particular piece of data
* **suspicious_count**: ``int``
The number of times a dependency has not been where we expected it
* **startstops**: ``[{startstop}]``
Log of transfer, load, and compute times for a task
* **start_time**: ``float``
Time at which task begins running
* **stop_time**: ``float``
Time at which task finishes running
* **metadata**: ``dict``
Metadata related to task. Stored metadata should be msgpack
serializable (e.g. int, string, list, dict).
* **nbytes**: ``int``
The size of a particular piece of data
* **annotations**: ``dict``
Task annotations

I would consider the documented attributes as mostly stable. Peaking over the attributes I don't expect any of them to change (neither type nor semantics) with two exceptions triggered by #5046 suspicious_count might be removed (TBD) and the set of possible task states might change as well over there. If in doubt, don't hesitate to open a ticket. There is a continuous, ongoing effort to improve our communication in terms of what is considered public and what isn't but this is hard considering the vast "API surface". feedback is always appreciated in that regard, though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants