Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When tasks are re-submitted with the same key, Scheduler.queued may return stale TaskState objects #7504

Open
fjetter opened this issue Jan 27, 2023 · 1 comment · May be fixed by #7528
Open

Comments

@fjetter
Copy link
Member

fjetter commented Jan 27, 2023

In distributed/tests/test_client.py::test_threadsafe_get we saw an assertion error during scheduling of queued tasks where a task was already forgotten even though the scheduler just tried to assign this task.

https://github.com/dask/distributed/actions/runs/4026033623/jobs/6920058827

  File "D:\a\distributed\distributed\distributed\scheduler.py", line 4617, in stimulus_queue_slots_maybe_opened

    assert qts.state == "queued", qts.state

AssertionError: forgotten

cc @gjoseph92

@gjoseph92 gjoseph92 changed the title AssertionError in queuing scheduling logic When tasks are re-submitted with the same key, Scheduler.queued may return stale TaskState objects Jan 28, 2023
@gjoseph92
Copy link
Collaborator

TaskState objects are hashable. Their hash is currently their key. So if a key is forgotten, but its TaskState object still exists, when a task with the same key is submitted, the two TaskState objects will hash and compare as equal, even though they are logically different tasks.

So I think this is what's happening:

  1. Task x is added to the queue.

  2. Task x is removed from the queue (not popped).

    Remember that internally, a HeapSet has both a set and a heap of weakrefs. HeapSet.remove removes the object from the set, but not from the heap. So HeapSet._data doesn't contain x, but HeapSet._heap still does.

  3. Task x is forgotten (but something still has a reference to it somewhere).

  4. Task x is re-submitted. A different TaskState object is created.

  5. The new x TaskState is added to the queue.

    x in HeapSet._data (the set) is False. So x is added to _data, and x is pushed onto _heap. The heap has an insertion-order tiebreaker, so the new x comes immediately after the old x in the heap.

  6. We peek from the front of the queue.

    The old x object eventually comes off the heap first. That old TaskState object is not in the _data set, but because the new x with the same key is in the set, x in self._data is True, and we return the old, stale x TaskState.

  7. This stale TaskState is in state forgotten, causing the assertion error.

This explanation kinda fits with what the test actually does:

def f(_):
total = 0
for _ in range(20):
total += (x + random.randint(0, 20)).sum().compute()
sleep(0.001)
return total
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(30) as e:
results = list(e.map(f, range(30)))

The test is guaranteed to submit the same graph at least a few times. And in the specific CI run you linked, a worker died in the middle, which could have triggered the removal of a task from the middle.


The underlying issue is that TaskStates shouldn't be hashed or equal based on keys: #7510.

@gjoseph92 gjoseph92 linked a pull request Feb 9, 2023 that will close this issue
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants