Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_wait_for_outgoing breaking on master CI #4417

Open
fjetter opened this issue Jan 11, 2021 · 0 comments
Open

test_wait_for_outgoing breaking on master CI #4417

fjetter opened this issue Jan 11, 2021 · 0 comments
Labels
flaky test Intermittent failures on CI.

Comments

@fjetter
Copy link
Member

fjetter commented Jan 11, 2021

https://travis-ci.org/github/dask/distributed/jobs/753832478

____________________________ test_wait_for_outgoing ____________________________
3406
3407    def test_func():
3408        result = None
3409        workers = []
3410        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
3411    
3412            async def coro():
3413                with dask.config.set(config):
3414                    s = False
3415                    for i in range(5):
3416                        try:
3417                            s, ws = await start_cluster(
3418                                nthreads,
3419                                scheduler,
3420                                loop,
3421                                security=security,
3422                                Worker=Worker,
3423                                scheduler_kwargs=scheduler_kwargs,
3424                                worker_kwargs=worker_kwargs,
3425                            )
3426                        except Exception as e:
3427                            logger.error(
3428                                "Failed to start gen_cluster, retrying",
3429                                exc_info=True,
3430                            )
3431                            await asyncio.sleep(1)
3432                        else:
3433                            workers[:] = ws
3434                            args = [s] + workers
3435                            break
3436                    if s is False:
3437                        raise Exception("Could not start cluster")
3438                    if client:
3439                        c = await Client(
3440                            s.address,
3441                            loop=loop,
3442                            security=security,
3443                            asynchronous=True,
3444                            **client_kwargs,
3445                        )
3446                        args = [c] + args
3447                    try:
3448                        future = func(*args)
3449                        if timeout:
3450                            future = asyncio.wait_for(future, timeout)
3451                        result = await future
3452                        if s.validate:
3453                            s.validate_state()
3454                    finally:
3455                        if client and c.status not in ("closing", "closed"):
3456                            await c._close(fast=s.status == Status.closed)
3457                        await end_cluster(s, workers)
3458                        await asyncio.wait_for(cleanup_global_workers(), 1)
3459    
3460                    try:
3461                        c = await default_client()
3462                    except ValueError:
3463                        pass
3464                    else:
3465                        await c._close(fast=True)
3466    
3467                    def get_unclosed():
3468                        return [c for c in Comm._instances if not c.closed()] + [
3469                            c
3470                            for c in _global_clients.values()
3471                            if c.status != "closed"
3472                        ]
3473    
3474                    try:
3475                        start = time()
3476                        while time() < start + 5:
3477                            gc.collect()
3478                            if not get_unclosed():
3479                                break
3480                            await asyncio.sleep(0.05)
3481                        else:
3482                            if allow_unclosed:
3483                                print(f"Unclosed Comms: {get_unclosed()}")
3484                            else:
3485                                raise RuntimeError("Unclosed Comms", get_unclosed())
3486                    finally:
3487                        Comm._instances.clear()
3488                        _global_clients.clear()
3489    
3490                    return result
3491    
3492            result = loop.run_sync(
3493>               coro, timeout=timeout * 2 if timeout else timeout
3494            )
3495
3496distributed/utils_test.py:954: 
3497_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
3498../../../miniconda/envs/dask-distributed/lib/python3.7/site-packages/tornado/ioloop.py:530: in run_sync
3499    return future_cell[0].result()
3500distributed/utils_test.py:912: in coro
3501    result = await future
3502../../../miniconda/envs/dask-distributed/lib/python3.7/asyncio/tasks.py:442: in wait_for
3503    return fut.result()
3504distributed/tests/test_worker.py:1316: in test_wait_for_outgoing
3505    future = await c.scatter(x, workers=a.address)
3506distributed/client.py:2077: in _scatter
3507    nthreads, data2, report=False, rpc=self.rpc
3508distributed/utils_comm.py:149: in scatter_to_workers
3509    for address, v in d.items()
3510distributed/utils.py:229: in All
3511    result = await tasks.next()
3512distributed/core.py:852: in send_recv_from_rpc
3513    result = await send_recv(comm=comm, op=key, **kwargs)
3514distributed/core.py:651: in send_recv
3515    raise exc.with_traceback(tb)
3516distributed/core.py:422: in handle_comm
3517    msg = await comm.read()
3518distributed/comm/tcp.py:216: in read
3519    allow_offload=self.allow_offload,
3520distributed/comm/utils.py:78: in from_frames
3521    res = await offload(_from_frames)
3522distributed/utils.py:1455: in offload
3523    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
3524../../../miniconda/envs/dask-distributed/lib/python3.7/asyncio/base_events.py:755: in run_in_executor
3525    executor.submit(func, *args), loop=self)
3526_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
3527
3528>   raise RuntimeError('cannot schedule new futures after shutdown')
3529E   RuntimeError: cannot schedule new futures after shutdown
3530
3531../../../miniconda/envs/dask-distributed/lib/python3.7/concurrent/futures/thread.py:163: RuntimeError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

No branches or pull requests

1 participant