Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client and worker_client refactor #1201

Merged
merged 5 commits into from
Jul 5, 2017
Merged

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Jun 23, 2017

This is a major rewrite of how we handle tasks that create clients for complex dynamic workloads.

Previously we would start up a new client every time someone used worker_client

def func():
    with worker_client() as c:
        ...

We change this in two large ways:

  1. In API we split this into two functions, get_client and secede.
  2. In implementation we keep around the same client forever

Reusing the same client and requesting the same client from many threads required us to be a bit clever locks, which is probably error prone. This also exposed some thread-unsafety issues in our ThreadPoolExecutor fork.

This also removes channels. It was a pain to maintain the same behavior when all of the worker_clients were in fact the same client.

@mrocklin mrocklin changed the title Merge WorkerClient into Client Client and worker_client refactor Jun 23, 2017
@mrocklin mrocklin force-pushed the client-multi-thread branch 4 times, most recently from 9cd47a0 to c482831 Compare June 29, 2017 17:53
@mrocklin
Copy link
Member Author

Rebased to more sensible commits. Going to merge this soon.

@adamklein
Copy link
Contributor

adamklein commented Jun 30, 2017

Can I nest future objects into say a dictionary? E.g, instead of client.submit(f, x) where x is a future, can I do client.submit(f, {'x': x})? I tried this approach and get:

Exception ignored in: <object repr() failed>                                                                                                                   
Traceback (most recent call last):                                                                                                                            
  File "/home/aklein/src/distributed/distributed/client.py", line 282, in __del__                              
    self.release()                                       
  File "/home/aklein/src/distributed/distributed/client.py", line 267, in release                           
   with self.client._lock:       
AttributeError: 'Future' object has no attribute 'client'                        

@mrocklin
Copy link
Member Author

This brings up a good question of when do we want to materialize futures to track dependencies and when do we want to serialize them without tracking dependencies.

In the case you describe above I would actually expect that to create a dependency on x, not serialize it.

In the case you brought up on Stack Overflow it's actually not clear to me what best behavior is here. Do we want to finalize the result and create a dependency or move around the collection-holding-futures.

@mrocklin
Copy link
Member Author

Here is the behavior I get when trying your example:

In [1]: from dask.distributed import Client

In [2]: client = Client()
 
In [3]: future = client.scatter(123)

In [4]: future2 = client.submit(lambda x: x, {'x': future})

In [5]: future2.result()
Out[5]: {'x': 123}

@adamklein
Copy link
Contributor

I guess I'm a little confused by what we mean when we say support serialization of dask collections or a future. For instance, this doesn't work for a dask dataframe, but it does for a concrete value.

def task1():
	return pd.DataFrame(np.random.rand(10,10))


def task2(df):
	return df

def testme():
	with Client() as client:
		ddf = dd.from_delayed([client.submit(task1)])
		ddf = client.persist(ddf)
		fut = client.scatter(ddf)
		#fut = client.scatter(123)
		fut2 = client.submit(task2, fut)
		print(client.compute(fut2, sync=True))

because i get

ValueError: No global client found

What I want to do is send the handle to the ddf or future to the worker client and manipulate it there. Is this still not possible at the moment? I'd be happy with a way to wrap ddf so it's like client.submit(task2, serialize(ddf)) - do I have access to such a magical serialize function?

@adamklein
Copy link
Contributor

Also, why does this fail?

def task1():
	return pd.DataFrame(np.random.rand(10,10))


def task2(df):
	return df


def testme():
	with Client() as client:
		ddf = dd.from_delayed([client.submit(task1)])
		fut2 = client.submit(task2, ddf)
		print(client.compute(fut2, sync=True))

@adamklein
Copy link
Contributor

I guess even more fundamentally, I thought I'm replicating your test here, but this fails with the same ValueError: No global client found:

    with Client() as client:
        import dask.array as da
        x = da.arange(10, chunks=(5,)).persist()

        def f(x):
            assert isinstance(x, da.Array)
            return x.sum().compute()

        future = client.submit(f, x)
        result = client.compute(future, sync=True)
        print(result)

@adamklein
Copy link
Contributor

adamklein commented Jun 30, 2017

