We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
https://travis-ci.org/github/dask/distributed/jobs/753832478
____________________________ test_wait_for_outgoing ____________________________ 3406 3407 def test_func(): 3408 result = None 3409 workers = [] 3410 with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop: 3411 3412 async def coro(): 3413 with dask.config.set(config): 3414 s = False 3415 for i in range(5): 3416 try: 3417 s, ws = await start_cluster( 3418 nthreads, 3419 scheduler, 3420 loop, 3421 security=security, 3422 Worker=Worker, 3423 scheduler_kwargs=scheduler_kwargs, 3424 worker_kwargs=worker_kwargs, 3425 ) 3426 except Exception as e: 3427 logger.error( 3428 "Failed to start gen_cluster, retrying", 3429 exc_info=True, 3430 ) 3431 await asyncio.sleep(1) 3432 else: 3433 workers[:] = ws 3434 args = [s] + workers 3435 break 3436 if s is False: 3437 raise Exception("Could not start cluster") 3438 if client: 3439 c = await Client( 3440 s.address, 3441 loop=loop, 3442 security=security, 3443 asynchronous=True, 3444 **client_kwargs, 3445 ) 3446 args = [c] + args 3447 try: 3448 future = func(*args) 3449 if timeout: 3450 future = asyncio.wait_for(future, timeout) 3451 result = await future 3452 if s.validate: 3453 s.validate_state() 3454 finally: 3455 if client and c.status not in ("closing", "closed"): 3456 await c._close(fast=s.status == Status.closed) 3457 await end_cluster(s, workers) 3458 await asyncio.wait_for(cleanup_global_workers(), 1) 3459 3460 try: 3461 c = await default_client() 3462 except ValueError: 3463 pass 3464 else: 3465 await c._close(fast=True) 3466 3467 def get_unclosed(): 3468 return [c for c in Comm._instances if not c.closed()] + [ 3469 c 3470 for c in _global_clients.values() 3471 if c.status != "closed" 3472 ] 3473 3474 try: 3475 start = time() 3476 while time() < start + 5: 3477 gc.collect() 3478 if not get_unclosed(): 3479 break 3480 await asyncio.sleep(0.05) 3481 else: 3482 if allow_unclosed: 3483 print(f"Unclosed Comms: {get_unclosed()}") 3484 else: 3485 raise RuntimeError("Unclosed Comms", get_unclosed()) 3486 finally: 3487 Comm._instances.clear() 3488 _global_clients.clear() 3489 3490 return result 3491 3492 result = loop.run_sync( 3493> coro, timeout=timeout * 2 if timeout else timeout 3494 ) 3495 3496distributed/utils_test.py:954: 3497_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 3498../../../miniconda/envs/dask-distributed/lib/python3.7/site-packages/tornado/ioloop.py:530: in run_sync 3499 return future_cell[0].result() 3500distributed/utils_test.py:912: in coro 3501 result = await future 3502../../../miniconda/envs/dask-distributed/lib/python3.7/asyncio/tasks.py:442: in wait_for 3503 return fut.result() 3504distributed/tests/test_worker.py:1316: in test_wait_for_outgoing 3505 future = await c.scatter(x, workers=a.address) 3506distributed/client.py:2077: in _scatter 3507 nthreads, data2, report=False, rpc=self.rpc 3508distributed/utils_comm.py:149: in scatter_to_workers 3509 for address, v in d.items() 3510distributed/utils.py:229: in All 3511 result = await tasks.next() 3512distributed/core.py:852: in send_recv_from_rpc 3513 result = await send_recv(comm=comm, op=key, **kwargs) 3514distributed/core.py:651: in send_recv 3515 raise exc.with_traceback(tb) 3516distributed/core.py:422: in handle_comm 3517 msg = await comm.read() 3518distributed/comm/tcp.py:216: in read 3519 allow_offload=self.allow_offload, 3520distributed/comm/utils.py:78: in from_frames 3521 res = await offload(_from_frames) 3522distributed/utils.py:1455: in offload 3523 return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs)) 3524../../../miniconda/envs/dask-distributed/lib/python3.7/asyncio/base_events.py:755: in run_in_executor 3525 executor.submit(func, *args), loop=self) 3526_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 3527 3528> raise RuntimeError('cannot schedule new futures after shutdown') 3529E RuntimeError: cannot schedule new futures after shutdown 3530 3531../../../miniconda/envs/dask-distributed/lib/python3.7/concurrent/futures/thread.py:163: RuntimeError
The text was updated successfully, but these errors were encountered:
No branches or pull requests
https://travis-ci.org/github/dask/distributed/jobs/753832478
The text was updated successfully, but these errors were encountered: