Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add count option in heap chunks command to limit the number of chunks to process / output. #1029

Merged
merged 5 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/commands/heap.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ gef➤ heap chunks --min-size 16 --max-size 32

The range is inclusive, so the above command will display all chunks with size >=16 and <=32.

If heap chunks command still gives too many chunks, we can use `--count` argument to limit the number
of the chunks in the output:

```text
gef➤ heap chunks --count 1
```

![heap-chunks-size-filter](https://i.imgur.com/EinuDAt.png)

### `heap chunk` command

This command gives visual information of a Glibc malloc-ed chunked. Simply provide the address to
Expand Down
53 changes: 34 additions & 19 deletions gef.py
Original file line number Diff line number Diff line change
Expand Up @@ -6357,14 +6357,23 @@ def print(self) -> None:
for chunk_flag, chunk_summary in self.flag_distribution.items():
gef_print("{:<15s}\t{:<10d}\t{:<d}".format(chunk_flag, chunk_summary.count, chunk_summary.total_bytes))

class GlibcHeapWalkContext:
Grazfather marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, count: int = -1, resolve_type: bool = False, summary: bool = False) -> None:
self.print_arena = print_arena
self.allow_unaligned = allow_unaligned
self.min_size = min_size
self.max_size = max_size
self.remaining_chunk_count = count
self.summary = summary
self.resolve_type = resolve_type

@register
class GlibcHeapChunksCommand(GenericCommand):
"""Display all heap chunks for the current arena. As an optional argument
the base address of a different arena can be passed"""

_cmdline_ = "heap chunks"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [--min-size MIN_SIZE] [--max-size MAX_SIZE] [--resolve] [arena_address]"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [--min-size MIN_SIZE] [--max-size MAX_SIZE] [--count COUNT] [--resolve] [arena_address]"
_example_ = (f"\n{_cmdline_}"
f"\n{_cmdline_} 0x555555775000")

Expand All @@ -6373,49 +6382,50 @@ def __init__(self) -> None:
self["peek_nb_byte"] = (16, "Hexdump N first byte(s) inside the chunk data (0 to disable)")
return

@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--summary", "-s"): True, "--resolve": True})
@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--count", "-n"): -1, ("--summary", "-s"): True, "--resolve": True})
@only_if_gdb_running
def do_invoke(self, _: List[str], **kwargs: Any) -> None:
args = kwargs["arguments"]
ctx = GlibcHeapWalkContext(print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, count=args.count, resolve_type=args.resolve, summary=args.summary)
if args.all or not args.arena_address:
for arena in gef.heap.arenas:
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve)
self.dump_chunks_arena(arena, ctx)
if not args.all:
return
try:
arena_addr = parse_address(args.arena_address)
arena = GlibcArena(f"*{arena_addr:#x}")
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve)
self.dump_chunks_arena(arena, ctx)
except gdb.error:
err("Invalid arena")
return

def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> None:
heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned)
def dump_chunks_arena(self, arena: GlibcArena, ctx: GlibcHeapWalkContext) -> None:
heap_addr = arena.heap_addr(allow_unaligned=ctx.allow_unaligned)
if heap_addr is None:
err("Could not find heap for arena")
return
if print_arena:
if ctx.print_arena:
gef_print(str(arena))
if arena.is_main_arena():
heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type)
self.dump_chunks_heap(heap_addr, heap_end, arena, ctx)
else:
heap_info_structs = arena.get_heap_info_list() or []
for heap_info in heap_info_structs:
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type):
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, ctx):
break
return

def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> bool:
def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, ctx: GlibcHeapWalkContext) -> bool:
nb = self["peek_nb_byte"]
chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned)
heap_summary = GlibcHeapArenaSummary(resolve_type=resolve_type)
chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=ctx.allow_unaligned)
heap_summary = GlibcHeapArenaSummary(resolve_type=ctx.resolve_type)
for chunk in chunk_iterator:
heap_corrupted = chunk.base_address > end
should_process = self.should_process_chunk(chunk, min_size, max_size)
should_process = self.should_process_chunk(chunk, ctx)

if not summary and chunk.base_address == arena.top:
if not ctx.summary and chunk.base_address == arena.top:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of None would cause this to throw an Exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the code to fix this problem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK looks good, but technically you're passing a mutable object (the context) as the default value, which is bound to the function object. If it changes (and you take care not to) then subsequent calls would have this modified default. https://stackoverflow.com/questions/1132941/least-astonishment-and-the-mutable-default-argument

Since you don't modify it, maybe it's fine, but we run the risk of someone modifying it in the future. Probably OK though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it will never be null. The default value is added there just to follow existing code. We can also remove the default value, but I will have to move the parameter before the ones with default value. not sure if it is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i removed the default value and lifted the ctx parameter to the front.

if should_process:
gef_print(
f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}")
Expand All @@ -6428,24 +6438,29 @@ def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unalig
if not should_process:
continue

if summary:
if ctx.remaining_chunk_count == 0:
break

if ctx.summary:
heap_summary.process_chunk(chunk)
else:
line = str(chunk)
if nb:
line += f"\n [{hexdump(gef.memory.read(chunk.data_address, nb), nb, base=chunk.data_address)}]"
gef_print(line)

if summary:
ctx.remaining_chunk_count -= 1

if ctx.summary:
heap_summary.print()

return True

def should_process_chunk(self, chunk: GlibcChunk, min_size: int, max_size: int) -> bool:
if chunk.size < min_size:
def should_process_chunk(self, chunk: GlibcChunk, ctx: GlibcHeapWalkContext) -> bool:
if chunk.size < ctx.min_size:
return False

if 0 < max_size < chunk.size:
if 0 < ctx.max_size < chunk.size:
return False

return True
Expand Down
15 changes: 15 additions & 0 deletions tests/commands/heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ def test_cmd_heap_chunks_max_size_filter(self):
self.assertNoException(res)
self.assertNotIn("Chunk(addr=", res)

def test_cmd_heap_chunks_with_count(self):
cmd = "heap chunks --count 1"
target = debug_target("heap")
self.assertFailIfInactiveSession(gdb_run_cmd(cmd, target=target))
res = gdb_run_silent_cmd(cmd, target=target)
self.assertNoException(res)
self.assertIn("Chunk(addr=", res)

cmd = "heap chunks --count 0"
target = debug_target("heap")
self.assertFailIfInactiveSession(gdb_run_cmd(cmd, target=target))
res = gdb_run_silent_cmd(cmd, target=target)
self.assertNoException(res)
self.assertNotIn("Chunk(addr=", res)

def test_cmd_heap_bins_fast(self):
cmd = "heap bins fast"
before = ["set environment GLIBC_TUNABLES glibc.malloc.tcache_count=0"]
Expand Down
Loading