-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race condition in scatter->dereference->scatter #8576
Comments
We've done some nontrivial refactoring in how we're hashing/tokenizing objects (dask/dask#10905) I see this came up in an OSS project. Is it possible for you to link to the code this is running? cc @crusaderky |
Certainly. The problem is that the code is quite complex and the objects we use are also quite complex. Actually running this particular workflow currently requires a proprietary program. I was hoping to be able to create a smaller example, but wasn't able to. I could potentially do some debugging in dask/distributed myself if I knew where to start looking. Could it be that scattered objects gets removed after they are deemed not needed any longer? The main github page: https://github.com/pharmpy/pharmpy |
The lifetime of a scattered object is coupled to the |
Hi @rikardn,
What does "first" and "second" workflow mean? are they two keys inside the same dsk that end up summarized and retrieved at once through the In other words - could your algorithm be simplified down to this? import distributed
with distributed.Client(processes=False) as client:
dsk = {
"results": (sum, ["first", "second"]),
"first": client.scatter(123),
"second": client.scatter(123), # produces a future with the same key as first
}
print(client.get(dsk, "results")) or to this? with distributed.Client(processes=False) as client:
dsk = {"results": client.scatter(123)}
print("first", client.get(dsk, "results"))
with distributed.Client(processes=False) as client:
dsk = {"results": client.scatter(123)}
print("second", client.get(dsk, "results")) |
The full workflow is something like: def func():
obj = create_object()
dsk1 = create_workflow1(client.scatter(obj))
res1 = run_dynamic_workflow(dsk1)
dsk2 = create_workflow2(client.scatter(obj), res1) # Note same object as before. New call to scatter.
res2 = run_dynamic_workflow2(dsk2)
return res2
def run_dynamic_workflow(dsk):
client = get_client()
futures = client.get(dsk, "result", sync=False)
secede()
T = client.gather(futures)
rejoin()
return res
with distributed.Client(processes=False) as client:
dsk = {
"results": (func),
}
print(client.get(dsk, "results")) |
Thanks @rikardn, this helps a lot. An important nitpick though: did you accidentally omit def func():
client = get_client() ? In other words, do Second important nit:
is res1 a computed result, as your pseudocode lets intend, or a Future to the output of the first workflow? |
First: Second: Third:
Fourth: An updated example for reference: def func():
obj = create_object()
dsk1 = create_workflow1(client.scatter(obj))
res1, new_obj = run_dynamic_workflow(dsk1) # Note that new_obj and obj is the same object, i.e. having the same hash
dsk2 = create_workflow2(client.scatter(new_obj), client.scatter(res1))
res2 = run_dynamic_workflow2(dsk2)
return res2
def run_dynamic_workflow(dsk):
client = get_client()
futures = client.get(dsk, "result", sync=False)
secede()
res = client.gather(futures)
rejoin()
return res
with distributed.with LocalCluster(processes=False) as cluster, Client(cluster) as client:
dsk = {
"results": (func),
}
print(client.get(dsk, "results")) |
I realize my first point doesn't make sense. It is acutally the |
It isn't. What I meant was if res is actually just the raw output of client.get, e.g. distributed.Future objects. |
Reproduced. I'm willing to bet that your pseudocode is missing a detail: before you're calling from time import sleep
import distributed
client = distributed.Client(processes=False)
while True:
print(".", end="")
x = client.scatter(123)
assert client.cluster.scheduler.tasks[x.key].state == "memory"
del x
# while client.cluster.scheduler.tasks:
# sleep(0.01) output:
if I uncomment the sleep after the deletion, it goes on indefinitely. |
@crusaderky Just wow! I am amazed that you from my messy information could figure this one out. Thanks! I can confirm that this issue is also in dask/distributed 2024.2.0 and the reason it was triggered for me starting with 2024.2.1 was that the hash function in dask was changed so that it now gives the same hash for the objects that are the same (which is a good change). One potential workaround is to use |
I recently encountered this error as well, I was able to get it down to a somewhat minimal example. import numpy as np
from distributed import Client, LocalCluster, Future
import gc
import pytest
cluster = LocalCluster(
n_workers=2, threads_per_worker=2, dashboard_address=None, processes=True
)
class DataHolder():
def __init__(self, data, client, raise_error=False):
self._data_future = client.scatter(data, broadcast=True)
if raise_error:
raise ValueError("A value Error")
def do_remote_work(self):
future = client.submit(lambda x: x**2, self._data_future)
return future
data = np.arange(10)
with Client(cluster) as client:
with pytest.raises(ValueError):
DataHolder(data, client, raise_error=True)
# gc.collect() Adding an explicit garbage collection usually makes this work.
holder = DataHolder(data, client)
future_result = holder.do_remote_work()
print(future_result.result())
As pointed out it's very likely due to the garbage collection on the first object happening while the second object is being created thus it deletes the key that the second item creates. This makes me think there is some odd reference counting happening somewhere, or that there should be an "atomic" action somewhere that there isn't one. |
I also do not immediately see where the |
This is a little hidden but it is happening while the This race condition is not related to reference counting but is caused by scatter and release using different comm channels which creates an ordering problem. Fixing this is not straight forward (#7480, #8430 for a suggestion for a systematic fix). A mitigation (as already hinted to in #8576 (comment)) is to sleep briefly after an old scatter future is being released. That's why the |
another mitigation that's already been mentioned is to ensure that the name of every scattered future is unique. Providing |
#### Summary Updates for `dask>2024.2.1` #### PR Checklist * [ ] If this is a work in progress PR, set as a Draft PR * [ ] Linted my code according to the [style guides](https://docs.simpeg.xyz/content/getting_started/contributing/code-style.html). * [ ] Added [tests](https://docs.simpeg.xyz/content/getting_started/practices.html#testing) to verify changes to the code. * [ ] Added necessary documentation to any new functions/classes following the expect [style](https://docs.simpeg.xyz/content/getting_started/practices.html#documentation). * [ ] Marked as ready for review (if this is was a draft PR), and converted to a Pull Request * [ ] Tagged ``@simpeg/simpeg-developers`` when ready for review. #### What does this implement/fix? A few things: 1) dask recently updated how things items are hashed to create a more deterministic hash for items. For objects it will change depending on the state of the objects. `Simulation` objects are mutable things, thus their hash would change overtime for the same object. For the `DaskMetaSimulation`, we want a single simulation's hash (also a single map's hash) to be constant in time, so this PR adds uuid properties to each of those base classes that are then registered to be used by dask to hash the objects. 2) This deterministic hashing lead to a race condition in the error testing. When objects are scattered, dereferenced, then scattered again. The garbage collector would destroy the scattered objects dereferenced from the first call `after` the second scatter, thus creating a case where we are trying to access a Canceled Future. 3) This is also now a bit more rigorous about only using a single worker for each simulation operation when the simulations inadvertently live on multiple workers, by selecting the worker that had fewer simulations assigned to it. #### Additional information See dask/distributed#8576 for more information about the race condition. This also implements one of the updates in #1444 regarding the testing environment script so that we are sure we are creating the correct environment.
Triage summary
If you scatter a key, dereference the returned future, and then scatter again the same key (with the same value), the release of the first future is likely to reach the scheduler after the transition to memory of the second one and you'll end up holding a Future to a forgotten task. This in turn will cause your computation to fail.
Reproducer below: #8576 (comment)
Original post
I am using dask/distributed to run custom workflows. A workflow that used to work with versions 2024.2.0 stopped working with 2024.2.1 (and still does not work with 2024.3.0). I have been unable to create a minimal reproducible example, but I have some information that could give clues to what could be the problem.
The setup is that a static workflow, run on a
LocalCluster
with threads, is calling two dynamic workflows (see the graphs below) and many of the objects are scattered. I think that the problem is that the scattered object with idModelEntry-d3c014c28f9af6108cba2a6c960688ce
is the same for both dynamic workflows. Using distributed 2024.2.0 (or earlier) have different ids for this object even though it is the same object. Also if tellingscatter
to not usehash
the workflow will run, I guess because now the ids will be different. Not scattering the object will also work. Given the log messages (see below) it seems as if we are losing theModelEntry-d3c014c28f9af6108cba2a6c960688ce
in the second dynamic workflow and it seems to only happen when the scatter-id is the same in the two workflows.The stacktrace:
The log:
First dynamic workflow (works):
Failing dynamic workflow (see first function call for the suspect scattered object):
The text was updated successfully, but these errors were encountered: