From e629f02fa36747d42763ce8651b8e80f4f34f148 Mon Sep 17 00:00:00 2001 From: Riff Date: Fri, 22 Dec 2023 07:17:10 -0800 Subject: [PATCH] Add option to resolve type when dumping heap summary. (#1030) This change adds `--resolve` option to resolve the type and categorize the chunks, using vtable in the chunk. Since multiple types of objects could share the same size, also it is impossible for anyone to remember the size of all the objects in the system, categorize and output the chunks by object type will become very handy when debugging issues like memory leaks. --- docs/commands/heap.md | 11 ++++++++++ gef.py | 48 ++++++++++++++++++++++++++---------------- tests/commands/heap.py | 8 +++++++ 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/docs/commands/heap.md b/docs/commands/heap.md index a9c164dc3..7fa12c513 100644 --- a/docs/commands/heap.md +++ b/docs/commands/heap.md @@ -78,6 +78,17 @@ gef➤ heap chunks --summary ![heap-chunks-summary](https://i.imgur.com/3HTgtwX.png) +Sometimes, multiple types of objects could have the same size, hence it might not be enough only +knowing the chunk size when debugging issues like memory leaks. GEF supports using the vtable to +determine the type of the object stored in the chunk. To enable this feature, use `--resolve` along +with the `--summary` flag. + +```text +gef➤ heap chunks --summary --resolve +``` + +![heap-chunks-summary-resolve](https://i.imgur.com/2Mm0JF6.png) + Heap chunk command also supports filtering chunks by their size. To do so, simply provide the `--min-size` or `--max-size` argument: diff --git a/gef.py b/gef.py index 218dcc266..2e7a785fd 100644 --- a/gef.py +++ b/gef.py @@ -1710,6 +1710,15 @@ def psprint(self) -> str: msg.append(f"\n\n{self._str_pointers()}") return "\n".join(msg) + "\n" + def resolve_type(self) -> str: + ptr_data = gef.memory.read_integer(self.data_address) + if ptr_data != 0: + sym = gdb_get_location_from_symbol(ptr_data) + if sym is not None and "vtable for" in sym[0]: + return sym[0].replace("vtable for ", "") + + return "" + class GlibcFastChunk(GlibcChunk): @@ -1999,7 +2008,6 @@ def gdb_lookup_symbol(sym: str) -> Optional[Tuple[Optional[str], Optional[Tuple[ except gdb.error: return None - @lru_cache(maxsize=512) def gdb_get_location_from_symbol(address: int) -> Optional[Tuple[str, int]]: """Retrieve the location of the `address` argument from the symbol table. @@ -6302,7 +6310,8 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None: class GlibcHeapChunkSummary: - def __init__(self): + def __init__(self, desc = ""): + self.desc = desc self.count = 0 self.total_bytes = 0 @@ -6312,7 +6321,8 @@ def process_chunk(self, chunk: GlibcChunk) -> None: class GlibcHeapArenaSummary: - def __init__(self) -> None: + def __init__(self, resolve_type = False) -> None: + self.resolve_symbol = resolve_type self.size_distribution = {} self.flag_distribution = { "PREV_INUSE": GlibcHeapChunkSummary(), @@ -6321,10 +6331,12 @@ def __init__(self) -> None: } def process_chunk(self, chunk: GlibcChunk) -> None: - per_size_summary = self.size_distribution.get(chunk.size, None) + chunk_type = "" if not self.resolve_symbol else chunk.resolve_type() + + per_size_summary = self.size_distribution.get((chunk.size, chunk_type), None) if per_size_summary is None: - per_size_summary = GlibcHeapChunkSummary() - self.size_distribution[chunk.size] = per_size_summary + per_size_summary = GlibcHeapChunkSummary(desc=chunk_type) + self.size_distribution[(chunk.size, chunk_type)] = per_size_summary per_size_summary.process_chunk(chunk) if chunk.has_p_bit(): @@ -6336,9 +6348,9 @@ def process_chunk(self, chunk: GlibcChunk) -> None: def print(self) -> None: gef_print("== Chunk distribution by size ==") - gef_print("{:<10s}\t{:<10s}\t{:s}".format("ChunkBytes", "Count", "TotalBytes")) - for chunk_size, chunk_summary in sorted(self.size_distribution.items(), key=lambda x: x[1].total_bytes, reverse=True): - gef_print("{:<10d}\t{:<10d}\t{: None: self["peek_nb_byte"] = (16, "Hexdump N first byte(s) inside the chunk data (0 to disable)") return - @parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--summary", "-s"): True}) + @parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--summary", "-s"): True, "--resolve": True}) @only_if_gdb_running def do_invoke(self, _: List[str], **kwargs: Any) -> None: args = kwargs["arguments"] if args.all or not args.arena_address: for arena in gef.heap.arenas: - self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary) + self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve) if not args.all: return try: arena_addr = parse_address(args.arena_address) arena = GlibcArena(f"*{arena_addr:#x}") - self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary) + self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve) except gdb.error: err("Invalid arena") return - def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False) -> None: + def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> None: heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned) if heap_addr is None: err("Could not find heap for arena") @@ -6387,18 +6399,18 @@ def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_ gef_print(str(arena)) if arena.is_main_arena(): heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size - self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary) + self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type) else: heap_info_structs = arena.get_heap_info_list() or [] for heap_info in heap_info_structs: - if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary): + if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type): break return - def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False) -> bool: + def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> bool: nb = self["peek_nb_byte"] chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned) - heap_summary = GlibcHeapArenaSummary() + heap_summary = GlibcHeapArenaSummary(resolve_type=resolve_type) for chunk in chunk_iterator: heap_corrupted = chunk.base_address > end should_process = self.should_process_chunk(chunk, min_size, max_size) diff --git a/tests/commands/heap.py b/tests/commands/heap.py index c13261e57..fc3453bc7 100644 --- a/tests/commands/heap.py +++ b/tests/commands/heap.py @@ -103,6 +103,14 @@ def test_cmd_heap_chunks_summary(self): self.assertIn("== Chunk distribution by size", res) self.assertIn("== Chunk distribution by flag", res) + def test_cmd_heap_chunks_summary_with_type_resolved(self): + cmd = "heap chunks --summary --resolve" + target = _target("class") + res = gdb_run_silent_cmd(cmd, target=target, before=["b B::Run()"]) + self.assertNoException(res) + self.assertIn("== Chunk distribution by size", res) + self.assertIn("B", res) + def test_cmd_heap_chunks_min_size_filter(self): cmd = "heap chunks --min-size 16" target = _target("heap")