This almost works:

        client1 =>

        ddf = dd.from_delayed(dfs)
        ddf = client.persist(ddf)

        # hold reference to future so it isn't ejected from memory
        from distributed.protocol.pickle import dumps
        storage[ddf_name] = (ddf, dumps(ddf))
        ....
        worker client =>

        from distributed.protocol.pickle import loads
        ddf = loads(ddf_string)
        ....

Sometimes it falls over with:

Traceback (most recent call last):                                                                                                                                   
      ....                                                                                                                 
  File "/home/aklein/src/distributed/distributed/protocol/pickle.py", line 59, in loads                                                                                                                      
	return pickle.loads(x)                                                                                                                                                                                                         
AttributeError: Can't get attribute 'apply_and_enforce' on <module 'dask.dataframe.core' from '/home/aklein/src/dask/dask/dataframe/core.py'>                

I know I'm doing some weird stuff, but it feels very close :)

EDIT: if I throw in strategically placed time.sleep(0.2), I can get it pretty stable. Ha. This is so hacky.

@mrocklin
Copy link
Member Author

It looks like clients aren't being created on demand. I'll need to take a look. This probably won't happen any time today though. I appreciate the failing tests.

@adamklein
Copy link
Contributor

No problem, appreciate the work, it's very promising! Have a great pre-4th of july weekend.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 1, 2017

@adamklein the recent commits may interest you

@mrocklin
Copy link
Member Author

mrocklin commented Jul 3, 2017

This now feels ready to me. @adamklein if you have a chance to try to break this or @pitrou if you have a chance to review I would appreciate it.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 3, 2017

I'm considering where to put documentation for this. I may want to put this in the dask/dask repository.

@adamklein
Copy link
Contributor

Giving it a look this morning!

@adamklein
Copy link
Contributor

adamklein commented Jul 3, 2017

This is looking excellent. I'm running into timeout issues on my large task return values but I think that's the problem I hacked before that I simply lost b/c of the worker_client refactor, and I just have to throw those timeout hacks back in there. I think it's a result of my scheduler being overworked and unable to communicate in a timely fashion. Running locally I see it taking a lot of CPU and memory. Here is that stack trace just for information purposes, but again, think it has nothing to do with your changes (or e.g. the changes regarding de/serialization on a separate thread).

Traceback (most recent call last):                                                                                                                                  
  File "/lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/comm/core.py", line 181, in connect         
	quiet_exceptions=EnvironmentError)                                                                                                                               
  File "/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run                                                                
	value = future.result()                                                                                                                                          
  File "/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result                                                                                        
	raise_exc_info(self._exc_info)                                                                                                                                                                          
  File "<string>", line 4, in raise_exc_info                                                                                                                                                                         
tornado.gen.TimeoutError: Timeout                                                                                                                                  
																																									 
During handling of the above exception, another exception occurred:                                                                                                                                                                                                                                                          
																																																																															 
