Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent deadlock with shuffle="p2p when merging dataframes with many partitions #6981

Closed
wence- opened this issue Aug 31, 2022 · 10 comments
Closed
Labels

Comments

@wence-
Copy link
Contributor

wence- commented Aug 31, 2022

What happened:

The code below (needs typer in addition to usual dask/distributed/pandas/numpy) pretty consistently hangs after a worker AssertionError when using the p2p shuffle option. If I have both many workers, and many partitions per worker. In particular on a 40 physical core Broadwell machine with plentiful (1TB) RAM, the following execution nearly always crashes and then hangs:

$ python p2p-shuffle-hang.py --num-workers 40 --rows-per-worker 5_000_000 --partitions-per-worker 100 --shuffle-type p2p
...
2022-08-31 06:34:33,674 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 2068)
Function:  shuffle_unpack
args:      ('737f7325f3b9ae355d9fcea2be0ab659', 2068, None)
kwargs:    {}
Exception: 'AssertionError()'

2022-08-31 06:34:37,712 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 264)
Function:  shuffle_unpack
args:      ('737f7325f3b9ae355d9fcea2be0ab659', 264, None)
kwargs:    {}
Exception: 'AssertionError()'

At which point the dashboard shows that no tasks are processing (presumably because they are waiting for these now failed tasks), cluster dump attached below.

On the same system I could also reproduce with --num-workers 4 --partitions-per-worker 1000, though I was not able to on a different system (which has a faster disk and RAM).

Minimal Complete Verifiable Example:

Reproducer
import math
from enum import Enum, IntEnum, auto
from itertools import repeat
from typing import cast

import typer
import numpy as np
import pandas as pd
from dask.base import tokenize
from dask.dataframe.core import DataFrame
from distributed import Client, LocalCluster
from distributed.client import _wait, ALL_COMPLETED

class Type(IntEnum):
    LEFT = auto()
    RIGHT = auto()


def make_chunk(chunk_index, size, npartition, typ, match_fraction):
    if typ == Type.LEFT:
        start = size * chunk_index
        stop = start + size
        key = np.arange(start, stop, dtype=np.int64)
        value = np.random.randint(0, 2000, size=size, dtype=np.int64)
        return pd.DataFrame({"key": key, "value": value})
    elif typ == Type.RIGHT:
        sub_size = size // npartition
        to_use = max(math.ceil(sub_size * match_fraction), 1)
        arrays = []
        for i in range(npartition):
            start = size * i + (sub_size * chunk_index)
            end = start + sub_size
            arrays.append(
                np.random.permutation(np.arange(start, end, dtype=np.int64)[:to_use])
            )
        key_match = np.concatenate(arrays, axis=0)
        (got,) = key_match.shape
        missing = size - got
        start = size * npartition + size * chunk_index
        end = start + missing
        key_no_match = np.arange(start, end, dtype=np.int64)
        key = np.concatenate([key_match, key_no_match], axis=0)
        value = np.random.randint(0, 2000, size=size, dtype=np.int64)
        return pd.DataFrame({"key": key, "value": value})
    else:
        raise ValueError(f"Unknown dataframe type {typ}")


def make_ddf(chunk_size, npartition, match_fraction, typ):
    meta = pd.DataFrame(
        {"key": np.empty(0, dtype=np.int64), "value": np.empty(0, dtype=np.int64)}
    )
    divisions = list(repeat(None, npartition + 1))

    name = "generate-data-" + tokenize(chunk_size, npartition, match_fraction, typ)

    dsk = {
        (name, i): (make_chunk, i, chunk_size, npartition, typ, match_fraction)
        for i in range(npartition)
    }

    return DataFrame(dsk, name, meta, divisions)


class ShuffleType(str, Enum):
    P2P = "p2p"
    DEFAULT = "default"


def main(
    num_workers: int = typer.Option(
        1, help="Number of workers"
    ),
    rows_per_worker: int = typer.Option(
        5_000_000, help="Total dataframe rows per worker"
    ),
    partitions_per_worker: int = typer.Option(
        1, help="Number of partitions per worker"
    ),
    shuffle_type: ShuffleType = typer.Option(
        None, help="Dask shuffle implementation"
    )
):
    cluster = LocalCluster(n_workers=num_workers, threads_per_worker=1)
    client = Client(cluster, set_as_default=False)

    rows_per_chunk = rows_per_worker // partitions_per_worker
    npartition = partitions_per_worker * num_workers
    left = make_ddf(rows_per_chunk, npartition, 0.3, Type.LEFT)
    right = make_ddf(rows_per_chunk, npartition, 0.3, Type.RIGHT)
    left = cast(DataFrame, client.persist(left))
    right = cast(DataFrame, client.persist(right))
    _ = client.sync(_wait, left, timeout=None, return_when=ALL_COMPLETED)
    _ = client.sync(_wait, right, timeout=None, return_when=ALL_COMPLETED)

    shuffle = {ShuffleType.DEFAULT: None}.get(shuffle_type, shuffle_type)
    merged = left.merge(right, on=["key"], how="inner", shuffle=shuffle)
    merged = client.persist(merged)
    _ = client.sync(_wait, merged, timeout=None, return_when=ALL_COMPLETED)
    del cluster
    client.close()
    client.shutdown()
    del client


if __name__ == "__main__":
    client = typer.run(main)

Environment:

  • Dask version: 2022.8.1+7.g19a51474c
  • Distributed version: 2022.8.1+29.ga5d68657
  • Python version: 3.9.13 | packaged by conda-forge | (main, May 27 2022, 16:56:21) \n[GCC 10.3.0]
  • Operating System: Ubuntu 18.04
  • Install method (conda, pip, source): conda (dask/label/dev channel)
Cluster Dump State:

cluster-dump.msgpack.gz

@gjoseph92
Copy link
Collaborator

@wence- do you have any idea what line number that assertion error is coming from?

@wence-
Copy link
Contributor Author

wence- commented Sep 1, 2022

@wence- do you have any idea what line number that assertion error is coming from?

I don't :(, I will try and find out.

@wence-
Copy link
Contributor Author

wence- commented Sep 2, 2022

It's

assert not self.total_size

2022-09-02 06:41:47,467 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 40)
Function:  shuffle_unpack
args:      ('737f7325f3b9ae355d9fcea2be0ab659', 40, None)
kwargs:    {}
Exception: "AssertionError('Total size is 83544')"
Traceback:   File ".../distributed/shuffle/shuffle.py", line 48, in shuffle_unpack
    return get_ext().get_output_partition(id, output_partition)
  File ".../distributed/shuffle/shuffle_extension.py", line 323, in get_output_partition
    output = shuffle.get_output_partition(output_partition)
  File ".../distributed/shuffle/shuffle_extension.py", line 201, in get_output_partition
    sync(self.worker.loop, self.multi_file.flush)
  File ".../distributed/utils.py", line 405, in sync
    raise exc.with_traceback(tb)
  File ".../distributed/utils.py", line 378, in f
    result = yield future
  File ".../tornado/gen.py", line 762, in run
    value = future.result()
  File ".../distributed/shuffle/multi_file.py", line 259, in flush
    assert not self.total_size, f"Total size is {self.total_size}"

@gjoseph92
Copy link
Collaborator

Ah, thanks. Probably another concurrency bug, I'd guess. The p2p shuffle code hasn't been touched in a while, and likely won't be touched for a while, so I don't expect anyone will try to fix this. Ok if I close?

@wence-
Copy link
Contributor Author

wence- commented Sep 5, 2022

Probably another concurrency bug, I'd guess.

Do you mean a concurrency bug in distributed, or in "external" libraries.

The p2p shuffle code hasn't been touched in a while, and likely won't be touched for a while, so I don't expect anyone will try to fix this. Ok if I close?

I suppose this is OK, if the intention is to replace p2p shuffle code with something else. Otherwise, if this is just "low priority, but we would in theory like this to work", I would be +epsilon on leaving open (or I can schedule a reminder to check again in 3 months...)

@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2022

This seems like a valid bug. I don't think that it makes sense to close the issue because one person or one team chooses not to work on it. Others besides Gabe and the group around him can still jump in.

@fjetter
Copy link
Member

fjetter commented Oct 28, 2022

@wence- in #7195 I fixed a couple of deadlocks that are connected to swallowed exceptions. In that PR we should see the exceptions, if that's the problem

@wence-
Copy link
Contributor Author

wence- commented Oct 31, 2022

@wence- in #7195 I fixed a couple of deadlocks that are connected to swallowed exceptions. In that PR we should see the exceptions, if that's the problem

Running on that branch I'm unable to reproduce the original error and (after a couple of repeats) have yet to see any hangs.

@mrocklin
Copy link
Member

mrocklin commented Oct 31, 2022 via email

@fjetter
Copy link
Member

fjetter commented Nov 11, 2022

Should be closed after #7268 Please reopen if the issue is not resolved

@fjetter fjetter closed this as completed Nov 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants