Skip to content

Commit

Permalink
Add option to resolve type when dumping heap summary. (#1030)
Browse files Browse the repository at this point in the history
This change adds `--resolve` option to resolve the type and categorize
the chunks, using vtable in the chunk.

Since multiple types of objects could share the same size, also it is
impossible for anyone to remember the size of all the objects in the
system, categorize and output the chunks by object type will become very
handy when debugging issues like memory leaks.
  • Loading branch information
r12f authored Dec 22, 2023
1 parent 663d4a2 commit e629f02
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
11 changes: 11 additions & 0 deletions docs/commands/heap.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ gef➤ heap chunks --summary

![heap-chunks-summary](https://i.imgur.com/3HTgtwX.png)

Sometimes, multiple types of objects could have the same size, hence it might not be enough only
knowing the chunk size when debugging issues like memory leaks. GEF supports using the vtable to
determine the type of the object stored in the chunk. To enable this feature, use `--resolve` along
with the `--summary` flag.

```text
gef➤ heap chunks --summary --resolve
```

![heap-chunks-summary-resolve](https://i.imgur.com/2Mm0JF6.png)

Heap chunk command also supports filtering chunks by their size. To do so, simply provide the
`--min-size` or `--max-size` argument:

Expand Down
48 changes: 30 additions & 18 deletions gef.py
Original file line number Diff line number Diff line change
Expand Up @@ -1710,6 +1710,15 @@ def psprint(self) -> str:
msg.append(f"\n\n{self._str_pointers()}")
return "\n".join(msg) + "\n"

def resolve_type(self) -> str:
ptr_data = gef.memory.read_integer(self.data_address)
if ptr_data != 0:
sym = gdb_get_location_from_symbol(ptr_data)
if sym is not None and "vtable for" in sym[0]:
return sym[0].replace("vtable for ", "")

return ""


class GlibcFastChunk(GlibcChunk):

Expand Down Expand Up @@ -1999,7 +2008,6 @@ def gdb_lookup_symbol(sym: str) -> Optional[Tuple[Optional[str], Optional[Tuple[
except gdb.error:
return None


@lru_cache(maxsize=512)
def gdb_get_location_from_symbol(address: int) -> Optional[Tuple[str, int]]:
"""Retrieve the location of the `address` argument from the symbol table.
Expand Down Expand Up @@ -6302,7 +6310,8 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None:


class GlibcHeapChunkSummary:
def __init__(self):
def __init__(self, desc = ""):
self.desc = desc
self.count = 0
self.total_bytes = 0

Expand All @@ -6312,7 +6321,8 @@ def process_chunk(self, chunk: GlibcChunk) -> None:


class GlibcHeapArenaSummary:
def __init__(self) -> None:
def __init__(self, resolve_type = False) -> None:
self.resolve_symbol = resolve_type
self.size_distribution = {}
self.flag_distribution = {
"PREV_INUSE": GlibcHeapChunkSummary(),
Expand All @@ -6321,10 +6331,12 @@ def __init__(self) -> None:
}

def process_chunk(self, chunk: GlibcChunk) -> None:
per_size_summary = self.size_distribution.get(chunk.size, None)
chunk_type = "" if not self.resolve_symbol else chunk.resolve_type()

per_size_summary = self.size_distribution.get((chunk.size, chunk_type), None)
if per_size_summary is None:
per_size_summary = GlibcHeapChunkSummary()
self.size_distribution[chunk.size] = per_size_summary
per_size_summary = GlibcHeapChunkSummary(desc=chunk_type)
self.size_distribution[(chunk.size, chunk_type)] = per_size_summary
per_size_summary.process_chunk(chunk)

if chunk.has_p_bit():
Expand All @@ -6336,9 +6348,9 @@ def process_chunk(self, chunk: GlibcChunk) -> None:

def print(self) -> None:
gef_print("== Chunk distribution by size ==")
gef_print("{:<10s}\t{:<10s}\t{:s}".format("ChunkBytes", "Count", "TotalBytes"))
for chunk_size, chunk_summary in sorted(self.size_distribution.items(), key=lambda x: x[1].total_bytes, reverse=True):
gef_print("{:<10d}\t{:<10d}\t{:<d}".format(chunk_size, chunk_summary.count, chunk_summary.total_bytes))
gef_print("{:<10s}\t{:<10s}\t{:15s}\t{:s}".format("ChunkBytes", "Count", "TotalBytes", "Description"))
for chunk_info, chunk_summary in sorted(self.size_distribution.items(), key=lambda x: x[1].total_bytes, reverse=True):
gef_print("{:<10d}\t{:<10d}\t{:<15d}\t{:s}".format(chunk_info[0], chunk_summary.count, chunk_summary.total_bytes, chunk_summary.desc))

gef_print("\n== Chunk distribution by flag ==")
gef_print("{:<15s}\t{:<10s}\t{:s}".format("Flag", "TotalCount", "TotalBytes"))
Expand All @@ -6352,7 +6364,7 @@ class GlibcHeapChunksCommand(GenericCommand):
the base address of a different arena can be passed"""

_cmdline_ = "heap chunks"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [--min-size MIN_SIZE] [--max-size MAX_SIZE] [arena_address]"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [--min-size MIN_SIZE] [--max-size MAX_SIZE] [--resolve] [arena_address]"
_example_ = (f"\n{_cmdline_}"
f"\n{_cmdline_} 0x555555775000")

Expand All @@ -6361,24 +6373,24 @@ def __init__(self) -> None:
self["peek_nb_byte"] = (16, "Hexdump N first byte(s) inside the chunk data (0 to disable)")
return

@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--summary", "-s"): True})
@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--summary", "-s"): True, "--resolve": True})
@only_if_gdb_running
def do_invoke(self, _: List[str], **kwargs: Any) -> None:
args = kwargs["arguments"]
if args.all or not args.arena_address:
for arena in gef.heap.arenas:
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary)
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve)
if not args.all:
return
try:
arena_addr = parse_address(args.arena_address)
arena = GlibcArena(f"*{arena_addr:#x}")
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary)
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve)
except gdb.error:
err("Invalid arena")
return

def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False) -> None:
def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> None:
heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned)
if heap_addr is None:
err("Could not find heap for arena")
Expand All @@ -6387,18 +6399,18 @@ def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_
gef_print(str(arena))
if arena.is_main_arena():
heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary)
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type)
else:
heap_info_structs = arena.get_heap_info_list() or []
for heap_info in heap_info_structs:
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary):
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type):
break
return

def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False) -> bool:
def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> bool:
nb = self["peek_nb_byte"]
chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned)
heap_summary = GlibcHeapArenaSummary()
heap_summary = GlibcHeapArenaSummary(resolve_type=resolve_type)
for chunk in chunk_iterator:
heap_corrupted = chunk.base_address > end
should_process = self.should_process_chunk(chunk, min_size, max_size)
Expand Down
8 changes: 8 additions & 0 deletions tests/commands/heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ def test_cmd_heap_chunks_summary(self):
self.assertIn("== Chunk distribution by size", res)
self.assertIn("== Chunk distribution by flag", res)

def test_cmd_heap_chunks_summary_with_type_resolved(self):
cmd = "heap chunks --summary --resolve"
target = _target("class")
res = gdb_run_silent_cmd(cmd, target=target, before=["b B<TraitA, TraitB>::Run()"])
self.assertNoException(res)
self.assertIn("== Chunk distribution by size", res)
self.assertIn("B<TraitA, TraitB>", res)

def test_cmd_heap_chunks_min_size_filter(self):
cmd = "heap chunks --min-size 16"
target = _target("heap")
Expand Down

0 comments on commit e629f02

Please sign in to comment.