Traceback (most recent call last):                                                                                                                                                                                                                                                                                           
	result = f.result()                                                                                                                                            
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/client.py", line 152, in result             
	self._result, raiseit=False, callback_timeout=timeout)                                                                                                         
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/utils.py", line 234, in sync               
	six.reraise(*error[0])                                                                                                                                           
  File "lib/python3.5/site-packages/six.py", line 686, in reraise                                                                     
	raise value                                                                                                                                                    
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/utils.py", line 223, in f                 
	result[0] = yield make_coro()                                                                                                                                   
  File "lib/python3.5/site-packages/tornado/gen.py", line 1055, in run                                                               
	value = future.result()                                                                                                                                         
  File "lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result                                                      
	raise_exc_info(self._exc_info)                                                                                                                                  
  File "<string>", line 4, in raise_exc_info                                                                                                                         
  File "lib/python3.5/site-packages/tornado/gen.py", line 1063, in run                                                               
	yielded = self.gen.throw(*exc_info)                                                                                                                             
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/client.py", line 177, in _result           
	result = yield self.client._gather([self])                                                                                                                       
  File "lib/python3.5/site-packages/tornado/gen.py", line 1055, in run                                                               
	value = future.result()                                                                                                                                       
  File "lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result                                                      
	raise_exc_info(self._exc_info)                                                                                                                                  
  File "<string>", line 4, in raise_exc_info                                                                                                                         
  File "lib/python3.5/site-packages/tornado/gen.py", line 1063, in run                                                                
	yielded = self.gen.throw(*exc_info)                                                                                                                              
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/client.py", line 1245, in _gather           
	response = yield self.scheduler.gather(keys=keys)                                                                                                                                                 
  File "lib/python3.5/site-packages/tornado/gen.py", line 1055, in run                                                                                                       
	value = future.result()                                                                                                                                                                                          
  File "lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result                                                     
	raise_exc_info(self._exc_info)                                                                                                                                   
  File "<string>", line 4, in raise_exc_info                                                                                                                         
  File "lib/python3.5/site-packages/tornado/gen.py", line 1063, in run                                                                
	yielded = self.gen.throw(*exc_info)                                                                                                                              
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/core.py", line 423, in send_recv_from_rpc    
	comm = yield self.live_comm()                                                                                                                                     
  File "lib/python3.5/site-packages/tornado/gen.py", line 1055, in run                                                                
	value = future.result()                                                                                                                                           
  File "lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result                                                        
	raise_exc_info(self._exc_info)                                                                                                                                   
  File "<string>", line 4, in raise_exc_info                                                                                                                         
  File "lib/python3.5/site-packages/tornado/gen.py", line 1063, in run                                                                
	yielded = self.gen.throw(*exc_info)                                                                                                                              
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/core.py", line 399, in live_comm            
	connection_args=self.connection_args)                                                                                                                           
  File "lib/python3.5/site-packages/tornado/gen.py", line 1055, in run                                                               
	value = future.result()                                                                                                                                          
  File "lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result                                                      
	raise_exc_info(self._exc_info)                                                                                                                                  
  File "<string>", line 4, in raise_exc_info                                                                                                                         
  File "lib/python3.5/site-packages/tornado/gen.py", line 1063, in run                                                                
	yielded = self.gen.throw(*exc_info)                                                                                                                            
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/comm/core.py", line 190, in connect         
	_raise(error)                                                                                                                                                   
  File "lib/python3.5/site-packages/distributed-1.17.1+36.g007cbf73-py3.5.egg/distributed/comm/core.py", line 173, in _raise         
	raise IOError(msg)                                                                                                                                              
OSError: Timed out trying to connect to 'tcp://...:8786' after 5 s: connect() didn't finish in time                                    

@mrocklin
Copy link
Member Author

mrocklin commented Jul 3, 2017

Do you have a sense for what is slowing down your scheduler? It seems odd to me to see 5s of delay, even under very heavy load (thousands of workers).

You may be interested in this comment.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 3, 2017

Documentation added in dask/dask#2501

from distributed.recreate_exceptions import ReplayExceptionClient
ReplayExceptionClient(self)

@property
def asynchronous(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either the name of this property and/or its semantics and/or its docstring should be changed, since all three seem to be saying different things :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded the docstring. Maybe this helps? Suggestions welcome.

if asynchronous:
self.status = 'connecting'

if self.asynchronous:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must admit I fail to understand the intent of the .asynchronous property. If the user passed asynchronous=True, we may still start an event loop in a separate thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem arises because we reuse the same client in both contexts. Consider the following:

def g():
    # I run in a worker thread
    client = get_client()
    future = client.submit(inc, 1)
    return future.result()

async def f():
    async with Client(asynchronous=True, processes=False) as c:
        await c.submit(g)

The client in both cases is the same. We tell the client originally that it is operating in asynchronous mode. However when it finds itself in the g task it realizes that it definitely can't be in asynchronous mode and so switches over within that context.

Ideally we would have a function that could tell us if we "were in the event loop" but this isn't doable. So instead we take a signal from the user (this is asynchronous=True) but disregard it when it's obviously false.

This approach would currently fail if we were to start a synchronous client and then try to use it asynchronously, but seems to work in all other settings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client in both cases is the same.

Does it have to? :-) I don't think it would be terribly ruinous to restrict that asynchronous clients cannot be re-used in a synchronous context, and vice-versa.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some value to reducing the number of active clients. I agree though that, especially because of the failure of synchronous-intended clients to work in asynchronous environments, we do need to resolve this.

I think I'm still hoping for a way to help identify whether we should or should not act in an asynchronous manner that is separate from defining a client as asynchronous or not.

My inclination here is to merge things roughly as they are now. I think that the user-facing bits of this are decent. We have a known failure in one case but it's unlikely to be encountered by most users. This PR is large and I'd like to get it in soon-ish. I don't have strong confidence that I can find a good solution in the next day or two with other things going on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue that arises is that when we deserialize futures or queues or variables we don't know if the client should be asynchronous or synchronous. The user doesn't have a chance to tell us in this case.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 3, 2017

@pitrou any response on the asynchronous comments? In general I agree that this isn't entirely clean. I do think it's significantly cleaner than before. I'm not sure yet how to improve on the situation.

@mrocklin mrocklin mentioned this pull request Jul 3, 2017
@adamklein
Copy link
Contributor

@mrocklin I am actually seeing a major memory leak in the scheduler on my example in which I am beating up the scheduler. That seems to be what is leading to things to fall over. I rolled back to 1.17.1 and it disappears. I'll try to bisect to figure out on what commit it happens. I don't think I'll be able to get to it today though - most likely 7/5

@mrocklin
Copy link
Member Author

mrocklin commented Jul 3, 2017

I appreciate you digging into it. Let us know what you find

@adamklein
Copy link
Contributor

adamklein commented Jul 4, 2017

I did a git bisect and 068c571 is the commit where my dask stress test starts to make the scheduler blow out memory (and stress the CPU to boot).

@mrocklin
Copy link
Member Author

mrocklin commented Jul 4, 2017

@adamklein can you say a bit more about what your stress test is doing? For example, it would be useful to know if it is scatter/gathering a large amount of data between worker-clients.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 4, 2017 via email

@adamklein
Copy link
Contributor

@mrocklin Unfortunately my example seems to still leak memory and run at high CPU with your latest commit. I'm seeing about 10x performance drop in CPU and memory usage so that I can't do what I used to. I have tasks launching tasks and passing 100Mb's to Gbs as task results between workers. This is certainly an example I'd like to show you offline when everything is in place.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 5, 2017

@adamklein it would be interesting to see if the scheduler is reporting high communication volume. There is a plot at http://scheduler-address:8787/system that might be of interest

@adamklein
Copy link
Contributor

@mrocklin The answer is yes, there is high communication volume. Here are my snapshots: off current dask master =>

snap1

Off the latest PR of #1201 =>
snap2

@adamklein
Copy link
Contributor

PS I am using on_completed, which I see calls into self.client._gather([self]) around line 177 of client.py; this has default parameter direct=False, and probably local_worker=None as well. I assume we want either direct=True or local_worker=(something). Granted this is just from reading code, I haven't done any tracing so I could be totally off base here.

@adamklein
Copy link
Contributor

PPS, I just confirmed if I put direct=True into self.client._gather([self], direct=True) in the above mentioned line in client.py, the problem disappears. I don't know if that has any negative consequences, but it reverts the behavior to what happens on the master branch in terms of memory and bandwidth utilization.

These weren't heavily used and where they were used they are now
generally replacable with Queues, which are cleaner.
This caused unpleasant warnings in tests otherwise
These separate and optionally replace worker_client.

Additionally this required additional logic to handling default clients
and workers, serialization of futures, queues, and variables.  This
created functionality and tests that also triggered changes to how the
scheduler tracks long-running tasks.
@mrocklin
Copy link
Member Author

mrocklin commented Jul 5, 2017

I have rebased commits to be somewhat more clean. Merging on passed tests.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 5, 2017

PPS, I just confirmed if I put direct=True into self.client._gather([self], direct=True) in the above mentioned line in client.py, the problem disappears. I don't know if that has any negative consequences, but it reverts the behavior to what happens on the master branch in terms of memory and bandwidth utilization.

It looks like Future.result would suffer from this as well. I'll try to resolve it in another PR after this gets merged.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 5, 2017

@adamklein thanks for tracking that down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants