Skip to content

Commit

Permalink
checkpoint: added very basic pytests
Browse files Browse the repository at this point in the history
  • Loading branch information
hugsy committed Nov 4, 2024
1 parent a277661 commit 6d166f0
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 28 deletions.
57 changes: 31 additions & 26 deletions gef.py
Original file line number Diff line number Diff line change
Expand Up @@ -3530,33 +3530,31 @@ def is_target_remote_or_extended(conn: gdb.TargetConnection | None = None) -> bo
return is_target_remote(conn) or is_target_extended_remote(conn)


def is_running_under_qemu() -> bool:
def is_running_in_qemu() -> bool:
"See https://www.qemu.org/docs/master/system/gdb.html "
if not is_target_remote():
return False
response = gdb.execute("maintenance packet Qqemu.sstepbits", to_string=True, from_tty=False) or ""
return "ENABLE=" in response


def is_running_under_qemu_user() -> bool:
if not is_running_under_qemu():
def is_running_in_qemu_user() -> bool:
if not is_running_in_qemu():
return False
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" # Use `qAttached`?
return "Text=" in response


def is_running_under_qemu_system() -> bool:
if not is_running_under_qemu():
def is_running_in_qemu_system() -> bool:
if not is_running_in_qemu():
return False
# Use "maintenance packet qqemu.PhyMemMode"?
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or ""
return "received: \"\"" in response


def is_running_in_gdbserver() -> bool:
if is_running_under_qemu():
return False
return not is_running_under_qemu()
return not is_running_in_qemu()


def is_running_in_rr() -> bool:
Expand Down Expand Up @@ -7447,7 +7445,7 @@ def __init__(self) -> None:
def do_invoke(self, _: list[str], **kwargs: Any) -> None:
args : argparse.Namespace = kwargs["arguments"]

if is_running_under_qemu_system():
if is_running_in_qemu_system():
err("Unsupported")
return

Expand Down Expand Up @@ -11345,7 +11343,7 @@ def __repr__(self) -> str:
def auxiliary_vector(self) -> dict[str, int] | None:
if not is_alive():
return None
if is_running_under_qemu_system():
if is_running_in_qemu_system():
return None
if not self._auxiliary_vector:
auxiliary_vector = {}
Expand Down Expand Up @@ -11493,9 +11491,9 @@ def prompt_string(self) -> str:

@staticmethod
def init() -> "GefRemoteSessionManager.RemoteMode":
if is_running_under_qemu_system():
if is_running_in_qemu_system():
return GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM
if is_running_under_qemu_user():
if is_running_in_qemu_user():
return GefRemoteSessionManager.RemoteMode.QEMU_USER
if is_running_in_rr():
return GefRemoteSessionManager.RemoteMode.RR
Expand Down Expand Up @@ -11619,7 +11617,7 @@ def connect(self, pid: int) -> bool:

def setup(self) -> bool:
# setup remote adequately depending on remote or qemu mode
dbg(f"Setting up the {self._mode} session")
info(f"Setting up remote session as '{self._mode}'")
match self.mode:
case GefRemoteSessionManager.RemoteMode.QEMU_USER:
self.__setup_qemu_user()
Expand Down Expand Up @@ -11826,21 +11824,24 @@ def reset_caches(self) -> None:
return


def target_remote_hook():
# disable the context until the session has been fully established
gef.config["context.enable"] = False


def target_remote_posthook():
print(f"{is_target_remote()=}")
print(f"{is_target_remote_or_extended()=}")
print(f"{is_target_extended_remote()=}")
print(f"{is_running_under_qemu()=}")
print(f"{is_running_under_qemu_system()=}")
print(f"{is_running_under_qemu_user()=}")
print(f"{is_running_in_gdbserver()=}")
print(f"{is_running_in_rr()=}")
conn = gdb.selected_inferior().connection
if not isinstance(conn, gdb.RemoteTargetConnection):
raise TypeError("Expected type gdb.RemoteTargetConnection")
assert is_target_remote_or_extended(conn), "Target is not remote"
gef.session.remote = GefRemoteSessionManager(conn)

# re-enable context
gef.config["context.enable"] = True

# if here, no exception was thrown, print context
gdb.execute("context")


if __name__ == "__main__":
if sys.version_info[0] == 2:
Expand Down Expand Up @@ -11903,14 +11904,18 @@ def target_remote_posthook():

GefTmuxSetup()

# Initialize `target *remote` post hooks
# Initialize `target *remote` pre/post hooks
hook = """
define target hookpost-{0}
pi target_remote_posthook()
define target hook{1}-{0}
pi target_remote_{1}hook()
end
"""
gdb.execute(hook.format("remote"))
gdb.execute(hook.format("extended-remote"))
# pre-hooks
gdb.execute(hook.format("remote", ""))
gdb.execute(hook.format("extended-remote", ""))
# post-hooks
gdb.execute(hook.format("remote", "post"))
gdb.execute(hook.format("extended-remote", "post"))

