diff --git a/.github/workflows/slow-tests.yml b/.github/workflows/slow-tests.yml index 1cf63ab9..f95761b1 100644 --- a/.github/workflows/slow-tests.yml +++ b/.github/workflows/slow-tests.yml @@ -40,7 +40,7 @@ jobs: - name: Install run: | - python -m pip install -e .[test] + python -m pip install -e .[test] memray - name: Run tests run: | diff --git a/cubed/diagnostics/memray.py b/cubed/diagnostics/memray.py new file mode 100644 index 00000000..31d73a4c --- /dev/null +++ b/cubed/diagnostics/memray.py @@ -0,0 +1,100 @@ +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Dict, Optional + +import memray +from memray._memray import compute_statistics +from memray._stats import Stats + +from cubed.runtime.pipeline import visit_nodes +from cubed.runtime.types import Callback + + +class AllocationType(Enum): + MALLOC = 1 + FREE = 2 + CALLOC = 3 + REALLOC = 4 + + +@dataclass() +class Allocation: + object_id: str + allocation_type: AllocationType + memory: int + address: Optional[int] = None + call: Optional[str] = None + + def __repr__(self) -> str: + return f"{self.object_id} {self.allocation_type.name} {self.memory or ''} {self.address or ''} {self.call or ''}" + + +class MemrayCallback(Callback): + """Process Memray results for a computation, and print large MALLOC and FREE calls for each operation.""" + + def __init__(self, mem_threshold=50_000_000) -> None: + self.mem_threshold = mem_threshold + self.allocations: Dict[str, Allocation] = {} + self.stats: Dict[str, Stats] = {} + + def on_compute_end(self, event): + for name, _ in visit_nodes(event.dag): + memray_result_file = f"history/{event.compute_id}/memray/{name}.bin" + if not Path(memray_result_file).is_file(): + continue + + allocations = get_allocations_over_threshold( + memray_result_file, self.mem_threshold + ) + + print(memray_result_file) + for allocation in allocations: + print(allocation) + + stats = compute_statistics(memray_result_file) + print(f"Peak memory allocated: {stats.peak_memory_allocated}") + + print() + + self.allocations[name] = allocations + self.stats[name] = stats + + +def get_allocations_over_threshold(result_file, mem_threshold): + # find all allocations over threshold and their corresponding free operations + id = 0 + address_to_allocation = {} + with memray.FileReader(result_file) as reader: + for a in reader.get_allocation_records(): + if a.size >= mem_threshold: + func, mod, line = a.stack_trace()[0] + if a.allocator == memray.AllocatorType.MALLOC: + allocation_type = AllocationType.MALLOC + elif a.allocator == memray.AllocatorType.CALLOC: + allocation_type = AllocationType.CALLOC + elif a.allocator == memray.AllocatorType.REALLOC: + allocation_type = AllocationType.REALLOC + else: + raise ValueError(f"Unsupported memray.AllocatorType {a.allocator}") + allocation = Allocation( + f"object-{id:03}", + allocation_type, + a.size, + address=a.address, + call=f"{func};{mod};{line}", + ) + id += 1 + address_to_allocation[a.address] = allocation + yield allocation + elif ( + a.allocator == memray.AllocatorType.FREE + and a.address in address_to_allocation + ): + allocation = address_to_allocation.pop(a.address) + yield Allocation( + allocation.object_id, + AllocationType.FREE, + allocation.memory, + address=a.address, + ) diff --git a/cubed/tests/test_mem_utilization.py b/cubed/tests/test_mem_utilization.py index 290f8da7..edccca50 100644 --- a/cubed/tests/test_mem_utilization.py +++ b/cubed/tests/test_mem_utilization.py @@ -4,8 +4,11 @@ import sys from functools import partial, reduce +import pandas as pd import pytest +pytest.importorskip("memray") + import cubed import cubed.array_api as xp import cubed.random @@ -14,9 +17,13 @@ from cubed.core.optimization import multiple_inputs_optimize_dag from cubed.diagnostics.history import HistoryCallback from cubed.diagnostics.mem_warn import MemoryWarningCallback +from cubed.diagnostics.memray import MemrayCallback from cubed.runtime.create import create_executor from cubed.tests.utils import LITHOPS_LOCAL_CONFIG +pd.set_option("display.max_columns", None) + + ALLOWED_MEM = 2_000_000_000 EXECUTORS = {} @@ -107,7 +114,8 @@ def test_tril(tmp_path, spec, executor): @pytest.mark.slow -def test_add(tmp_path, spec, executor): +@pytest.mark.parametrize("optimize_graph", [False, True]) +def test_add(tmp_path, spec, executor, optimize_graph): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks @@ -115,7 +123,7 @@ def test_add(tmp_path, spec, executor): (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks c = xp.add(a, b) - run_operation(tmp_path, executor, "add", c) + run_operation(tmp_path, executor, "add", c, optimize_graph=optimize_graph) @pytest.mark.slow @@ -237,6 +245,16 @@ def test_concat(tmp_path, spec, executor): run_operation(tmp_path, executor, "concat", c) +@pytest.mark.slow +def test_flip(tmp_path, spec, executor): + # Note 'a' has one fewer element in axis=0 to force chunking to cross array boundaries + a = cubed.random.random( + (9999, 10000), chunks=(5000, 5000), spec=spec + ) # 200MB chunks + b = xp.flip(a, axis=0) + run_operation(tmp_path, executor, "flip", b) + + @pytest.mark.slow def test_reshape(tmp_path, spec, executor): a = cubed.random.random( @@ -305,17 +323,27 @@ def test_sum_partial_reduce(tmp_path, spec, executor): # Internal functions -def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None): - # result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False) +def run_operation( + tmp_path, + executor, + name, + result_array, + *, + optimize_graph=True, + optimize_function=None, +): + # result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False, show_hidden=True) # result_array.visualize(f"cubed-{name}", optimize_function=optimize_function) hist = HistoryCallback() mem_warn = MemoryWarningCallback() + memray = MemrayCallback() # use store=None to write to temporary zarr cubed.to_zarr( result_array, store=None, executor=executor, - callbacks=[hist, mem_warn], + callbacks=[hist, mem_warn, memray], + optimize_graph=optimize_graph, optimize_function=optimize_function, ) @@ -328,6 +356,13 @@ def run_operation(tmp_path, executor, name, result_array, *, optimize_function=N # check change in peak memory is no more than projected mem assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all() + # check memray peak memory allocated is no more than projected mem + for op_name, stats in memray.stats.items(): + assert ( + stats.peak_memory_allocated + <= df.query(f"name=='{op_name}'")["projected_mem_mb"].item() * 1_000_000 + ), f"projected mem exceeds memray's peak allocated for {op_name}" + # check projected_mem_utilization does not exceed 1 # except on processes executor that runs multiple tasks in a process if (