diff --git a/docs/commands/heap.md b/docs/commands/heap.md index 34dd076bf..170a3b5d5 100644 --- a/docs/commands/heap.md +++ b/docs/commands/heap.md @@ -9,6 +9,28 @@ for `malloc` structure information). Syntax to the subcommands is straight forwa gef➤ heap ``` +## `main_arena` symbol ### + +If the linked glibc of the target program does not have debugging symbols it +might be tricky for GEF to find the address of the `main_arena` which is needed +for most of the `heap` subcommands. If you know the offset of this symbol from +the glibc base address you can use GEF's config to provide said value: + +``` +gef➤ gef config gef.main_arena_offset +``` + +If you do not know this offset and you want GEF to try and find it via bruteforce +when executing a `heap` command the next time, you can try this instead: + +``` +gef➤ gef config gef.bruteforce_main_arena True +``` + +Note that this might take a few seconds to complete. If GEF does find the symbol +you can then calculate the offset to the libc base address and save it in the config. + + ### `heap chunks` command ### Displays all the chunks from the `heap` section of the current arena. diff --git a/gef.py b/gef.py index e95a25633..a37930b08 100644 --- a/gef.py +++ b/gef.py @@ -1193,30 +1193,9 @@ def is_valid(self) -> bool: def size(self) -> int: return len(self.opcodes) - -@lru_cache() +@deprecated("Use GefHeapManager.find_main_arena_addr()") def search_for_main_arena() -> int: - """A helper function to find the libc `main_arena` address, either from symbol or from its offset - from `__malloc_hook`.""" - try: - addr = parse_address(f"&{LIBC_HEAP_MAIN_ARENA_DEFAULT_NAME}") - - except gdb.error: - malloc_hook_addr = parse_address("(void *)&__malloc_hook") - - struct_size = ctypes.sizeof(GlibcArena.malloc_state_t()) - - if is_x86(): - addr = align_address_to_size(malloc_hook_addr + gef.arch.ptrsize, 0x20) - elif is_arch(Elf.Abi.AARCH64): - addr = malloc_hook_addr - gef.arch.ptrsize*2 - struct_size - elif is_arch(Elf.Abi.ARM): - addr = malloc_hook_addr - gef.arch.ptrsize - struct_size - else: - raise OSError(f"Cannot find main_arena for {gef.arch.arch}") - - return addr - + return GefHeapManager.find_main_arena_addr() class GlibcHeapInfo: """Glibc heap_info struct""" @@ -1225,14 +1204,27 @@ class GlibcHeapInfo: def heap_info_t() -> Type[ctypes.Structure]: class heap_info_cls(ctypes.Structure): pass - pointer = ctypes.c_uint64 if gef and gef.arch.ptrsize == 8 else ctypes.c_uint32 - heap_info_cls._fields_ = [ + pointer = ctypes.c_uint64 if gef.arch.ptrsize == 8 else ctypes.c_uint32 + pad_size = -5 * gef.arch.ptrsize & (gef.heap.malloc_alignment - 1) + fields = [ ("ar_ptr", ctypes.POINTER(GlibcArena.malloc_state_t())), ("prev", ctypes.POINTER(heap_info_cls)), - ("size", pointer), - ("mprotect_size", pointer), - ("pad", pointer), + ("size", pointer) ] + if gef.libc.version >= (2, 5): + fields += [ + ("mprotect_size", pointer) + ] + pad_size = -6 * gef.arch.ptrsize & (gef.heap.malloc_alignment - 1) + if gef.libc.version >= (2, 34): + fields += [ + ("pagesize", pointer) + ] + pad_size = -3 * gef.arch.ptrsize & (gef.heap.malloc_alignment - 1) + fields += [ + ("pad", ctypes.c_uint8*pad_size) + ] + heap_info_cls._fields_ = fields return heap_info_cls def __init__(self, addr: Union[str, int]) -> None: @@ -1242,13 +1234,13 @@ def __init__(self, addr: Union[str, int]) -> None: def reset(self): self._sizeof = ctypes.sizeof(GlibcHeapInfo.heap_info_t()) - self._data = gef.memory.read(self.__address, ctypes.sizeof(GlibcArena.malloc_state_t())) - self.__heap_info = GlibcArena.malloc_state_t().from_buffer_copy(self._data) + self._data = gef.memory.read(self.__address, ctypes.sizeof(GlibcHeapInfo.heap_info_t())) + self._heap_info = GlibcHeapInfo.heap_info_t().from_buffer_copy(self._data) return def __getattr__(self, item: Any) -> Any: - if item in dir(self.__heap_info): - return getattr(self.__heap_info, item) + if item in dir(self._heap_info): + return ctypes.cast(getattr(self._heap_info, item), ctypes.c_void_p).value return getattr(self, item) def __abs__(self) -> int: @@ -1269,6 +1261,27 @@ def sizeof(self) -> int: def addr(self) -> int: return int(self) + @property + def heap_start(self) -> int: + # check special case: first heap of non-main-arena + if self.ar_ptr - self.address < 0x60: + # the first heap of a non-main-arena starts with a `heap_info` + # struct, which should fit easily into 0x60 bytes throughout + # all architectures and glibc versions. If this check succeeds + # then we are currently looking at such a "first heap" + arena = GlibcArena(f"*{self.ar_ptr:#x}") + heap_addr = arena.heap_addr() + if heap_addr: + return heap_addr + else: + err(f"Cannot find heap address for arena {self.ar_ptr:#x}") + return 0 + return self.address + self.sizeof + + @property + def heap_end(self) -> int: + return self.address + self.size + class GlibcArena: """Glibc arena class""" @@ -1319,8 +1332,8 @@ def __init__(self, addr: str) -> None: try: self.__address : int = parse_address(f"&{addr}") except gdb.error: - self.__address : int = search_for_main_arena() - # if `search_for_main_arena` throws `gdb.error` on symbol lookup: + self.__address : int = GefHeapManager.find_main_arena_addr() + # if `find_main_arena_addr` throws `gdb.error` on symbol lookup: # it means the session is not started, so just propagate the exception self.reset() return @@ -1451,7 +1464,7 @@ def get_heap_info_list(self) -> Optional[List[GlibcHeapInfo]]: return None heap_addr = self.get_heap_for_ptr(self.top) heap_infos = [GlibcHeapInfo(heap_addr)] - while heap_infos[-1].prev != 0: + while heap_infos[-1].prev is not None: prev = int(heap_infos[-1].prev) heap_info = GlibcHeapInfo(prev) heap_infos.append(heap_info) @@ -1468,6 +1481,20 @@ def get_heap_for_ptr(ptr: int) -> int: heap_max_size = 2 * default_mmap_threshold_max return ptr & ~(heap_max_size - 1) + @staticmethod + def verify(addr: int) -> bool: + """Verify that the address matches a possible valid GlibcArena""" + try: + test_arena = GlibcArena(f"*{addr:#x}") + cur_arena = GlibcArena(f"*{test_arena.next:#x}") + while cur_arena != test_arena: + if cur_arena == 0: + return False + cur_arena = GlibcArena(f"*{cur_arena.next:#x}") + except Exception as e: + return False + return True + class GlibcChunk: """Glibc chunk class. The default behavior (from_base=False) is to interpret the data starting at the memory @@ -1694,25 +1721,9 @@ class GlibcTcacheChunk(GlibcFastChunk): pass - -@lru_cache() +@deprecated("Use GefLibcManager.find_libc_version()") def get_libc_version() -> Tuple[int, ...]: - sections = gef.memory.maps - for section in sections: - match = re.search(r"libc6?[-_](\d+)\.(\d+)\.so", section.path) - if match: - return tuple(int(_) for _ in match.groups()) - if "libc" in section.path: - try: - with open(section.path, "rb") as f: - data = f.read() - except OSError: - continue - match = re.search(PATTERN_LIBC_VERSION, data) - if match: - return tuple(int(_) for _ in match.groups()) - return 0, 0 - + return GefLibcManager.find_libc_version() def titlify(text: str, color: Optional[str] = None, msg_color: Optional[str] = None) -> str: """Print a centered title.""" @@ -6055,7 +6066,7 @@ def do_invoke(self, _: List[str]) -> None: @register class GlibcHeapSetArenaCommand(GenericCommand): - """Display information on a heap chunk.""" + """Set the address of the main_arena or the currently selected arena.""" _cmdline_ = "heap set-arena" _syntax_ = f"{_cmdline_} [address|&symbol]" @@ -6066,28 +6077,33 @@ def __init__(self) -> None: return @only_if_gdb_running - def do_invoke(self, argv: List[str]) -> None: + @parse_arguments({"addr": ""}, {"--reset": True}) + def do_invoke(self, _: List[str], **kwargs: Any) -> None: global gef - if not argv: - ok(f"Current arena set to: '{gef.heap.selected_arena}'") + args: argparse.Namespace = kwargs["arguments"] + + if args.reset: + gef.heap.reset_caches() return - if is_hex(argv[0]): - new_arena_address = int(argv[0], 16) - else: - new_arena_symbol = safe_parse_and_eval(argv[0]) - if not new_arena_symbol: - err("Invalid symbol for arena") - return - new_arena_address = to_unsigned_long(new_arena_symbol) + if not args.addr: + ok(f"Current arena set to: '{gef.heap.selected_arena}'") + return - new_arena = GlibcArena( f"*{new_arena_address:#x}") - if new_arena not in gef.heap.arenas: - err("Invalid arena") + try: + new_arena_address = parse_address(args.addr) + except gdb.error: + err("Invalid symbol for arena") return - gef.heap.selected_arena = new_arena + new_arena = GlibcArena( f"*{new_arena_address:#x}") + if new_arena in gef.heap.arenas: + # if entered arena is in arena list then just select it + gef.heap.selected_arena = new_arena + else: + # otherwise set the main arena to the entered arena + gef.heap.main_arena = new_arena return @@ -6168,10 +6184,18 @@ def __init__(self) -> None: @only_if_gdb_running def do_invoke(self, _: List[str], **kwargs: Any) -> None: args = kwargs["arguments"] - for arena in gef.heap.arenas: - self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned) - if not args.all: - break + if args.all or not args.arena_address: + for arena in gef.heap.arenas: + self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned) + if not args.all: + return + try: + arena_addr = parse_address(args.arena_address) + arena = GlibcArena(f"*{arena_addr:#x}") + self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned) + except gdb.error: + err("Invalid arena") + return def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False) -> None: heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned) @@ -6181,19 +6205,16 @@ def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_ if print_arena: gef_print(str(arena)) if arena.is_main_arena(): - self.dump_chunks_heap(heap_addr, arena, allow_unaligned=allow_unaligned) + heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size + self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned) else: heap_info_structs = arena.get_heap_info_list() or [] - first_heap_info = heap_info_structs.pop(0) - heap_info_t_size = int(arena) - first_heap_info.addr - if self.dump_chunks_heap(heap_addr, arena, allow_unaligned=allow_unaligned): - for heap_info in heap_info_structs: - start = heap_info.addr + heap_info_t_size - if not self.dump_chunks_heap(start, arena, allow_unaligned=allow_unaligned): - break + for heap_info in heap_info_structs: + if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned): + break return - def dump_chunks_heap(self, start: int, arena: GlibcArena, allow_unaligned: bool = False) -> bool: + def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False) -> bool: nb = self["peek_nb_byte"] chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned) for chunk in chunk_iterator: @@ -6202,7 +6223,7 @@ def dump_chunks_heap(self, start: int, arena: GlibcArena, allow_unaligned: bool f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}") break - if chunk.base_address > arena.top: + if chunk.base_address > end: err("Corrupted heap, cannot continue.") return False @@ -6293,7 +6314,7 @@ def __init__(self) -> None: @only_if_gdb_running def do_invoke(self, argv: List[str]) -> None: # Determine if we are using libc with tcache built in (2.26+) - if get_libc_version() < (2, 26): + if gef.libc.version and gef.libc.version < (2, 26): info("No Tcache in this version of libc") return @@ -9367,6 +9388,8 @@ def __init__(self) -> None: gef.config["gef.tempdir"] = GefSetting(GEF_TEMP_DIR, str, "Directory to use for temporary/cache content") gef.config["gef.show_deprecation_warnings"] = GefSetting(True, bool, "Toggle the display of the `deprecated` warnings") gef.config["gef.buffer"] = GefSetting(True, bool, "Internally buffer command output until completion") + gef.config["gef.bruteforce_main_arena"] = GefSetting(False, bool, "Allow bruteforcing main_arena symbol if everything else fails") + gef.config["gef.main_arena_offset"] = GefSetting("", str, "Offset from libc base address to main_arena symbol (int or hex). Set to empty string to disable.") self.commands : Dict[str, GenericCommand] = collections.OrderedDict() self.functions : Dict[str, GenericFunction] = collections.OrderedDict() @@ -10315,7 +10338,7 @@ def reset_caches(self) -> None: def main_arena(self) -> Optional[GlibcArena]: if not self.__libc_main_arena: try: - __main_arena_addr = search_for_main_arena() + __main_arena_addr = GefHeapManager.find_main_arena_addr() self.__libc_main_arena = GlibcArena(f"*{__main_arena_addr:#x}") # the initialization of `main_arena` also defined `selected_arena`, so # by default, `main_arena` == `selected_arena` @@ -10325,11 +10348,90 @@ def main_arena(self) -> Optional[GlibcArena]: pass return self.__libc_main_arena + @main_arena.setter + def main_arena(self, value: GlibcArena) -> None: + self.__libc_main_arena = value + return + + @staticmethod + @lru_cache() + def find_main_arena_addr() -> int: + """A helper function to find the glibc `main_arena` address, either from + symbol, from its offset from `__malloc_hook` or by brute force.""" + # Before anything else, use libc offset from config if available + if gef.config["gef.main_arena_offset"]: + try: + libc_base = get_section_base_address("libc") + offset = parse_address(gef.config["gef.main_arena_offset"]) + if libc_base: + dbg(f"Using main_arena_offset={offset:#x} from config") + addr = libc_base + offset + + # Verify the found address before returning + if GlibcArena.verify(addr): + return addr + except gdb.error: + pass + + # First, try to find `main_arena` symbol directly + try: + return parse_address(f"&{LIBC_HEAP_MAIN_ARENA_DEFAULT_NAME}") + except gdb.error: + pass + + # Second, try to find it by offset from `__malloc_hook` + if gef.libc.version < (2, 34): + try: + malloc_hook_addr = parse_address("(void *)&__malloc_hook") + + struct_size = ctypes.sizeof(GlibcArena.malloc_state_t()) + + if is_x86(): + addr = align_address_to_size(malloc_hook_addr + gef.arch.ptrsize, 0x20) + elif is_arch(Elf.Abi.AARCH64): + addr = malloc_hook_addr - gef.arch.ptrsize*2 - struct_size + elif is_arch(Elf.Abi.ARM): + addr = malloc_hook_addr - gef.arch.ptrsize - struct_size + else: + addr = None + + # Verify the found address before returning + if addr and GlibcArena.verify(addr): + return addr + + except gdb.error: + pass + + # Last resort, try to find it via brute force if enabled in settings + if gef.config["gef.bruteforce_main_arena"]: + alignment = 0x8 + try: + dbg("Trying to bruteforce main_arena address") + # setup search_range for `main_arena` to `.data` of glibc + search_filter = lambda f: "libc" in f.filename and f.name == ".data" + dotdata = list(filter(search_filter, get_info_files()))[0] + search_range = range(dotdata.zone_start, dotdata.zone_end, alignment) + # find first possible candidate + for addr in search_range: + if GlibcArena.verify(addr): + dbg(f"Found candidate at {addr:#x}") + return addr + dbg("Bruteforce not successful") + except Exception: + pass + + # Nothing helped + err_msg = f"Cannot find main_arena for {gef.arch.arch}. You might want to set a manually found libc offset " + if not gef.config["gef.bruteforce_main_arena"]: + err_msg += "or allow bruteforcing " + err_msg += "through the GEF config." + raise OSError(err_msg) + @property def selected_arena(self) -> Optional[GlibcArena]: if not self.__libc_selected_arena: # `selected_arena` must default to `main_arena` - self.__libc_selected_arena = self.__libc_main_arena + self.__libc_selected_arena = self.main_arena return self.__libc_selected_arena @selected_arena.setter @@ -10371,7 +10473,7 @@ def min_chunk_size(self) -> int: @property def malloc_alignment(self) -> int: __default_malloc_alignment = 0x10 - if get_libc_version() > (2, 25) and is_x86_32(): + if gef.libc.version >= (2, 26) and is_x86_32(): # Special case introduced in Glibc 2.26: # https://elixir.bootlin.com/glibc/glibc-2.26/source/sysdeps/i386/malloc-alignment.h#L22 return __default_malloc_alignment @@ -10830,9 +10932,30 @@ def version(self) -> Optional[Tuple[int, int]]: if not is_alive(): return None if not self._version: - self._version = get_libc_version() + self._version = GefLibcManager.find_libc_version() return self._version + @staticmethod + @lru_cache() + def find_libc_version() -> Tuple[int, ...]: + sections = gef.memory.maps + for section in sections: + match = re.search(r"libc6?[-_](\d+)\.(\d+)\.so", section.path) + if match: + return tuple(int(_) for _ in match.groups()) + if "libc" in section.path: + try: + with open(section.path, "rb") as f: + data = f.read() + except OSError: + continue + match = re.search(PATTERN_LIBC_VERSION, data) + if match: + return tuple(int(_) for _ in match.groups()) + return 0, 0 + + + class Gef: """The GEF root class, which serves as a entrypoint for all the debugging session attributes (architecture, diff --git a/tests/commands/heap.py b/tests/commands/heap.py index 52458f0e6..b624a0eda 100644 --- a/tests/commands/heap.py +++ b/tests/commands/heap.py @@ -64,6 +64,14 @@ def test_cmd_heap_chunks(self): self.assertIn("Chunk(addr=", res) self.assertIn("top chunk", res) + cmd = "heap chunks" + target = _target("heap-non-main") + res = gdb_run_silent_cmd(cmd, target=target) + self.assertNoException(res) + self.assertIn("Chunk(addr=", res) + self.assertIn("top chunk", res) + chunks = [line for line in res.splitlines() if "Chunk(addr=" in line] + cmd = "python gdb.execute(f'heap chunks {int(list(gef.heap.arenas)[1]):#x}')" target = _target("heap-non-main") res = gdb_run_silent_cmd(cmd, target=target) @@ -71,6 +79,9 @@ def test_cmd_heap_chunks(self): self.assertNotIn("using '&main_arena' instead", res) self.assertIn("Chunk(addr=", res) self.assertIn("top chunk", res) + non_main_chunks = [line for line in res.splitlines() if "Chunk(addr=" in line] + # make sure that the chunks of each arena are distinct + self.assertNotEqual(chunks, non_main_chunks) def test_cmd_heap_chunks_mult_heaps(self): @@ -107,7 +118,7 @@ def test_cmd_heap_bins_large(self): def test_cmd_heap_bins_non_main(self): - cmd = "python gdb.execute(f'heap bins fast {gef.heap.main_arena.addr:#x}')" + cmd = "python gdb.execute(f'heap bins fast {gef.heap.main_arena.next:#x}')" before = ["set environment GLIBC_TUNABLES glibc.malloc.tcache_count=0"] target = _target("heap-non-main") res = gdb_run_silent_cmd(cmd, before=before, target=target)