From 5b17f55da9914840ffe379b7c2ffc13e340fe03f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Jun 2021 11:55:18 -0600 Subject: [PATCH 01/24] Test workers without deps are considered --- distributed/tests/test_scheduler.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 3c61360184..e3cb6e00bb 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -126,6 +126,24 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c): assert x.key in a.data or x.key in b.data +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 3, + config={"distributed.scheduler.work-stealing": False}, +) +async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c): + root = await client.scatter(1) + assert sum(root.key in worker.data for worker in [a, b, c]) == 1 + + start = time() + tasks = client.map(sleep, [root] * 6, pure=False) + await wait(tasks) + elapsed = time() - start + + assert elapsed <= 4 + assert all(root.key in worker.data for worker in [a, b, c]) + + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_move_data_over_break_restrictions(client, s, a, b, c): [x] = await client.scatter([1], workers=b.address) From 4a81ca80b5d75107bb4ae8754cabcd6ad50cabcf Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Jun 2021 11:56:41 -0600 Subject: [PATCH 02/24] Consider random subset of workers in decide_worker --- distributed/scheduler.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9ebc33d3b4..14be701fc6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7476,16 +7476,28 @@ def decide_worker( of bytes sent between workers. This is determined by calling the *objective* function. """ + N_RANDOM_WORKERS: Py_ssize_t = 20 + ws: WorkerState = None wws: WorkerState dts: TaskState deps: set = ts._dependencies + random_workers_set: set = ( + valid_workers if valid_workers is not None else all_workers + ) candidates: set assert all([dts._who_has for dts in deps]) if ts._actor: candidates = set(all_workers) else: - candidates = {wws for dts in deps for wws in dts._who_has} + if len(random_workers_set) <= N_RANDOM_WORKERS: + candidates = random_workers_set + else: + candidates = {wws for dts in deps for wws in dts._who_has} + # TODO the ordering of this set is likely not actually random. + # Ideally it would be ordered by occupancy (but that's more work than we want to do). + # Can we at least ensure idle workers come first? And that it's reasonably random after that? + candidates.update(itertools.islice(random_workers_set, N_RANDOM_WORKERS)) if valid_workers is None: if not candidates: candidates = set(all_workers) From c57fd725992d3c6bff2d8a39414f0f5baa809e3b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Jun 2021 12:20:58 -0600 Subject: [PATCH 03/24] no-sleep test --- distributed/tests/test_scheduler.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e3cb6e00bb..9196465066 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -135,13 +135,11 @@ async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c root = await client.scatter(1) assert sum(root.key in worker.data for worker in [a, b, c]) == 1 - start = time() - tasks = client.map(sleep, [root] * 6, pure=False) + tasks = client.map(inc, [root] * 6, pure=False) await wait(tasks) - elapsed = time() - start - assert elapsed <= 4 assert all(root.key in worker.data for worker in [a, b, c]) + assert len(a.data) == len(b.data) == len(c.data) == 3 @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) From d810d2d1e3cdbb90c567f643f999ae8209d8026e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Jun 2021 12:33:35 -0600 Subject: [PATCH 04/24] Comment fastpath. Maybe this is still unnecessary? --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 14be701fc6..112fa581e0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7491,6 +7491,7 @@ def decide_worker( candidates = set(all_workers) else: if len(random_workers_set) <= N_RANDOM_WORKERS: + # Fastpath: every worker would end up in `candidates`, so no need to build the set from `who_has`. candidates = random_workers_set else: candidates = {wws for dts in deps for wws in dts._who_has} From 768d66023596666053950639782e79f726e5fe77 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Jun 2021 16:11:47 -0600 Subject: [PATCH 05/24] Pick from idle workers first --- distributed/scheduler.py | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 112fa581e0..4fd1346aa3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2325,6 +2325,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = decide_worker( ts, self._workers_dv.values(), + self._idle_dv.values(), valid_workers, partial(self.worker_objective, ts), ) @@ -7459,14 +7460,19 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): @cfunc @exceptval(check=False) def decide_worker( - ts: TaskState, all_workers, valid_workers: set, objective + ts: TaskState, + all_workers: sortedcontainers.SortedValuesView, + idle_workers: sortedcontainers.SortedValuesView, + valid_workers: set, + objective, ) -> WorkerState: """ Decide which worker should take task *ts*. - We choose the worker that has the data on which *ts* depends. + We consider all workers which hold dependencies of *ts*, + plus a sample of 20 random workers (with preference for idle ones). - If several workers have dependencies then we choose the less-busy worker. + From those, we choose the worker where the *objective* function is minimized. Optionally provide *valid_workers* of where jobs are allowed to occur (if all workers are allowed to take the task, pass None instead). @@ -7476,29 +7482,30 @@ def decide_worker( of bytes sent between workers. This is determined by calling the *objective* function. """ + # TODO should it be a bounded fraction of `len(all_workers)`? N_RANDOM_WORKERS: Py_ssize_t = 20 ws: WorkerState = None wws: WorkerState dts: TaskState deps: set = ts._dependencies - random_workers_set: set = ( - valid_workers if valid_workers is not None else all_workers - ) candidates: set assert all([dts._who_has for dts in deps]) if ts._actor: candidates = set(all_workers) else: - if len(random_workers_set) <= N_RANDOM_WORKERS: - # Fastpath: every worker would end up in `candidates`, so no need to build the set from `who_has`. - candidates = random_workers_set - else: - candidates = {wws for dts in deps for wws in dts._who_has} - # TODO the ordering of this set is likely not actually random. - # Ideally it would be ordered by occupancy (but that's more work than we want to do). - # Can we at least ensure idle workers come first? And that it's reasonably random after that? - candidates.update(itertools.islice(random_workers_set, N_RANDOM_WORKERS)) + candidates = {wws for dts in deps for wws in dts._who_has} + # Add some random workers to into `candidates`, starting with idle ones + # TODO shuffle to prevent hotspots? + candidates.update(idle_workers[:N_RANDOM_WORKERS]) + if len(idle_workers) < N_RANDOM_WORKERS: + sample_from = ( + list(valid_workers) if valid_workers is not None else all_workers + ) + candidates.update( + random.sample(sample_from, min(N_RANDOM_WORKERS, len(sample_from))) + # ^ NOTE: `min` because `random.sample` errors if `len(sample) < k` + ) if valid_workers is None: if not candidates: candidates = set(all_workers) @@ -7508,7 +7515,7 @@ def decide_worker( candidates = valid_workers if not candidates: if ts._loose_restrictions: - ws = decide_worker(ts, all_workers, None, objective) + ws = decide_worker(ts, all_workers, idle_workers, None, objective) return ws ncandidates: Py_ssize_t = len(candidates) From 346ab1752a70f6857e35eb30491dd8f14ff4c1e4 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 11:31:52 -0600 Subject: [PATCH 06/24] Update `many_independent_leaves` test --- distributed/tests/test_scheduler.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 9196465066..132778ef9f 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -100,14 +100,16 @@ async def test_recompute_released_results(c, s, a, b): assert result == 1 -@gen_cluster(client=True) +@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "1mb"}) async def test_decide_worker_with_many_independent_leaves(c, s, a, b): + # Make data large to penalize scheduling dependent tasks on other workers + ballast = b"\0" * int(s.bandwidth) xs = await asyncio.gather( - c.scatter(list(range(0, 100, 2)), workers=a.address), - c.scatter(list(range(1, 100, 2)), workers=b.address), + c.scatter([bytes(i) + ballast for i in range(0, 100, 2)], workers=a.address), + c.scatter([bytes(i) + ballast for i in range(1, 100, 2)], workers=b.address), ) xs = list(concat(zip(*xs))) - ys = [delayed(inc)(x) for x in xs] + ys = [delayed(lambda s: s[0])(x) for x in xs] y2s = c.persist(ys) await wait(y2s) From 420c99e3bbb9777dc592bff377e3b58e9cbd57ce Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 12:06:01 -0600 Subject: [PATCH 07/24] Uppercase Mb Co-authored-by: Matthew Rocklin --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 132778ef9f..a1af5c3c00 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -100,7 +100,7 @@ async def test_recompute_released_results(c, s, a, b): assert result == 1 -@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "1mb"}) +@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "1Mb"}) async def test_decide_worker_with_many_independent_leaves(c, s, a, b): # Make data large to penalize scheduling dependent tasks on other workers ballast = b"\0" * int(s.bandwidth) From 0a004b26a03a9e40608b3cf39319e00e2bf56394 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 12:15:09 -0600 Subject: [PATCH 08/24] move N_RANDOM_WORKERS within conditional --- distributed/scheduler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4fd1346aa3..195d2c5f18 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7482,9 +7482,6 @@ def decide_worker( of bytes sent between workers. This is determined by calling the *objective* function. """ - # TODO should it be a bounded fraction of `len(all_workers)`? - N_RANDOM_WORKERS: Py_ssize_t = 20 - ws: WorkerState = None wws: WorkerState dts: TaskState @@ -7497,6 +7494,7 @@ def decide_worker( candidates = {wws for dts in deps for wws in dts._who_has} # Add some random workers to into `candidates`, starting with idle ones # TODO shuffle to prevent hotspots? + N_RANDOM_WORKERS: Py_ssize_t = 20 candidates.update(idle_workers[:N_RANDOM_WORKERS]) if len(idle_workers) < N_RANDOM_WORKERS: sample_from = ( From b050d14d465e9ffaeb75258ac582f94ee92e9402 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 12:35:55 -0600 Subject: [PATCH 09/24] Pass in sortedcontainers values, not pydict values --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 195d2c5f18..72c24bc7dd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2324,8 +2324,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if ts._dependencies or valid_workers is not None: ws = decide_worker( ts, - self._workers_dv.values(), - self._idle_dv.values(), + self._workers.values(), + self._idle.values(), valid_workers, partial(self.worker_objective, ts), ) From 9e99b7fea633123c3c64543bf8c56397b35e3b49 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 13:06:23 -0600 Subject: [PATCH 10/24] Use sleep test again --- distributed/tests/test_scheduler.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index a1af5c3c00..b6fbfbb1cb 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -134,13 +134,19 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c): config={"distributed.scheduler.work-stealing": False}, ) async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c): + await client.submit(slowinc, 10, delay=0.1) # learn that slowinc is slow root = await client.scatter(1) assert sum(root.key in worker.data for worker in [a, b, c]) == 1 - tasks = client.map(inc, [root] * 6, pure=False) + start = time() + tasks = client.map(slowinc, [root] * 6, delay=0.1, pure=False) await wait(tasks) + elapsed = time() - start - assert all(root.key in worker.data for worker in [a, b, c]) + assert elapsed <= 4 + assert all(root.key in worker.data for worker in [a, b, c]), [ + list(worker.data.keys()) for worker in [a, b, c] + ] assert len(a.data) == len(b.data) == len(c.data) == 3 From f6acdc4d7983ed5b7a1201ddf444baecf2c37ce6 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 13:12:46 -0600 Subject: [PATCH 11/24] Simpler logic --- distributed/scheduler.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 72c24bc7dd..5de6eefa69 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7470,7 +7470,7 @@ def decide_worker( Decide which worker should take task *ts*. We consider all workers which hold dependencies of *ts*, - plus a sample of 20 random workers (with preference for idle ones). + plus a sample of up to 20 random workers (with preference for idle ones). From those, we choose the worker where the *objective* function is minimized. @@ -7491,19 +7491,16 @@ def decide_worker( if ts._actor: candidates = set(all_workers) else: + # Select all workers holding deps of this task candidates = {wws for dts in deps for wws in dts._who_has} - # Add some random workers to into `candidates`, starting with idle ones - # TODO shuffle to prevent hotspots? - N_RANDOM_WORKERS: Py_ssize_t = 20 - candidates.update(idle_workers[:N_RANDOM_WORKERS]) - if len(idle_workers) < N_RANDOM_WORKERS: - sample_from = ( - list(valid_workers) if valid_workers is not None else all_workers - ) - candidates.update( - random.sample(sample_from, min(N_RANDOM_WORKERS, len(sample_from))) - # ^ NOTE: `min` because `random.sample` errors if `len(sample) < k` - ) + # Add up to 10 random workers into `candidates`, preferring idle ones. + sample_from = ( + list(valid_workers) + if valid_workers is not None + else idle_workers or all_workers + ) + candidates.update(random.sample(sample_from, min(10, len(sample_from)))) + # ^ NOTE: `min` because `random.sample` errors if `len(sample) < k` if valid_workers is None: if not candidates: candidates = set(all_workers) From 524da734a1ae5446ab40b4205dc10b921f777000 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 13:22:32 -0600 Subject: [PATCH 12/24] 20 -> 10 Co-authored-by: Matthew Rocklin --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5de6eefa69..6f07f0bc83 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7470,7 +7470,7 @@ def decide_worker( Decide which worker should take task *ts*. We consider all workers which hold dependencies of *ts*, - plus a sample of up to 20 random workers (with preference for idle ones). + plus a sample of up to 10 random workers (with preference for idle ones). From those, we choose the worker where the *objective* function is minimized. From a5d37ae516fa309a17da39fcf4ee9795361e81fa Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 14:45:08 -0600 Subject: [PATCH 13/24] Over-optimized --- distributed/scheduler.py | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6f07f0bc83..b8f4519333 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -18,7 +18,7 @@ from datetime import timedelta from functools import partial from numbers import Number -from typing import Optional +from typing import Optional, Union import psutil import sortedcontainers @@ -2324,8 +2324,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if ts._dependencies or valid_workers is not None: ws = decide_worker( ts, - self._workers.values(), - self._idle.values(), + self._workers_dv, + self._idle_dv, valid_workers, partial(self.worker_objective, ts), ) @@ -7461,8 +7461,8 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): @exceptval(check=False) def decide_worker( ts: TaskState, - all_workers: sortedcontainers.SortedValuesView, - idle_workers: sortedcontainers.SortedValuesView, + all_workers: dict, + idle_workers: dict, valid_workers: set, objective, ) -> WorkerState: @@ -7489,21 +7489,20 @@ def decide_worker( candidates: set assert all([dts._who_has for dts in deps]) if ts._actor: - candidates = set(all_workers) + candidates = set(all_workers.values()) else: # Select all workers holding deps of this task candidates = {wws for dts in deps for wws in dts._who_has} - # Add up to 10 random workers into `candidates`, preferring idle ones. - sample_from = ( - list(valid_workers) - if valid_workers is not None - else idle_workers or all_workers - ) - candidates.update(random.sample(sample_from, min(10, len(sample_from)))) - # ^ NOTE: `min` because `random.sample` errors if `len(sample) < k` + + worker_pool = valid_workers if valid_workers is not None else all_workers + if len(candidates) < len(worker_pool): + # Add up to 10 random workers into `candidates`, preferring idle ones. + candidates.update( + random_choices_dict_set(idle_workers or worker_pool, k=10) + ) if valid_workers is None: if not candidates: - candidates = set(all_workers) + candidates = set(all_workers.values()) else: candidates &= valid_workers if not candidates: @@ -7524,6 +7523,17 @@ def decide_worker( return ws +def random_choices_dict_set(population: Union[set, dict], k: int) -> Iterator: + "Randomly choose *k* items with replacement from the set or values of the dict *population*" + if k < len(population): + yield from population if isinstance(population, set) else population.values() + elif isinstance(population, set): + yield from random.choices(list(population), k=k) + else: + for key in random.choices(list(population), k=k): + yield population[key] + + def validate_task_state(ts: TaskState): """ Validate the given TaskState. From 5842ca8dad2de131a681c086e01bdbdff966ac4d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 14:45:18 -0600 Subject: [PATCH 14/24] Revert "Over-optimized" This reverts commit 761546db775b772a6dbbfc39218cc9e871fc3c98. --- distributed/scheduler.py | 40 +++++++++++++++------------------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b8f4519333..6f07f0bc83 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -18,7 +18,7 @@ from datetime import timedelta from functools import partial from numbers import Number -from typing import Optional, Union +from typing import Optional import psutil import sortedcontainers @@ -2324,8 +2324,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if ts._dependencies or valid_workers is not None: ws = decide_worker( ts, - self._workers_dv, - self._idle_dv, + self._workers.values(), + self._idle.values(), valid_workers, partial(self.worker_objective, ts), ) @@ -7461,8 +7461,8 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): @exceptval(check=False) def decide_worker( ts: TaskState, - all_workers: dict, - idle_workers: dict, + all_workers: sortedcontainers.SortedValuesView, + idle_workers: sortedcontainers.SortedValuesView, valid_workers: set, objective, ) -> WorkerState: @@ -7489,20 +7489,21 @@ def decide_worker( candidates: set assert all([dts._who_has for dts in deps]) if ts._actor: - candidates = set(all_workers.values()) + candidates = set(all_workers) else: # Select all workers holding deps of this task candidates = {wws for dts in deps for wws in dts._who_has} - - worker_pool = valid_workers if valid_workers is not None else all_workers - if len(candidates) < len(worker_pool): - # Add up to 10 random workers into `candidates`, preferring idle ones. - candidates.update( - random_choices_dict_set(idle_workers or worker_pool, k=10) - ) + # Add up to 10 random workers into `candidates`, preferring idle ones. + sample_from = ( + list(valid_workers) + if valid_workers is not None + else idle_workers or all_workers + ) + candidates.update(random.sample(sample_from, min(10, len(sample_from)))) + # ^ NOTE: `min` because `random.sample` errors if `len(sample) < k` if valid_workers is None: if not candidates: - candidates = set(all_workers.values()) + candidates = set(all_workers) else: candidates &= valid_workers if not candidates: @@ -7523,17 +7524,6 @@ def decide_worker( return ws -def random_choices_dict_set(population: Union[set, dict], k: int) -> Iterator: - "Randomly choose *k* items with replacement from the set or values of the dict *population*" - if k < len(population): - yield from population if isinstance(population, set) else population.values() - elif isinstance(population, set): - yield from random.choices(list(population), k=k) - else: - for key in random.choices(list(population), k=k): - yield population[key] - - def validate_task_state(ts: TaskState): """ Validate the given TaskState. From a1592459984f3379582748b338765f9969b394e2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 15:30:32 -0600 Subject: [PATCH 15/24] `random_choices_iter`. over-optimized for now. --- distributed/scheduler.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6f07f0bc83..5e2a97db6d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -18,7 +18,7 @@ from datetime import timedelta from functools import partial from numbers import Number -from typing import Optional +from typing import Optional, ValuesView import psutil import sortedcontainers @@ -2324,8 +2324,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if ts._dependencies or valid_workers is not None: ws = decide_worker( ts, - self._workers.values(), - self._idle.values(), + self._workers_dv.values(), + self._idle_dv.values(), valid_workers, partial(self.worker_objective, ts), ) @@ -7461,8 +7461,8 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): @exceptval(check=False) def decide_worker( ts: TaskState, - all_workers: sortedcontainers.SortedValuesView, - idle_workers: sortedcontainers.SortedValuesView, + all_workers: ValuesView, + idle_workers: ValuesView, valid_workers: set, objective, ) -> WorkerState: @@ -7482,6 +7482,8 @@ def decide_worker( of bytes sent between workers. This is determined by calling the *objective* function. """ + # NOTE: `all_workers` and `idle_workers` must be plain `dict_values` objects, + # not a `SortedValuesView`, which is much slower to iterate over. ws: WorkerState = None wws: WorkerState dts: TaskState @@ -7494,13 +7496,14 @@ def decide_worker( # Select all workers holding deps of this task candidates = {wws for dts in deps for wws in dts._who_has} # Add up to 10 random workers into `candidates`, preferring idle ones. - sample_from = ( - list(valid_workers) - if valid_workers is not None - else idle_workers or all_workers - ) - candidates.update(random.sample(sample_from, min(10, len(sample_from)))) - # ^ NOTE: `min` because `random.sample` errors if `len(sample) < k` + worker_pool = valid_workers if valid_workers is not None else all_workers + if len(candidates) < len(worker_pool): + sample_from = idle_workers or worker_pool + candidates.update( + random_choices_iter(sample_from, 10) + if len(sample_from) > 10 + else sample_from + ) if valid_workers is None: if not candidates: candidates = set(all_workers) @@ -7524,6 +7527,11 @@ def decide_worker( return ws +def random_choices_iter(xs: Iterable, k: int) -> Iterable: + "Randomly choose between 0 and *k* items from *xs*" + return itertools.islice(itertools.takewhile(lambda _: random.random() < 0.5, xs), k) + + def validate_task_state(ts: TaskState): """ Validate the given TaskState. From bb991d1befdcb6b7b30ad984e08a23e3478c7cd2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 15:44:10 -0600 Subject: [PATCH 16/24] use `random.choices` tradeoff between speed and randomness --- distributed/scheduler.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5e2a97db6d..708f916190 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7500,9 +7500,7 @@ def decide_worker( if len(candidates) < len(worker_pool): sample_from = idle_workers or worker_pool candidates.update( - random_choices_iter(sample_from, 10) - if len(sample_from) > 10 - else sample_from + random.choices(list(sample_from), k=min(10, len(sample_from))) ) if valid_workers is None: if not candidates: @@ -7527,11 +7525,6 @@ def decide_worker( return ws -def random_choices_iter(xs: Iterable, k: int) -> Iterable: - "Randomly choose between 0 and *k* items from *xs*" - return itertools.islice(itertools.takewhile(lambda _: random.random() < 0.5, xs), k) - - def validate_task_state(ts: TaskState): """ Validate the given TaskState. From 58b4bf830566c3e84e50b88ac390095c9158c5b9 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 21:30:45 -0600 Subject: [PATCH 17/24] REBASEME Actor: don't hold key references on workers --- distributed/actor.py | 8 ++--- distributed/tests/test_actor.py | 57 ++++++++++++++++++++++++++++++++- distributed/worker.py | 2 +- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index 77b2cda67d..231cc8b3a2 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -3,11 +3,11 @@ import threading from queue import Queue -from .client import Future, default_client +from .client import Future from .protocol import to_serialize from .utils import iscoroutinefunction, sync, thread_state from .utils_comm import WrappedKey -from .worker import get_worker +from .worker import get_client, get_worker class Actor(WrappedKey): @@ -63,8 +63,8 @@ def __init__(self, cls, address, key, worker=None): except ValueError: self._worker = None try: - self._client = default_client() - self._future = Future(key) + self._client = get_client() + self._future = Future(key, inform=self._worker is None) except ValueError: self._client = None diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 851ee7e8b2..87c126bf2f 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -515,7 +515,7 @@ def check(dask_worker): start = time() while any(client.run(check).values()): sleep(0.01) - assert time() < start + 30 + assert time() < start + 10 @gen_cluster( @@ -566,6 +566,61 @@ async def wait(self): await c.gather(futures) +@gen_cluster(client=True, client_kwargs=dict(set_as_default=False)) +# ^ NOTE: without `set_as_default=False`, `get_client()` within worker would return +# the same client instance the test is using (because it's all one process). +# Even with this, both workers will share the same client instance. +async def test_worker_actor_handle_is_weakref(c, s, a, b): + counter = c.submit(Counter, actor=True, workers=[a.address]) + + await c.submit(lambda _: None, counter, workers=[b.address]) + + del counter + + start = time() + while a.actors or b.data: + await asyncio.sleep(0.1) + assert time() < start + 10 + + +def test_worker_actor_handle_is_weakref_sync(client): + workers = list(client.run(lambda: None)) + counter = client.submit(Counter, actor=True, workers=[workers[0]]) + + client.submit(lambda _: None, counter, workers=[workers[1]]).result() + + del counter + + def check(dask_worker): + return len(dask_worker.data) + len(dask_worker.actors) + + start = time() + while any(client.run(check).values()): + sleep(0.01) + assert time() < start + 10 + + +def test_worker_actor_handle_is_weakref_from_compute_sync(client): + workers = list(client.run(lambda: None)) + + with dask.annotate(workers=workers[0]): + counter = dask.delayed(Counter)() + with dask.annotate(workers=workers[1]): + intermediate = dask.delayed(lambda c: None)(counter) + with dask.annotate(workers=workers[0]): + final = dask.delayed(lambda x, c: x)(intermediate, counter) + + final.compute(actors=counter, optimize_graph=False) + + def worker_tasks_running(dask_worker): + return len(dask_worker.data) + len(dask_worker.actors) + + start = time() + while any(client.run(worker_tasks_running).values()): + sleep(0.01) + assert time() < start + 10 + + def test_one_thread_deadlock(): with cluster(nworkers=2) as (cl, w): client = Client(cl["address"]) diff --git a/distributed/worker.py b/distributed/worker.py index bd1bf6f0d7..44e05f0502 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1417,7 +1417,7 @@ async def get_data( if k in self.actors: from .actor import Actor - data[k] = Actor(type(self.actors[k]), self.address, k) + data[k] = Actor(type(self.actors[k]), self.address, k, worker=self) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} nbytes = {k: self.tasks[k].nbytes for k in data if k in self.tasks} From 13975cbb47abdf777b23df4825a705105645e88d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 21 Jun 2021 13:24:30 -0600 Subject: [PATCH 18/24] Remove flaky data-length check --- distributed/tests/test_scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index b6fbfbb1cb..f8924c8a93 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -147,7 +147,6 @@ async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c assert all(root.key in worker.data for worker in [a, b, c]), [ list(worker.data.keys()) for worker in [a, b, c] ] - assert len(a.data) == len(b.data) == len(c.data) == 3 @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) From fcb165e5d194b630ddc33ee301cc696235b6d9e7 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 21 Jun 2021 15:27:33 -0600 Subject: [PATCH 19/24] No randomness if < 10 workers to choose from Fixes flaky `test_balance_with_restrictions` --- distributed/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 708f916190..3fd7326df1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7501,6 +7501,8 @@ def decide_worker( sample_from = idle_workers or worker_pool candidates.update( random.choices(list(sample_from), k=min(10, len(sample_from))) + if len(sample_from) > 10 + else sample_from ) if valid_workers is None: if not candidates: From cd382c636d28074667dad0665c040d117a55533a Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 21 Jun 2021 15:54:40 -0600 Subject: [PATCH 20/24] Ensure `decide_worker` args are plain dict_values @jakirkham any better ideas here? --- distributed/scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3fd7326df1..1ab5ba5d1b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2324,8 +2324,10 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if ts._dependencies or valid_workers is not None: ws = decide_worker( ts, - self._workers_dv.values(), - self._idle_dv.values(), + self._workers_dv.values() if compiled else dict.values(self._workers), + self._idle_dv.values() if compiled else dict.values(self._workers), + # ^ NOTE: For performance, these must be actual `dict_values`, not `SortedDictValues`. + # In Cython, `_workers_dv` is a plain dict, but in plain Python, it's still a `SortedDict`. valid_workers, partial(self.worker_objective, ts), ) From cc57a8be180b6bbb6d4c61ef481f8297d5f12f09 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 11:09:32 -0600 Subject: [PATCH 21/24] 1 worker for `test_statistical_profiling` I'm a little concerned why the tasks never ran on worker `a` only on Windows though? --- distributed/tests/test_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index ba4de5199d..71fdf3cb7a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1070,8 +1070,8 @@ async def test_scheduler_delay(c, s, a, b): @pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) -@gen_cluster(client=True) -async def test_statistical_profiling(c, s, a, b): +@gen_cluster(client=True, ncores=[("127.0.0.1", 1)]) +async def test_statistical_profiling(c, s, a): futures = c.map(slowinc, range(10), delay=0.1) await wait(futures) From 13911bc16f0163c975300dbecdca189488a6619f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 12:46:16 -0600 Subject: [PATCH 22/24] no conditional on compiled --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1ab5ba5d1b..5d10e55a01 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2324,8 +2324,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if ts._dependencies or valid_workers is not None: ws = decide_worker( ts, - self._workers_dv.values() if compiled else dict.values(self._workers), - self._idle_dv.values() if compiled else dict.values(self._workers), + dict.values(self._workers_dv), + dict.values(self._idle_dv), # ^ NOTE: For performance, these must be actual `dict_values`, not `SortedDictValues`. # In Cython, `_workers_dv` is a plain dict, but in plain Python, it's still a `SortedDict`. valid_workers, From f2445fe63f2d2e52c9fcc21e7acb687ede74afee Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 15:09:52 -0600 Subject: [PATCH 23/24] rerun tests From 5794540748018dad063be0ea8610b4605193f1f8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 20 Jul 2021 13:21:49 -0800 Subject: [PATCH 24/24] fix errant actor test --- distributed/tests/test_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 2c1a77065e..d529edcc98 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -513,7 +513,7 @@ def check(dask_worker): start = time() while any(client.run(check).values()): sleep(0.01) - assert time() < start + 10 + assert time() < start + 30 @gen_cluster(