Skip to content

Commit

Permalink
shuffle instead of set_index
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed May 19, 2021
1 parent 230e681 commit f25819f
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions dask_profiling_coiled/run_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from scheduler_profilers import pyspy_on_scheduler


def print_sizeof_serialized_graph(x) -> float:
start = total_start = time.perf_counter()
def print_sizeof_serialized_graph(x) -> None:
start = time.perf_counter()
dsk = dask.base.collections_to_dsk([x], optimize_graph=True)
optimize_time = time.perf_counter() - start

Expand All @@ -36,7 +36,6 @@ def print_sizeof_serialized_graph(x) -> float:
f"* {dask.utils.format_bytes(pickled)} pickled - {pickle_time:.1}s\n"
f"Optimize: {optimize_time:.1}s, pack: {pack_time:.1}s"
)
return time.perf_counter() - total_start


def main():
Expand All @@ -50,18 +49,15 @@ def main():
distributed.wait(df)
print("DataFrame persisted")

start = time.perf_counter()
reindexed = df.set_index("id", compute=False)
print(f"Reindexed generated in {time.perf_counter() - start:.1f} sec")
shuffled = df.shuffle("id", shuffle="tasks")

extra_time = print_sizeof_serialized_graph(reindexed)
print_sizeof_serialized_graph(shuffled)

df2 = reindexed.persist()
start = time.perf_counter()
df2 = shuffled.persist()
distributed.wait(df2)
elapsed = time.perf_counter() - start
print(
f"{elapsed:.1f} sec total, {elapsed - extra_time:.1f} sec without diagnostics"
)
print(f"{elapsed:.1f} sec")


if __name__ == "__main__":
Expand Down

0 comments on commit f25819f

Please sign in to comment.