Skip to content

Commit

Permalink
Limit the number of memory records reported
Browse files Browse the repository at this point in the history
For very big files, the generated reporters will hung when trying to
process all of the memory records produced. This happens quite a lot in
flamegraphs produced from very big files, where the browser cannot
display the ploy with millions of points.

To help here, add a new parameter to the FileReader class that limits
the number of memory records (and therefore temporal snapshots) stored
and reported. This should not affect most regular capture files but will
help with the very big ones.

Signed-off-by: Pablo Galindo <[email protected]>
Co-authored-by: Matt Wozniski <[email protected]>
Signed-off-by: Pablo Galindo <[email protected]>
  • Loading branch information
pablogsal and godlygeek committed Nov 9, 2023
1 parent 2defa44 commit 694bf84
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 3 deletions.
1 change: 1 addition & 0 deletions news/491.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Limit the number of memory records displayed in reporters by default. This will help displaying flamegraphs for long capture sessions.
6 changes: 5 additions & 1 deletion src/memray/_memray.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ class FileReader:
@property
def metadata(self) -> Metadata: ...
def __init__(
self, file_name: Union[str, Path], *, report_progress: bool = False
self,
file_name: Union[str, Path],
*,
report_progress: bool = False,
max_memory_records: int = 10000,
) -> None: ...
def get_allocation_records(self) -> Iterable[AllocationRecord]: ...
def get_temporal_allocation_records(
Expand Down
16 changes: 15 additions & 1 deletion src/memray/_memray.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ from _memray.source cimport SocketSource
from _memray.tracking_api cimport Tracker as NativeTracker
from _memray.tracking_api cimport install_trace_function
from cpython cimport PyErr_CheckSignals
from libc.math cimport ceil
from libc.stdint cimport uint64_t
from libcpp cimport bool
from libcpp.limits cimport numeric_limits
Expand Down Expand Up @@ -856,8 +857,9 @@ cdef class FileReader:
cdef HighWatermark _high_watermark
cdef object _header
cdef bool _report_progress
cdef size_t _memory_snapshot_stride

def __cinit__(self, object file_name, *, bool report_progress=False):
def __cinit__(self, object file_name, *, bool report_progress=False, int max_memory_records=10000):
try:
self._file = open(file_name)
except OSError as exc:
Expand All @@ -882,6 +884,9 @@ cdef class FileReader:
n_memory_snapshots_approx = 2048
if 0 < stats["start_time"] < stats["end_time"]:
n_memory_snapshots_approx = (stats["end_time"] - stats["start_time"]) / 10

if n_memory_snapshots_approx > max_memory_records:
n_memory_snapshots_approx = max_memory_records
self._memory_snapshots.reserve(n_memory_snapshots_approx)

cdef object total = stats['n_allocations'] or None
Expand All @@ -892,6 +897,7 @@ cdef class FileReader:
total=total,
report_progress=self._report_progress
)
self._memory_snapshot_stride = 0
cdef MemoryRecord memory_record
with progress_indicator:
while True:
Expand All @@ -918,6 +924,10 @@ cdef class FileReader:
self._memory_snapshots.push_back(reader.getLatestMemorySnapshot())
else:
break

if len(self._memory_snapshots) > max_memory_records:
self._memory_snapshot_stride = int(ceil(<double>len(self._memory_snapshots) / max_memory_records))
self._memory_snapshots = self._memory_snapshots[::self._memory_snapshot_stride]
self._high_watermark = finder.getHighWatermark()
stats["n_allocations"] = progress_indicator.num_processed

Expand Down Expand Up @@ -1101,6 +1111,7 @@ cdef class FileReader:

cdef AllocationLifetimeAggregator aggregator
cdef _Allocation allocation
cdef int memory_records_seen = 0

with progress_indicator:
while records_to_process > 0:
Expand All @@ -1114,6 +1125,9 @@ cdef class FileReader:
records_to_process -= 1
progress_indicator.update(1)
elif ret == RecordResult.RecordResultMemoryRecord:
memory_records_seen += 1
if self._memory_snapshot_stride and memory_records_seen % self._memory_snapshot_stride != 0:
continue
aggregator.captureSnapshot()
else:
assert ret != RecordResult.RecordResultMemorySnapshot
Expand Down
10 changes: 9 additions & 1 deletion src/memray/commands/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ def write_report(
merge_threads: Optional[bool] = None,
inverted: Optional[bool] = None,
temporal: bool = False,
max_memory_records: Optional[int] = None,
) -> None:
try:
reader = FileReader(os.fspath(result_path), report_progress=True)
kwargs = {}
if max_memory_records is not None:
kwargs["max_memory_records"] = max_memory_records
reader = FileReader(os.fspath(result_path), report_progress=True, **kwargs)
merge_threads = True if merge_threads is None else merge_threads
inverted = False if inverted is None else inverted

Expand Down Expand Up @@ -271,6 +275,10 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None

if hasattr(args, "inverted"):
kwargs["inverted"] = args.inverted

if hasattr(args, "max_memory_records"):
kwargs["max_memory_records"] = args.max_memory_records

self.write_report(
result_path,
output_file,
Expand Down
7 changes: 7 additions & 0 deletions src/memray/commands/flamegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ def prepare_parser(self, parser: argparse.ArgumentParser) -> None:
action="store_true",
default=False,
)

parser.add_argument(
"--max-memory-records",
help="Maximum number of memory records to display",
type=int,
default=None,
)
28 changes: 28 additions & 0 deletions tests/integration/test_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,34 @@ def test_memory_snapshots_tick_interval(self, tmp_path):
for prev, _next in zip(memory_snapshots, memory_snapshots[1:])
)

def test_memory_snapshots_limit_when_reading(self, tmp_path):
# GIVEN
allocator = MemoryAllocator()
output = tmp_path / "test.bin"

# WHEN
with Tracker(output):
for _ in range(2):
allocator.valloc(ALLOC_SIZE)
time.sleep(0.11)
allocator.free()

reader = FileReader(output)
memory_snapshots = list(reader.get_memory_snapshots())
temporal_records = list(reader.get_temporal_allocation_records())

assert memory_snapshots
n_snapshots = len(memory_snapshots)
n_temporal_records = len(temporal_records)

reader = FileReader(output, max_memory_records=n_snapshots // 2)
memory_snapshots = list(reader.get_memory_snapshots())
temporal_records = list(reader.get_temporal_allocation_records())

assert memory_snapshots
assert len(memory_snapshots) <= n_snapshots // 2 + 1
assert len(temporal_records) <= n_temporal_records // 2 + 1

def test_temporary_allocations_when_filling_vector_without_preallocating(
self, tmp_path
):
Expand Down

0 comments on commit 694bf84

Please sign in to comment.