diff --git a/gef.py b/gef.py index 3742cd7bf..e78c20343 100644 --- a/gef.py +++ b/gef.py @@ -70,7 +70,6 @@ import platform import re import shutil -import site import socket import string import struct @@ -275,6 +274,8 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return wrapper +class ValidationError(Exception): pass + # # Helpers # @@ -4670,16 +4671,22 @@ def __contains__(self, name: str) -> bool: @deprecated(f"Use `self[setting_name] = value` instead") def add_setting(self, name: str, value: Tuple[Any, type, str], description: str = "") -> None: - return self.__setitem__(name, (value, type(value), description)) + return self.__setitem__(name, (value, description)) - def __setitem__(self, name: str, value: Union[Any, Tuple[Any, str]]) -> None: + def __setitem__(self, name: str, value: Union["GefSetting", Tuple[Any, str]]) -> None: # make sure settings are always associated to the root command (which derives from GenericCommand) if "GenericCommand" not in [x.__name__ for x in self.__class__.__bases__]: return key = self.__get_setting_name(name) if key in gef.config: + # If the setting already exists, update the entry setting = gef.config.raw_entry(key) setting.value = value + return + + # otherwise create it + if isinstance(value, GefSetting): + gef.config[key] = value else: if len(value) == 1: gef.config[key] = GefSetting(value[0]) @@ -9710,8 +9717,8 @@ def __init__(self) -> None: gef.config["gef.debug"] = GefSetting(False, bool, "Enable debug mode for gef") gef.config["gef.autosave_breakpoints_file"] = GefSetting("", str, "Automatically save and restore breakpoints") gef.config["gef.disable_target_remote_overwrite"] = GefSetting(False, bool, "Disable the overwrite of `target remote`") - plugins_dir = GefSetting("", str, "Autoload additional GEF commands from external directory", hooks={"on_write": [GefSetting.no_spaces,]}) - plugins_dir.add_hook("on_write", [lambda _: self.reload_extra_plugins(),]) + plugins_dir = GefSetting("", str, "Autoload additional GEF commands from external directory", hooks={"on_write": [GefSetting.no_spaces, ]}) + plugins_dir.add_hook("on_changed", [lambda _, new_val: GefSetting.must_exist(new_val), lambda _, new_val: self.load_extra_plugins(new_val), ]) gef.config["gef.extra_plugins_dir"] = plugins_dir gef.config["gef.disable_color"] = GefSetting(False, bool, "Disable all colors in GEF") gef.config["gef.tempdir"] = GefSetting(GEF_TEMP_DIR, pathlib.Path, "Directory to use for temporary/cache content", hooks={"on_write": [GefSetting.no_spaces, GefSetting.create_folder_tree]}) @@ -9753,11 +9760,13 @@ def setup(self) -> None: GefRunCommand() GefInstallExtraScriptCommand() - # restore the settings from config file if any + # At this point, commands (incl. extras) are loaded with default settings. + # Load custom settings from config file if any GefRestoreCommand() return - def load_extra_plugins(self) -> int: + def load_extra_plugins(self, extra_plugins_dir: Optional[pathlib.Path] = None) -> int: + """Load the plugins from the gef-extras setting. Returns the number of new plugins added.""" def load_plugin(fpath: pathlib.Path) -> bool: try: dbg(f"Loading '{fpath}'") @@ -9790,16 +9799,15 @@ def load_plugins_from_directory(plugin_directory: pathlib.Path): except gdb.error as e: err(f"failed: {e}") return nb_added - directory = gef.config["gef.extra_plugins_dir"] or "" + + directory = extra_plugins_dir or gef.config["gef.extra_plugins_dir"] if not directory: return 0 - return load_plugins_from_directory(pathlib.Path(directory).expanduser().absolute()) - - def reload_extra_plugins(self) -> int: - try: - return self.load_extra_plugins() - except: - return -1 + directory = pathlib.Path(directory).expanduser().absolute() + if not directory.exists(): + return 0 + dbg(f"Loading extra plugins from {directory=}") + return load_plugins_from_directory(directory) @property def loaded_command_names(self) -> Iterable[str]: @@ -9979,7 +9987,7 @@ def invoke(self, args: str, from_tty: bool) -> None: if not is_debug(): try: self.set_setting(argv) - except ValueError as e: + except (ValueError, KeyError) as e: err(str(e)) else: # Let exceptions (if any) propagate @@ -10013,23 +10021,23 @@ def print_settings(self) -> None: self.print_setting(x) return - def set_setting(self, argv: List[str]) -> None: + def set_setting(self, argv: List[str]) -> bool: global gef key, new_value = argv if "." not in key: err("Invalid command format") - return + return False loaded_commands = list( gef.gdb.commands.keys()) + ["gef"] plugin_name = key.split(".", 1)[0] if plugin_name not in loaded_commands: err(f"Unknown plugin '{plugin_name}'") - return + return False if key not in gef.config: - err(f"'{key}' is not a valid configuration setting") - return + dbg(f"'{key}' is not a valid configuration setting") + return False _type = gef.config.raw_entry(key).type @@ -10047,7 +10055,7 @@ def set_setting(self, argv: List[str]) -> None: gef.config[key] = _newval reset_all_caches() - return + return True def complete(self, text: str, word: str) -> List[str]: settings = sorted(gef.config) @@ -10115,7 +10123,7 @@ def __init__(self) -> None: def invoke(self, args: str, from_tty: bool) -> None: self.dont_repeat() - if GEF_RC.is_file(): + if GEF_RC.is_file(): quiet = (args.lower() == "quiet") self.reload(quiet) return @@ -10130,8 +10138,8 @@ def reload(self, quiet: bool): for key in cfg.options(section): try: GefAlias(key, cfg.get(section, key)) - except: - pass + except Exception as e: + dbg(f"GefAlias() raised exception {e}") continue # load the other options @@ -10170,8 +10178,8 @@ def invoke(self, args: Any, from_tty: bool) -> None: ok("No missing command") return - for cmd, reason in missing_commands.items(): - warn(f"Missing `{cmd}`, reason: {str(reason)}") + for cmd, exc in missing_commands.items(): + warn(f"Missing `{cmd}`: reason: {str(exc)})") return @@ -10928,7 +10936,7 @@ def __init__(self, value: Any, cls: Optional[type] = None, description: Optional self.description = description or "" self.hooks: Dict[str, List[Callable]] = collections.defaultdict(list) if not hooks: - hooks = {"on_read": [], "on_write": []} + hooks = {"on_read": [], "on_write": [], "on_changed": []} for access, funcs in hooks.items(): self.add_hook(access, funcs) @@ -10936,10 +10944,11 @@ def __init__(self, value: Any, cls: Optional[type] = None, description: Optional def __str__(self) -> str: return f"Setting(type={self.type.__name__}, value='{self.value}', desc='{self.description[:10]}...', " \ - f"read_hooks={len(self.hooks['on_read'])}, write_hooks={len(self.hooks['on_write'])})" + f"read_hooks={len(self.hooks['on_read'])}, write_hooks={len(self.hooks['on_write'])}, "\ + f"changed_hooks={len(self.hooks['on_changed'])})" def add_hook(self, access: str, funcs: List[Callable]): - if access not in ("on_read", "on_write"): + if access not in ("on_read", "on_write", "on_changed"): raise ValueError("invalid access type") for func in funcs: if not callable(func): @@ -10950,7 +10959,12 @@ def add_hook(self, access: str, funcs: List[Callable]): @staticmethod def no_spaces(value: pathlib.Path): if " " in str(value): - raise ValueError("setting cannot contain spaces") + raise ValidationError("setting cannot contain spaces") + + @staticmethod + def must_exist(value: pathlib.Path): + if not value or not pathlib.Path(value).expanduser().absolute().exists(): + raise ValidationError("specified path must exist") @staticmethod def create_folder_tree(value: pathlib.Path): @@ -10972,17 +10986,22 @@ def __setitem__(self, name: str, value: Any) -> None: if super().__contains__(name): # if so, update its value directly setting = super().__getitem__(name) - if not isinstance(setting, GefSetting): raise ValueError - setting.value = setting.type(value) - else: - # if not, `value` must be a GefSetting - if not isinstance(value, GefSetting): raise Exception("Invalid argument") - if not value.type: raise Exception("Invalid type") - if not value.description: raise Exception("Invalid description") - setting = value - value = setting.value - super().__setitem__(name, setting) + if not isinstance(setting, GefSetting): raise TypeError + new_value = setting.type(value) + dbg(f"in __invoke_changed_hooks(\"{name}\"), {setting.value=} -> {new_value=}, changing={bool(setting.value != new_value)}") + self.__invoke_changed_hooks(setting, new_value) + self.__invoke_write_hooks(setting, new_value) + setting.value = new_value + return + + # if not, assert `value` is a GefSetting, then insert it + if not isinstance(value, GefSetting): raise TypeError("Invalid argument") + if not value.type: raise TypeError("Invalid type") + if not value.description: raise AttributeError("Invalid description") + setting = value + value = setting.value self.__invoke_write_hooks(setting, value) + super().__setitem__(name, setting) return def __delitem__(self, name: str) -> None: @@ -10996,10 +11015,16 @@ def __invoke_read_hooks(self, setting: GefSetting) -> None: callback() return - def __invoke_write_hooks(self, setting: GefSetting, value: Any) -> None: + def __invoke_changed_hooks(self, setting: GefSetting, new_value: Any) -> None: + old_value = setting.value + if old_value == new_value: + return + for callback in setting.hooks["on_changed"]: + callback(old_value, new_value) + + def __invoke_write_hooks(self, setting: GefSetting, new_value: Any) -> None: for callback in setting.hooks["on_write"]: - callback(value) - return + callback(new_value) class GefSessionManager(GefManager): @@ -11445,8 +11470,6 @@ def find_libc_version() -> Tuple[int, int]: return 0, 0 - - class Gef: """The GEF root class, which serves as a entrypoint for all the debugging session attributes (architecture, memory, settings, etc.)."""