diff --git a/.gitignore b/.gitignore index 954607a..7620630 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,6 @@ __pycache__ *egg* __pycache__ *.dSYM -docs/_static +docs .mypy_cache/ -venv +venv \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 95d4450..1028df4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,29 +1,34 @@ +os: + - linux +cache: + pip: true language: python -sudo: required # for gdb installation -python: - - "2.7" - - "3.4" - - "3.5" - - "3.6" - - "3.7-dev" - - "3.8-dev" - - "pypy" - +sudo: required matrix: - allow_failures: - - python: '3.7-dev' - - python: '3.8-dev' + include: + - python: '3.5' + env: NOXSESSION="tests-3.5" + - python: '3.6' + env: NOXSESSION="tests-3.6" + - python: '3.7' + env: NOXSESSION="tests-3.7" + dist: xenial + - python: '3.8-dev' + env: NOXSESSION="tests-3.8" + dist: xenial -before_install: - - sudo apt-get install gdb + - python: '3.7' + env: NOXSESSION="lint" + dist: xenial + - python: '3.7' + env: NOXSESSION="docs" + dist: xenial install: - - 'pip install -e .[dev]' + - if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then sudo apt-get install gdb; fi + - if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew update; fi + - if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew install homebrew/dupes/gdb; fi + - pip install nox script: - - make test - -cache: - directories: - # avoid using PyPI's bandwidth when pip packages are already cached - - $HOME/.cache/pip + - nox --non-interactive --session "$NOXSESSION" diff --git a/CHANGELOG.md b/CHANGELOG.md index 46266b0..0e3ed24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # pygdbmi release history +## dev +* Drop support for 2.7, 3.4 +* Add support for 3.7, 3.8 +* Do not log in StringStream (#36) + ## 0.9.0.0 * Stop buffering output * Use logger in GdbController; modify `verbose` arguments. diff --git a/MANIFEST.in b/MANIFEST.in index f14ea3a..82f3bc5 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,9 @@ -include -*.md -LICENSE +include *.md +include LICENSE + +exclude example.py +exclude .flake8 +exclude noxfile.py + +prune tests +prune docs \ No newline at end of file diff --git a/README.md b/README.md index 2396de4..b5c451e 100644 --- a/README.md +++ b/README.md @@ -11,10 +11,6 @@ pygdbmi - Get Structured Output from GDB's Machine Interface - -Code style: black -

- **Documentation** https://cs01.github.io/pygdbmi **Source Code** https://github.com/cs01/pygdbmi @@ -159,11 +155,18 @@ The `type` is defined based on gdb's various [mi output record types](<(https:// Documentation fixes, bug fixes, performance improvements, and functional improvements are welcome. You may want to create an issue before beginning work to make sure I am interested in merging it to the master branch. -To develop, set up a new virtual environment, then clone this repo and run `pip install -e .[dev]`. +pygdbmi uses [nox](https://github.com/theacodes/nox) for automation. -Confirm unit tests are working with `make test`, then begin development. +See available tasks with +``` +nox -l +``` -Update unit tests as necessary at `pygdbmi/tests/test_app.py`. +Run tests and lint with +``` +nox -s tests +nox -s lint +``` ## Projects Using pygdbmi diff --git a/docs/StringStream.html b/docs/StringStream.html deleted file mode 100644 index d4ede5a..0000000 --- a/docs/StringStream.html +++ /dev/null @@ -1,394 +0,0 @@ - - - - - - -pygdbmi.StringStream API documentation - - - - - - - - - -
-
-
-

pygdbmi.StringStream module

-
-
-
-Source code -
import logging
-from pygdbmi.printcolor import fmt_cyan
-
-
-class StringStream:
-    """A simple class to hold text so that when passed
-    between functions, the object is passed by reference
-    and memory does not need to be repeatedly allocated for the string.
-
-    This class was written here to avoid adding a dependency
-    to the project.
-    """
-
-    def __init__(self, raw_text, debug=False):
-        self.raw_text = raw_text
-        self.index = 0
-        self.len = len(raw_text)
-
-        if debug:
-            level = logging.DEBUG
-        else:
-            level = logging.ERROR
-        logging.basicConfig(format="%(funcName)20s %(message)s", level=level)
-
-    def read(self, count):
-        """Read count characters starting at self.index,
-        and return those characters as a string
-        """
-        new_index = self.index + count
-        if new_index > self.len:
-            buf = self.raw_text[self.index :]  # return to the end, don't fail
-        else:
-            buf = self.raw_text[self.index : new_index]
-        self.index = new_index
-
-        return buf
-
-    def seek(self, offset):
-        """Advance the index of this StringStream by offset characters"""
-        self.index = self.index + offset
-
-    def advance_past_chars(self, chars):
-        """Advance the index past specific chars
-        Args chars (list): list of characters to advance past
-
-        Return substring that was advanced past
-        """
-        start_index = self.index
-        while True:
-            current_char = self.raw_text[self.index]
-            self.index += 1
-            if current_char in chars:
-                break
-
-            elif self.index == self.len:
-                break
-
-        return self.raw_text[start_index : self.index - 1]
-
-    def advance_past_string_with_gdb_escapes(self, chars_to_remove_gdb_escape=None):
-        """characters that gdb escapes that should not be
-        escaped by this parser
-        """
-
-        if chars_to_remove_gdb_escape is None:
-            chars_to_remove_gdb_escape = ['"']
-
-        buf = ""
-        while True:
-            c = self.raw_text[self.index]
-            self.index += 1
-            logging.debug("%s", fmt_cyan(c))
-
-            if c == "\\":
-                # We are on a backslash and there is another character after the backslash
-                # to parse. Handle this case specially since gdb escaped it for us
-
-                # Get the next char that is being escaped
-                c2 = self.raw_text[self.index]
-                self.index += 1
-                # only store the escaped character in the buffer; don't store the backslash
-                # (don't leave it escaped)
-                buf += c2
-
-            elif c == '"':
-                # Quote is closed. Exit (and don't include the end quote).
-                break
-
-            else:
-                # capture this character, and keep capturing
-                buf += c
-        return buf
-
-
-
-
-
-
-
-
-
-

Classes

-
-
-class StringStream -
-
-

A simple class to hold text so that when passed -between functions, the object is passed by reference -and memory does not need to be repeatedly allocated for the string.

-

This class was written here to avoid adding a dependency -to the project.

-
-Source code -
class StringStream:
-    """A simple class to hold text so that when passed
-    between functions, the object is passed by reference
-    and memory does not need to be repeatedly allocated for the string.
-
-    This class was written here to avoid adding a dependency
-    to the project.
-    """
-
-    def __init__(self, raw_text, debug=False):
-        self.raw_text = raw_text
-        self.index = 0
-        self.len = len(raw_text)
-
-        if debug:
-            level = logging.DEBUG
-        else:
-            level = logging.ERROR
-        logging.basicConfig(format="%(funcName)20s %(message)s", level=level)
-
-    def read(self, count):
-        """Read count characters starting at self.index,
-        and return those characters as a string
-        """
-        new_index = self.index + count
-        if new_index > self.len:
-            buf = self.raw_text[self.index :]  # return to the end, don't fail
-        else:
-            buf = self.raw_text[self.index : new_index]
-        self.index = new_index
-
-        return buf
-
-    def seek(self, offset):
-        """Advance the index of this StringStream by offset characters"""
-        self.index = self.index + offset
-
-    def advance_past_chars(self, chars):
-        """Advance the index past specific chars
-        Args chars (list): list of characters to advance past
-
-        Return substring that was advanced past
-        """
-        start_index = self.index
-        while True:
-            current_char = self.raw_text[self.index]
-            self.index += 1
-            if current_char in chars:
-                break
-
-            elif self.index == self.len:
-                break
-
-        return self.raw_text[start_index : self.index - 1]
-
-    def advance_past_string_with_gdb_escapes(self, chars_to_remove_gdb_escape=None):
-        """characters that gdb escapes that should not be
-        escaped by this parser
-        """
-
-        if chars_to_remove_gdb_escape is None:
-            chars_to_remove_gdb_escape = ['"']
-
-        buf = ""
-        while True:
-            c = self.raw_text[self.index]
-            self.index += 1
-            logging.debug("%s", fmt_cyan(c))
-
-            if c == "\\":
-                # We are on a backslash and there is another character after the backslash
-                # to parse. Handle this case specially since gdb escaped it for us
-
-                # Get the next char that is being escaped
-                c2 = self.raw_text[self.index]
-                self.index += 1
-                # only store the escaped character in the buffer; don't store the backslash
-                # (don't leave it escaped)
-                buf += c2
-
-            elif c == '"':
-                # Quote is closed. Exit (and don't include the end quote).
-                break
-
-            else:
-                # capture this character, and keep capturing
-                buf += c
-        return buf
-
-

Methods

-
-
-def __init__(self, raw_text, debug=False) -
-
-

Initialize self. -See help(type(self)) for accurate signature.

-
-Source code -
def __init__(self, raw_text, debug=False):
-    self.raw_text = raw_text
-    self.index = 0
-    self.len = len(raw_text)
-
-    if debug:
-        level = logging.DEBUG
-    else:
-        level = logging.ERROR
-    logging.basicConfig(format="%(funcName)20s %(message)s", level=level)
-
-
-
-def advance_past_chars(self, chars) -
-
-

Advance the index past specific chars -Args chars (list): list of characters to advance past

-

Return substring that was advanced past

-
-Source code -
def advance_past_chars(self, chars):
-    """Advance the index past specific chars
-    Args chars (list): list of characters to advance past
-
-    Return substring that was advanced past
-    """
-    start_index = self.index
-    while True:
-        current_char = self.raw_text[self.index]
-        self.index += 1
-        if current_char in chars:
-            break
-
-        elif self.index == self.len:
-            break
-
-    return self.raw_text[start_index : self.index - 1]
-
-
-
-def advance_past_string_with_gdb_escapes(self, chars_to_remove_gdb_escape=None) -
-
-

characters that gdb escapes that should not be -escaped by this parser

-
-Source code -
def advance_past_string_with_gdb_escapes(self, chars_to_remove_gdb_escape=None):
-    """characters that gdb escapes that should not be
-    escaped by this parser
-    """
-
-    if chars_to_remove_gdb_escape is None:
-        chars_to_remove_gdb_escape = ['"']
-
-    buf = ""
-    while True:
-        c = self.raw_text[self.index]
-        self.index += 1
-        logging.debug("%s", fmt_cyan(c))
-
-        if c == "\\":
-            # We are on a backslash and there is another character after the backslash
-            # to parse. Handle this case specially since gdb escaped it for us
-
-            # Get the next char that is being escaped
-            c2 = self.raw_text[self.index]
-            self.index += 1
-            # only store the escaped character in the buffer; don't store the backslash
-            # (don't leave it escaped)
-            buf += c2
-
-        elif c == '"':
-            # Quote is closed. Exit (and don't include the end quote).
-            break
-
-        else:
-            # capture this character, and keep capturing
-            buf += c
-    return buf
-
-
-
-def read(self, count) -
-
-

Read count characters starting at self.index, -and return those characters as a string

-
-Source code -
def read(self, count):
-    """Read count characters starting at self.index,
-    and return those characters as a string
-    """
-    new_index = self.index + count
-    if new_index > self.len:
-        buf = self.raw_text[self.index :]  # return to the end, don't fail
-    else:
-        buf = self.raw_text[self.index : new_index]
-    self.index = new_index
-
-    return buf
-
-
-
-def seek(self, offset) -
-
-

Advance the index of this StringStream by offset characters

