Skip to content

Commit

Permalink
checkpoint: added new remote modes, gdbserver & gdbserver-multi work
Browse files Browse the repository at this point in the history
hugsy committed Nov 3, 2024
1 parent 9fb1d71 commit a277661
Showing 1 changed file with 122 additions and 72 deletions.
194 changes: 122 additions & 72 deletions gef.py
Original file line number Diff line number Diff line change
@@ -273,11 +273,13 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
return wrapper


class ValidationError(Exception): pass

#
# Helpers
#
class ValidationError(Exception): pass

class InitializationError(Exception): pass

class ObsoleteException(Exception): pass

@@ -347,10 +349,10 @@ def is_alive() -> bool:
return False


def calling_function() -> str | None:
def calling_function(frame: int = 3) -> str | None:
"""Return the name of the calling function"""
try:
stack_info = traceback.extract_stack()[-3]
stack_info = traceback.extract_stack()[-frame]
return stack_info.name
except Exception as e:
dbg(f"traceback failed with {str(e)}")
@@ -3512,30 +3514,57 @@ def get_os() -> str:
return gef.session.os


@lru_cache()
def is_qemu() -> bool:
if not is_remote_debug():
def is_target_remote(conn: gdb.TargetConnection | None = None) -> bool:
"Returns True for `extended-remote` only."
_conn = conn or gdb.selected_inferior().connection
return isinstance(_conn, gdb.RemoteTargetConnection) and _conn.type == "remote"


def is_target_extended_remote(conn: gdb.TargetConnection | None = None) -> bool:
"Returns True for `extended-remote` only."
_conn = conn or gdb.selected_inferior().connection
return isinstance(_conn, gdb.RemoteTargetConnection) and _conn.type == "extended-remote"


def is_target_remote_or_extended(conn: gdb.TargetConnection | None = None) -> bool:
return is_target_remote(conn) or is_target_extended_remote(conn)


def is_running_under_qemu() -> bool:
"See https://www.qemu.org/docs/master/system/gdb.html "
if not is_target_remote():
return False
response = gdb.execute("maintenance packet Qqemu.sstepbits", to_string=True, from_tty=False) or ""
return "ENABLE=" in response


@lru_cache()
def is_qemu_usermode() -> bool:
if not is_qemu():
def is_running_under_qemu_user() -> bool:
if not is_running_under_qemu():
return False
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or ""
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" # Use `qAttached`?
return "Text=" in response


@lru_cache()
def is_qemu_system() -> bool:
if not is_qemu():
def is_running_under_qemu_system() -> bool:
if not is_running_under_qemu():
return False
# Use "maintenance packet qqemu.PhyMemMode"?
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or ""
return "received: \"\"" in response


def is_running_in_gdbserver() -> bool:
if is_running_under_qemu():
return False
return not is_running_under_qemu()


def is_running_in_rr() -> bool:
if not is_running_in_gdbserver():
return False
return os.environ.get("GDB_UNDER_RR", None) == "1"


def get_filepath() -> str | None:
"""Return the local absolute path of the file currently debugged."""
if gef.session.remote:
@@ -3960,6 +3989,7 @@ def is_in_x86_kernel(address: int) -> bool:
return (address >> memalign) == 0xF


@deprecated("Use `is_target_remote()`")
def is_remote_debug() -> bool:
""""Return True is the current debugging session is running through GDB remote session."""
return gef.session.remote_initializing or gef.session.remote is not None
@@ -6294,7 +6324,10 @@ def do_invoke(self, _: list[str], **kwargs: Any) -> None:
# calls `is_remote_debug` which checks if `remote_initializing` is True or `.remote` is None
# This prevents some spurious errors being thrown during startup
gef.session.remote_initializing = True
session = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary)
# session = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary)
conn = gdb.selected_inferior().connection
assert isinstance(conn, gdb.RemoteTargetConnection)
session = GefRemoteSessionManager(conn)

