Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python 3.10 #5952

Merged
merged 14 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ["3.8", "3.9"]
python-version: ["3.8", "3.9", "3.10"]
# Cherry-pick test modules to split the overall runtime roughly in half
partition: [ci1, not ci1]
include:
Expand Down Expand Up @@ -65,12 +65,6 @@ jobs:
shell: bash -l {0}
run: conda config --show

- name: Install stacktrace
shell: bash -l {0}
# stacktrace for Python 3.8 has not been released at the moment of writing
if: ${{ matrix.os == 'ubuntu-latest' && matrix.python-version < '3.8' }}
run: mamba install -c conda-forge -c defaults -c numba libunwind stacktrace
graingert marked this conversation as resolved.
Show resolved Hide resolved

- name: Hack around https://github.com/ipython/ipython/issues/12197
# This upstream issue causes an interpreter crash when running
# distributed/protocol/tests/test_serialize.py::test_profile_nested_sizeof
Expand Down
46 changes: 46 additions & 0 deletions continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: dask-distributed
channels:
- conda-forge
- defaults
graingert marked this conversation as resolved.
Show resolved Hide resolved
dependencies:
- python=3.10
- packaging
- pip
- asyncssh
- bokeh
- click
- cloudpickle
- coverage<6.3 # https://github.com/nedbat/coveragepy/issues/1310
- dask # overridden by git tip below
- filesystem-spec # overridden by git tip below
- h5py
- ipykernel
- ipywidgets
- jinja2
- jupyter_client
- msgpack-python
- netcdf4
- paramiko
- pre-commit
- prometheus_client
- psutil
- pytest
- pytest-cov
- pytest-faulthandler
- pytest-repeat
- pytest-rerunfailures
- pytest-timeout
- requests
- s3fs # overridden by git tip below
- scikit-learn
- scipy
- sortedcollections
- tblib
- toolz
- tornado=6
- zict # overridden by git tip below
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask/zict
- pytest-asyncio<0.14.0 # `pytest-asyncio<0.14.0` isn't available on conda-forge for Python 3.10
5 changes: 1 addition & 4 deletions distributed/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,9 @@ def start_http_server(
import ssl

ssl_options = ssl.create_default_context(
cafile=tls_ca_file, purpose=ssl.Purpose.SERVER_AUTH
cafile=tls_ca_file, purpose=ssl.Purpose.CLIENT_AUTH
graingert marked this conversation as resolved.
Show resolved Hide resolved
)
ssl_options.load_cert_chain(tls_cert, keyfile=tls_key)
# We don't care about auth here, just encryption
ssl_options.check_hostname = False
ssl_options.verify_mode = ssl.CERT_NONE

self.http_server = HTTPServer(self.http_application, ssl_options=ssl_options)

Expand Down
29 changes: 25 additions & 4 deletions distributed/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from __future__ import annotations

import bisect
import dis
import linecache
import sys
import threading
Expand Down Expand Up @@ -59,21 +60,41 @@ def identifier(frame):
)


# work around some frames lacking an f_lineo eg: https://bugs.python.org/issue47085
def _f_lineno(frame):
f_lineno = frame.f_lineno
if f_lineno is not None:
return f_lineno

f_lasti = frame.f_lasti
code = frame.f_code
prev_line = code.co_firstlineno

for start, next_line in dis.findlinestarts(code):
if f_lasti < start:
return prev_line
prev_line = next_line

return prev_line


def repr_frame(frame):
"""Render a frame as a line for inclusion into a text traceback"""
co = frame.f_code
text = f' File "{co.co_filename}", line {frame.f_lineno}, in {co.co_name}'
line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip()
f_lineno = _f_lineno(frame)
text = f' File "{co.co_filename}", line {f_lineno}, in {co.co_name}'
line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip()
return text + "\n\t" + line


def info_frame(frame):
co = frame.f_code
line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip()
f_lineno = _f_lineno(frame)
graingert marked this conversation as resolved.
Show resolved Hide resolved
line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip()
return {
"filename": co.co_filename,
"name": co.co_name,
"line_number": frame.f_lineno,
"line_number": f_lineno,
"line": line,
}

Expand Down
11 changes: 11 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6462,6 +6462,10 @@ async def f(stacklevel, mode=None):
assert "cdn.bokeh.org" in data


@pytest.mark.skipif(
sys.version_info >= (3, 10),
reason="On Py3.10+ semaphore._loop is not bound until .acquire() blocks",
)
@gen_cluster(nthreads=[])
async def test_client_gather_semaphore_loop(s):
async with Client(s.address, asynchronous=True) as c:
Expand All @@ -6472,9 +6476,16 @@ async def test_client_gather_semaphore_loop(s):
async def test_as_completed_condition_loop(c, s, a, b):
seq = c.map(inc, range(5))
ac = as_completed(seq)
# consume the ac so that the ac.condition is bound to the loop on py3.10+
async for _ in ac:
graingert marked this conversation as resolved.
Show resolved Hide resolved
pass
graingert marked this conversation as resolved.
Show resolved Hide resolved
assert ac.condition._loop == c.loop.asyncio_loop