-
-Source code -
def seek(self, offset):
-    """Advance the index of this StringStream by offset characters"""
-    self.index = self.index + offset
-
-
-
-
-
-
-
- -
- - - - - \ No newline at end of file diff --git a/docs/gdbcontroller.html b/docs/gdbcontroller.html deleted file mode 100644 index 6746256..0000000 --- a/docs/gdbcontroller.html +++ /dev/null @@ -1,1382 +0,0 @@ - - - - - - -pygdbmi.gdbcontroller API documentation - - - - - - - - - -
-
-
-

Module pygdbmi.gdbcontroller

-
-
-

GdbController class to programatically run gdb and get structured output

-
-Source code -
"""GdbController class to programatically run gdb and get structured output"""
-
-from distutils.spawn import find_executable
-import logging
-import os
-from pprint import pformat
-from pygdbmi import gdbmiparser
-import signal
-import select
-import subprocess
-import sys
-import time
-
-try:  # py3
-    from shlex import quote
-except ImportError:  # py2
-    from pipes import quote
-
-PYTHON3 = sys.version_info.major == 3
-DEFAULT_GDB_TIMEOUT_SEC = 1
-DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC = 0.2
-USING_WINDOWS = os.name == "nt"
-if USING_WINDOWS:
-    import msvcrt
-    from ctypes import windll, byref, wintypes, WinError, POINTER
-    from ctypes.wintypes import HANDLE, DWORD, BOOL
-else:
-    import fcntl
-
-SIGNAL_NAME_TO_NUM = {}
-for n in dir(signal):
-    if n.startswith("SIG") and "_" not in n:
-        SIGNAL_NAME_TO_NUM[n.upper()] = getattr(signal, n)
-
-unicode = str if PYTHON3 else unicode  # noqa: F821
-
-
-class NoGdbProcessError(ValueError):
-    """Raise when trying to interact with gdb subprocess, but it does not exist.
-    It may have been killed and removed, or failed to initialize for some reason."""
-
-    pass
-
-
-class GdbTimeoutError(ValueError):
-    """Raised when no response is recieved from gdb after the timeout has been triggered"""
-
-    pass
-
-
-class GdbController:
-    """
-    Run gdb as a subprocess. Send commands and receive structured output.
-    Create new object, along with a gdb subprocess
-
-    Args:
-        gdb_path (str): Command to run in shell to spawn new gdb subprocess
-        gdb_args (list): Arguments to pass to shell when spawning new gdb subprocess
-        time_to_check_for_additional_output_sec (float): When parsing responses, wait this amout of time before exiting (exits before timeout is reached to save time). If <= 0, full timeout time is used.
-        rr (bool): Use the `rr replay` command instead of `gdb`. See rr-project.org for more info.
-        verbose (bool): Print verbose output if True
-    Returns:
-        New GdbController object
-    """
-
-    def __init__(
-        self,
-        gdb_path="gdb",
-        gdb_args=None,
-        time_to_check_for_additional_output_sec=DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC,
-        rr=False,
-        verbose=False,
-    ):
-        if gdb_args is None:
-            default_gdb_args = ["--nx", "--quiet", "--interpreter=mi2"]
-            gdb_args = default_gdb_args
-
-        self.verbose = verbose
-        self.abs_gdb_path = None  # abs path to gdb executable
-        self.cmd = []  # the shell command to run gdb
-        self.time_to_check_for_additional_output_sec = (
-            time_to_check_for_additional_output_sec
-        )
-        self.gdb_process = None
-        self._allow_overwrite_timeout_times = (
-            self.time_to_check_for_additional_output_sec > 0
-        )
-
-        if rr:
-            self.cmd = ["rr", "replay"] + gdb_args
-
-        else:
-            if not gdb_path:
-                raise ValueError("a valid path to gdb must be specified")
-
-            else:
-                abs_gdb_path = find_executable(gdb_path)
-                if abs_gdb_path is None:
-                    raise ValueError(
-                        'gdb executable could not be resolved from "%s"' % gdb_path
-                    )
-
-                else:
-                    self.abs_gdb_path = abs_gdb_path
-            self.cmd = [self.abs_gdb_path] + gdb_args
-
-        self._attach_logger(verbose)
-        self.spawn_new_gdb_subprocess()
-
-    def _attach_logger(self, verbose):
-        handler = logging.StreamHandler()
-        handler.setFormatter(logging.Formatter("%(message)s"))
-        unique_number = time.time()
-        self.logger = logging.getLogger(__name__ + "." + str(unique_number))
-        self.logger.propagate = False
-        if verbose:
-            level = logging.DEBUG
-        else:
-            level = logging.ERROR
-        self.logger.setLevel(level)
-        self.logger.addHandler(handler)
-
-    def get_subprocess_cmd(self):
-        """Returns the shell-escaped string used to invoke the gdb subprocess.
-        This is a string that can be executed directly in a shell.
-        """
-        return " ".join(quote(c) for c in self.cmd)
-
-    def spawn_new_gdb_subprocess(self):
-        """Spawn a new gdb subprocess with the arguments supplied to the object
-        during initialization. If gdb subprocess already exists, terminate it before
-        spanwing a new one.
-        Return int: gdb process id
-        """
-        if self.gdb_process:
-            self.logger.debug(
-                "Killing current gdb subprocess (pid %d)" % self.gdb_process.pid
-            )
-            self.exit()
-
-        self.logger.debug('Launching gdb: "%s"' % " ".join(self.cmd))
-
-        # Use pipes to the standard streams
-        self.gdb_process = subprocess.Popen(
-            self.cmd,
-            shell=False,
-            stdout=subprocess.PIPE,
-            stdin=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-            bufsize=0,
-        )
-
-        _make_non_blocking(self.gdb_process.stdout)
-        _make_non_blocking(self.gdb_process.stderr)
-
-        # save file numbers for use later
-        self.stdout_fileno = self.gdb_process.stdout.fileno()
-        self.stderr_fileno = self.gdb_process.stderr.fileno()
-        self.stdin_fileno = self.gdb_process.stdin.fileno()
-
-        self.read_list = [self.stdout_fileno, self.stderr_fileno]
-        self.write_list = [self.stdin_fileno]
-
-        # string buffers for unifinished gdb output
-        self._incomplete_output = {"stdout": None, "stderr": None}
-        return self.gdb_process.pid
-
-    def verify_valid_gdb_subprocess(self):
-        """Verify there is a process object, and that it is still running.
-        Raise NoGdbProcessError if either of the above are not true."""
-        if not self.gdb_process:
-            raise NoGdbProcessError("gdb process is not attached")
-
-        elif self.gdb_process.poll() is not None:
-            raise NoGdbProcessError(
-                "gdb process has already finished with return code: %s"
-                % str(self.gdb_process.poll())
-            )
-
-    def write(
-        self,
-        mi_cmd_to_write,
-        timeout_sec=DEFAULT_GDB_TIMEOUT_SEC,
-        raise_error_on_timeout=True,
-        read_response=True,
-    ):
-        """Write to gdb process. Block while parsing responses from gdb for a maximum of timeout_sec.
-
-        Args:
-            mi_cmd_to_write (str or list): String to write to gdb. If list, it is joined by newlines.
-            timeout_sec (float): Maximum number of seconds to wait for response before exiting. Must be >= 0.
-            raise_error_on_timeout (bool): If read_response is True, raise error if no response is received
-            read_response (bool): Block and read response. If there is a separate thread running,
-            this can be false, and the reading thread read the output.
-        Returns:
-            List of parsed gdb responses if read_response is True, otherwise []
-        Raises:
-            NoGdbProcessError if there is no gdb subprocess running
-            TypeError if mi_cmd_to_write is not valid
-        """
-        self.verify_valid_gdb_subprocess()
-        if timeout_sec < 0:
-            self.logger.warning("timeout_sec was negative, replacing with 0")
-            timeout_sec = 0
-
-        # Ensure proper type of the mi command
-        if type(mi_cmd_to_write) in [str, unicode]:
-            pass
-        elif type(mi_cmd_to_write) == list:
-            mi_cmd_to_write = "\n".join(mi_cmd_to_write)
-        else:
-            raise TypeError(
-                "The gdb mi command must a be str or list. Got "
-                + str(type(mi_cmd_to_write))
-            )
-
-        self.logger.debug("writing: %s", mi_cmd_to_write)
-
-        if not mi_cmd_to_write.endswith("\n"):
-            mi_cmd_to_write_nl = mi_cmd_to_write + "\n"
-        else:
-            mi_cmd_to_write_nl = mi_cmd_to_write
-
-        if USING_WINDOWS:
-            # select not implemented in windows for pipes
-            # assume it's always ready
-            outputready = [self.stdin_fileno]
-        else:
-            _, outputready, _ = select.select([], self.write_list, [], timeout_sec)
-        for fileno in outputready:
-            if fileno == self.stdin_fileno:
-                # ready to write
-                self.gdb_process.stdin.write(mi_cmd_to_write_nl.encode())
-                # don't forget to flush for Python3, otherwise gdb won't realize there is data
-                # to evaluate, and we won't get a response
-                self.gdb_process.stdin.flush()
-            else:
-                self.logger.error("got unexpected fileno %d" % fileno)
-
-        if read_response is True:
-            return self.get_gdb_response(
-                timeout_sec=timeout_sec, raise_error_on_timeout=raise_error_on_timeout
-            )
-
-        else:
-            return []
-
-    def get_gdb_response(
-        self, timeout_sec=DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True
-    ):
-        """Get response from GDB, and block while doing so. If GDB does not have any response ready to be read
-        by timeout_sec, an exception is raised.
-
-        Args:
-            timeout_sec (float): Maximum time to wait for reponse. Must be >= 0. Will return after
-            raise_error_on_timeout (bool): Whether an exception should be raised if no response was found
-            after timeout_sec
-
-        Returns:
-            List of parsed GDB responses, returned from gdbmiparser.parse_response, with the
-            additional key 'stream' which is either 'stdout' or 'stderr'
-
-        Raises:
-            GdbTimeoutError if response is not received within timeout_sec
-            ValueError if select returned unexpected file number
-            NoGdbProcessError if there is no gdb subprocess running
-        """
-
-        self.verify_valid_gdb_subprocess()
-        if timeout_sec < 0:
-            self.logger.warning("timeout_sec was negative, replacing with 0")
-            timeout_sec = 0
-
-        if USING_WINDOWS:
-            retval = self._get_responses_windows(timeout_sec)
-        else:
-            retval = self._get_responses_unix(timeout_sec)
-
-        if not retval and raise_error_on_timeout:
-            raise GdbTimeoutError(
-                "Did not get response from gdb after %s seconds" % timeout_sec
-            )
-
-        else:
-            return retval
-
-    def _get_responses_windows(self, timeout_sec):
-        """Get responses on windows. Assume no support for select and use a while loop."""
-        timeout_time_sec = time.time() + timeout_sec
-        responses = []
-        while True:
-            try:
-                self.gdb_process.stdout.flush()
-                if PYTHON3:
-                    raw_output = self.gdb_process.stdout.readline().replace(
-                        b"\r", b"\n"
-                    )
-                else:
-                    raw_output = self.gdb_process.stdout.read().replace(b"\r", b"\n")
-                responses += self._get_responses_list(raw_output, "stdout")
-            except IOError:
-                pass
-
-            try:
-                self.gdb_process.stderr.flush()
-                if PYTHON3:
-                    raw_output = self.gdb_process.stderr.readline().replace(
-                        b"\r", b"\n"
-                    )
-                else:
-                    raw_output = self.gdb_process.stderr.read().replace(b"\r", b"\n")
-                responses += self._get_responses_list(raw_output, "stderr")
-            except IOError:
-                pass
-
-            if time.time() > timeout_time_sec:
-                break
-
-        return responses
-
-    def _get_responses_unix(self, timeout_sec):
-        """Get responses on unix-like system. Use select to wait for output."""
-        timeout_time_sec = time.time() + timeout_sec
-        responses = []
-        while True:
-            select_timeout = timeout_time_sec - time.time()
-            # I prefer to not pass a negative value to select
-            if select_timeout <= 0:
-                select_timeout = 0
-            events, _, _ = select.select(self.read_list, [], [], select_timeout)
-            responses_list = None  # to avoid infinite loop if using Python 2
-            try:
-                for fileno in events:
-                    # new data is ready to read
-                    if fileno == self.stdout_fileno:
-                        self.gdb_process.stdout.flush()
-                        raw_output = self.gdb_process.stdout.read()
-                        stream = "stdout"
-
-                    elif fileno == self.stderr_fileno:
-                        self.gdb_process.stderr.flush()
-                        raw_output = self.gdb_process.stderr.read()
-                        stream = "stderr"
-
-                    else:
-                        raise ValueError(
-                            "Developer error. Got unexpected file number %d" % fileno
-                        )
-
-                    responses_list = self._get_responses_list(raw_output, stream)
-                    responses += responses_list
-
-            except IOError:  # only occurs in python 2.7
-                pass
-
-            if timeout_sec == 0:  # just exit immediately
-                break
-
-            elif responses_list and self._allow_overwrite_timeout_times:
-                # update timeout time to potentially be closer to now to avoid lengthy wait times when nothing is being output by gdb
-                timeout_time_sec = min(
-                    time.time() + self.time_to_check_for_additional_output_sec,
-                    timeout_time_sec,
-                )
-
-            elif time.time() > timeout_time_sec:
-                break
-
-        return responses
-
-    def _get_responses_list(self, raw_output, stream):
-        """Get parsed response list from string output
-        Args:
-            raw_output (unicode): gdb output to parse
-            stream (str): either stdout or stderr
-        """
-        responses = []
-
-        raw_output, self._incomplete_output[stream] = _buffer_incomplete_responses(
-            raw_output, self._incomplete_output.get(stream)
-        )
-
-        if not raw_output:
-            return responses
-
-        response_list = list(
-            filter(lambda x: x, raw_output.decode(errors="replace").split("\n"))
-        )  # remove blank lines
-
-        # parse each response from gdb into a dict, and store in a list
-        for response in response_list:
-            if gdbmiparser.response_is_finished(response):
-                pass
-            else:
-                parsed_response = gdbmiparser.parse_response(response)
-                parsed_response["stream"] = stream
-
-                self.logger.debug("%s", pformat(parsed_response))
-
-                responses.append(parsed_response)
-
-        return responses
-
-    def send_signal_to_gdb(self, signal_input):
-        """Send signal name (case insensitive) or number to gdb subprocess
-        gdbmi.send_signal_to_gdb(2)  # valid
-        gdbmi.send_signal_to_gdb('sigint')  # also valid
-        gdbmi.send_signal_to_gdb('SIGINT')  # also valid
-
-        raises ValueError if signal_input is invalie
-        raises NoGdbProcessError if there is no gdb process to send a signal to
-        """
-        try:
-            signal = int(signal_input)
-        except Exception:
-            signal = SIGNAL_NAME_TO_NUM.get(signal_input.upper())
-
-        if not signal:
-            raise ValueError(
-                'Could not find signal corresponding to "%s"' % str(signal)
-            )
-
-        if self.gdb_process:
-            os.kill(self.gdb_process.pid, signal)
-        else:
-            raise NoGdbProcessError(
-                "Cannot send signal to gdb process because no process exists."
-            )
-
-    def interrupt_gdb(self):
-        """Send SIGINT (interrupt signal) to the gdb subprocess"""
-        self.send_signal_to_gdb("SIGINT")
-
-    def exit(self):
-        """Terminate gdb process
-        Returns: None"""
-        if self.gdb_process:
-            self.gdb_process.terminate()
-            self.gdb_process.communicate()
-        self.gdb_process = None
-        return None
-
-
-def _buffer_incomplete_responses(raw_output, buf):
-    """It is possible for some of gdb's output to be read before it completely finished its response.
-    In that case, a partial mi response was read, which cannot be parsed into structured data.
-    We want to ALWAYS parse complete mi records. To do this, we store a buffer of gdb's
-    output if the output did not end in a newline.
-
-    Args:
-        raw_output: Contents of the gdb mi output
-        buf (str): Buffered gdb response from the past. This is incomplete and needs to be prepended to
-        gdb's next output.
-
-    Returns:
-        (raw_output, buf)
-    """
-
-    if raw_output:
-        if buf:
-            # concatenate buffer and new output
-            raw_output = b"".join([buf, raw_output])
-            buf = None
-
-        if b"\n" not in raw_output:
-            # newline was not found, so assume output is incomplete and store in buffer
-            buf = raw_output
-            raw_output = None
-
-        elif not raw_output.endswith(b"\n"):
-            # raw output doesn't end in a newline, so store everything after the last newline (if anything)
-            # in the buffer, and parse everything before it
-            remainder_offset = raw_output.rindex(b"\n") + 1
-            buf = raw_output[remainder_offset:]
-            raw_output = raw_output[:remainder_offset]
-
-    return (raw_output, buf)
-
-
-def _make_non_blocking(file_obj):
-    """make file object non-blocking
-    Windows doesn't have the fcntl module, but someone on
-    stack overflow supplied this code as an answer, and it works
-    http://stackoverflow.com/a/34504971/2893090"""
-
-    if USING_WINDOWS:
-        LPDWORD = POINTER(DWORD)
-        PIPE_NOWAIT = wintypes.DWORD(0x00000001)
-
-        SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
-        SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
-        SetNamedPipeHandleState.restype = BOOL
-
-        h = msvcrt.get_osfhandle(file_obj.fileno())
-
-        res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
-        if res == 0:
-            raise ValueError(WinError())
-
-    else:
-        # Set the file status flag (F_SETFL) on the pipes to be non-blocking
-        # so we can attempt to read from a pipe with no new data without locking
-        # the program up
-        fcntl.fcntl(file_obj, fcntl.F_SETFL, os.O_NONBLOCK)
-
-
-
-
-
-
-
-
-
-

Classes

-
-
-class GdbController -(gdb_path='gdb', gdb_args=None, time_to_check_for_additional_output_sec=0.2, rr=False, verbose=False) -
-
-

Run gdb as a subprocess. Send commands and receive structured output. -Create new object, along with a gdb subprocess

-

Args

-
-
gdb_path : str
-
Command to run in shell to spawn new gdb subprocess
-
gdb_args : list
-
Arguments to pass to shell when spawning new gdb subprocess
-
time_to_check_for_additional_output_sec : float
-
When parsing responses, wait this amout of time before exiting (exits before timeout is reached to save time). If <= 0, full timeout time is used.
-
rr : bool
-
Use the rr replay command instead of gdb. See rr-project.org for more info.
-
verbose : bool
-
Print verbose output if True
-
-

Returns

-
-
New GdbController object
-
 
-
-
-Source code -
class GdbController:
-    """
-    Run gdb as a subprocess. Send commands and receive structured output.
-    Create new object, along with a gdb subprocess
-
-    Args:
-        gdb_path (str): Command to run in shell to spawn new gdb subprocess
-        gdb_args (list): Arguments to pass to shell when spawning new gdb subprocess
-        time_to_check_for_additional_output_sec (float): When parsing responses, wait this amout of time before exiting (exits before timeout is reached to save time). If <= 0, full timeout time is used.
-        rr (bool): Use the `rr replay` command instead of `gdb`. See rr-project.org for more info.
-        verbose (bool): Print verbose output if True
-    Returns:
-        New GdbController object
-    """
-
-    def __init__(
-        self,
-        gdb_path="gdb",
-        gdb_args=None,
-        time_to_check_for_additional_output_sec=DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC,
-        rr=False,
-        verbose=False,
-    ):
-        if gdb_args is None:
-            default_gdb_args = ["--nx", "--quiet", "--interpreter=mi2"]
-            gdb_args = default_gdb_args
-
-        self.verbose = verbose
-        self.abs_gdb_path = None  # abs path to gdb executable
-        self.cmd = []  # the shell command to run gdb
-        self.time_to_check_for_additional_output_sec = (
-            time_to_check_for_additional_output_sec
-        )
-        self.gdb_process = None
-        self._allow_overwrite_timeout_times = (
-            self.time_to_check_for_additional_output_sec > 0
-        )
-
-        if rr:
-            self.cmd = ["rr", "replay"] + gdb_args
-
-        else:
-            if not gdb_path:
-                raise ValueError("a valid path to gdb must be specified")
-
-            else:
-                abs_gdb_path = find_executable(gdb_path)
-                if abs_gdb_path is None:
-                    raise ValueError(
-                        'gdb executable could not be resolved from "%s"' % gdb_path
-                    )
-
-                else:
-                    self.abs_gdb_path = abs_gdb_path
-            self.cmd = [self.abs_gdb_path] + gdb_args
-
-        self._attach_logger(verbose)
-        self.spawn_new_gdb_subprocess()
-
-    def _attach_logger(self, verbose):
-        handler = logging.StreamHandler()
-        handler.setFormatter(logging.Formatter("%(message)s"))
-        unique_number = time.time()
-        self.logger = logging.getLogger(__name__ + "." + str(unique_number))
-        self.logger.propagate = False
-        if verbose:
-            level = logging.DEBUG
-        else:
-            level = logging.ERROR
-        self.logger.setLevel(level)
-        self.logger.addHandler(handler)
-
-    def get_subprocess_cmd(self):
-        """Returns the shell-escaped string used to invoke the gdb subprocess.
-        This is a string that can be executed directly in a shell.
-        """
-        return " ".join(quote(c) for c in self.cmd)
-
-    def spawn_new_gdb_subprocess(self):
-        """Spawn a new gdb subprocess with the arguments supplied to the object
-        during initialization. If gdb subprocess already exists, terminate it before
-        spanwing a new one.
-        Return int: gdb process id
-        """
-        if self.gdb_process:
-            self.logger.debug(
-                "Killing current gdb subprocess (pid %d)" % self.gdb_process.pid
-            )
-            self.exit()
-
-        self.logger.debug('Launching gdb: "%s"' % " ".join(self.cmd))
-
-        # Use pipes to the standard streams
-        self.gdb_process = subprocess.Popen(
-            self.cmd,
-            shell=False,
-            stdout=subprocess.PIPE,
-            stdin=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-            bufsize=0,
-        )
-
-        _make_non_blocking(self.gdb_process.stdout)
-        _make_non_blocking(self.gdb_process.stderr)
-
-        # save file numbers for use later
-        self.stdout_fileno = self.gdb_process.stdout.fileno()
-        self.stderr_fileno = self.gdb_process.stderr.fileno()
-        self.stdin_fileno = self.gdb_process.stdin.fileno()
-
-        self.read_list = [self.stdout_fileno, self.stderr_fileno]
-        self.write_list = [self.stdin_fileno]
-
-        # string buffers for unifinished gdb output
-        self._incomplete_output = {"stdout": None, "stderr": None}
-        return self.gdb_process.pid
-
-    def verify_valid_gdb_subprocess(self):
-        """Verify there is a process object, and that it is still running.
-        Raise NoGdbProcessError if either of the above are not true."""
-        if not self.gdb_process:
-            raise NoGdbProcessError("gdb process is not attached")
-
-        elif self.gdb_process.poll() is not None:
-            raise NoGdbProcessError(
-                "gdb process has already finished with return code: %s"
-                % str(self.gdb_process.poll())
-            )
-
-    def write(
-        self,
-        mi_cmd_to_write,
-        timeout_sec=DEFAULT_GDB_TIMEOUT_SEC,
-        raise_error_on_timeout=True,
-        read_response=True,
-    ):
-        """Write to gdb process. Block while parsing responses from gdb for a maximum of timeout_sec.
-
-        Args:
-            mi_cmd_to_write (str or list): String to write to gdb. If list, it is joined by newlines.
-            timeout_sec (float): Maximum number of seconds to wait for response before exiting. Must be >= 0.
-            raise_error_on_timeout (bool): If read_response is True, raise error if no response is received
-            read_response (bool): Block and read response. If there is a separate thread running,
-            this can be false, and the reading thread read the output.
-        Returns:
-            List of parsed gdb responses if read_response is True, otherwise []
-        Raises:
-            NoGdbProcessError if there is no gdb subprocess running
-            TypeError if mi_cmd_to_write is not valid
-        """
-        self.verify_valid_gdb_subprocess()
-        if timeout_sec < 0:
-            self.logger.warning("timeout_sec was negative, replacing with 0")
-            timeout_sec = 0
-
-        # Ensure proper type of the mi command
-        if type(mi_cmd_to_write) in [str, unicode]:
-            pass
-        elif type(mi_cmd_to_write) == list:
-            mi_cmd_to_write = "\n".join(mi_cmd_to_write)
-        else:
-            raise TypeError(
-                "The gdb mi command must a be str or list. Got "
-                + str(type(mi_cmd_to_write))
-            )
-
-        self.logger.debug("writing: %s", mi_cmd_to_write)
-
-        if not mi_cmd_to_write.endswith("\n"):
-            mi_cmd_to_write_nl = mi_cmd_to_write + "\n"
-        else:
-            mi_cmd_to_write_nl = mi_cmd_to_write
-
-        if USING_WINDOWS:
-            # select not implemented in windows for pipes
-            # assume it's always ready
-            outputready = [self.stdin_fileno]
-        else:
-            _, outputready, _ = select.select([], self.write_list, [], timeout_sec)
-        for fileno in outputready:
-            if fileno == self.stdin_fileno:
-                # ready to write
-                self.gdb_process.stdin.write(mi_cmd_to_write_nl.encode())
-                # don't forget to flush for Python3, otherwise gdb won't realize there is data
-                # to evaluate, and we won't get a response
-                self.gdb_process.stdin.flush()
-            else:
-                self.logger.error("got unexpected fileno %d" % fileno)
-
-        if read_response is True:
-            return self.get_gdb_response(
-                timeout_sec=timeout_sec, raise_error_on_timeout=raise_error_on_timeout
-            )
-
-        else:
-            return []
-
-    def get_gdb_response(
-        self, timeout_sec=DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True
-    ):
-        """Get response from GDB, and block while doing so. If GDB does not have any response ready to be read
-        by timeout_sec, an exception is raised.
-
-        Args:
-            timeout_sec (float): Maximum time to wait for reponse. Must be >= 0. Will return after
-            raise_error_on_timeout (bool): Whether an exception should be raised if no response was found
-            after timeout_sec
-
-        Returns:
-            List of parsed GDB responses, returned from gdbmiparser.parse_response, with the
-            additional key 'stream' which is either 'stdout' or 'stderr'
-
-        Raises:
-            GdbTimeoutError if response is not received within timeout_sec
-            ValueError if select returned unexpected file number
-            NoGdbProcessError if there is no gdb subprocess running
-        """
-
-        self.verify_valid_gdb_subprocess()
-        if timeout_sec < 0:
-            self.logger.warning("timeout_sec was negative, replacing with 0")
-            timeout_sec = 0
-
-        if USING_WINDOWS:
-            retval = self._get_responses_windows(timeout_sec)
-        else:
-            retval = self._get_responses_unix(timeout_sec)
-
-        if not retval and raise_error_on_timeout:
-            raise GdbTimeoutError(
-                "Did not get response from gdb after %s seconds" % timeout_sec
-            )
-
-        else:
-            return retval
-
-    def _get_responses_windows(self, timeout_sec):
-        """Get responses on windows. Assume no support for select and use a while loop."""
-        timeout_time_sec = time.time() + timeout_sec
-        responses = []
-        while True:
-            try:
-                self.gdb_process.stdout.flush()
-                if PYTHON3:
-                    raw_output = self.gdb_process.stdout.readline().replace(
-                        b"\r", b"\n"
-                    )
-                else:
-                    raw_output = self.gdb_process.stdout.read().replace(b"\r", b"\n")
-                responses += self._get_responses_list(raw_output, "stdout")
-            except IOError:
-                pass
-
-            try:
-                self.gdb_process.stderr.flush()
-                if PYTHON3:
-                    raw_output = self.gdb_process.stderr.readline().replace(
-                        b"\r", b"\n"
-                    )
-                else:
-                    raw_output = self.gdb_process.stderr.read().replace(b"\r", b"\n")
-                responses += self._get_responses_list(raw_output, "stderr")
-            except IOError:
-                pass
-
-            if time.time() > timeout_time_sec:
-                break
-
-        return responses
-
-    def _get_responses_unix(self, timeout_sec):
-        """Get responses on unix-like system. Use select to wait for output."""
-        timeout_time_sec = time.time() + timeout_sec
-        responses = []
-        while True:
-            select_timeout = timeout_time_sec - time.time()
-            # I prefer to not pass a negative value to select
-            if select_timeout <= 0:
-                select_timeout = 0
-            events, _, _ = select.select(self.read_list, [], [], select_timeout)
-            responses_list = None  # to avoid infinite loop if using Python 2
-            try:
-                for fileno in events:
-                    # new data is ready to read
-                    if fileno == self.stdout_fileno:
-                        self.gdb_process.stdout.flush()
-                        raw_output = self.gdb_process.stdout.read()
-                        stream = "stdout"
-
-                    elif fileno == self.stderr_fileno:
-                        self.gdb_process.stderr.flush()
-                        raw_output = self.gdb_process.stderr.read()
-                        stream = "stderr"
-
-                    else:
-                        raise ValueError(
-                            "Developer error. Got unexpected file number %d" % fileno
-                        )
-
-                    responses_list = self._get_responses_list(raw_output, stream)
-                    responses += responses_list
-
-            except IOError:  # only occurs in python 2.7
-                pass
-
-            if timeout_sec == 0:  # just exit immediately
-                break
-
-            elif responses_list and self._allow_overwrite_timeout_times:
-                # update timeout time to potentially be closer to now to avoid lengthy wait times when nothing is being output by gdb
-                timeout_time_sec = min(
-                    time.time() + self.time_to_check_for_additional_output_sec,
-                    timeout_time_sec,
-                )
-
-            elif time.time() > timeout_time_sec:
-                break
-
-        return responses
-
-    def _get_responses_list(self, raw_output, stream):
-        """Get parsed response list from string output
-        Args:
-            raw_output (unicode): gdb output to parse
-            stream (str): either stdout or stderr
-        """
-        responses = []
-
-        raw_output, self._incomplete_output[stream] = _buffer_incomplete_responses(
-            raw_output, self._incomplete_output.get(stream)
-        )
-
-        if not raw_output:
-            return responses
-
-        response_list = list(
-            filter(lambda x: x, raw_output.decode(errors="replace").split("\n"))
-        )  # remove blank lines
-
-        # parse each response from gdb into a dict, and store in a list
-        for response in response_list:
-            if gdbmiparser.response_is_finished(response):
-                pass
-            else:
-                parsed_response = gdbmiparser.parse_response(response)
-                parsed_response["stream"] = stream
-
-                self.logger.debug("%s", pformat(parsed_response))
-
-                responses.append(parsed_response)
-
-        return responses
-
-    def send_signal_to_gdb(self, signal_input):
-        """Send signal name (case insensitive) or number to gdb subprocess
-        gdbmi.send_signal_to_gdb(2)  # valid
-        gdbmi.send_signal_to_gdb('sigint')  # also valid
-        gdbmi.send_signal_to_gdb('SIGINT')  # also valid
-
-        raises ValueError if signal_input is invalie
-        raises NoGdbProcessError if there is no gdb process to send a signal to
-        """
-        try:
-            signal = int(signal_input)
-        except Exception:
-            signal = SIGNAL_NAME_TO_NUM.get(signal_input.upper())
-
-        if not signal:
-            raise ValueError(
-                'Could not find signal corresponding to "%s"' % str(signal)
-            )
-
-        if self.gdb_process:
-            os.kill(self.gdb_process.pid, signal)
-        else:
-            raise NoGdbProcessError(
-                "Cannot send signal to gdb process because no process exists."
-            )
-
-    def interrupt_gdb(self):
-        """Send SIGINT (interrupt signal) to the gdb subprocess"""
-        self.send_signal_to_gdb("SIGINT")
-
-    def exit(self):
-        """Terminate gdb process
-        Returns: None"""
-        if self.gdb_process:
-            self.gdb_process.terminate()
-            self.gdb_process.communicate()
-        self.gdb_process = None
-        return None
-
-

Methods

-
-
-def exit(self) -
-
-

Terminate gdb process -Returns: None

-
-Source code -
def exit(self):
-    """Terminate gdb process
-    Returns: None"""
-    if self.gdb_process:
-        self.gdb_process.terminate()
-        self.gdb_process.communicate()
-    self.gdb_process = None
-    return None
-
-
-
-def get_gdb_response(self, timeout_sec=1, raise_error_on_timeout=True) -
-
-

Get response from GDB, and block while doing so. If GDB does not have any response ready to be read -by timeout_sec, an exception is raised.

-

Args

-
-
timeout_sec : float
-
Maximum time to wait for reponse. Must be >= 0. Will return after
-
raise_error_on_timeout : bool
-
Whether an exception should be raised if no response was found
-
-

after timeout_sec

-

Returns

-
-
List of parsed GDB responses, returned from gdbmiparser.parse_response, with the
-
 
-
additional key 'stream' which is either 'stdout' or 'stderr'
-
 
-
-

Raises

-
-
GdbTimeoutError if response is not received within timeout_sec
-
 
-
ValueError if select returned unexpected file number
-
 
-
NoGdbProcessError if there is no gdb subprocess running
-
 
-
-
-Source code -
def get_gdb_response(
-    self, timeout_sec=DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True
-):
-    """Get response from GDB, and block while doing so. If GDB does not have any response ready to be read
-    by timeout_sec, an exception is raised.
-
-    Args:
-        timeout_sec (float): Maximum time to wait for reponse. Must be >= 0. Will return after
-        raise_error_on_timeout (bool): Whether an exception should be raised if no response was found
-        after timeout_sec
-
-    Returns:
-        List of parsed GDB responses, returned from gdbmiparser.parse_response, with the
-        additional key 'stream' which is either 'stdout' or 'stderr'
-
-    Raises:
-        GdbTimeoutError if response is not received within timeout_sec
-        ValueError if select returned unexpected file number
-        NoGdbProcessError if there is no gdb subprocess running
-    """
-
-    self.verify_valid_gdb_subprocess()
-    if timeout_sec < 0:
-        self.logger.warning("timeout_sec was negative, replacing with 0")
-        timeout_sec = 0
-
-    if USING_WINDOWS:
-        retval = self._get_responses_windows(timeout_sec)
-    else:
-        retval = self._get_responses_unix(timeout_sec)
-
-    if not retval and raise_error_on_timeout:
-        raise GdbTimeoutError(
-            "Did not get response from gdb after %s seconds" % timeout_sec
-        )
-
-    else:
-        return retval
-
-
-
-def get_subprocess_cmd(self) -
-
-

Returns the shell-escaped string used to invoke the gdb subprocess. -This is a string that can be executed directly in a shell.

-
-Source code -
def get_subprocess_cmd(self):
-    """Returns the shell-escaped string used to invoke the gdb subprocess.
-    This is a string that can be executed directly in a shell.
-    """
-    return " ".join(quote(c) for c in self.cmd)
-
-
-
-def interrupt_gdb(self) -
-
-

Send SIGINT (interrupt signal) to the gdb subprocess

-
-Source code -
def interrupt_gdb(self):
-    """Send SIGINT (interrupt signal) to the gdb subprocess"""
-    self.send_signal_to_gdb("SIGINT")
-
-
-
-def send_signal_to_gdb(self, signal_input) -
-
-

Send signal name (case insensitive) or number to gdb subprocess -gdbmi.send_signal_to_gdb(2) -# valid -gdbmi.send_signal_to_gdb('sigint') -# also valid -gdbmi.send_signal_to_gdb('SIGINT') -# also valid

-

raises ValueError if signal_input is invalie -raises NoGdbProcessError if there is no gdb process to send a signal to

-
-Source code -
def send_signal_to_gdb(self, signal_input):
-    """Send signal name (case insensitive) or number to gdb subprocess
-    gdbmi.send_signal_to_gdb(2)  # valid
-    gdbmi.send_signal_to_gdb('sigint')  # also valid
-    gdbmi.send_signal_to_gdb('SIGINT')  # also valid
-
-    raises ValueError if signal_input is invalie
-    raises NoGdbProcessError if there is no gdb process to send a signal to
-    """
-    try:
-        signal = int(signal_input)
-    except Exception:
-        signal = SIGNAL_NAME_TO_NUM.get(signal_input.upper())
-
-    if not signal:
-        raise ValueError(
-            'Could not find signal corresponding to "%s"' % str(signal)
-        )
-
-    if self.gdb_process:
-        os.kill(self.gdb_process.pid, signal)
-    else:
-        raise NoGdbProcessError(
-            "Cannot send signal to gdb process because no process exists."
-        )
-
-
-
-def spawn_new_gdb_subprocess(self) -
-
-

Spawn a new gdb subprocess with the arguments supplied to the object -during initialization. If gdb subprocess already exists, terminate it before -spanwing a new one. -Return int: gdb process id

-
-Source code -
def spawn_new_gdb_subprocess(self):
-    """Spawn a new gdb subprocess with the arguments supplied to the object
-    during initialization. If gdb subprocess already exists, terminate it before
-    spanwing a new one.
-    Return int: gdb process id
-    """
-    if self.gdb_process:
-        self.logger.debug(
-            "Killing current gdb subprocess (pid %d)" % self.gdb_process.pid
-        )
-        self.exit()
-
-    self.logger.debug('Launching gdb: "%s"' % " ".join(self.cmd))
-
-    # Use pipes to the standard streams
-    self.gdb_process = subprocess.Popen(
-        self.cmd,
-        shell=False,
-        stdout=subprocess.PIPE,
-        stdin=subprocess.PIPE,
-        stderr=subprocess.PIPE,
-        bufsize=0,
-    )
-
-    _make_non_blocking(self.gdb_process.stdout)
-    _make_non_blocking(self.gdb_process.stderr)
-
-    # save file numbers for use later
-    self.stdout_fileno = self.gdb_process.stdout.fileno()
-    self.stderr_fileno = self.gdb_process.stderr.fileno()
-    self.stdin_fileno = self.gdb_process.stdin.fileno()
-
-    self.read_list = [self.stdout_fileno, self.stderr_fileno]
-    self.write_list = [self.stdin_fileno]
-
-    # string buffers for unifinished gdb output
-    self._incomplete_output = {"stdout": None, "stderr": None}
-    return self.gdb_process.pid
-
-
-
-def verify_valid_gdb_subprocess(self) -
-
-

Verify there is a process object, and that it is still running. -Raise NoGdbProcessError if either of the above are not true.

-
-Source code -
def verify_valid_gdb_subprocess(self):
-    """Verify there is a process object, and that it is still running.
-    Raise NoGdbProcessError if either of the above are not true."""
-    if not self.gdb_process:
-        raise NoGdbProcessError("gdb process is not attached")
-
-    elif self.gdb_process.poll() is not None:
-        raise NoGdbProcessError(
-            "gdb process has already finished with return code: %s"
-            % str(self.gdb_process.poll())
-        )
-
-
-
-def write(self, mi_cmd_to_write, timeout_sec=1, raise_error_on_timeout=True, read_response=True) -
-
-

Write to gdb process. Block while parsing responses from gdb for a maximum of timeout_sec.

-

Args

-
-
mi_cmd_to_write : str or list
-
String to write to gdb. If list, it is joined by newlines.
-
timeout_sec : float
-
Maximum number of seconds to wait for response before exiting. Must be >= 0.
-
raise_error_on_timeout : bool
-
If read_response is True, raise error if no response is received
-
read_response : bool
-
Block and read response. If there is a separate thread running,
-
-

this can be false, and the reading thread read the output.

-

Returns

-
-
List of parsed gdb responses if read_response is True, otherwise []
-
 
-
-

Raises

-
-
NoGdbProcessError if there is no gdb subprocess running
-
 
-
TypeError if mi_cmd_to_write is not valid
-
 
-
-
-Source code -
def write(
-    self,
-    mi_cmd_to_write,
-    timeout_sec=DEFAULT_GDB_TIMEOUT_SEC,
-    raise_error_on_timeout=True,
-    read_response=True,
-):
-    """Write to gdb process. Block while parsing responses from gdb for a maximum of timeout_sec.
-
-    Args:
-        mi_cmd_to_write (str or list): String to write to gdb. If list, it is joined by newlines.
-        timeout_sec (float): Maximum number of seconds to wait for response before exiting. Must be >= 0.
-        raise_error_on_timeout (bool): If read_response is True, raise error if no response is received
-        read_response (bool): Block and read response. If there is a separate thread running,
-        this can be false, and the reading thread read the output.
-    Returns:
-        List of parsed gdb responses if read_response is True, otherwise []
-    Raises:
-        NoGdbProcessError if there is no gdb subprocess running
-        TypeError if mi_cmd_to_write is not valid
-    """
-    self.verify_valid_gdb_subprocess()
-    if timeout_sec < 0:
-        self.logger.warning("timeout_sec was negative, replacing with 0")
-        timeout_sec = 0
-
-    # Ensure proper type of the mi command
-    if type(mi_cmd_to_write) in [str, unicode]:
-        pass
-    elif type(mi_cmd_to_write) == list:
-        mi_cmd_to_write = "\n".join(mi_cmd_to_write)
-    else:
-        raise TypeError(
-            "The gdb mi command must a be str or list. Got "
-            + str(type(mi_cmd_to_write))
-        )
-
-    self.logger.debug("writing: %s", mi_cmd_to_write)
-
-    if not mi_cmd_to_write.endswith("\n"):
-        mi_cmd_to_write_nl = mi_cmd_to_write + "\n"
-    else:
-        mi_cmd_to_write_nl = mi_cmd_to_write
-
-    if USING_WINDOWS:
-        # select not implemented in windows for pipes
-        # assume it's always ready
-        outputready = [self.stdin_fileno]
-    else:
-        _, outputready, _ = select.select([], self.write_list, [], timeout_sec)
-    for fileno in outputready:
-        if fileno == self.stdin_fileno:
-            # ready to write
-            self.gdb_process.stdin.write(mi_cmd_to_write_nl.encode())
-            # don't forget to flush for Python3, otherwise gdb won't realize there is data
-            # to evaluate, and we won't get a response
-            self.gdb_process.stdin.flush()
-        else:
-            self.logger.error("got unexpected fileno %d" % fileno)
-
-    if read_response is True:
-        return self.get_gdb_response(
-            timeout_sec=timeout_sec, raise_error_on_timeout=raise_error_on_timeout
-        )
-
-    else:
-        return []
-
-
-
-
-
-class GdbTimeoutError -(*args, **kwargs) -
-
-

Raised when no response is recieved from gdb after the timeout has been triggered

-
-Source code -
class GdbTimeoutError(ValueError):
-    """Raised when no response is recieved from gdb after the timeout has been triggered"""
-
-    pass
-
-

Ancestors

-
    -
  • builtins.ValueError
  • -
  • builtins.Exception
  • -
  • builtins.BaseException
  • -
-
-
-class NoGdbProcessError -(*args, **kwargs) -
-
-

Raise when trying to interact with gdb subprocess, but it does not exist. -It may have been killed and removed, or failed to initialize for some reason.

-
-Source code -
class NoGdbProcessError(ValueError):
-    """Raise when trying to interact with gdb subprocess, but it does not exist.
-    It may have been killed and removed, or failed to initialize for some reason."""
-
-    pass
-
-

Ancestors

-
    -
  • builtins.ValueError
  • -
  • builtins.Exception
  • -
  • builtins.BaseException
  • -
-
-
-
-
- -
- - - - - \ No newline at end of file diff --git a/docs/gdbmiparser.html b/docs/gdbmiparser.html deleted file mode 100644 index fa7492c..0000000 --- a/docs/gdbmiparser.html +++ /dev/null @@ -1,570 +0,0 @@ - - - - - - -pygdbmi.gdbmiparser API documentation - - - - - - - - - -
-
-
-

Module pygdbmi.gdbmiparser

-
-
-

Python parser for gdb's machine interface interpreter.

-

Parses string output from gdb with the "–interpreter=mi2" flag into -structured objects.

-

See more at https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI.html#GDB_002fMI

-
-Source code -
"""
-Python parser for gdb's machine interface interpreter.
-
-Parses string output from gdb with the "--interpreter=mi2" flag into
-structured objects.
-
-See more at https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI.html#GDB_002fMI
-
-"""
-
-import logging
-from pygdbmi.printcolor import fmt_green
-from pygdbmi.StringStream import StringStream
-from pprint import pprint
-import re
-
-_DEBUG = False
-logger = logging.getLogger(__name__)
-
-
-def _setup_logger(logger, debug):
-    logger.propagate = False
-
-    handler = logging.StreamHandler()
-    handler.setFormatter(
-        logging.Formatter("[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s")
-    )
-    if debug:
-        level = logging.DEBUG
-    else:
-        level = logging.ERROR
-
-    logger.setLevel(level)
-    logger.addHandler(handler)
-
-
-_setup_logger(logger, _DEBUG)
-
-
-def parse_response(gdb_mi_text):
-    """Parse gdb mi text and turn it into a dictionary.
-
-    See https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Stream-Records.html#GDB_002fMI-Stream-Records
-    for details on types of gdb mi output.
-
-    Args:
-        gdb_mi_text (str): String output from gdb
-
-    Returns:
-        dict with the following keys:
-        type (either 'notify', 'result', 'console', 'log', 'target', 'done'),
-        message (str or None),
-        payload (str, list, dict, or None)
-    """
-    stream = StringStream(gdb_mi_text, debug=_DEBUG)
-
-    if _GDB_MI_NOTIFY_RE.match(gdb_mi_text):
-        token, message, payload = _get_notify_msg_and_payload(gdb_mi_text, stream)
-        return {
-            "type": "notify",
-            "message": message,
-            "payload": payload,
-            "token": token,
-        }
-
-    elif _GDB_MI_RESULT_RE.match(gdb_mi_text):
-        token, message, payload = _get_result_msg_and_payload(gdb_mi_text, stream)
-        return {
-            "type": "result",
-            "message": message,
-            "payload": payload,
-            "token": token,
-        }
-
-    elif _GDB_MI_CONSOLE_RE.match(gdb_mi_text):
-        return {
-            "type": "console",
-            "message": None,
-            "payload": _GDB_MI_CONSOLE_RE.match(gdb_mi_text).groups()[0],
-        }
-
-    elif _GDB_MI_LOG_RE.match(gdb_mi_text):
-        return {
-            "type": "log",
-            "message": None,
-            "payload": _GDB_MI_LOG_RE.match(gdb_mi_text).groups()[0],
-        }
-
-    elif _GDB_MI_TARGET_OUTPUT_RE.match(gdb_mi_text):
-        return {
-            "type": "target",
-            "message": None,
-            "payload": _GDB_MI_TARGET_OUTPUT_RE.match(gdb_mi_text).groups()[0],
-        }
-
-    elif response_is_finished(gdb_mi_text):
-        return {"type": "done", "message": None, "payload": None}
-
-    else:
-        # This was not gdb mi output, so it must have just been printed by
-        # the inferior program that's being debugged
-        return {"type": "output", "message": None, "payload": gdb_mi_text}
-
-
-def response_is_finished(gdb_mi_text):
-    """Return true if the gdb mi response is ending
-    Returns: True if gdb response is finished"""
-    if _GDB_MI_RESPONSE_FINISHED_RE.match(gdb_mi_text):
-        return True
-
-    else:
-        return False
-
-
-def assert_match(actual_char_or_str, expected_char_or_str):
-    """If values don't match, print them and raise a ValueError, otherwise,
-    continue
-    Raises: ValueError if arguments do not match"""
-    if expected_char_or_str != actual_char_or_str:
-        print("Expected")
-        pprint(expected_char_or_str)
-        print("")
-        print("Got")
-        pprint(actual_char_or_str)
-        raise ValueError()
-
-
-# ========================================================================
-# All functions and variables below are used internally to parse mi output
-# ========================================================================
-
-
-# GDB machine interface output patterns to match
-# https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Stream-Records.html#GDB_002fMI-Stream-Records
-
-# https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Result-Records.html#GDB_002fMI-Result-Records
-# In addition to a number of out-of-band notifications,
-# the response to a gdb/mi command includes one of the following result indications:
-# done, running, connected, error, exit
-_GDB_MI_RESULT_RE = re.compile(r"^(\d*)\^(\S+?)(,(.*))?$")
-
-# https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Async-Records.html#GDB_002fMI-Async-Records
-# Async records are used to notify the gdb/mi client of additional
-# changes that have occurred. Those changes can either be a consequence
-# of gdb/mi commands (e.g., a breakpoint modified) or a result of target activity
-# (e.g., target stopped).
-_GDB_MI_NOTIFY_RE = re.compile(r"^(\d*)[*=](\S+?),(.*)$")
-
-# https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Stream-Records.html#GDB_002fMI-Stream-Records
-# "~" string-output
-# The console output stream contains text that should be displayed
-# in the CLI console window. It contains the textual responses to CLI commands.
-_GDB_MI_CONSOLE_RE = re.compile(r'~"(.*)"', re.DOTALL)
-
-# https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Stream-Records.html#GDB_002fMI-Stream-Records
-# "&" string-output
-# The log stream contains debugging messages being produced by gdb's internals.
-_GDB_MI_LOG_RE = re.compile(r'&"(.*)"', re.DOTALL)
-
-# https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Stream-Records.html#GDB_002fMI-Stream-Records
-# "@" string-output
-# The target output stream contains any textual output from the
-# running target. This is only present when GDB's event loop is truly asynchronous,
-# which is currently only the case for remote targets.
-_GDB_MI_TARGET_OUTPUT_RE = re.compile(r'@"(.*)"', re.DOTALL)
-
-# Response finished
-_GDB_MI_RESPONSE_FINISHED_RE = re.compile(r"^\(gdb\)\s*$")
-
-_WHITESPACE = [" ", "\t", "\r", "\n"]
-
-_GDB_MI_CHAR_DICT_START = "{"
-_GDB_MI_CHAR_ARRAY_START = "["
-_GDB_MI_CHAR_STRING_START = '"'
-_GDB_MI_VALUE_START_CHARS = [
-    _GDB_MI_CHAR_DICT_START,
-    _GDB_MI_CHAR_ARRAY_START,
-    _GDB_MI_CHAR_STRING_START,
-]
-
-
-def _get_notify_msg_and_payload(result, stream):
-    """Get notify message and payload dict"""
-    token = stream.advance_past_chars(["=", "*"])
-    token = int(token) if token != "" else None
-    logger.debug("%s", fmt_green("parsing message"))
-    message = stream.advance_past_chars([","])
-
-    logger.debug("parsed message")
-    logger.debug("%s", fmt_green(message))
-
-    payload = _parse_dict(stream)
-    return token, message.strip(), payload
-
-
-def _get_result_msg_and_payload(result, stream):
-    """Get result message and payload dict"""
-
-    groups = _GDB_MI_RESULT_RE.match(result).groups()
-    token = int(groups[0]) if groups[0] != "" else None
-    message = groups[1]
-
-    if groups[2] is None:
-        payload = None
-    else:
-        stream.advance_past_chars([","])
-        payload = _parse_dict(stream)
-
-    return token, message, payload
-
-
-def _parse_dict(stream):
-    """Parse dictionary, with optional starting character '{'
-    return (tuple):
-        Number of characters parsed from to_parse
-        Parsed dictionary
-    """
-    obj = {}
-
-    logger.debug("%s", fmt_green("parsing dict"))
-
-    while True:
-        c = stream.read(1)
-        if c in _WHITESPACE:
-            pass
-        elif c in ["{", ","]:
-            pass
-        elif c in ["}", ""]:
-            # end of object, exit loop
-            break
-
-        else:
-            stream.seek(-1)
-            key, val = _parse_key_val(stream)
-            if key in obj:
-                # This is a gdb bug. We should never get repeated keys in a dict!
-                # See https://sourceware.org/bugzilla/show_bug.cgi?id=22217
-                # and https://github.com/cs01/pygdbmi/issues/19
-                # Example:
-                #   thread-ids={thread-id="1",thread-id="2"}
-                # Results in:
-                #   thread-ids: {{'thread-id': ['1', '2']}}
-                # Rather than the lossy
-                #   thread-ids: {'thread-id': 2}  # '1' got overwritten!
-                if isinstance(obj[key], list):
-                    obj[key].append(val)
-                else:
-                    obj[key] = [obj[key], val]
-            else:
-                obj[key] = val
-
-            look_ahead_for_garbage = True
-            c = stream.read(1)
-            while look_ahead_for_garbage:
-                if c in ["}", ",", ""]:
-                    look_ahead_for_garbage = False
-                else:
-                    # got some garbage text, skip it. for example:
-                    # name="gdb"gargage  # skip over 'garbage'
-                    # name="gdb"\n  # skip over '\n'
-                    logger.debug("skipping unexpected charcter: " + c)
-                    c = stream.read(1)
-            stream.seek(-1)
-
-    logger.debug("parsed dict")
-    logger.debug("%s", fmt_green(obj))
-    return obj
-
-
-def _parse_key_val(stream):
-    """Parse key, value combination
-    return (tuple):
-        Parsed key (string)
-        Parsed value (either a string, array, or dict)
-    """
-
-    logger.debug("parsing key/val")
-    key = _parse_key(stream)
-    val = _parse_val(stream)
-
-    logger.debug("parsed key/val")
-    logger.debug("%s", fmt_green(key))
-    logger.debug("%s", fmt_green(val))
-
-    return key, val
-
-
-def _parse_key(stream):
-    """Parse key, value combination
-    returns :
-        Parsed key (string)
-    """
-    logger.debug("parsing key")
-
-    key = stream.advance_past_chars(["="])
-
-    logger.debug("parsed key:")
-    logger.debug("%s", fmt_green(key))
-    return key
-
-
-def _parse_val(stream):
-    """Parse value from string
-    returns:
-        Parsed value (either a string, array, or dict)
-    """
-
-    logger.debug("parsing value")
-
-    while True:
-        c = stream.read(1)
-
-        if c == "{":
-            # Start object
-            val = _parse_dict(stream)
-            break
-
-        elif c == "[":
-            # Start of an array
-            val = _parse_array(stream)
-            break
-
-        elif c == '"':
-            # Start of a string
-            val = stream.advance_past_string_with_gdb_escapes()
-            break
-
-        elif _DEBUG:
-            raise ValueError("unexpected character: %s" % c)
-
-        else:
-            print(
-                'pygdbmi warning: encountered unexpected character: "%s". Continuing.'
-                % c
-            )
-            val = ""  # this will be overwritten if there are more characters to be read
-
-    logger.debug("parsed value:")
-    logger.debug("%s", fmt_green(val))
-
-    return val
-
-
-def _parse_array(stream):
-    """Parse an array, stream should be passed the initial [
-    returns:
-        Parsed array
-    """
-
-    logger.debug("parsing array")
-    arr = []
-    while True:
-        c = stream.read(1)
-
-        if c in _GDB_MI_VALUE_START_CHARS:
-            stream.seek(-1)
-            val = _parse_val(stream)
-            arr.append(val)
-        elif c in _WHITESPACE:
-            pass
-        elif c == ",":
-            pass
-        elif c == "]":
-            # Stop when this array has finished. Note
-            # that elements of this array can be also be arrays.
-            break
-
-    logger.debug("parsed array:")
-    logger.debug("%s", fmt_green(arr))
-    return arr
-
-
-
-
-
-
-
-

Functions

-
-
-def assert_match(actual_char_or_str, expected_char_or_str) -
-
-

If values don't match, print them and raise a ValueError, otherwise, -continue -Raises: ValueError if arguments do not match

-
-Source code -
def assert_match(actual_char_or_str, expected_char_or_str):
-    """If values don't match, print them and raise a ValueError, otherwise,
-    continue
-    Raises: ValueError if arguments do not match"""
-    if expected_char_or_str != actual_char_or_str:
-        print("Expected")
-        pprint(expected_char_or_str)
-        print("")
-        print("Got")
-        pprint(actual_char_or_str)
-        raise ValueError()
-
-
-
-def parse_response(gdb_mi_text) -
-
-

Parse gdb mi text and turn it into a dictionary.

-

See https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Stream-Records.html#GDB_002fMI-Stream-Records -for details on types of gdb mi output.

-

Args

-
-
gdb_mi_text : str
-
String output from gdb
-
-

Returns

-
-
dict with the following keys:
-
 
-
type (either 'notify', 'result', 'console', 'log', 'target', 'done'),
-
 
-
message (str or None),
-
 
-
payload (str, list, dict, or None)
-
 
-
-
-Source code -
def parse_response(gdb_mi_text):
-    """Parse gdb mi text and turn it into a dictionary.
-
-    See https://sourceware.org/gdb/onlinedocs/gdb/GDB_002fMI-Stream-Records.html#GDB_002fMI-Stream-Records
-    for details on types of gdb mi output.
-
-    Args:
-        gdb_mi_text (str): String output from gdb
-
-    Returns:
-        dict with the following keys:
-        type (either 'notify', 'result', 'console', 'log', 'target', 'done'),
-        message (str or None),
-        payload (str, list, dict, or None)
-    """
-    stream = StringStream(gdb_mi_text, debug=_DEBUG)
-
-    if _GDB_MI_NOTIFY_RE.match(gdb_mi_text):
-        token, message, payload = _get_notify_msg_and_payload(gdb_mi_text, stream)
-        return {
-            "type": "notify",
-            "message": message,
-            "payload": payload,
-            "token": token,
-        }
-
-    elif _GDB_MI_RESULT_RE.match(gdb_mi_text):
-        token, message, payload = _get_result_msg_and_payload(gdb_mi_text, stream)
-        return {
-            "type": "result",
-            "message": message,
-            "payload": payload,
-            "token": token,
-        }
-
-    elif _GDB_MI_CONSOLE_RE.match(gdb_mi_text):
-        return {
-            "type": "console",
-            "message": None,
-            "payload": _GDB_MI_CONSOLE_RE.match(gdb_mi_text).groups()[0],
-        }
-
-    elif _GDB_MI_LOG_RE.match(gdb_mi_text):
-        return {
-            "type": "log",
-            "message": None,
-            "payload": _GDB_MI_LOG_RE.match(gdb_mi_text).groups()[0],
-        }
-
-    elif _GDB_MI_TARGET_OUTPUT_RE.match(gdb_mi_text):
-        return {
-            "type": "target",
-            "message": None,
-            "payload": _GDB_MI_TARGET_OUTPUT_RE.match(gdb_mi_text).groups()[0],
-        }
-
-    elif response_is_finished(gdb_mi_text):
-        return {"type": "done", "message": None, "payload": None}
-
-    else:
-        # This was not gdb mi output, so it must have just been printed by
-        # the inferior program that's being debugged
-        return {"type": "output", "message": None, "payload": gdb_mi_text}
-
-
-
-def response_is_finished(gdb_mi_text) -
-
-

Return true if the gdb mi response is ending -Returns: True if gdb response is finished

-
-Source code -
def response_is_finished(gdb_mi_text):
-    """Return true if the gdb mi response is ending
-    Returns: True if gdb response is finished"""
-    if _GDB_MI_RESPONSE_FINISHED_RE.match(gdb_mi_text):
-        return True
-
-    else:
-        return False
-
-
-
-
-
-
-
- -
- - - - - \ No newline at end of file diff --git a/docs/index.html b/docs/index.html deleted file mode 100644 index 5708572..0000000 --- a/docs/index.html +++ /dev/null @@ -1,234 +0,0 @@ - - - - - - -pygdbmi API documentation - - - - - - - - - -
-
-
-

Module pygdbmi

-
-
-

-pygdbmi - Get Structured Output from GDB's Machine Interface -

-

- - - - - - - -Code style: black -

-

Documentation https://cs01.github.io/pygdbmi

-

Source Code https://github.com/cs01/pygdbmi

-
-

Python (py) gdb machine interface (mi)

-
-

GDB/MI is a line based machine oriented text interface to GDB and is activated by specifying using the –interpreter command line option (see Mode Options). It is specifically intended to support the development of systems which use the debugger as just one small component of a larger system.

-
-

What's in the box?

-
    -
  1. A function to parse gdb machine interface string output and return structured data types (Python dicts) that are JSON serializable. Useful for writing the backend to a gdb frontend. For example, gdbgui uses pygdbmi on the backend.
  2. -
  3. A Python class to control and interact with gdb as a subprocess
  4. -
-

To get machine interface output from gdb, run gdb with the --interpreter=mi2 flag like so:

-
gdb --interpreter=mi2
-
-

Installation

-
pip install pygdbmi
-
-

Compatibility

-

Operating Systems

-

Cross platform support for Linux, macOS and Windows

-
    -
  • Linux/Unix
  • -
-

Ubuntu 14.04 and 16.04 have been tested to work. Other versions likely work as well.

-
    -
  • macOS
  • -
-

Note: the error please check gdb is codesigned - see taskgated(8) can be fixed by codesigning gdb with these instructions. If the error is not fixed, please create an issue in github.

-
    -
  • Windows
  • -
-

Windows 10 has been tested to work with MinGW and cygwin.

-

gdb versions

-
    -
  • gdb 7.6+ has been tested. Older versions may work as well.
  • -
-

Examples

-

gdb mi defines a syntax for its output that is suitable for machine readability and scripting: example output:

-
-> -break-insert main
-<- ^done,bkpt={number="1",type="breakpoint",disp="keep",
-enabled="y",addr="0x08048564",func="main",file="myprog.c",
-fullname="/home/myprog.c",line="68",thread-groups=["i1"],
-times="0"}
-<- (gdb)
-
-

Use pygdbmi.gdbmiparser.parse_response to turn that string output into a JSON serializable dictionary

-
from pygdbmi import gdbmiparser
-from pprint import pprint
-response = gdbmiparser.parse_response('^done,bkpt={number="1",type="breakpoint",disp="keep", enabled="y",addr="0x08048564",func="main",file="myprog.c",fullname="/home/myprog.c",line="68",thread-groups=["i1"],times="0"')
-pprint(response)
-> {'message': 'done',
-'payload': {'bkpt': {'addr': '0x08048564',
-                      'disp': 'keep',
-                      'enabled': 'y',
-                      'file': 'myprog.c',
-                      'fullname': '/home/myprog.c',
-                      'func': 'main',
-                      'line': '68',
-                      'number': '1',
-                      'thread-groups': ['i1'],
-                      'times': '0',
-                      'type': 'breakpoint'}},
- 'type': 'result'}
-
-

Programmatic Control Over gdb

-

But how do you get the gdb output into Python in the first place? If you want, pygdbmi also has a class to control gdb as subprocess. You can write commands, and get structured output back:

-
from pygdbmi.gdbcontroller import GdbController
-from pprint import pprint
-
-# Start gdb process
-gdbmi = GdbController()
-print(gdbmi.get_subprocess_cmd())  # print actual command run as subprocess
-
-# Load binary a.out and get structured response
-response = gdbmi.write('-file-exec-file a.out')
-pprint(response)
-[{'message': u'thread-group-added',
-  'payload': {u'id': u'i1'},
-  'type': 'notify'},
- {'message': u'done', 'payload': None, 'type': 'result'}]
-
-

Now do whatever you want with gdb. All gdb commands, as well as gdb machine interface commands are acceptable. gdb mi commands give better structured output that is machine readable, rather than gdb console output. mi commands begin with a -.

-
response = gdbmi.write('-break-insert main')  # machine interface (MI) commands start with a '-'
-response = gdbmi.write('break main')  # normal gdb commands work too, but the return value is slightly different
-response = gdbmi.write('-exec-run')
-response = gdbmi.write('run')
-response = gdbmi.write('-exec-next', timeout_sec=0.1)  # the wait time can be modified from the default of 1 second
-response = gdbmi.write('next')
-response = gdbmi.write('next', raise_error_on_timeout=False)
-response = gdbmi.write('next', raise_error_on_timeout=True, timeout_sec=0.01)
-response = gdbmi.write('-exec-continue')
-response = gdbmi.send_signal_to_gdb('SIGKILL')  # name of signal is okay
-response = gdbmi.send_signal_to_gdb(2)  # value of signal is okay too
-response = gdbmi.interrupt_gdb()  # sends SIGINT to gdb
-response = gdbmi.write('continue')
-response = gdbmi.exit()
-
-

Parsed Output Format

-

Each parsed gdb response consists of a list of dictionaries. Each dictionary has keys message, payload, token, and type.

-
    -
  • message contains a textual message from gdb, which is not always present. When missing, this is None.
  • -
  • payload contains the content of gdb's output, which can contain any of the following: dictionary, list, string. This too is not always present, and can be None depending on the response.
  • -
  • token If an input command was prefixed with a (optional) token then the corresponding output for that command will also be prefixed by that same token. This field is only present for pygdbmi output types nofity and result. When missing, this is None.
  • -
-

The type is defined based on gdb's various mi output record types, and can be

-
    -
  • result - the result of a gdb command, such as done, running, error, etc.
  • -
  • notify - additional async changes that have occurred, such as breakpoint modified
  • -
  • console - textual responses to cli commands
  • -
  • log - debugging messages from gdb's internals
  • -
  • output - output from target
  • -
  • target - output from remote target
  • -
  • done - when gdb has finished its output
  • -
-

Contributing

-

Documentation fixes, bug fixes, performance improvements, and functional improvements are welcome. You may want to create an issue before beginning work to make sure I am interested in merging it to the master branch.

-

To develop, set up a new virtual environment, then clone this repo and run pip install -e .[dev].

-

Confirm unit tests are working with make test, then begin development.

-

Update unit tests as necessary at pygdbmi/tests/test_app.py.

-

Projects Using pygdbmi

-
    -
  • gdbgui implements a browser-based frontend to gdb, using pygdbmi on the backend
  • -
  • PINCE is a gdb frontend that aims to provide a reverse engineering tool and a reusable library focused on games. It uses pygdbmi to parse gdb/mi based output for some functions
  • -
  • avatar² is an orchestration framework for reversing and analysing firmware of embedded devices. It utilizes pygdbmi for internal communication to different analysis targets.
  • -
  • Know of another project? Create a PR and add it here.
  • -
-

Authors

-

pygdbmi was written by Chad Smith with contributions from the community for which the author is very grateful. Thanks especially to @mariusmue, @bobthekingofegypt, @mouuff, and @felipesere.

-
-Source code -
"""
-.. include:: ../README.md
-"""
-__title__ = "pygdbmi"
-__version__ = "0.9.0.2"
-__author__ = "Chad Smith"
-__copyright__ = "Copyright Chad Smith"
-__pdoc__ = {"StringStream": False, "printcolor": False}
-
-
-
-

Sub-modules

-
-
pygdbmi.gdbcontroller
-
-

GdbController class to programatically run gdb and get structured output

-
-
pygdbmi.gdbmiparser
-
-

Python parser for gdb's machine interface interpreter …

-
-
-
-
-
-
-
-
-
-
- -
- - - - - \ No newline at end of file diff --git a/docs/printcolor.html b/docs/printcolor.html deleted file mode 100644 index a1c92b1..0000000 --- a/docs/printcolor.html +++ /dev/null @@ -1,176 +0,0 @@ - - - - - - -pygdbmi.printcolor API documentation - - - - - - - - - -
-
-
-

pygdbmi.printcolor module

-
-
-
-Source code -
import os
-
-USING_WINDOWS = os.name == "nt"
-
-
-def print_red(x):
-    if USING_WINDOWS:
-        print(x)
-    else:
-        print("\033[91m {}\033[00m".format(x))
-
-
-def print_green(x):
-    if USING_WINDOWS:
-        print(x)
-    else:
-        print("\033[92m {}\033[00m".format(x))
-
-
-def print_cyan(x):
-    if USING_WINDOWS:
-        print(x)
-    else:
-        print("\033[96m {}\033[00m".format(x))
-
-
-def fmt_green(x):
-    if USING_WINDOWS:
-        return x
-    else:
-        return "\033[92m {}\033[00m".format(x)
-
-
-def fmt_cyan(x):
-    if USING_WINDOWS:
-        return x
-    else:
-        return "\033[96m {}\033[00m".format(x)
-
-
-
-
-
-
-
-

Functions

-
-
-def fmt_cyan(x) -
-
-
-
-Source code -
def fmt_cyan(x):
-    if USING_WINDOWS:
-        return x
-    else:
-        return "\033[96m {}\033[00m".format(x)
-
-
-
-def fmt_green(x) -
-
-
-
-Source code -
def fmt_green(x):
-    if USING_WINDOWS:
-        return x
-    else:
-        return "\033[92m {}\033[00m".format(x)
-
-
-
-def print_cyan(x) -
-
-
-
-Source code -
def print_cyan(x):
-    if USING_WINDOWS:
-        print(x)
-    else:
-        print("\033[96m {}\033[00m".format(x))
-
-
-
-def print_green(x) -
-
-
-
-Source code -
def print_green(x):
-    if USING_WINDOWS:
-        print(x)
-    else:
-        print("\033[92m {}\033[00m".format(x))
-
-
-
-def print_red(x) -
-
-
-
-Source code -
def print_red(x):
-    if USING_WINDOWS:
-        print(x)
-    else:
-        print("\033[91m {}\033[00m".format(x))
-
-
-
-
-
-
-
- -
- - - - - \ No newline at end of file diff --git a/makefile b/makefile deleted file mode 100644 index 2c12e99..0000000 --- a/makefile +++ /dev/null @@ -1,28 +0,0 @@ -# run pip install -r dev_requirements.txt before running make test -.PHONY: test upload clean docs - -test: - python -m tests - -clean: - rm -rf dist build *.egg-info - find . -name '*.pyc' -exec rm -f {} + - find . -name '*.pyo' -exec rm -f {} + - find . -name '*~' -exec rm -f {} + - -build: clean - python -m pip install --upgrade --quiet setuptools wheel twine - python setup.py --quiet sdist bdist_wheel - twine check dist/* - -publish: build - twine upload dist/* - -testpublish: test clean - python setup.py sdist bdist_wheel --universal - twine upload dist/* -r pypitest - -docs: - pdoc --html --force --output-dir docs pygdbmi - mv docs/pygdbmi/* docs - rmdir docs/pygdbmi diff --git a/noxfile.py b/noxfile.py new file mode 100644 index 0000000..017460a --- /dev/null +++ b/noxfile.py @@ -0,0 +1,57 @@ +import nox # type: ignore +from pathlib import Path +import shutil + +nox.options.sessions = ["tests", "lint", "docs"] + + +@nox.session(python=["3.5", "3.6", "3.7", "3.8"]) +def tests(session): + session.install(".") + session.run("python", "-m", "unittest", "discover") + + +@nox.session(python="3.7") +def lint(session): + session.install(*["black", "flake8", "mypy", "check-manifest"]) + files = ["pygdbmi", "tests"] + [str(p) for p in Path(".").glob("*.py")] + session.run("black", "--check", *files) + session.run("flake8", *files) + session.run("mypy", *files) # + session.run("check-manifest") + session.run("python", "setup.py", "check", "--metadata", "--strict") + + +@nox.session(python="3.7") +def docs(session): + session.install(".", "pdoc3") + session.run( + "pdoc", "--html", "--force", "--output-dir", "/tmp/pygdbmi_docs", "pygdbmi" + ) + shutil.rmtree("docs", ignore_errors=True) + shutil.move("/tmp/pygdbmi_docs/pygdbmi", "docs") + + +@nox.session(python="3.7") +def publish_docs(session): + session.run("git", "checkout", "gh-pages", external=True) + session.run("git", "rebase", "master", external=True) + docs(session) + session.run("git", "add", "docs", external=True) + session.run("git", "commit", "-m", "updating docs", external=True) + session.run("git", "push", "origin", "gh-pages", external=True) + + +@nox.session(python="3.7") +def build(session): + session.install("setuptools", "wheel", "twine") + shutil.rmtree("dist", ignore_errors=True) + session.run("python", "setup.py", "--quiet", "sdist", "bdist_wheel") + session.run("twine", "check", "dist/*") + + +@nox.session(python="3.7") +def publish(session): + build(session) + print("REMINDER: Has the changelog been updated?") + session.run("python", "-m", "twine", "upload", "dist/*") diff --git a/pygdbmi/StringStream.py b/pygdbmi/StringStream.py index e84356e..a6d89ed 100644 --- a/pygdbmi/StringStream.py +++ b/pygdbmi/StringStream.py @@ -1,7 +1,3 @@ -import logging -from pygdbmi.printcolor import fmt_cyan - - class StringStream: """A simple class to hold text so that when passed between functions, the object is passed by reference @@ -16,12 +12,6 @@ def __init__(self, raw_text, debug=False): self.index = 0 self.len = len(raw_text) - if debug: - level = logging.DEBUG - else: - level = logging.ERROR - logging.basicConfig(format="%(funcName)20s %(message)s", level=level) - def read(self, count): """Read count characters starting at self.index, and return those characters as a string @@ -69,7 +59,6 @@ def advance_past_string_with_gdb_escapes(self, chars_to_remove_gdb_escape=None): while True: c = self.raw_text[self.index] self.index += 1 - logging.debug("%s", fmt_cyan(c)) if c == "\\": # We are on a backslash and there is another character after the backslash diff --git a/pygdbmi/gdbcontroller.py b/pygdbmi/gdbcontroller.py index 00c1e1d..eb48519 100644 --- a/pygdbmi/gdbcontroller.py +++ b/pygdbmi/gdbcontroller.py @@ -1,15 +1,17 @@ """GdbController class to programatically run gdb and get structured output""" -from distutils.spawn import find_executable import logging import os -from pprint import pformat -from pygdbmi import gdbmiparser -import signal import select +import signal import subprocess import sys import time +from distutils.spawn import find_executable +from pprint import pformat +from typing import Union, List, Optional + +from pygdbmi import gdbmiparser try: # py3 from shlex import quote @@ -22,7 +24,7 @@ USING_WINDOWS = os.name == "nt" if USING_WINDOWS: import msvcrt - from ctypes import windll, byref, wintypes, WinError, POINTER + from ctypes import windll, byref, wintypes, WinError, POINTER # type: ignore from ctypes.wintypes import HANDLE, DWORD, BOOL else: import fcntl @@ -32,8 +34,6 @@ if n.startswith("SIG") and "_" not in n: SIGNAL_NAME_TO_NUM[n.upper()] = getattr(signal, n) -unicode = str if PYTHON3 else unicode # noqa: F821 - class NoGdbProcessError(ValueError): """Raise when trying to interact with gdb subprocess, but it does not exist. @@ -65,11 +65,11 @@ class GdbController: def __init__( self, - gdb_path="gdb", - gdb_args=None, + gdb_path: str = "gdb", + gdb_args: Optional[List] = None, time_to_check_for_additional_output_sec=DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC, - rr=False, - verbose=False, + rr: bool = False, + verbose: bool = False, ): if gdb_args is None: default_gdb_args = ["--nx", "--quiet", "--interpreter=mi2"] @@ -77,7 +77,7 @@ def __init__( self.verbose = verbose self.abs_gdb_path = None # abs path to gdb executable - self.cmd = [] # the shell command to run gdb + self.cmd = [] # type: List[str] self.time_to_check_for_additional_output_sec = ( time_to_check_for_additional_output_sec ) @@ -107,7 +107,7 @@ def __init__( self._attach_logger(verbose) self.spawn_new_gdb_subprocess() - def _attach_logger(self, verbose): + def _attach_logger(self, verbose: bool): handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(message)s")) unique_number = time.time() @@ -179,10 +179,10 @@ def verify_valid_gdb_subprocess(self): def write( self, - mi_cmd_to_write, + mi_cmd_to_write: Union[str, List[str]], timeout_sec=DEFAULT_GDB_TIMEOUT_SEC, - raise_error_on_timeout=True, - read_response=True, + raise_error_on_timeout: bool = True, + read_response: bool = True, ): """Write to gdb process. Block while parsing responses from gdb for a maximum of timeout_sec. @@ -204,10 +204,10 @@ def write( timeout_sec = 0 # Ensure proper type of the mi command - if type(mi_cmd_to_write) in [str, unicode]: - pass - elif type(mi_cmd_to_write) == list: - mi_cmd_to_write = "\n".join(mi_cmd_to_write) + if isinstance(mi_cmd_to_write, str): + mi_cmd_to_write_str = mi_cmd_to_write + elif isinstance(mi_cmd_to_write, list): + mi_cmd_to_write_str = "\n".join(mi_cmd_to_write) else: raise TypeError( "The gdb mi command must a be str or list. Got " @@ -216,10 +216,10 @@ def write( self.logger.debug("writing: %s", mi_cmd_to_write) - if not mi_cmd_to_write.endswith("\n"): - mi_cmd_to_write_nl = mi_cmd_to_write + "\n" + if not mi_cmd_to_write_str.endswith("\n"): + mi_cmd_to_write_nl = mi_cmd_to_write_str + "\n" else: - mi_cmd_to_write_nl = mi_cmd_to_write + mi_cmd_to_write_nl = mi_cmd_to_write_str if USING_WINDOWS: # select not implemented in windows for pipes @@ -230,10 +230,12 @@ def write( for fileno in outputready: if fileno == self.stdin_fileno: # ready to write - self.gdb_process.stdin.write(mi_cmd_to_write_nl.encode()) + self.gdb_process.stdin.write( # type: ignore + mi_cmd_to_write_nl.encode() + ) # don't forget to flush for Python3, otherwise gdb won't realize there is data # to evaluate, and we won't get a response - self.gdb_process.stdin.flush() + self.gdb_process.stdin.flush() # type: ignore else: self.logger.error("got unexpected fileno %d" % fileno) @@ -246,15 +248,14 @@ def write( return [] def get_gdb_response( - self, timeout_sec=DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True + self, timeout_sec: float = DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True ): """Get response from GDB, and block while doing so. If GDB does not have any response ready to be read by timeout_sec, an exception is raised. Args: timeout_sec (float): Maximum time to wait for reponse. Must be >= 0. Will return after - raise_error_on_timeout (bool): Whether an exception should be raised if no response was found - after timeout_sec + raise_error_on_timeout (bool): Whether an exception should be raised if no response was found after timeout_sec Returns: List of parsed GDB responses, returned from gdbmiparser.parse_response, with the diff --git a/pygdbmi/gdbmiparser.py b/pygdbmi/gdbmiparser.py index 544b390..f82bb15 100755 --- a/pygdbmi/gdbmiparser.py +++ b/pygdbmi/gdbmiparser.py @@ -9,10 +9,11 @@ """ import logging +import re +from pprint import pprint + from pygdbmi.printcolor import fmt_green from pygdbmi.StringStream import StringStream -from pprint import pprint -import re _DEBUG = False logger = logging.getLogger(__name__) diff --git a/setup.py b/setup.py index 784b4e2..be7fe7d 100644 --- a/setup.py +++ b/setup.py @@ -1,23 +1,23 @@ #!/usr/bin/env python -# -*- coding: utf-8 -*- - -# Note: To use the 'upload' functionality of this file, you must: -# pip install twine import io import os import re -from setuptools import find_packages, setup from codecs import open +from setuptools import find_packages, setup # type: ignore + EXCLUDE_FROM_PACKAGES = ["tests"] CURDIR = os.path.abspath(os.path.dirname(__file__)) README = io.open("README.md", "r", encoding="utf-8").read() with open("pygdbmi/__init__.py", "r") as fd: - version = re.search( + matches = re.search( r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE - ).group(1) + ) + version = "0.0.0.0" + if matches: + version = matches.group(1) setup( @@ -35,28 +35,17 @@ keywords=["gdb", "python", "machine-interface", "parse", "frontend"], scripts=[], entry_points={}, - extras_require={ - "dev": [ - 'black;python_version>="3.6"', - 'pdoc;python_version>="3.6"', - "flake8==3.5.0", - "collective.checkdocs==0.2", - 'pdoc3;python_version>="3.6"', - ] - }, zip_safe=False, classifiers=[ "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Programming Language :: Python", - "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.3", - "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "Programming Language :: Python :: Implementation :: PyPy", ], ) diff --git a/tests/static_tests.py b/tests/static_tests.py index 694303f..2bcd365 100644 --- a/tests/static_tests.py +++ b/tests/static_tests.py @@ -1,8 +1,9 @@ import subprocess import sys + def run(cmd): - print("Running %r" % ' '.join(cmd)) + print("Running %r" % " ".join(cmd)) subprocess.check_call(cmd) diff --git a/tests/test_pygdbmi.py b/tests/test_pygdbmi.py index 54d5fe4..3535b2d 100755 --- a/tests/test_pygdbmi.py +++ b/tests/test_pygdbmi.py @@ -195,6 +195,7 @@ def test_controller(self): responses = gdbmi.write(["-exec-run", "-exec-continue"], timeout_sec=3) found_match = False + print(responses) for r in responses: if ( r.get("payload", "")