dbg(f"[remote] initializing remote session with {session.target} under {session.root}")
if not session.connect(args.pid) or not session.setup():
@@ -7414,7 +7447,7 @@ def __init__(self) -> None:
def do_invoke(self, _: list[str], **kwargs: Any) -> None:
args : argparse.Namespace = kwargs["arguments"]

if is_qemu_system():
if is_running_under_qemu_system():
err("Unsupported")
return

@@ -11312,7 +11345,7 @@ def __repr__(self) -> str:
def auxiliary_vector(self) -> dict[str, int] | None:
if not is_alive():
return None
if is_qemu_system():
if is_running_under_qemu_system():
return None
if not self._auxiliary_vector:
auxiliary_vector = {}
@@ -11431,6 +11464,9 @@ class RemoteMode(enum.IntEnum):
GDBSERVER = 0
QEMU = 1
RR = 2
GDBSERVER_MULTI = 3
QEMU_USER = 4
QEMU_SYSTEM = 5

def __str__(self):
return self.name
@@ -11440,30 +11476,48 @@ def __repr__(self):

def prompt_string(self) -> str:
match self:
case GefRemoteSessionManager.RemoteMode.QEMU:
case (
GefRemoteSessionManager.RemoteMode.QEMU |
GefRemoteSessionManager.RemoteMode.QEMU_USER |
GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM
):
return Color.boldify("(qemu) ")
case GefRemoteSessionManager.RemoteMode.RR:
return Color.boldify("(rr) ")
case GefRemoteSessionManager.RemoteMode.GDBSERVER:
case (
GefRemoteSessionManager.RemoteMode.GDBSERVER |
GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI
):
return Color.boldify("(remote) ")
raise AttributeError("Unknown value")

def __init__(self, host: str, port: int, pid: int =-1, qemu: pathlib.Path | None = None) -> None:
@staticmethod
def init() -> "GefRemoteSessionManager.RemoteMode":
if is_running_under_qemu_system():
return GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM
if is_running_under_qemu_user():
return GefRemoteSessionManager.RemoteMode.QEMU_USER
if is_running_in_rr():
return GefRemoteSessionManager.RemoteMode.RR
if is_running_in_gdbserver():
if is_target_extended_remote():
return GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI
return GefRemoteSessionManager.RemoteMode.GDBSERVER
raise AttributeError

def __init__(self, conn: gdb.RemoteTargetConnection) -> None:
super().__init__()
self.__host = host
self.__port = port
assert is_target_remote_or_extended()
remote_host = conn.details
assert remote_host
host, port = remote_host.split(":", 1)
self.__host = host or "localhost"
self.__port = int(port)
self.__local_root_fd = tempfile.TemporaryDirectory()
self.__local_root_path = pathlib.Path(self.__local_root_fd.name)
self.__qemu = qemu
if pid > 0:
self._pid = pid
self._mode = GefRemoteSessionManager.RemoteMode.init()

if self.__qemu is not None:
self._mode = GefRemoteSessionManager.RemoteMode.QEMU
elif os.environ.get("GDB_UNDER_RR", None) == "1":
self._mode = GefRemoteSessionManager.RemoteMode.RR
else:
self._mode = GefRemoteSessionManager.RemoteMode.GDBSERVER
self.setup()

def close(self) -> None:
self.__local_root_fd.cleanup()
@@ -11475,7 +11529,11 @@ def close(self) -> None:
raise

def __str__(self) -> str:
return f"RemoteSession(target='{self.target}', local='{self.root}', pid={self.pid}, mode={self.mode})"
msg = f"RemoteSession(target='{self.target}', local='{self.root}', mode={self.mode}"
if self.mode == GefRemoteSessionManager.RemoteMode.GDBSERVER:
msg += f", pid={self.pid}"
msg += ")"
return msg

def __repr__(self) -> str:
return str(self)
@@ -11561,16 +11619,18 @@ def connect(self, pid: int) -> bool:

def setup(self) -> bool:
# setup remote adequately depending on remote or qemu mode
dbg(f"Setting up the {self._mode} session")
match self.mode:
case GefRemoteSessionManager.RemoteMode.QEMU:
dbg(f"Setting up as qemu session, target={self.__qemu}")
self.__setup_qemu()
case GefRemoteSessionManager.RemoteMode.QEMU_USER:
self.__setup_qemu_user()
case GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM:
raise Exception("TODO")
case GefRemoteSessionManager.RemoteMode.RR:
dbg("Setting up as rr session")
self.__setup_rr()
case GefRemoteSessionManager.RemoteMode.GDBSERVER:
dbg("Setting up as remote session")
self.__setup_remote()
case GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI:
pass
case _:
raise ValueError

@@ -11580,13 +11640,13 @@ def setup(self) -> bool:
reset_architecture()
return True

def __setup_qemu(self) -> bool:
def __setup_qemu_user(self) -> bool:
# setup emulated file in the chroot
assert self.__qemu
target = self.root / str(self.__qemu.parent).lstrip("/")
__qemu_target = pathlib.Path("foo") # TODO
target = self.root / str(__qemu_target.parent).lstrip("/")
target.mkdir(parents=True, exist_ok=False)
shutil.copy2(self.__qemu, target)
self._file = self.__qemu
shutil.copy2(__qemu_target, target)
self._file = __qemu_target
assert self.lfile.exists()

# create a procfs
@@ -11767,12 +11827,20 @@ def reset_caches(self) -> None:


def target_remote_posthook():
if gef.session.remote_initializing:
return
print(f"{is_target_remote()=}")
print(f"{is_target_remote_or_extended()=}")
print(f"{is_target_extended_remote()=}")
print(f"{is_running_under_qemu()=}")
print(f"{is_running_under_qemu_system()=}")
print(f"{is_running_under_qemu_user()=}")
print(f"{is_running_in_gdbserver()=}")
print(f"{is_running_in_rr()=}")
conn = gdb.selected_inferior().connection
if not isinstance(conn, gdb.RemoteTargetConnection):
raise TypeError("Expected type gdb.RemoteTargetConnection")
assert is_target_remote_or_extended(conn), "Target is not remote"
gef.session.remote = GefRemoteSessionManager(conn)

gef.session.remote = GefRemoteSessionManager("", 0)
if not gef.session.remote.setup():
raise EnvironmentError(f"Failed to create a proper environment for {gef.session.remote}")

if __name__ == "__main__":
if sys.version_info[0] == 2:
@@ -11835,32 +11903,14 @@ def target_remote_posthook():

GefTmuxSetup()

if GDB_VERSION > (9, 0):
disable_tr_overwrite_setting = "gef.disable_target_remote_overwrite"

if not gef.config[disable_tr_overwrite_setting]:
warnmsg = ("Using `target remote` with GEF should work in most cases, "
"but use `gef-remote` if you can. You can disable the "
"overwrite of the `target remote` command by toggling "
f"`{disable_tr_overwrite_setting}` in the config.")
hook = f"""
define target hookpost-{{}}
pi target_remote_posthook()
context
pi if calling_function() != "connect": warn("{warnmsg}")
end
"""

# Register a post-hook for `target remote` that initialize the remote session
gdb.execute(hook.format("remote"))
gdb.execute(hook.format("extended-remote"))
else:
errmsg = ("Using `target remote` does not work, use `gef-remote` "
f"instead. You can toggle `{disable_tr_overwrite_setting}` "
"if this is not desired.")
hook = f"""pi if calling_function() != "connect": err("{errmsg}")"""
gdb.execute(f"define target hook-remote\n{hook}\nend")
gdb.execute(f"define target hook-extended-remote\n{hook}\nend")
# Initialize `target *remote` post hooks
hook = """
define target hookpost-{0}
pi target_remote_posthook()
end
"""
gdb.execute(hook.format("remote"))
gdb.execute(hook.format("extended-remote"))

# restore saved breakpoints (if any)
bkp_fpath = pathlib.Path(gef.config["gef.autosave_breakpoints_file"]).expanduser().absolute()

0 comments on commit a277661

Please sign in to comment.