-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Client and worker_client refactor #1201
Conversation
9cd47a0
to
c482831
Compare
e9e1884
to
1b54a4a
Compare
Rebased to more sensible commits. Going to merge this soon. |
5676ae8
to
02d3657
Compare
Can I nest future objects into say a dictionary? E.g, instead of
|
This brings up a good question of when do we want to materialize futures to track dependencies and when do we want to serialize them without tracking dependencies. In the case you describe above I would actually expect that to create a dependency on In the case you brought up on Stack Overflow it's actually not clear to me what best behavior is here. Do we want to finalize the result and create a dependency or move around the collection-holding-futures. |
02d3657
to
5681aac
Compare
Here is the behavior I get when trying your example: In [1]: from dask.distributed import Client
In [2]: client = Client()
In [3]: future = client.scatter(123)
In [4]: future2 = client.submit(lambda x: x, {'x': future})
In [5]: future2.result()
Out[5]: {'x': 123} |
I guess I'm a little confused by what we mean when we say
because i get
What I want to do is send the handle to the ddf or future to the worker client and manipulate it there. Is this still not possible at the moment? I'd be happy with a way to wrap ddf so it's like |
Also, why does this fail?
|
I guess even more fundamentally, I thought I'm replicating your test here, but this fails with the same
|
This almost works:
Sometimes it falls over with:
I know I'm doing some weird stuff, but it feels very close :) EDIT: if I throw in strategically placed |
5681aac
to
aea7ac7
Compare
It looks like clients aren't being created on demand. I'll need to take a look. This probably won't happen any time today though. I appreciate the failing tests. |
No problem, appreciate the work, it's very promising! Have a great pre-4th of july weekend. |
@adamklein the recent commits may interest you |
f9e7a6e
to
2e977b5
Compare
This now feels ready to me. @adamklein if you have a chance to try to break this or @pitrou if you have a chance to review I would appreciate it. |
I'm considering where to put documentation for this. I may want to put this in the dask/dask repository. |
Giving it a look this morning! |
This is looking excellent. I'm running into timeout issues on my large task return values but I think that's the problem I hacked before that I simply lost b/c of the
|
Do you have a sense for what is slowing down your scheduler? It seems odd to me to see 5s of delay, even under very heavy load (thousands of workers). You may be interested in this comment. |
Documentation added in dask/dask#2501 |
from distributed.recreate_exceptions import ReplayExceptionClient | ||
ReplayExceptionClient(self) | ||
|
||
@property | ||
def asynchronous(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think either the name of this property and/or its semantics and/or its docstring should be changed, since all three seem to be saying different things :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've expanded the docstring. Maybe this helps? Suggestions welcome.
if asynchronous: | ||
self.status = 'connecting' | ||
|
||
if self.asynchronous: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must admit I fail to understand the intent of the .asynchronous
property. If the user passed asynchronous=True
, we may still start an event loop in a separate thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem arises because we reuse the same client in both contexts. Consider the following:
def g():
# I run in a worker thread
client = get_client()
future = client.submit(inc, 1)
return future.result()
async def f():
async with Client(asynchronous=True, processes=False) as c:
await c.submit(g)
The client in both cases is the same. We tell the client originally that it is operating in asynchronous mode. However when it finds itself in the g
task it realizes that it definitely can't be in asynchronous mode and so switches over within that context.
Ideally we would have a function that could tell us if we "were in the event loop" but this isn't doable. So instead we take a signal from the user (this is asynchronous=True
) but disregard it when it's obviously false.
This approach would currently fail if we were to start a synchronous client and then try to use it asynchronously, but seems to work in all other settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client in both cases is the same.
Does it have to? :-) I don't think it would be terribly ruinous to restrict that asynchronous clients cannot be re-used in a synchronous context, and vice-versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some value to reducing the number of active clients. I agree though that, especially because of the failure of synchronous-intended clients to work in asynchronous environments, we do need to resolve this.
I think I'm still hoping for a way to help identify whether we should or should not act in an asynchronous manner that is separate from defining a client as asynchronous or not.
My inclination here is to merge things roughly as they are now. I think that the user-facing bits of this are decent. We have a known failure in one case but it's unlikely to be encountered by most users. This PR is large and I'd like to get it in soon-ish. I don't have strong confidence that I can find a good solution in the next day or two with other things going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue that arises is that when we deserialize futures or queues or variables we don't know if the client should be asynchronous or synchronous. The user doesn't have a chance to tell us in this case.
ce1057d
to
7414fb2
Compare
@pitrou any response on the asynchronous comments? In general I agree that this isn't entirely clean. I do think it's significantly cleaner than before. I'm not sure yet how to improve on the situation. |
@mrocklin I am actually seeing a major memory leak in the scheduler on my example in which I am beating up the scheduler. That seems to be what is leading to things to fall over. I rolled back to 1.17.1 and it disappears. I'll try to bisect to figure out on what commit it happens. I don't think I'll be able to get to it today though - most likely 7/5 |
I appreciate you digging into it. Let us know what you find |
I did a git bisect and 068c571 is the commit where my dask stress test starts to make the scheduler blow out memory (and stress the CPU to boot). |
@adamklein can you say a bit more about what your stress test is doing? For example, it would be useful to know if it is scatter/gathering a large amount of data between worker-clients. |
@adamklein if you can try this branch again with the recent commit I'd be
obliged. No rush.
…On Tue, Jul 4, 2017 at 3:13 AM, Antoine Pitrou ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In distributed/client.py
<#1201 (comment)>:
> pc = PeriodicCallback(lambda: None, 1000, io_loop=self.loop)
self.loop.add_callback(pc.start)
_set_global_client(self)
- if asynchronous:
+ self.status = 'connecting'
+
+ if self.asynchronous:
The client in both cases is the same.
Does it have to? :-) I don't think it would be terribly ruinous to
restrict that asynchronous clients cannot be re-used in a synchronous
context, and vice-versa.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1201 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszLM92BkWZi11jixUYEQ1gXCYsnJyks5sKeYUgaJpZM4ODHNX>
.
|
@mrocklin Unfortunately my example seems to still leak memory and run at high CPU with your latest commit. I'm seeing about 10x performance drop in CPU and memory usage so that I can't do what I used to. I have tasks launching tasks and passing 100Mb's to Gbs as task results between workers. This is certainly an example I'd like to show you offline when everything is in place. |
@adamklein it would be interesting to see if the scheduler is reporting high communication volume. There is a plot at |
PS I am using |
PPS, I just confirmed if I put |
5d68103
to
321b019
Compare
These weren't heavily used and where they were used they are now generally replacable with Queues, which are cleaner.
This caused unpleasant warnings in tests otherwise
These separate and optionally replace worker_client. Additionally this required additional logic to handling default clients and workers, serialization of futures, queues, and variables. This created functionality and tests that also triggered changes to how the scheduler tracks long-running tasks.
321b019
to
fe2adcb
Compare
I have rebased commits to be somewhat more clean. Merging on passed tests. |
It looks like |
@adamklein thanks for tracking that down. |
This is a major rewrite of how we handle tasks that create clients for complex dynamic workloads.
Previously we would start up a new client every time someone used
worker_client
We change this in two large ways:
get_client
andsecede
.Reusing the same client and requesting the same client from many threads required us to be a bit clever locks, which is probably error prone. This also exposed some thread-unsafety issues in our ThreadPoolExecutor fork.
This also removes channels. It was a pain to maintain the same behavior when all of the worker_clients were in fact the same client.