# restore saved breakpoints (if any)
bkp_fpath = pathlib.Path(gef.config["gef.autosave_breakpoints_file"]).expanduser().absolute()
Expand Down
87 changes: 87 additions & 0 deletions tests/api/gef_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
`target remote/extended-remote` test module.
"""


from tests.base import RemoteGefUnitTestGeneric
from tests.utils import debug_target, gdbserver_session, gdbserver_multi_session, get_random_port, qemuuser_session


class GefRemoteApi(RemoteGefUnitTestGeneric):

def setUp(self) -> None:
self._target = debug_target("default")
return super().setUp()

def test_gef_remote_test_gdbserver(self):
"""Test `gdbserver file`"""
_gdb = self._gdb
_root = self._conn.root
port = get_random_port()

with gdbserver_session(port=port):
assert not _root.eval("is_target_remote()")
assert not _root.eval("is_target_remote_or_extended()")
assert not _root.eval("is_running_in_gdbserver()")

_gdb.execute(f"target remote :{port}")

assert _root.eval("is_target_remote()")
assert _root.eval("is_target_remote_or_extended()")
assert _root.eval("is_running_in_gdbserver()")

assert not _root.eval("is_target_extended_remote()")
assert not _root.eval("is_running_under_qemu()")
assert not _root.eval("is_running_under_qemu_system()")
assert not _root.eval("is_running_under_qemu_user()")
assert not _root.eval("is_running_in_rr()")

def test_gef_remote_test_gdbserver_multi(self):
"""Test `gdbserver --multi file`"""
_gdb = self._gdb
_root = self._conn.root
port = get_random_port()

with gdbserver_multi_session(port=port):
assert not _root.eval("is_target_remote()")
assert not _root.eval("is_target_remote_or_extended()")
assert not _root.eval("is_running_in_gdbserver()")

_gdb.execute(f"target extended-remote :{port}")

assert _root.eval("is_target_remote()")
assert _root.eval("is_target_remote_or_extended()")
assert _root.eval("is_target_extended_remote()")
assert _root.eval("is_running_in_gdbserver()")

assert not _root.eval("is_running_under_qemu()")
assert not _root.eval("is_running_under_qemu_system()")
assert not _root.eval("is_running_under_qemu_user()")
assert not _root.eval("is_running_in_rr()")

def test_gef_remote_test_qemuuser(self):
"""Test `qemu-user -g`"""
_gdb = self._gdb
_root = self._conn.root
port = get_random_port()

with qemuuser_session(port=port):
assert not _root.eval("is_target_remote()")
assert not _root.eval("is_target_remote_or_extended()")
assert not _root.eval("is_running_in_gdbserver()")

_gdb.execute(f"target remote :{port}")

assert _root.eval("is_target_remote()")
assert _root.eval("is_target_remote_or_extended()")
assert _root.eval("is_running_under_qemu()")
assert _root.eval("is_running_under_qemu_user()")

assert not _root.eval("is_target_extended_remote()")
assert not _root.eval("is_running_under_qemu_system()")
assert not _root.eval("is_running_in_gdbserver()")
assert not _root.eval("is_running_in_rr()")

# TODO add tests for
# - [ ] qemu-system
# - [ ] rr
4 changes: 2 additions & 2 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import rpyc

from .utils import debug_target
from .utils import debug_target, get_random_port

COVERAGE_DIR = os.getenv("COVERAGE_DIR", "")
GEF_PATH = pathlib.Path(os.getenv("GEF_PATH", "gef.py")).absolute()
Expand Down Expand Up @@ -58,7 +58,7 @@ def __setup(self):
#
# Select a random tcp port for rpyc
#
self._port = random.randint(1025, 65535)
self._port = get_random_port()
self._commands = ""

if COVERAGE_DIR:
Expand Down
30 changes: 30 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import subprocess
import tempfile
import time
import random

from typing import Iterable, List, Optional, Union
from urllib.request import urlopen
Expand Down Expand Up @@ -112,6 +113,13 @@ def start_gdbserver(
logging.debug(f"Starting {cmd}")
return subprocess.Popen(cmd)

def start_gdbserver_multi(
host: str = GDBSERVER_DEFAULT_HOST,
port: int = GDBSERVER_DEFAULT_PORT,
) -> subprocess.Popen:
cmd = [GDBSERVER_BINARY, "--multi", f"{host}:{port}"]
logging.debug(f"Starting {cmd}")
return subprocess.Popen(cmd)

def stop_gdbserver(gdbserver: subprocess.Popen) -> None:
"""Stop the gdbserver and wait until it is terminated if it was
Expand All @@ -138,6 +146,17 @@ def gdbserver_session(
finally:
stop_gdbserver(sess)

@contextlib.contextmanager
def gdbserver_multi_session(
port: int = GDBSERVER_DEFAULT_PORT,
host: str = GDBSERVER_DEFAULT_HOST,
):
sess = start_gdbserver_multi(host, port)
try:
time.sleep(1) # forced delay to allow gdbserver to start listening
yield sess
finally:
stop_gdbserver(sess)

def start_qemuuser(
exe: Union[str, pathlib.Path] = debug_target("default"),
Expand Down Expand Up @@ -301,3 +320,14 @@ def p32(x: int) -> bytes:

def p64(x: int) -> bytes:
return struct.pack("<Q", x)


__available_ports = list()
def get_random_port() -> int:
global __available_ports
if len(__available_ports) < 2:
__available_ports = list( range(1024, 65535) )
idx = random.choice(range(len(__available_ports)))
port = __available_ports[idx]
__available_ports.pop(idx)
return port

0 comments on commit 6d166f0

Please sign in to comment.