Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separation of concerns between Shuffle and WorkerShuffle #7195

Closed
wants to merge 13 commits into from

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Oct 26, 2022

Builds on #7186

This PR refactors the Worker extension and the Shuffle class to have stricter separation of concerns.

Specifically,

Shuffle is responsible for all splitting, sending, flushing, receiving, etc. and is sole owner of associated resources (e.g. comms, buffers, threads, background tasks, etc.). It is entirely asynchronous and not intended to be interacted with directly. This is basically agnostic of what a worker even is.

The extension otoh is the interface between worker and the shuffle instance. It routes RPC calls to the shuffle and exposes synchronous methods to be called in the worker thread. It also is responsible to communicate with the scheduler.


Additional changes include some fixes around concurrency. Particularly this should close #6277 (See test_shuffle.py test_error_offload and test_slow_offload)
The gist is that the shuffle/extension did not wait for threads to complete when flushing (or rather it didn't wait for the receives to finish when flushing)

@fjetter
Copy link
Member Author

fjetter commented Oct 26, 2022

Unfortunately, I haven't fixed test_bad_disk yet (#7185 (comment))

@github-actions
Copy link
Contributor

github-actions bot commented Oct 26, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   6h 45m 18s ⏱️ + 38m 49s
  3 196 tests +  39    3 110 ✔️ +  44    83 💤 ±0  3  -   5 
23 633 runs  +277  22 723 ✔️ +290  901 💤 +2  9  - 15 

For more details on these failures, see this check.

Results for commit fd15b6f. ± Comparison against base commit 5dccad4.

♻️ This comment has been updated with latest results.

Comment on lines +500 to +534
class ShuffleTestPool:
def __init__(self, *args, **kwargs):
self.shuffles = {}
super().__init__(*args, **kwargs)

def __call__(self, addr: str, *args: Any, **kwargs: Any) -> PooledRPCShuffle:
return PooledRPCShuffle(self.shuffles[addr])

async def fake_broadcast(self, msg):

op = msg.pop("op").replace("shuffle_", "")
out = {}
for addr, s in self.shuffles.items():
out[addr] = await getattr(s, op)()
return out

def new_shuffle(
self, name, worker_for_mapping, schema, directory, loop, Shuffle=Shuffle
):
s = Shuffle(
column="_partition",
worker_for=worker_for_mapping,
# FIXME: Is output_workers redundant with worker_for?
output_workers=set(worker_for_mapping.values()),
schema=schema,
directory=directory / name,
id=ShuffleId(name),
local_address=name,
nthreads=2,
rpc=self,
loop=loop,
broadcast=self.fake_broadcast,
)
self.shuffles[name] = s
return s
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the most interesting part of this refactoring. I separated Shuffle and ShuffleWorkerExtension sufficiently to allow us to write down tests without scheduler and workers. This allows for a much more granular control over concurrency issues.

Particularly the offload tests below produce race conditions that would drop data

@fjetter fjetter changed the title Improve abstraction shuffle Separation of concerns between Shuffle and WorkerShuffle Oct 26, 2022
Comment on lines +616 to +633
dfs = []
rows_per_df = 10
n_input_partitions = 2
npartitions = 2
for ix in range(n_input_partitions):
df = pd.DataFrame({"x": range(rows_per_df * ix, rows_per_df * (ix + 1))})
df["_partition"] = df.x % npartitions
dfs.append(df)

workers = ["A", "B"]

worker_for_mapping = {}
partitions_for_worker = defaultdict(list)

for part in range(npartitions):
worker_for_mapping[part] = w = get_worker_for(part, workers, npartitions)
partitions_for_worker[w].append(part)
schema = pa.Schema.from_pandas(dfs[0])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all boilerplate I hope we can get rid of eventually. Right now not a prio

Comment on lines +92 to +93
rpc: Callable[[str], PooledRPCCall],
broadcast: Callable,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the most controversial part. It is very useful, though

@fjetter fjetter marked this pull request as ready for review October 28, 2022 14:59
@fjetter
Copy link
Member Author

fjetter commented Oct 28, 2022

I fixed a couple of deadlocks in multicomm an multifile in case of exceptions. I think I took this PR already too far. Will branch of with any following work.

This closes #7208

@fjetter
Copy link
Member Author

fjetter commented Nov 10, 2022

closed by #7268

@fjetter fjetter closed this Nov 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

P2P shuffle returns incorrect results if errors occur receiving data (also asyncio tasks are leaked)
1 participant