@pytest.mark.skipif(
sys.version_info >= (3, 10),
reason="On Py3.10+ semaphore._loop is not bound until .acquire() blocks",
)
def test_client_connectionpool_semaphore_loop(s, a, b):
with Client(s["address"]) as c:
assert c.rpc.semaphore._loop is c.loop.asyncio_loop
Expand Down
104 changes: 104 additions & 0 deletions distributed/tests/test_profile.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from __future__ import annotations

import dataclasses
import sys
import threading
from collections.abc import Iterator, Sequence
from time import sleep

import pytest
Expand All @@ -11,6 +15,7 @@
call_stack,
create,
identifier,
info_frame,
ll_get_stack,
llprocess,
merge,
Expand Down Expand Up @@ -200,3 +205,102 @@ def stop():
while threading.active_count() > start_threads:
assert time() < start + 2
sleep(0.01)


@dataclasses.dataclass(frozen=True)
class FakeCode:
co_filename: str
co_name: str
co_firstlineno: int
co_lnotab: bytes
co_lines_seq: Sequence[tuple[int, int, int | None]]
co_code: bytes

def co_lines(self) -> Iterator[tuple[int, int, int | None]]:
yield from self.co_lines_seq


FAKE_CODE = FakeCode(
co_filename="<stdin>",
co_name="example",
co_firstlineno=1,
# https://github.com/python/cpython/blob/b68431fadb3150134ac6ccbf501cdfeaf4c75678/Objects/lnotab_notes.txt#L84
# generated from:
# def example():
# for i in range(1):
# if i >= 0:
# pass
# example.__code__.co_lnotab
co_lnotab=b"\x00\x01\x0c\x01\x08\x01\x04\xfe",
# generated with list(example.__code__.co_lines())
co_lines_seq=[
(0, 12, 2),
(12, 20, 3),
(20, 22, 4),
(22, 24, None),
(24, 28, 2),
],
# used in dis.findlinestarts as bytecode_len = len(code.co_code)
# https://github.com/python/cpython/blob/6f345d363308e3e6ecf0ad518ea0fcc30afde2a8/Lib/dis.py#L457
co_code=bytes(28),
)


@dataclasses.dataclass(frozen=True)
class FakeFrame:
f_lasti: int
f_code: FakeCode
f_lineno: int | None = None
f_back: FakeFrame | None = None
f_globals: dict[str, object] = dataclasses.field(default_factory=dict)


@pytest.mark.parametrize(
"f_lasti,f_lineno",
[
(-1, 1),
(0, 2),
(1, 2),
(11, 2),
(12, 3),
(21, 4),
(22, 4),
(23, 4),
(24, 2),
(25, 2),
(26, 2),
(27, 2),
(100, 2),
],
)
def test_info_frame_f_lineno(f_lasti: int, f_lineno: int) -> None:
assert info_frame(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == {
"filename": "<stdin>",
"name": "example",
"line_number": f_lineno,
"line": "",
}


@pytest.mark.parametrize(
"f_lasti,f_lineno",
[
(-1, 1),
(0, 2),
(1, 2),
(11, 2),
(12, 3),
(21, 4),
(22, 4),
(23, 4),
(24, 2),
(25, 2),
(26, 2),
(27, 2),
(100, 2),
],
)
def test_call_stack_f_lineno(f_lasti: int, f_lineno: int) -> None:
assert call_stack(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == [
f' File "<stdin>", line {f_lineno}, in example\n\t'
]
graingert marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 10 additions & 7 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,13 +739,16 @@ def cluster(
except KeyError:
rpc_kwargs = {}

with rpc(saddr, **rpc_kwargs) as s:
while True:
nthreads = loop.run_sync(s.ncores)
if len(nthreads) == nworkers:
break
if time() - start > 5:
raise Exception("Timeout on cluster creation")
async def wait_for_workers():
async with rpc(saddr, **rpc_kwargs) as s:
while True:
nthreads = await s.ncores()
if len(nthreads) == nworkers:
break
if time() - start > 5:
raise Exception("Timeout on cluster creation")

loop.run_sync(wait_for_workers)
graingert marked this conversation as resolved.
Show resolved Hide resolved

# avoid sending processes down to function
yield {"address": saddr}, [
Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@
"License :: OSI Approved :: BSD License",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Topic :: Scientific/Engineering",
"Topic :: System :: Distributed Computing",
],
Expand Down