From 2751eb4700f09700abebb668299260aef1b3b3d2 Mon Sep 17 00:00:00 2001 From: Ivona Stojanovic Date: Fri, 8 Sep 2023 18:55:25 +0100 Subject: [PATCH 1/5] Added --aggregate option to attach Added --aggregate option which allows user to request aggregated mode for in-memory aggregation. Signed-off-by: Ivona Stojanovic --- src/memray/commands/attach.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/memray/commands/attach.py b/src/memray/commands/attach.py index 529dd5fd50..830b401fac 100644 --- a/src/memray/commands/attach.py +++ b/src/memray/commands/attach.py @@ -14,6 +14,7 @@ import threading import memray +from memray import FileFormat from memray._errors import MemrayCommandError from .live import LiveCommand @@ -241,6 +242,13 @@ def prepare_parser(self, parser: argparse.ArgumentParser) -> None: default=False, ) + parser.add_argument( + "--aggregate", + help="Write aggregated stats to the output file instead of all allocations", + action="store_true", + default=False, + ) + parser.add_argument( "--native", help="Track native (C/C++) stack frames as well", @@ -332,11 +340,19 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None live_port = _get_free_port() destination = memray.SocketDestination(server_port=live_port) + if args.aggregate and not hasattr(args, "output"): + parser.error("Can't use aggregated mode without an output file.") + + file_format = ( + f"file_format={FileFormat.AGGREGATED_ALLOCATIONS}" if args.aggregate else "" + ) + tracker_call = ( f"memray.Tracker(destination=memray.{destination!r}," f" native_traces={args.native}," f" follow_fork={args.follow_fork}," - f" trace_python_allocators={args.trace_python_allocators})" + f" trace_python_allocators={args.trace_python_allocators}," + f"{file_format})" ) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) From 01f4121e9ac279f421a62a59bd36260161d55331 Mon Sep 17 00:00:00 2001 From: Ivona Stojanovic Date: Tue, 12 Sep 2023 10:54:16 +0100 Subject: [PATCH 2/5] fixup! Added --aggregate option to attach Signed-off-by: Ivona Stojanovic --- src/memray/commands/attach.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/memray/commands/attach.py b/src/memray/commands/attach.py index 830b401fac..927fe338ef 100644 --- a/src/memray/commands/attach.py +++ b/src/memray/commands/attach.py @@ -14,7 +14,6 @@ import threading import memray -from memray import FileFormat from memray._errors import MemrayCommandError from .live import LiveCommand @@ -340,11 +339,13 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None live_port = _get_free_port() destination = memray.SocketDestination(server_port=live_port) - if args.aggregate and not hasattr(args, "output"): + if args.aggregate and not args.output: parser.error("Can't use aggregated mode without an output file.") file_format = ( - f"file_format={FileFormat.AGGREGATED_ALLOCATIONS}" if args.aggregate else "" + "file_format=memray.FileFormat.AGGREGATED_ALLOCATIONS" + if args.aggregate + else "" ) tracker_call = ( From 3c5852e603532825fa5fdeff2e1353adf067b580 Mon Sep 17 00:00:00 2001 From: Ivona Stojanovic Date: Tue, 12 Sep 2023 10:58:06 +0100 Subject: [PATCH 3/5] Tests: Added --aggregate option to attach Added a unit and an integration test for the --aggregate option. The unit test tests the case when --aggregate is given but --output is not. The --aggregate option is tested in the integration test. Signed-off-by: Ivona Stojanovic --- tests/integration/test_attach.py | 123 ++++++++++++++++++++++++------- tests/unit/test_attach.py | 21 ++++++ 2 files changed, 118 insertions(+), 26 deletions(-) create mode 100644 tests/unit/test_attach.py diff --git a/tests/integration/test_attach.py b/tests/integration/test_attach.py index 520d06570c..8fc1c78932 100644 --- a/tests/integration/test_attach.py +++ b/tests/integration/test_attach.py @@ -34,7 +34,7 @@ def bar(): def baz(): allocator = MemoryAllocator() - allocator.valloc(1024) + allocator.valloc(50 * 1024 * 1024) allocator.free() @@ -52,13 +52,43 @@ def baz(): """ -@pytest.mark.parametrize("method", ["lldb", "gdb"]) -def test_basic_attach(tmp_path, method): - if not debugger_available(method): - pytest.skip(f"a supported {method} debugger isn't installed") +def compare_allocations(allocations1, allocations2): + assert len(allocations1) == len(allocations2) + for i in range(0, len(allocations1)): + assert allocations1[i].allocator == allocations2[i].allocator + assert allocations1[i].n_allocations == allocations2[i].n_allocations + assert allocations1[i].size == allocations2[i].size + assert allocations1[i].stack_id == allocations2[i].stack_id + assert allocations1[i].tid == allocations2[i].tid + assert allocations1[i].native_stack_id == allocations2[i].native_stack_id + assert ( + allocations1[i].native_segment_generation + == allocations2[i].native_segment_generation + ) + assert allocations1[i].thread_name == allocations2[i].thread_name + + +def generate_command(method, output, aggregate): + cmd = [ + sys.executable, + "-m", + "memray", + "attach", + "--verbose", + "--force", + "--method", + method, + "-o", + str(output), + ] - # GIVEN - output = tmp_path / "test.bin" + if aggregate: + cmd.append("--aggregate") + + return cmd + + +def run_process(cmd): tracked_process = subprocess.Popen( [sys.executable, "-uc", PROGRAM], stdin=subprocess.PIPE, @@ -71,22 +101,12 @@ def test_basic_attach(tmp_path, method): assert tracked_process.stdout is not None assert tracked_process.stdout.readline() == "ready\n" - attach_cmd = [ - sys.executable, - "-m", - "memray", - "attach", - "--verbose", - "--method", - method, - "-o", - str(output), - str(tracked_process.pid), - ] + + cmd.append(str(tracked_process.pid)) # WHEN try: - subprocess.check_output(attach_cmd, stderr=subprocess.STDOUT, text=True) + subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True) except subprocess.CalledProcessError as exc: if "Couldn't write extended state status" in exc.output: # https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=898048 @@ -103,14 +123,65 @@ def test_basic_attach(tmp_path, method): assert "" == tracked_process.stdout.read() assert tracked_process.returncode == 0 - reader = FileReader(output) - records = list(reader.get_allocation_records()) - vallocs = [ + +def get_functions(allocations): + (valloc,) = allocations + return [f[0] for f in valloc.stack_trace()] + + +def get_relevant_allocations(records): + return [ record for record in filter_relevant_allocations(records) if record.allocator == AllocatorType.VALLOC ] - (valloc,) = vallocs - functions = [f[0] for f in valloc.stack_trace()] - assert functions == ["valloc", "baz", "bar", "foo", ""] + +@pytest.mark.parametrize("method", ["lldb", "gdb"]) +@pytest.mark.parametrize("aggregate", [True, False]) +def test_basic_attach(tmp_path, method, aggregate): + if not debugger_available(method): + pytest.skip(f"a supported {method} debugger isn't installed") + + # GIVEN + output = tmp_path / "test.bin" + + attach_cmd = generate_command(method, output, aggregate) + + run_process(attach_cmd) + + reader = FileReader(output) + + # WHEN + try: + hwa_allocation_records = list(reader.get_high_watermark_allocation_records()) + assert hwa_allocation_records is not None + allocation_records = list(reader.get_allocation_records()) + except NotImplementedError as exc: + if aggregate: + assert ( + "Can't get all allocations from a pre-aggregated capture file." + in str(exc) + ) + + hwa_relevant_allocations_records = get_relevant_allocations(hwa_allocation_records) + relevant_allocations_records = ( + get_relevant_allocations(allocation_records) if not aggregate else [] + ) + + if not aggregate: + assert get_functions(hwa_relevant_allocations_records) == get_functions( + relevant_allocations_records + ) + else: + output_no_aggregate = tmp_path / "test.bin" + attach_cmd = generate_command(method, output_no_aggregate, False) + run_process(attach_cmd) + + reader = FileReader(output_no_aggregate) + allocation_records = list(reader.get_high_watermark_allocation_records()) + relevant_allocations_records = get_relevant_allocations(allocation_records) + + compare_allocations( + relevant_allocations_records, hwa_relevant_allocations_records + ) diff --git a/tests/unit/test_attach.py b/tests/unit/test_attach.py new file mode 100644 index 0000000000..7ca44e4426 --- /dev/null +++ b/tests/unit/test_attach.py @@ -0,0 +1,21 @@ +from unittest.mock import patch + +import pytest + +from memray.commands import main + + +@patch("memray.commands.attach.debugger_available") +class TestAttachSubCommand: + def test_memray_attach_aggregated_without_output_file( + self, is_debugger_available_mock, capsys + ): + # GIVEN + is_debugger_available_mock.return_value = True + + # WHEN + with pytest.raises(SystemExit): + main(["attach", "--aggregate", "1234"]) + + captured = capsys.readouterr() + assert "Can't use aggregated mode without an output file." in captured.err From 5a2cdef1a077195619504cf6942ae5d8958602a9 Mon Sep 17 00:00:00 2001 From: Matt Wozniski Date: Tue, 12 Sep 2023 13:14:42 -0400 Subject: [PATCH 4/5] fixup! Tests: Added --aggregate option to attach --- tests/integration/test_attach.py | 85 +++++++++++--------------------- 1 file changed, 30 insertions(+), 55 deletions(-) diff --git a/tests/integration/test_attach.py b/tests/integration/test_attach.py index 8fc1c78932..f321087a53 100644 --- a/tests/integration/test_attach.py +++ b/tests/integration/test_attach.py @@ -52,22 +52,6 @@ def baz(): """ -def compare_allocations(allocations1, allocations2): - assert len(allocations1) == len(allocations2) - for i in range(0, len(allocations1)): - assert allocations1[i].allocator == allocations2[i].allocator - assert allocations1[i].n_allocations == allocations2[i].n_allocations - assert allocations1[i].size == allocations2[i].size - assert allocations1[i].stack_id == allocations2[i].stack_id - assert allocations1[i].tid == allocations2[i].tid - assert allocations1[i].native_stack_id == allocations2[i].native_stack_id - assert ( - allocations1[i].native_segment_generation - == allocations2[i].native_segment_generation - ) - assert allocations1[i].thread_name == allocations2[i].thread_name - - def generate_command(method, output, aggregate): cmd = [ sys.executable, @@ -124,12 +108,11 @@ def run_process(cmd): assert tracked_process.returncode == 0 -def get_functions(allocations): - (valloc,) = allocations - return [f[0] for f in valloc.stack_trace()] +def get_call_stack(allocation): + return [f[0] for f in allocation.stack_trace()] -def get_relevant_allocations(records): +def get_relevant_vallocs(records): return [ record for record in filter_relevant_allocations(records) @@ -138,50 +121,42 @@ def get_relevant_allocations(records): @pytest.mark.parametrize("method", ["lldb", "gdb"]) -@pytest.mark.parametrize("aggregate", [True, False]) -def test_basic_attach(tmp_path, method, aggregate): +def test_basic_attach(tmp_path, method): if not debugger_available(method): pytest.skip(f"a supported {method} debugger isn't installed") # GIVEN output = tmp_path / "test.bin" + attach_cmd = generate_command(method, output, aggregate=False) - attach_cmd = generate_command(method, output, aggregate) - + # WHEN run_process(attach_cmd) + # THEN reader = FileReader(output) + (valloc,) = get_relevant_vallocs(reader.get_allocation_records()) + assert get_call_stack(valloc) == ["valloc", "baz", "bar", "foo", ""] + + +@pytest.mark.parametrize("method", ["lldb", "gdb"]) +def test_aggregated_attach(tmp_path, method): + if not debugger_available(method): + pytest.skip(f"a supported {method} debugger isn't installed") + + # GIVEN + output = tmp_path / "test.bin" + attach_cmd = generate_command(method, output, aggregate=True) # WHEN - try: - hwa_allocation_records = list(reader.get_high_watermark_allocation_records()) - assert hwa_allocation_records is not None - allocation_records = list(reader.get_allocation_records()) - except NotImplementedError as exc: - if aggregate: - assert ( - "Can't get all allocations from a pre-aggregated capture file." - in str(exc) - ) - - hwa_relevant_allocations_records = get_relevant_allocations(hwa_allocation_records) - relevant_allocations_records = ( - get_relevant_allocations(allocation_records) if not aggregate else [] - ) + run_process(attach_cmd) - if not aggregate: - assert get_functions(hwa_relevant_allocations_records) == get_functions( - relevant_allocations_records - ) - else: - output_no_aggregate = tmp_path / "test.bin" - attach_cmd = generate_command(method, output_no_aggregate, False) - run_process(attach_cmd) - - reader = FileReader(output_no_aggregate) - allocation_records = list(reader.get_high_watermark_allocation_records()) - relevant_allocations_records = get_relevant_allocations(allocation_records) - - compare_allocations( - relevant_allocations_records, hwa_relevant_allocations_records - ) + # THEN + reader = FileReader(output) + with pytest.raises( + NotImplementedError, + match="Can't get all allocations from a pre-aggregated capture file.", + ): + list(reader.get_allocation_records()) + + (valloc,) = get_relevant_vallocs(reader.get_high_watermark_allocation_records()) + assert get_call_stack(valloc) == ["valloc", "baz", "bar", "foo", ""] From d47422db3cf616757f3f13808b5ca3e0c3695c89 Mon Sep 17 00:00:00 2001 From: Matt Wozniski Date: Tue, 12 Sep 2023 18:55:59 -0400 Subject: [PATCH 5/5] Move the `memray attach` callback into a module Previously we were maintaining this as a string literal inside the client-side `attach` code, but it's growing more complex, and we're reaching the point where it would be advantageous to have it in a normal module that linters and type checkers can analyze. Signed-off-by: Matt Wozniski --- src/memray/_attach_callback.py | 34 +++++++++++++++++++++ src/memray/commands/attach.py | 56 ++++++++++------------------------ 2 files changed, 50 insertions(+), 40 deletions(-) create mode 100644 src/memray/_attach_callback.py diff --git a/src/memray/_attach_callback.py b/src/memray/_attach_callback.py new file mode 100644 index 0000000000..14eb6bc8be --- /dev/null +++ b/src/memray/_attach_callback.py @@ -0,0 +1,34 @@ +import atexit + +import memray + + +def deactivate_last_tracker() -> None: + global _last_tracker + tracker = _last_tracker + + if tracker: + _last_tracker = None + tracker.__exit__(None, None, None) + + +def activate_tracking(**tracker_kwargs) -> None: + global _last_tracker + deactivate_last_tracker() + + tracker = memray.Tracker(**tracker_kwargs) + try: + tracker.__enter__() + _last_tracker = tracker + finally: + # Prevent any exception from keeping the tracker alive. + # This way resources are cleaned up ASAP. + del tracker + + +def callback(**tracker_kwargs) -> None: + activate_tracking(**tracker_kwargs) + + +_last_tracker = None +atexit.register(deactivate_last_tracker) diff --git a/src/memray/commands/attach.py b/src/memray/commands/attach.py index 927fe338ef..a5b9219558 100644 --- a/src/memray/commands/attach.py +++ b/src/memray/commands/attach.py @@ -11,6 +11,7 @@ import socket import subprocess import sys +import textwrap import threading import memray @@ -23,38 +24,7 @@ LLDB_SCRIPT = pathlib.Path(__file__).parent / "_attach.lldb" RTLD_DEFAULT = memray._memray.RTLD_DEFAULT RTLD_NOW = memray._memray.RTLD_NOW -PAYLOAD = """ -import atexit - -import memray - - -def deactivate_last_tracker(): - tracker = getattr(memray, "_last_tracker", None) - if not tracker: - return - - memray._last_tracker = None - tracker.__exit__(None, None, None) - - -if not hasattr(memray, "_last_tracker"): - # This only needs to be registered the first time we attach. - atexit.register(deactivate_last_tracker) - -deactivate_last_tracker() - -tracker = {tracker_call} -try: - tracker.__enter__() -except: - # Prevent the exception from keeping the tracker alive. - # This way resources are cleaned up ASAP. - del tracker - raise - -memray._last_tracker = tracker -""" +PAYLOAD = "import memray._attach_callback; memray._attach_callback.callback({args})" def inject(debugger: str, pid: int, port: int, verbose: bool) -> str | None: @@ -345,15 +315,21 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None file_format = ( "file_format=memray.FileFormat.AGGREGATED_ALLOCATIONS" if args.aggregate - else "" + else "file_format=memray.FileFormat.ALL_ALLOCATIONS" ) - tracker_call = ( - f"memray.Tracker(destination=memray.{destination!r}," - f" native_traces={args.native}," - f" follow_fork={args.follow_fork}," - f" trace_python_allocators={args.trace_python_allocators}," - f"{file_format})" + PAYLOAD = textwrap.dedent( + f""" + import memray._attach_callback + + memray._attach_callback.callback( + destination=memray.{destination!r}, + native_traces={args.native}, + follow_fork={args.follow_fork}, + trace_python_allocators={args.trace_python_allocators}, + {file_format}, + ) + """ ) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -368,7 +344,7 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None client = server.accept()[0] - client.sendall(PAYLOAD.format(tracker_call=tracker_call).encode("utf-8")) + client.sendall(PAYLOAD.encode("utf-8")) client.shutdown(socket.SHUT_WR) if not live_port: