diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7e087f9975..e5b74d5b49 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -23,7 +23,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python-version: ["3.8", "3.9"] + python-version: ["3.8", "3.9", "3.10"] # Cherry-pick test modules to split the overall runtime roughly in half partition: [ci1, not ci1] include: @@ -65,12 +65,6 @@ jobs: shell: bash -l {0} run: conda config --show - - name: Install stacktrace - shell: bash -l {0} - # stacktrace for Python 3.8 has not been released at the moment of writing - if: ${{ matrix.os == 'ubuntu-latest' && matrix.python-version < '3.8' }} - run: mamba install -c conda-forge -c defaults -c numba libunwind stacktrace - - name: Hack around https://github.com/ipython/ipython/issues/12197 # This upstream issue causes an interpreter crash when running # distributed/protocol/tests/test_serialize.py::test_profile_nested_sizeof diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml new file mode 100644 index 0000000000..0ee8292226 --- /dev/null +++ b/continuous_integration/environment-3.10.yaml @@ -0,0 +1,46 @@ +name: dask-distributed +channels: + - conda-forge + - defaults +dependencies: + - python=3.10 + - packaging + - pip + - asyncssh + - bokeh + - click + - cloudpickle + - coverage<6.3 # https://github.com/nedbat/coveragepy/issues/1310 + - dask # overridden by git tip below + - filesystem-spec # overridden by git tip below + - h5py + - ipykernel + - ipywidgets + - jinja2 + - jupyter_client + - msgpack-python + - netcdf4 + - paramiko + - pre-commit + - prometheus_client + - psutil + - pytest + - pytest-cov + - pytest-faulthandler + - pytest-repeat + - pytest-rerunfailures + - pytest-timeout + - requests + - s3fs # overridden by git tip below + - scikit-learn + - scipy + - sortedcollections + - tblib + - toolz + - tornado=6 + - zict # overridden by git tip below + - zstandard + - pip: + - git+https://github.com/dask/dask + - git+https://github.com/dask/zict + - pytest-asyncio<0.14.0 # `pytest-asyncio<0.14.0` isn't available on conda-forge for Python 3.10 diff --git a/distributed/node.py b/distributed/node.py index 6db2c7711e..6fedd1b8ac 100644 --- a/distributed/node.py +++ b/distributed/node.py @@ -131,12 +131,9 @@ def start_http_server( import ssl ssl_options = ssl.create_default_context( - cafile=tls_ca_file, purpose=ssl.Purpose.SERVER_AUTH + cafile=tls_ca_file, purpose=ssl.Purpose.CLIENT_AUTH ) ssl_options.load_cert_chain(tls_cert, keyfile=tls_key) - # We don't care about auth here, just encryption - ssl_options.check_hostname = False - ssl_options.verify_mode = ssl.CERT_NONE self.http_server = HTTPServer(self.http_application, ssl_options=ssl_options) diff --git a/distributed/profile.py b/distributed/profile.py index bb832735e8..22a2fc80cf 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -27,6 +27,7 @@ from __future__ import annotations import bisect +import dis import linecache import sys import threading @@ -59,21 +60,41 @@ def identifier(frame): ) +# work around some frames lacking an f_lineo eg: https://bugs.python.org/issue47085 +def _f_lineno(frame): + f_lineno = frame.f_lineno + if f_lineno is not None: + return f_lineno + + f_lasti = frame.f_lasti + code = frame.f_code + prev_line = code.co_firstlineno + + for start, next_line in dis.findlinestarts(code): + if f_lasti < start: + return prev_line + prev_line = next_line + + return prev_line + + def repr_frame(frame): """Render a frame as a line for inclusion into a text traceback""" co = frame.f_code - text = f' File "{co.co_filename}", line {frame.f_lineno}, in {co.co_name}' - line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip() + f_lineno = _f_lineno(frame) + text = f' File "{co.co_filename}", line {f_lineno}, in {co.co_name}' + line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() return text + "\n\t" + line def info_frame(frame): co = frame.f_code - line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip() + f_lineno = _f_lineno(frame) + line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() return { "filename": co.co_filename, "name": co.co_name, - "line_number": frame.f_lineno, + "line_number": f_lineno, "line": line, } diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 7479b8cff4..84c5882bde 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6463,6 +6463,10 @@ async def f(stacklevel, mode=None): assert "cdn.bokeh.org" in data +@pytest.mark.skipif( + sys.version_info >= (3, 10), + reason="On Py3.10+ semaphore._loop is not bound until .acquire() blocks", +) @gen_cluster(nthreads=[]) async def test_client_gather_semaphore_loop(s): async with Client(s.address, asynchronous=True) as c: @@ -6473,9 +6477,16 @@ async def test_client_gather_semaphore_loop(s): async def test_as_completed_condition_loop(c, s, a, b): seq = c.map(inc, range(5)) ac = as_completed(seq) + # consume the ac so that the ac.condition is bound to the loop on py3.10+ + async for _ in ac: + pass assert ac.condition._loop == c.loop.asyncio_loop +@pytest.mark.skipif( + sys.version_info >= (3, 10), + reason="On Py3.10+ semaphore._loop is not bound until .acquire() blocks", +) def test_client_connectionpool_semaphore_loop(s, a, b): with Client(s["address"]) as c: assert c.rpc.semaphore._loop is c.loop.asyncio_loop diff --git a/distributed/tests/test_profile.py b/distributed/tests/test_profile.py index 7d044945d7..75eb704c99 100644 --- a/distributed/tests/test_profile.py +++ b/distributed/tests/test_profile.py @@ -1,5 +1,9 @@ +from __future__ import annotations + +import dataclasses import sys import threading +from collections.abc import Iterator, Sequence from time import sleep import pytest @@ -11,6 +15,7 @@ call_stack, create, identifier, + info_frame, ll_get_stack, llprocess, merge, @@ -200,3 +205,102 @@ def stop(): while threading.active_count() > start_threads: assert time() < start + 2 sleep(0.01) + + +@dataclasses.dataclass(frozen=True) +class FakeCode: + co_filename: str + co_name: str + co_firstlineno: int + co_lnotab: bytes + co_lines_seq: Sequence[tuple[int, int, int | None]] + co_code: bytes + + def co_lines(self) -> Iterator[tuple[int, int, int | None]]: + yield from self.co_lines_seq + + +FAKE_CODE = FakeCode( + co_filename="", + co_name="example", + co_firstlineno=1, + # https://github.com/python/cpython/blob/b68431fadb3150134ac6ccbf501cdfeaf4c75678/Objects/lnotab_notes.txt#L84 + # generated from: + # def example(): + # for i in range(1): + # if i >= 0: + # pass + # example.__code__.co_lnotab + co_lnotab=b"\x00\x01\x0c\x01\x08\x01\x04\xfe", + # generated with list(example.__code__.co_lines()) + co_lines_seq=[ + (0, 12, 2), + (12, 20, 3), + (20, 22, 4), + (22, 24, None), + (24, 28, 2), + ], + # used in dis.findlinestarts as bytecode_len = len(code.co_code) + # https://github.com/python/cpython/blob/6f345d363308e3e6ecf0ad518ea0fcc30afde2a8/Lib/dis.py#L457 + co_code=bytes(28), +) + + +@dataclasses.dataclass(frozen=True) +class FakeFrame: + f_lasti: int + f_code: FakeCode + f_lineno: int | None = None + f_back: FakeFrame | None = None + f_globals: dict[str, object] = dataclasses.field(default_factory=dict) + + +@pytest.mark.parametrize( + "f_lasti,f_lineno", + [ + (-1, 1), + (0, 2), + (1, 2), + (11, 2), + (12, 3), + (21, 4), + (22, 4), + (23, 4), + (24, 2), + (25, 2), + (26, 2), + (27, 2), + (100, 2), + ], +) +def test_info_frame_f_lineno(f_lasti: int, f_lineno: int) -> None: + assert info_frame(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == { + "filename": "", + "name": "example", + "line_number": f_lineno, + "line": "", + } + + +@pytest.mark.parametrize( + "f_lasti,f_lineno", + [ + (-1, 1), + (0, 2), + (1, 2), + (11, 2), + (12, 3), + (21, 4), + (22, 4), + (23, 4), + (24, 2), + (25, 2), + (26, 2), + (27, 2), + (100, 2), + ], +) +def test_call_stack_f_lineno(f_lasti: int, f_lineno: int) -> None: + assert call_stack(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == [ + f' File "", line {f_lineno}, in example\n\t' + ] diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 3e558a0b55..a8e2652d12 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -739,13 +739,16 @@ def cluster( except KeyError: rpc_kwargs = {} - with rpc(saddr, **rpc_kwargs) as s: - while True: - nthreads = loop.run_sync(s.ncores) - if len(nthreads) == nworkers: - break - if time() - start > 5: - raise Exception("Timeout on cluster creation") + async def wait_for_workers(): + async with rpc(saddr, **rpc_kwargs) as s: + while True: + nthreads = await s.ncores() + if len(nthreads) == nworkers: + break + if time() - start > 5: + raise Exception("Timeout on cluster creation") + + loop.run_sync(wait_for_workers) # avoid sending processes down to function yield {"address": saddr}, [ diff --git a/setup.py b/setup.py index 0f57c52579..1661783f36 100755 --- a/setup.py +++ b/setup.py @@ -98,8 +98,11 @@ "License :: OSI Approved :: BSD License", "Operating System :: OS Independent", "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", "Topic :: Scientific/Engineering", "Topic :: System :: Distributed Computing", ],