Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TaskState to worker.py #4107

Merged
merged 91 commits into from
Oct 15, 2020
Merged

Add TaskState to worker.py #4107

merged 91 commits into from
Oct 15, 2020

Conversation

gforsyth
Copy link
Contributor

This is some initial progress towards #4097 -- dependents, dependencies and dep_state are still being tracked in a single dict on the Worker class but those come next.

Still lots to do on this, so no need for anyone to review just yet.

gforsyth and others added 5 commits September 10, 2020 13:17
Starting with:
* `key`
* `duration`
* `priority`
* `state`
* `resource_restrictions`
* `exception`
* `traceback`

Going to get that working, then handle the rest --
`dep_state` seems tricky to get right
Signed-off-by: Gil Forsyth <gil@forsyth.dev>
@mrocklin
Copy link
Member

Hooray for PRs with incremental progress!

FWIW I really like the approach of starting with just the task-wise state. That seems sanity-preserving.

Working plan:
- All dependencies are also Tasks (they don't necessarily have a `runspec`
because they might be gathered from another worker)
- `dependents` and `dependencies` are sets of TaskState objects
Also treating `who_has` as a Task object level dict, where keys are the
dependency keys and workers is a list of workers who have that data

This is maybe... wrong?

Each dependency TaskState object should be itself (ish) so probably just needs
to be a list of workers
@mrocklin
Copy link
Member

Checking in here @gforsyth . How are things going? Have you run into tricky bits that I can help with?

@gforsyth
Copy link
Contributor Author

How are things going?

Definitely hit some tricky bits, but there's something I thought of I want to try.
Mostly working, just some bookkeeping issues w.r.t. updating data_needed, waiting_for_data, etc. in a sane way.

I'll ping if the next few hours don't resolve things.

self.active_threads,
self.active_threads_lock,
self.scheduler_delay,
),
)
# TODO: it seems insane that this needs to be here
# but it is occasionally cleared by something in executor_submit
self.executing.add(ts.key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be here? What happens if it is removed?

Grepping through the codebase I see three paths for this:

  1. The task has been released during execution. Maybe the user no longer needs the result
  2. The task is long-running, such as is called with worker_client or secede
  3. The same task is run twice (??)

In some of these situations I would expect the following ts.state not in ("executing", "long-running") check to clean things up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm,
I had put this in because there were tests in test_failed_worker.py that would throw assertion errors (but not fail the test in question) but I've just removed it and can no longer get those errors to show up, so it may have been due to a problem elsewhere that is now fixed.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delayed review @gforsyth. Thanks for all your careful work here!

distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Show resolved Hide resolved
pass
try:
self.has_what[worker].remove(dep)
ts.who_has.remove(worker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be moved outside the try/except block

distributed/worker.py Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
@@ -2253,70 +2241,24 @@ def release_key(self, key, cause=None, reason=None, report=True):
pdb.set_trace()
raise

def release_dep(self, dep, report=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we've replaced release_dep calls with release_keys. What was the reasoning for this? It'll impact worker plugins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no longer a distinction between a dep and a Task which is why I removed it. But if it's going to impact the plugins I can restore it and have it wrap release_keys (or we can update the plugins?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll impact worker plugins

Do we know of any worker plugins that trigger on release_dep?

Also, If we're jumping from version 2 to 2020 in the next release then we can also break some things. Breaking WorkerPlugin.release_dep sounds ok to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know of any worker plugins that trigger on release_dep?

None come to mind, but there may be user-defined plugins that use release_dep

Breaking WorkerPlugin.release_dep sounds ok to me

Yeah, I think we're treating all the HLG changes + the bump to calver as a major version change, so removing release_dep should be okay

gforsyth and others added 6 commits October 9, 2020 10:12
And also restore said count to Worker `repr`
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
This is a redundant bit of state tracking and overcomplicates things.
Instead, similar to `Worker.tasks` this maintains a count of the
executing tasks for reporting but relies on `TaskState` objects to keep
track of the task state.

Note that the removal of the `finally:` block, specifically, is because
_if_ the key has already been released, then `self.executing_count` is
already decremented so that particular check doesn't need to be run.
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @gforsyth! It looks like there are a few remaining executing -> executing_count updates, but otherwise this looks good to go

distributed/diagnostics/tests/test_worker_plugin.py Outdated Show resolved Hide resolved
@mrocklin
Copy link
Member

Passed tests here are encouraging. I'm going to restart Github actions CI to see if this is consistent.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gforsyth! I'm +1 on merging the changes here. I'll leave this open for a bit to allow for last minute feedback, but then I plan on merging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants