From d5d544db2fdc01fb38bd5fd4c07fd73afaebfd75 Mon Sep 17 00:00:00 2001 From: hugsy Date: Sat, 2 Nov 2024 13:28:54 -0700 Subject: [PATCH] `Optional` is optional --- gef.py | 319 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 154 insertions(+), 165 deletions(-) diff --git a/gef.py b/gef.py index 854076a15..af1151296 100644 --- a/gef.py +++ b/gef.py @@ -82,16 +82,15 @@ from functools import lru_cache from io import StringIO, TextIOWrapper from types import ModuleType -from typing import (Any, ByteString, Callable, Generator, Iterable, - Iterator, NoReturn, Optional, Sequence, Type, TypeVar, - cast) +from typing import (Any, ByteString, Callable, Generator, Iterable, Iterator, + NoReturn, Sequence, Type, TypeVar, cast) from urllib.request import urlopen GEF_DEFAULT_BRANCH = "main" GEF_EXTRAS_DEFAULT_BRANCH = "main" -def http_get(url: str) -> Optional[bytes]: +def http_get(url: str) -> bytes | None: """Basic HTTP wrapper for GET request. Return the body of the page if HTTP code is OK, otherwise return None.""" try: @@ -284,49 +283,49 @@ class ObsoleteException(Exception): pass class AlreadyRegisteredException(Exception): pass -def p8(x: int, s: bool = False, e: Optional["Endianness"] = None) -> bytes: +def p8(x: int, s: bool = False, e: "Endianness | None" = None) -> bytes: """Pack one byte respecting the current architecture endianness.""" endian = e or gef.arch.endianness return struct.pack(f"{endian}B", x) if not s else struct.pack(f"{endian:s}b", x) -def p16(x: int, s: bool = False, e: Optional["Endianness"] = None) -> bytes: +def p16(x: int, s: bool = False, e: "Endianness | None" = None) -> bytes: """Pack one word respecting the current architecture endianness.""" endian = e or gef.arch.endianness return struct.pack(f"{endian}H", x) if not s else struct.pack(f"{endian:s}h", x) -def p32(x: int, s: bool = False, e: Optional["Endianness"] = None) -> bytes: +def p32(x: int, s: bool = False, e: "Endianness | None" = None) -> bytes: """Pack one dword respecting the current architecture endianness.""" endian = e or gef.arch.endianness return struct.pack(f"{endian}I", x) if not s else struct.pack(f"{endian:s}i", x) -def p64(x: int, s: bool = False, e: Optional["Endianness"] = None) -> bytes: +def p64(x: int, s: bool = False, e: "Endianness | None" = None) -> bytes: """Pack one qword respecting the current architecture endianness.""" endian = e or gef.arch.endianness return struct.pack(f"{endian}Q", x) if not s else struct.pack(f"{endian:s}q", x) -def u8(x: bytes, s: bool = False, e: Optional["Endianness"] = None) -> int: +def u8(x: bytes, s: bool = False, e: "Endianness | None" = None) -> int: """Unpack one byte respecting the current architecture endianness.""" endian = e or gef.arch.endianness return struct.unpack(f"{endian}B", x)[0] if not s else struct.unpack(f"{endian:s}b", x)[0] -def u16(x: bytes, s: bool = False, e: Optional["Endianness"] = None) -> int: +def u16(x: bytes, s: bool = False, e: "Endianness | None" = None) -> int: """Unpack one word respecting the current architecture endianness.""" endian = e or gef.arch.endianness return struct.unpack(f"{endian}H", x)[0] if not s else struct.unpack(f"{endian:s}h", x)[0] -def u32(x: bytes, s: bool = False, e: Optional["Endianness"] = None) -> int: +def u32(x: bytes, s: bool = False, e: "Endianness | None" = None) -> int: """Unpack one dword respecting the current architecture endianness.""" endian = e or gef.arch.endianness return struct.unpack(f"{endian}I", x)[0] if not s else struct.unpack(f"{endian:s}i", x)[0] -def u64(x: bytes, s: bool = False, e: Optional["Endianness"] = None) -> int: +def u64(x: bytes, s: bool = False, e: "Endianness | None" = None) -> int: """Unpack one qword respecting the current architecture endianness.""" endian = e or gef.arch.endianness return struct.unpack(f"{endian}Q", x)[0] if not s else struct.unpack(f"{endian:s}q", x)[0] @@ -348,7 +347,7 @@ def is_alive() -> bool: return False -def calling_function() -> Optional[str]: +def calling_function() -> str | None: """Return the name of the calling function""" try: stack_info = traceback.extract_stack()[-3] @@ -452,7 +451,7 @@ def parse_arguments(required_arguments: dict[str | tuple[str, str], Any], def int_wrapper(x: str) -> int: return int(x, 0) - def decorator(f: Callable) -> Optional[Callable]: + def decorator(f: Callable) -> Callable | None: def wrapper(*args: Any, **kwargs: Any) -> Callable: parser = argparse.ArgumentParser(prog=args[0]._cmdline_, add_help=True) for argname in required_arguments: @@ -606,7 +605,7 @@ def is_in_stack_segment(self) -> bool: def is_in_heap_segment(self) -> bool: return hasattr(self.section, "path") and "[heap]" == self.section.path - def dereference(self) -> Optional[int]: + def dereference(self) -> int | None: addr = align_address(int(self.value)) derefed = dereference(addr) return None if derefed is None else int(derefed) @@ -693,7 +692,7 @@ def size(self) -> int: raise AttributeError return self.page_end - self.page_start - def _search_for_realpath_without_versions(self, path: pathlib.Path) -> Optional[str]: + def _search_for_realpath_without_versions(self, path: pathlib.Path) -> str | None: """Given a path, search for a file that exists without numeric suffixes.""" # Match the path string against a regex that will remove a suffix @@ -708,7 +707,7 @@ def _search_for_realpath_without_versions(self, path: pathlib.Path) -> Optional[ candidate = re.match(r"^(.*)\.(\d*)$", candidate) return None - def _search_for_realpath(self) -> Optional[str]: + def _search_for_realpath(self) -> str | None: """This function is a workaround for gdb bug #23764 path might be wrong for remote sessions, so try a simple search for files @@ -1199,7 +1198,7 @@ def _missing_(cls, _:int): sh_entsize: int name: str - def __init__(self, elf: Optional[Elf], off: int) -> None: + def __init__(self, elf: Elf | None, off: int) -> None: if elf is None: return elf.seek(off) @@ -1516,7 +1515,7 @@ def system_mem(self) -> int: def max_system_mem(self) -> int: return self.__arena.max_system_mem - def fastbin(self, i: int) -> Optional["GlibcFastChunk"]: + def fastbin(self, i: int) -> "GlibcFastChunk | None": """Return head chunk in fastbinsY[i].""" addr = int(self.fastbinsY[i]) if addr == 0: @@ -1537,7 +1536,7 @@ def bin_at(self, i) -> int: def is_main_arena(self) -> bool: return gef.heap.main_arena is not None and int(self) == int(gef.heap.main_arena) - def heap_addr(self, allow_unaligned: bool = False) -> Optional[int]: + def heap_addr(self, allow_unaligned: bool = False) -> int | None: if self.is_main_arena(): heap_section = gef.heap.base_address if not heap_section: @@ -1548,7 +1547,7 @@ def heap_addr(self, allow_unaligned: bool = False) -> Optional[int]: return _addr return gef.heap.malloc_align_address(_addr) - def get_heap_info_list(self) -> Optional[list[GlibcHeapInfo]]: + def get_heap_info_list(self) -> list[GlibcHeapInfo] | None: if self.is_main_arena(): return None heap_addr = self.get_heap_for_ptr(self.top) @@ -1827,7 +1826,7 @@ class GlibcTcacheChunk(GlibcFastChunk): def get_libc_version() -> tuple[int, ...]: return GefLibcManager.find_libc_version() -def titlify(text: str, color: Optional[str] = None, msg_color: Optional[str] = None) -> str: +def titlify(text: str, color: str | None = None, msg_color: str | None = None) -> str: """Print a centered title.""" _, cols = get_terminal_size() nb = (cols - len(text) - 2) // 2 @@ -2088,7 +2087,7 @@ def gef_makedirs(path: str, mode: int = 0o755) -> pathlib.Path: @lru_cache() -def gdb_lookup_symbol(sym: str) -> Optional[tuple[gdb.Symtab_and_line, ...]]: +def gdb_lookup_symbol(sym: str) -> tuple[gdb.Symtab_and_line, ...] | None: """Fetch the proper symbol or None if not defined.""" try: res = gdb.decode_line(sym)[1] # pylint: disable=E1136 @@ -2097,7 +2096,7 @@ def gdb_lookup_symbol(sym: str) -> Optional[tuple[gdb.Symtab_and_line, ...]]: return None @lru_cache(maxsize=512) -def gdb_get_location_from_symbol(address: int) -> Optional[tuple[str, int]]: +def gdb_get_location_from_symbol(address: int) -> tuple[str, int] | None: """Retrieve the location of the `address` argument from the symbol table. Return a tuple with the name and offset if found, None otherwise.""" # this is horrible, ugly hack and shitty perf... @@ -2148,7 +2147,7 @@ def gdb_disassemble(start_pc: int, **kwargs: int) -> Generator[Instruction, None yield Instruction(address, location, mnemo, operands, opcodes) -def gdb_get_nth_previous_instruction_address(addr: int, n: int) -> Optional[int]: +def gdb_get_nth_previous_instruction_address(addr: int, n: int) -> int | None: """Return the address (Integer) of the `n`-th instruction before `addr`.""" # fixed-length ABI if gef.arch.instruction_length: @@ -2288,7 +2287,7 @@ def get_arch() -> str: @deprecated("Use `gef.binary.entry_point` instead") -def get_entry_point() -> Optional[int]: +def get_entry_point() -> int | None: """Return the binary entry point.""" return gef.binary.entry_point if gef.binary else None @@ -2316,13 +2315,13 @@ def flags_to_human(reg_value: int, value_table: dict[int, str]) -> str: @lru_cache() -def get_section_base_address(name: str) -> Optional[int]: +def get_section_base_address(name: str) -> int | None: section = process_lookup_path(name) return section.page_start if section else None @lru_cache() -def get_zone_base_address(name: str) -> Optional[int]: +def get_zone_base_address(name: str) -> int | None: zone = file_lookup_name_path(name, get_filepath()) return zone.zone_start if zone else None @@ -2337,7 +2336,7 @@ def register_architecture(cls: Type["Architecture"]) -> Type["Architecture"]: class ArchitectureBase: """Class decorator for declaring an architecture to GEF.""" - aliases: tuple[str | Elf.Abi, ...] + aliases: tuple[str | Elf.Abi, ...] = () def __init_subclass__(cls: Type["ArchitectureBase"], **kwargs): global __registered_architectures__ @@ -2357,21 +2356,21 @@ class Architecture(ArchitectureBase): # Mandatory defined attributes by inheriting classes arch: str mode: str - all_registers: tuple[()] | tuple[str, ...] + all_registers: tuple[str, ...] nop_insn: bytes return_register: str - flag_register: Optional[str] - instruction_length: Optional[int] + flag_register: str | None + instruction_length: int | None flags_table: dict[int, str] - syscall_register: Optional[str] - syscall_instructions: tuple[()] | tuple[str, ...] - function_parameters: tuple[()] | tuple[str, ...] + syscall_register: str | None + syscall_instructions: tuple[str, ...] + function_parameters: tuple[str, ...] # Optionally defined attributes - _ptrsize: Optional[int] = None - _endianness: Optional[Endianness] = None + _ptrsize: int | None = None + _endianness: Endianness | None = None special_registers: tuple[()] | tuple[str, ...] = () - maps: Optional[GefMemoryMapProvider] = None + maps: GefMemoryMapProvider | None = None def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) @@ -2388,13 +2387,13 @@ def __repr__(self) -> str: return self.__str__() @staticmethod - def supports_gdb_arch(gdb_arch: str) -> Optional[bool]: + def supports_gdb_arch(gdb_arch: str) -> bool | None: """If implemented by a child `Architecture`, this function dictates if the current class supports the loaded ELF file (which can be accessed via `gef.binary`). This callback function will override any assumption made by GEF to determine the architecture.""" return None - def flag_register_to_human(self, val: Optional[int] = None) -> str: + def flag_register_to_human(self, val: int | None = None) -> str: raise NotImplementedError def is_call(self, insn: Instruction) -> bool: @@ -2409,7 +2408,7 @@ def is_conditional_branch(self, insn: Instruction) -> bool: def is_branch_taken(self, insn: Instruction) -> tuple[bool, str]: raise NotImplementedError - def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> Optional[int]: + def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> int | None: raise NotImplementedError def canary_address(self) -> int: @@ -2485,7 +2484,7 @@ def endianness(self) -> Endianness: raise OSError(f"No valid endianess found in '{output}'") return self._endianness - def get_ith_parameter(self, i: int, in_func: bool = True) -> tuple[str, Optional[int]]: + def get_ith_parameter(self, i: int, in_func: bool = True) -> tuple[str, int | None]: """Retrieves the correct parameter used for the current function call.""" reg = self.function_parameters[i] val = self.register(reg) @@ -2616,7 +2615,7 @@ def long_to_twos_complement(v: int) -> int: return taken, reason - def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> Optional[int]: + def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> int | None: ra = None if self.is_ret(insn): ra = gef.arch.register("$ra") @@ -2626,7 +2625,7 @@ def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> Optional[int]: ra = to_unsigned_long(older.pc()) return ra - def flag_register_to_human(self, val: Optional[int] = None) -> str: + def flag_register_to_human(self, val: int | None = None) -> str: # RISC-V has no flags registers, return an empty string to # preserve the Architecture API return "" @@ -2660,7 +2659,7 @@ def is_thumb(self) -> bool: return is_alive() and (self.cpsr & (1 << 5) == 1) @property - def pc(self) -> Optional[int]: + def pc(self) -> int | None: pc = gef.arch.register("$pc") if self.is_thumb(): pc += 1 @@ -2677,7 +2676,7 @@ def mode(self) -> str: return "THUMB" if self.is_thumb() else "ARM" @property - def instruction_length(self) -> Optional[int]: + def instruction_length(self) -> int | None: # Thumb instructions have variable-length (2 or 4-byte) return None if self.is_thumb() else 4 @@ -2702,7 +2701,7 @@ def is_ret(self, insn: Instruction) -> bool: return insn.operands[0] == "pc" return False - def flag_register_to_human(self, val: Optional[int] = None) -> str: + def flag_register_to_human(self, val: int | None = None) -> str: # https://www.botskool.com/user-pages/tutorials/electronics/arm-7-tutorial-part-1 if val is None: reg = self.flag_register @@ -2746,7 +2745,7 @@ def is_branch_taken(self, insn: Instruction) -> tuple[bool, str]: elif mnemo.endswith("cc"): taken, reason = not val&(1< Optional[int]: + def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> int | None: if not self.is_ret(insn): older = frame.older() if not older: @@ -2822,7 +2821,7 @@ def is_call(self, insn: Instruction) -> bool: call_mnemos = {"bl", "blr"} return mnemo in call_mnemos - def flag_register_to_human(self, val: Optional[int] = None) -> str: + def flag_register_to_human(self, val: int | None = None) -> str: # https://events.linuxfoundation.org/sites/events/files/slides/KoreaLinuxForum-2014.pdf reg = self.flag_register if not val: @@ -2942,7 +2941,7 @@ class X86(Architecture): _ptrsize = 4 _endianness = Endianness.LITTLE_ENDIAN - def flag_register_to_human(self, val: Optional[int] = None) -> str: + def flag_register_to_human(self, val: int | None = None) -> str: reg = self.flag_register if val is None: val = gef.arch.register(reg) @@ -3011,7 +3010,7 @@ def is_branch_taken(self, insn: Instruction) -> tuple[bool, str]: taken, reason = not val&(1< Optional[int]: + def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> int | None: ra = None if self.is_ret(insn): ra = dereference(gef.arch.sp) @@ -3039,7 +3038,7 @@ def mprotect_asm(cls, addr: int, size: int, perm: Permission) -> str: ] return "; ".join(insns) - def get_ith_parameter(self, i: int, in_func: bool = True) -> tuple[str, Optional[int]]: + def get_ith_parameter(self, i: int, in_func: bool = True) -> tuple[str, int | None]: if in_func: i += 1 # Account for RA being at the top of the stack sp = gef.arch.sp @@ -3128,7 +3127,7 @@ class PowerPC(Architecture): _ptrsize = 4 - def flag_register_to_human(self, val: Optional[int] = None) -> str: + def flag_register_to_human(self, val: int | None = None) -> str: # https://www.cebix.net/downloads/bebox/pem32b.pdf (% 2.1.3) if val is None: reg = self.flag_register @@ -3159,7 +3158,7 @@ def is_branch_taken(self, insn: Instruction) -> tuple[bool, str]: elif mnemo == "bgt": taken, reason = bool(val&(1< Optional[int]: + def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> int | None: ra = None if self.is_ret(insn): ra = gef.arch.register("$lr") @@ -3232,7 +3231,7 @@ class SPARC(Architecture): syscall_register = "%g1" syscall_instructions = ("t 0x10",) - def flag_register_to_human(self, val: Optional[int] = None) -> str: + def flag_register_to_human(self, val: int | None = None) -> str: # https://www.gaisler.com/doc/sparcv8.pdf reg = self.flag_register if val is None: @@ -3278,7 +3277,7 @@ def is_branch_taken(self, insn: Instruction) -> tuple[bool, str]: elif mnemo == "bcc": taken, reason = val&(1< Optional[int]: + def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> int | None: ra = None if self.is_ret(insn): ra = gef.arch.register("$o7") @@ -3376,7 +3375,7 @@ class MIPS(Architecture): syscall_register = "$v0" syscall_instructions = ("syscall",) - def flag_register_to_human(self, val: Optional[int] = None) -> str: + def flag_register_to_human(self, val: int | None = None) -> str: return Color.colorify("No flag register", "yellow underline") def is_call(self, insn: Instruction) -> bool: @@ -3412,7 +3411,7 @@ def is_branch_taken(self, insn: Instruction) -> tuple[bool, str]: taken, reason = gef.arch.register(ops[0]) <= 0, f"{ops[0]} <= 0" return taken, reason - def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> Optional[int]: + def get_ra(self, insn: Instruction, frame: "gdb.Frame") -> int | None: ra = None if self.is_ret(insn): ra = gef.arch.register("$ra") @@ -3446,7 +3445,7 @@ class MIPS64(MIPS): _ptrsize = 8 @staticmethod - def supports_gdb_arch(gdb_arch: str) -> Optional[bool]: + def supports_gdb_arch(gdb_arch: str) -> bool | None: if not gef.binary or not isinstance(gef.binary, Elf): return False return gdb_arch.startswith("mips") and gef.binary.e_class == Elf.Class.ELF_64_BITS @@ -3501,7 +3500,7 @@ def to_unsigned_long(v: gdb.Value) -> int: return int(v.cast(gdb.Value(mask).type)) & mask -def get_path_from_info_proc() -> Optional[str]: +def get_path_from_info_proc() -> str | None: for x in (gdb.execute("info proc", to_string=True) or "").splitlines(): if x.startswith("exe = "): return x.split(" = ")[1].replace("'", "") @@ -3537,7 +3536,7 @@ def is_qemu_system() -> bool: return "received: \"\"" in response -def get_filepath() -> Optional[str]: +def get_filepath() -> str | None: """Return the local absolute path of the file currently debugged.""" if gef.session.remote: return str(gef.session.remote.lfile.absolute()) @@ -3581,7 +3580,7 @@ def get_info_files() -> list[Zone]: return infos -def process_lookup_address(address: int) -> Optional[Section]: +def process_lookup_address(address: int) -> Section | None: """Look up for an address in memory. Return an Address object if found, None otherwise.""" if not is_alive(): @@ -3600,7 +3599,7 @@ def process_lookup_address(address: int) -> Optional[Section]: @lru_cache() -def process_lookup_path(name: str, perm: Permission = Permission.ALL) -> Optional[Section]: +def process_lookup_path(name: str, perm: Permission = Permission.ALL) -> Section | None: """Look up for a path in the process memory mapping. Return a Section object if found, None otherwise.""" if not is_alive(): @@ -3632,7 +3631,7 @@ def process_lookup_path(name: str, perm: Permission = Permission.ALL) -> Optiona @lru_cache() -def file_lookup_name_path(name: str, path: str) -> Optional[Zone]: +def file_lookup_name_path(name: str, path: str) -> Zone | None: """Look up a file by name and path. Return a Zone object if found, None otherwise.""" for xfile in get_info_files(): @@ -3642,7 +3641,7 @@ def file_lookup_name_path(name: str, path: str) -> Optional[Zone]: @lru_cache() -def file_lookup_address(address: int) -> Optional[Zone]: +def file_lookup_address(address: int) -> Zone | None: """Look up for a file by its address. Return a Zone object if found, None otherwise.""" for info in get_info_files(): @@ -3688,7 +3687,7 @@ def hook_stop_handler(_: "gdb.StopEvent") -> None: return -def new_objfile_handler(evt: Optional["gdb.NewObjFileEvent"]) -> None: +def new_objfile_handler(evt: "gdb.NewObjFileEvent | None") -> None: """GDB event handler for new object file cases.""" reset_all_caches() progspace = gdb.current_progspace() @@ -3828,7 +3827,7 @@ def is_arch(arch: Elf.Abi) -> bool: return arch in gef.arch.aliases -def reset_architecture(arch: Optional[str] = None) -> None: +def reset_architecture(arch: str | None = None) -> None: """Sets the current architecture. If an architecture is explicitly specified by parameter, try to use that one. If this fails, an `OSError` exception will occur. @@ -3869,7 +3868,7 @@ def reset_architecture(arch: Optional[str] = None) -> None: @lru_cache() -def cached_lookup_type(_type: str) -> Optional[gdb.Type]: +def cached_lookup_type(_type: str) -> gdb.Type | None: try: return gdb.lookup_type(_type).strip_typedefs() except RuntimeError: @@ -3993,7 +3992,7 @@ def generate_cyclic_pattern(length: int, cycle: int = 4) -> bytearray: return bytearray(itertools.islice(de_bruijn(charset, cycle), length)) -def safe_parse_and_eval(value: str) -> Optional["gdb.Value"]: +def safe_parse_and_eval(value: str) -> "gdb.Value | None": """GEF wrapper for gdb.parse_and_eval(): this function returns None instead of raising gdb.error if the eval failed.""" try: @@ -4004,7 +4003,7 @@ def safe_parse_and_eval(value: str) -> Optional["gdb.Value"]: @lru_cache() -def dereference(addr: int) -> Optional["gdb.Value"]: +def dereference(addr: int) -> "gdb.Value | None": """GEF wrapper for gdb dereference function.""" try: ulong_t = cached_lookup_type(use_stdtype()) or \ @@ -4086,7 +4085,7 @@ def gef_getpagesize() -> int: @deprecated("Use `gef.session.canary`") -def gef_read_canary() -> Optional[tuple[int, int]]: +def gef_read_canary() -> tuple[int, int] | None: return gef.session.canary @@ -4102,12 +4101,12 @@ def get_filename() -> str: @deprecated("Use `gef.heap.main_arena`") -def get_glibc_arena() -> Optional[GlibcArena]: +def get_glibc_arena() -> GlibcArena | None: return gef.heap.main_arena @deprecated("Use `gef.arch.register(regname)`") -def get_register(regname) -> Optional[int]: +def get_register(regname) -> int | None: return gef.arch.register(regname) @@ -4117,7 +4116,7 @@ def get_process_maps() -> list[Section]: @deprecated("Use `reset_architecture`") -def set_arch(arch: Optional[str] = None, _: Optional[str] = None) -> None: +def set_arch(arch: str | None = None, _: str | None = None) -> None: return reset_architecture(arch) # @@ -4282,7 +4281,7 @@ def stop(self) -> bool: class StubBreakpoint(gdb.Breakpoint): """Create a breakpoint to permanently disable a call (fork/alarm/signal/etc.).""" - def __init__(self, func: str, retval: Optional[int]) -> None: + def __init__(self, func: str, retval: int | None) -> None: super().__init__(func, gdb.BP_BREAKPOINT, internal=False) self.func = func self.retval = retval @@ -4612,7 +4611,7 @@ def __init__(self, loc: str) -> None: # Context Panes # -def register_external_context_pane(pane_name: str, display_pane_function: Callable[[], None], pane_title_function: Callable[[], Optional[str]], condition : Optional[Callable[[], bool]] = None) -> None: +def register_external_context_pane(pane_name: str, display_pane_function: Callable[[], None], pane_title_function: Callable[[], str | None], condition : Callable[[], bool] | None = None) -> None: """ Registering function for new GEF Context View. pane_name: a string that has no spaces (used in settings) @@ -4633,7 +4632,7 @@ def pane_title(): gef.gdb.add_context_pane(pane_name, display_pane_function, pane_title_function, condition) return -def register_external_context_layout_mapping(current_pane_name: str, display_pane_function: Callable[[], None], pane_title_function: Callable[[], Optional[str]], condition : Optional[Callable[[], bool]] = None) -> None: +def register_external_context_layout_mapping(current_pane_name: str, display_pane_function: Callable[[], None], pane_title_function: Callable[[], str | None], condition : Callable[[], bool] | None = None) -> None: gef.gdb.add_context_layout_mapping(current_pane_name, display_pane_function, pane_title_function, condition) return @@ -4778,7 +4777,7 @@ def __contains__(self, name: str) -> bool: def add_setting(self, name: str, value: tuple[Any, type, str], description: str = "") -> None: return self.__setitem__(name, (value, description)) - def __setitem__(self, name: str, value: "GefSetting" | tuple[Any, str]) -> None: + def __setitem__(self, name: str, value: "GefSetting | tuple[Any, str]") -> None: # make sure settings are always associated to the root command (which derives from GenericCommand) if "GenericCommand" not in [x.__name__ for x in self.__class__.__bases__]: return @@ -5713,7 +5712,7 @@ def structures(self) -> Generator[tuple["ExternalStructureManager.Module", "Exte return @lru_cache() - def find(self, structure_name: str) -> Optional[tuple["ExternalStructureManager.Module", "ExternalStructureManager.Structure"]]: + def find(self, structure_name: str) -> tuple["ExternalStructureManager.Module", "ExternalStructureManager.Structure"] | None: """Return the module and structure for the given structure name; `None` if the structure name was not found.""" for module in self.modules.values(): if structure_name in module: @@ -6937,7 +6936,7 @@ def check_thread_ids(self, tids: list[int]) -> list[int]: existing_tids = set(t.num for t in gdb.selected_inferior().threads()) return list(set(tids) & existing_tids) - def tcachebin(self, tcache_base: int, i: int) -> tuple[Optional[GlibcTcacheChunk], int]: + def tcachebin(self, tcache_base: int, i: int) -> tuple[GlibcTcacheChunk | None, int]: """Return the head chunk in tcache[i] and the number of chunks in the bin.""" if i >= self.TCACHE_MAX_BINS: err("Incorrect index value, index value must be between 0 and " @@ -7594,7 +7593,7 @@ class ContextCommand(GenericCommand): _syntax_ = f"{_cmdline_} [legend|regs|stack|code|args|memory|source|trace|threads|extra]" _aliases_ = ["ctx",] - old_registers: dict[str, Optional[int]] = {} + old_registers: dict[str, int | None] = {} def __init__(self) -> None: super().__init__() @@ -7622,7 +7621,7 @@ def __init__(self) -> None: self["libc_args"] = (False, "[DEPRECATED - Unused] Show libc function call args description") self["libc_args_path"] = ("", "[DEPRECATED - Unused] Path to libc function call args json files, provided via gef-extras") - self.layout_mapping: dict[str, tuple[Callable, Optional[Callable], Optional[Callable]]] = { + self.layout_mapping: dict[str, tuple[Callable, Callable | None, Callable | None]] = { "legend": (self.show_legend, None, None), "regs": (self.context_regs, None, None), "stack": (self.context_stack, None, None), @@ -7705,7 +7704,7 @@ def do_invoke(self, argv: list[str]) -> None: disable_redirect_output() return - def context_title(self, m: Optional[str]) -> None: + def context_title(self, m: str | None) -> None: # allow for not displaying a title line if m is None: return @@ -7959,7 +7958,7 @@ def print_arguments_from_symbol(self, function_name: str, symbol: "gdb.Symbol") def print_guessed_arguments(self, function_name: str) -> None: """When no symbol, read the current basic block and look for "interesting" instructions.""" - def __get_current_block_start_address() -> Optional[int]: + def __get_current_block_start_address() -> int | None: pc = gef.arch.pc try: block = gdb.block_for_pc(pc) @@ -8411,7 +8410,7 @@ class HexdumpCommand(GenericCommand): def __init__(self) -> None: super().__init__(complete=gdb.COMPLETE_LOCATION, prefix=True) self["always_show_ascii"] = (False, "If true, hexdump will always display the ASCII dump") - self.format: Optional[str] = None + self.format: str | None = None self.__last_target = "$sp" return @@ -8550,7 +8549,7 @@ class PatchCommand(GenericCommand): def __init__(self) -> None: super().__init__(prefix=True, complete=gdb.COMPLETE_LOCATION) - self.format: Optional[str] = None + self.format: str | None = None return @only_if_gdb_running @@ -9945,8 +9944,8 @@ def __init__(self) -> None: gef.config["gef.propagate_debug_exception"] = GefSetting(False, bool, "If true, when debug mode is enabled, Python exceptions will be propagated all the way.") gef.config["gef.extra_python_package_paths"] = GefSetting("", str, "Semi-colon separate list of extra paths to include for python packages") - self.commands : dict[str, GenericCommand] = collections.OrderedDict() - self.functions : dict[str, GenericFunction] = collections.OrderedDict() + self.commands : dict[str, GenericCommand] = {} + self.functions : dict[str, GenericFunction] = {} self.missing: dict[str, Exception] = {} return @@ -9981,7 +9980,7 @@ def setup(self) -> None: GefRestoreCommand() return - def load_extra_plugins(self, extra_plugins_dir: Optional[pathlib.Path] = None) -> int: + def load_extra_plugins(self, extra_plugins_dir: pathlib.Path | None = None) -> int: """Load the plugins from the gef-extras setting. Returns the number of new plugins added.""" def load_plugin(fpath: pathlib.Path) -> bool: try: @@ -10035,7 +10034,7 @@ def invoke(self, args: Any, from_tty: bool) -> None: gdb.execute("gef help") return - def add_context_layout_mapping(self, current_pane_name: str, display_pane_function: Callable, pane_title_function: Callable, condition: Optional[Callable]) -> None: + def add_context_layout_mapping(self, current_pane_name: str, display_pane_function: Callable, pane_title_function: Callable, condition: Callable | None) -> None: """Add a new context layout mapping.""" context = self.commands["context"] assert isinstance(context, ContextCommand) @@ -10043,7 +10042,7 @@ def add_context_layout_mapping(self, current_pane_name: str, display_pane_functi # overload the printing of pane title context.layout_mapping[current_pane_name] = (display_pane_function, pane_title_function, condition) - def add_context_pane(self, pane_name: str, display_pane_function: Callable, pane_title_function: Callable, condition: Optional[Callable]) -> None: + def add_context_pane(self, pane_name: str, display_pane_function: Callable, pane_title_function: Callable, condition: Callable | None) -> None: """Add a new context pane to ContextCommand.""" context = self.commands["context"] assert isinstance(context, ContextCommand) @@ -10489,7 +10488,7 @@ def invoke(self, args: Any, from_tty: bool) -> None: gdb.execute(f"{self.command} {args}", from_tty=from_tty) return - def lookup_command(self, cmd: str) -> Optional[tuple[str, GenericCommand]]: + def lookup_command(self, cmd: str) -> tuple[str, GenericCommand] | None: global gef for _name, _instance in gef.gdb.commands.items(): if cmd == _name: @@ -10740,10 +10739,10 @@ def __init__(self) -> None: def reset_caches(self) -> None: super().reset_caches() - self.__maps: Optional[list[Section]] = None + self.__maps: list[Section] | None = None return - def write(self, address: int, buffer: ByteString, length: Optional[int] = None) -> None: + def write(self, address: int, buffer: ByteString, length: int | None = None) -> None: """Write `buffer` at address `address`.""" length = length or len(buffer) gdb.selected_inferior().write_memory(address, buffer, length) @@ -10762,7 +10761,7 @@ def read_integer(self, addr: int) -> int: def read_cstring(self, address: int, max_length: int = GEF_MAX_STRING_LENGTH, - encoding: Optional[str] = None) -> str: + encoding: str | None = None) -> str: """Return a C-string read from memory.""" encoding = encoding or "unicode-escape" length = min(address | (DEFAULT_PAGE_SIZE-1), max_length+1) @@ -10803,7 +10802,7 @@ def read_cstring(self, return f"{ustr[:max_length]}[...]" return ustr - def read_ascii_string(self, address: int) -> Optional[str]: + def read_ascii_string(self, address: int) -> str | None: """Read an ASCII string from memory""" cstr = self.read_cstring(address) if isinstance(cstr, str) and cstr and all(x in string.printable for x in cstr): @@ -10819,7 +10818,7 @@ def maps(self) -> list[Section]: self.__maps = maps return self.__maps - def __parse_maps(self) -> Optional[list[Section]]: + def __parse_maps(self) -> list[Section] | None: """Return the mapped memory sections. If the current arch has its maps method defined, then defer to that to generated maps, otherwise, try to figure it out from procfs, then info sections, then monitor info @@ -11002,13 +11001,13 @@ def __init__(self) -> None: return def reset_caches(self) -> None: - self.__libc_main_arena: Optional[GlibcArena] = None - self.__libc_selected_arena: Optional[GlibcArena] = None + self.__libc_main_arena: GlibcArena | None = None + self.__libc_selected_arena: GlibcArena | None = None self.__heap_base = None return @property - def main_arena(self) -> Optional[GlibcArena]: + def main_arena(self) -> GlibcArena | None: if not self.__libc_main_arena: try: __main_arena_addr = GefHeapManager.find_main_arena_addr() @@ -11104,7 +11103,7 @@ def search_filter(zone: Zone) -> bool: raise OSError(err_msg) @property - def selected_arena(self) -> Optional[GlibcArena]: + def selected_arena(self) -> GlibcArena | None: if not self.__libc_selected_arena: # `selected_arena` must default to `main_arena` self.__libc_selected_arena = self.main_arena @@ -11122,7 +11121,7 @@ def arenas(self) -> list | Iterator[GlibcArena]: return iter(self.main_arena) @property - def base_address(self) -> Optional[int]: + def base_address(self) -> int | None: if not self.__heap_base: base = 0 try: @@ -11175,7 +11174,7 @@ def ceil(n: float) -> int: class GefSetting: """Basic class for storing gef settings as objects""" - def __init__(self, value: Any, cls: Optional[type] = None, description: Optional[str] = None, hooks: Optional[dict[str, list[Callable]]] = None) -> None: + def __init__(self, value: Any, cls: type | None = None, description: str | None = None, hooks: dict[str, list[Callable]] | None = None) -> None: self.value = value self.type = cls or type(value) self.description = description or "" @@ -11276,7 +11275,7 @@ class GefSessionManager(GefManager): """Class managing the runtime properties of GEF. """ def __init__(self) -> None: self.reset_caches() - self.remote: Optional["GefRemoteSessionManager"] = None + self.remote: "GefRemoteSessionManager | None" = None self.remote_initializing: bool = False self.qemu_mode: bool = False self.convenience_vars_index: int = 0 @@ -11299,8 +11298,8 @@ def reset_caches(self) -> None: self._os = None self._pid = None self._file = None - self._maps: Optional[pathlib.Path] = None - self._root: Optional[pathlib.Path] = None + self._maps: pathlib.Path | None = None + self._root: pathlib.Path | None = None return def __str__(self) -> str: @@ -11311,7 +11310,7 @@ def __repr__(self) -> str: return str(self) @property - def auxiliary_vector(self) -> Optional[dict[str, int]]: + def auxiliary_vector(self) -> dict[str, int] | None: if not is_alive(): return None if is_qemu_system(): @@ -11351,7 +11350,7 @@ def pid(self) -> int: return self._pid @property - def file(self) -> Optional[pathlib.Path]: + def file(self) -> pathlib.Path | None: """Return a Path object of the target process.""" if self.remote is not None: return self.remote.file @@ -11363,7 +11362,7 @@ def file(self) -> Optional[pathlib.Path]: return self._file @property - def cwd(self) -> Optional[pathlib.Path]: + def cwd(self) -> pathlib.Path | None: if self.remote is not None: return self.remote.root return self.file.parent if self.file else None @@ -11378,7 +11377,7 @@ def pagesize(self) -> int: return self._pagesize @property - def canary(self) -> Optional[tuple[int, int]]: + def canary(self) -> tuple[int, int] | None: """Return a tuple of the canary address and value, read from the canonical location if supported by the architecture. Otherwise, read from the auxiliary vector.""" @@ -11392,7 +11391,7 @@ def canary(self) -> Optional[tuple[int, int]]: return canary, canary_location @property - def original_canary(self) -> Optional[tuple[int, int]]: + def original_canary(self) -> tuple[int, int] | None: """Return a tuple of the initial canary address and value, read from the auxiliary vector.""" auxval = self.auxiliary_vector @@ -11404,7 +11403,7 @@ def original_canary(self) -> Optional[tuple[int, int]]: return canary, canary_location @property - def maps(self) -> Optional[pathlib.Path]: + def maps(self) -> pathlib.Path | None: """Returns the Path to the procfs entry for the memory mapping.""" if not is_alive(): return None @@ -11416,7 +11415,7 @@ def maps(self) -> Optional[pathlib.Path]: return self._maps @property - def root(self) -> Optional[pathlib.Path]: + def root(self) -> pathlib.Path | None: """Returns the path to the process's root directory.""" if not is_alive(): return None @@ -11450,7 +11449,7 @@ def prompt_string(self) -> str: return Color.boldify("(remote) ") raise AttributeError("Unknown value") - def __init__(self, host: str, port: int, pid: int =-1, qemu: Optional[pathlib.Path] = None) -> None: + def __init__(self, host: str, port: int, pid: int =-1, qemu: pathlib.Path | None = None) -> None: super().__init__() self.__host = host self.__port = port @@ -11519,7 +11518,7 @@ def maps(self) -> pathlib.Path: def mode(self) -> RemoteMode: return self._mode - def sync(self, src: str, dst: Optional[str] = None) -> bool: + def sync(self, src: str, dst: str | None = None) -> bool: """Copy the `src` into the temporary chroot. If `dst` is provided, that path will be used instead of `src`.""" if not dst: @@ -11657,9 +11656,9 @@ def remote_objfile_event_handler(self, evt: "gdb.NewObjFileEvent") -> None: class GefUiManager(GefManager): """Class managing UI settings.""" def __init__(self) -> None: - self.redirect_fd : Optional[TextIOWrapper] = None + self.redirect_fd : TextIOWrapper | None = None self.context_hidden = False - self.stream_buffer : Optional[StringIO] = None + self.stream_buffer : StringIO | None = None self.highlight_table: dict[str, str] = {} self.watches: dict[int, tuple[int, str]] = {} self.context_messages: list[tuple[str, str]] = [] @@ -11672,16 +11671,16 @@ class GefLibcManager(GefManager): PATTERN_LIBC_VERSION_FILENAME = re.compile(r"libc6?[-_](\d+)\.(\d+)\.so") def __init__(self) -> None: - self._version : Optional[tuple[int, int]] = None - self._patch: Optional[int] = None - self._release: Optional[str] = None + self._version : tuple[int, int] | None = None + self._patch: int | None = None + self._release: str | None = None return def __str__(self) -> str: return f"Libc(version='{self.version}')" @property - def version(self) -> Optional[tuple[int, int]]: + def version(self) -> tuple[int, int] | None: if not is_alive(): return None @@ -11723,7 +11722,7 @@ def find_libc_version() -> tuple[int, int]: class Gef: """The GEF root class, which serves as a entrypoint for all the debugging session attributes (architecture, memory, settings, etc.).""" - binary: Optional[FileFormat] + binary: FileFormat | None arch: Architecture config : GefSettingsManager ui: GefUiManager @@ -11734,7 +11733,7 @@ class Gef: gdb: GefCommand def __init__(self) -> None: - self.binary: Optional[FileFormat] = None + self.binary: FileFormat | None = None self.arch: Architecture = GenericArchitecture() # see PR #516, will be reset by `new_objfile_handler` self.arch_reason: str = "This is the default architecture" self.config = GefSettingsManager() @@ -11769,32 +11768,14 @@ def reset_caches(self) -> None: def target_remote_posthook(): - conn = gdb.selected_inferior().connection - # if here, the connection should be established - if not isinstance(conn, gdb.RemoteTargetConnection): - raise EnvironmentError("Failed to create a remote session") - - # if gef.session.remote_initializing: - # dbg(f"remote session already initializing: {gef.session}") - # return - - print(f"{conn.description=}") - print(f"{conn.details=}") - print(f"{conn.type=}") - - assert conn.details - host, port = conn.details.split(":") - pid = ( - gdb.selected_inferior().pid - if False - else gdb.selected_thread().ptid[1] - ) - print(host, port, pid) - gef.session.remote = GefRemoteSessionManager(host, int(port), pid) - gef.session.remote._mode = GefRemoteSessionManager.RemoteMode.GDBSERVER + if gef.session.remote_initializing: + return + gef.session.remote = GefRemoteSessionManager("", 0) if not gef.session.remote.setup(): - raise EnvironmentError(f"Failed to create a proper environment for {gef.session.remote}") + raise EnvironmentError( + f"Failed to create a proper environment for {gef.session.remote}" + ) if __name__ == "__main__": if sys.version_info[0] == 2: @@ -11839,14 +11820,6 @@ def target_remote_posthook(): # reload settings gdb.execute("gef restore") - # extend `sys.path` for venv compat - if gef.config["gef.extra_python_package_paths"]: - for path in map(pathlib.Path, gef.config["gef.extra_python_package_paths"].split(";")): - if not path.exists(): - warn(f"Skipping invalid directory {path}") - continue - sys.path.extend(str(path.absolute())) - # setup gdb prompt gdb.prompt_hook = __gef_prompt__ @@ -11865,16 +11838,32 @@ def target_remote_posthook(): GefTmuxSetup() - # Register a post-hook for `target remote` that initialize the remote session - hook = """ - define target hookpost-{} - pi target_remote_posthook() - context - pi res = calling_function(); if res != "connect": err("Failed to initialize remote session: " + str(res)) - end - """ - gdb.execute(hook.format("remote")) - gdb.execute(hook.format("extended-remote")) + if GDB_VERSION > (9, 0): + disable_tr_overwrite_setting = "gef.disable_target_remote_overwrite" + + if not gef.config[disable_tr_overwrite_setting]: + warnmsg = ("Using `target remote` with GEF should work in most cases, " + "but use `gef-remote` if you can. You can disable the " + "overwrite of the `target remote` command by toggling " + f"`{disable_tr_overwrite_setting}` in the config.") + hook = f""" + define target hookpost-{{}} + pi target_remote_posthook() + context + pi if calling_function() != "connect": warn("{warnmsg}") + end + """ + + # Register a post-hook for `target remote` that initialize the remote session + gdb.execute(hook.format("remote")) + gdb.execute(hook.format("extended-remote")) + else: + errmsg = ("Using `target remote` does not work, use `gef-remote` " + f"instead. You can toggle `{disable_tr_overwrite_setting}` " + "if this is not desired.") + hook = f"""pi if calling_function() != "connect": err("{errmsg}")""" + gdb.execute(f"define target hook-remote\n{hook}\nend") + gdb.execute(f"define target hook-extended-remote\n{hook}\nend") # restore saved breakpoints (if any) bkp_fpath = pathlib.Path(gef.config["gef.autosave_breakpoints_file"]).expanduser().absolute()