Skip to content

Commit

Permalink
Memray callback (#561)
Browse files Browse the repository at this point in the history
* Add memray callback to print large MALLOC and FREE calls for each operation

* Use memray callback in test_mem_utilization

* Add test for flip in test_mem_utilization

* Skip test_mem_utilization if memray isn't installed

* Fix mypy
  • Loading branch information
tomwhite authored Aug 31, 2024
1 parent 680b3c1 commit d2ba5e1
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/slow-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Install
run: |
python -m pip install -e .[test]
python -m pip install -e .[test] memray
- name: Run tests
run: |
Expand Down
100 changes: 100 additions & 0 deletions cubed/diagnostics/memray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Dict, Optional

import memray
from memray._memray import compute_statistics
from memray._stats import Stats

from cubed.runtime.pipeline import visit_nodes
from cubed.runtime.types import Callback


class AllocationType(Enum):
MALLOC = 1
FREE = 2
CALLOC = 3
REALLOC = 4


@dataclass()
class Allocation:
object_id: str
allocation_type: AllocationType
memory: int
address: Optional[int] = None
call: Optional[str] = None

def __repr__(self) -> str:
return f"{self.object_id} {self.allocation_type.name} {self.memory or ''} {self.address or ''} {self.call or ''}"


class MemrayCallback(Callback):
"""Process Memray results for a computation, and print large MALLOC and FREE calls for each operation."""

def __init__(self, mem_threshold=50_000_000) -> None:
self.mem_threshold = mem_threshold
self.allocations: Dict[str, Allocation] = {}
self.stats: Dict[str, Stats] = {}

def on_compute_end(self, event):
for name, _ in visit_nodes(event.dag):
memray_result_file = f"history/{event.compute_id}/memray/{name}.bin"
if not Path(memray_result_file).is_file():
continue

allocations = get_allocations_over_threshold(
memray_result_file, self.mem_threshold
)

print(memray_result_file)
for allocation in allocations:
print(allocation)

stats = compute_statistics(memray_result_file)
print(f"Peak memory allocated: {stats.peak_memory_allocated}")

print()

self.allocations[name] = allocations
self.stats[name] = stats


def get_allocations_over_threshold(result_file, mem_threshold):
# find all allocations over threshold and their corresponding free operations
id = 0
address_to_allocation = {}
with memray.FileReader(result_file) as reader:
for a in reader.get_allocation_records():
if a.size >= mem_threshold:
func, mod, line = a.stack_trace()[0]
if a.allocator == memray.AllocatorType.MALLOC:
allocation_type = AllocationType.MALLOC
elif a.allocator == memray.AllocatorType.CALLOC:
allocation_type = AllocationType.CALLOC
elif a.allocator == memray.AllocatorType.REALLOC:
allocation_type = AllocationType.REALLOC
else:
raise ValueError(f"Unsupported memray.AllocatorType {a.allocator}")
allocation = Allocation(
f"object-{id:03}",
allocation_type,
a.size,
address=a.address,
call=f"{func};{mod};{line}",
)
id += 1
address_to_allocation[a.address] = allocation
yield allocation
elif (
a.allocator == memray.AllocatorType.FREE
and a.address in address_to_allocation
):
allocation = address_to_allocation.pop(a.address)
yield Allocation(
allocation.object_id,
AllocationType.FREE,
allocation.memory,
address=a.address,
)
45 changes: 40 additions & 5 deletions cubed/tests/test_mem_utilization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import sys
from functools import partial, reduce

import pandas as pd
import pytest

pytest.importorskip("memray")

import cubed
import cubed.array_api as xp
import cubed.random
Expand All @@ -14,9 +17,13 @@
from cubed.core.optimization import multiple_inputs_optimize_dag
from cubed.diagnostics.history import HistoryCallback
from cubed.diagnostics.mem_warn import MemoryWarningCallback
from cubed.diagnostics.memray import MemrayCallback
from cubed.runtime.create import create_executor
from cubed.tests.utils import LITHOPS_LOCAL_CONFIG

pd.set_option("display.max_columns", None)


ALLOWED_MEM = 2_000_000_000

EXECUTORS = {}
Expand Down Expand Up @@ -107,15 +114,16 @@ def test_tril(tmp_path, spec, executor):


@pytest.mark.slow
def test_add(tmp_path, spec, executor):
@pytest.mark.parametrize("optimize_graph", [False, True])
def test_add(tmp_path, spec, executor, optimize_graph):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.add(a, b)
run_operation(tmp_path, executor, "add", c)
run_operation(tmp_path, executor, "add", c, optimize_graph=optimize_graph)


@pytest.mark.slow
Expand Down Expand Up @@ -237,6 +245,16 @@ def test_concat(tmp_path, spec, executor):
run_operation(tmp_path, executor, "concat", c)


@pytest.mark.slow
def test_flip(tmp_path, spec, executor):
# Note 'a' has one fewer element in axis=0 to force chunking to cross array boundaries
a = cubed.random.random(
(9999, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.flip(a, axis=0)
run_operation(tmp_path, executor, "flip", b)


@pytest.mark.slow
def test_reshape(tmp_path, spec, executor):
a = cubed.random.random(
Expand Down Expand Up @@ -305,17 +323,27 @@ def test_sum_partial_reduce(tmp_path, spec, executor):
# Internal functions


def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None):
# result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False)
def run_operation(
tmp_path,
executor,
name,
result_array,
*,
optimize_graph=True,
optimize_function=None,
):
# result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False, show_hidden=True)
# result_array.visualize(f"cubed-{name}", optimize_function=optimize_function)
hist = HistoryCallback()
mem_warn = MemoryWarningCallback()
memray = MemrayCallback()
# use store=None to write to temporary zarr
cubed.to_zarr(
result_array,
store=None,
executor=executor,
callbacks=[hist, mem_warn],
callbacks=[hist, mem_warn, memray],
optimize_graph=optimize_graph,
optimize_function=optimize_function,
)

Expand All @@ -328,6 +356,13 @@ def run_operation(tmp_path, executor, name, result_array, *, optimize_function=N
# check change in peak memory is no more than projected mem
assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all()

# check memray peak memory allocated is no more than projected mem
for op_name, stats in memray.stats.items():
assert (
stats.peak_memory_allocated
<= df.query(f"name=='{op_name}'")["projected_mem_mb"].item() * 1_000_000
), f"projected mem exceeds memray's peak allocated for {op_name}"

# check projected_mem_utilization does not exceed 1
# except on processes executor that runs multiple tasks in a process
if (
Expand Down

0 comments on commit d2ba5e1

Please sign in to comment.