From 1e04d0f2b25233ac83e3b4f378c0e1df6398a3f3 Mon Sep 17 00:00:00 2001 From: dennisvang <29799340+dennisvang@users.noreply.github.com> Date: Wed, 4 Oct 2023 13:04:33 +0200 Subject: [PATCH 1/6] add link to python docs for cwd context manager --- tests/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/__init__.py b/tests/__init__.py index 81a140d..5fda146 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -22,6 +22,9 @@ def _create_cwd_change_generator(): TemporaryDirectory can also be used without context management, as it is cleaned up automatically. Nevertheless, it is neater to use an explicit context manager here. + + Also see example in docs: + https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager """ # based on the pytest cleandir trick original_cwd = os.getcwd() From 08ff73e904560b23a5076523a74ef1c751200a7f Mon Sep 17 00:00:00 2001 From: Dennis <29799340+dennisvang@users.noreply.github.com> Date: Thu, 12 Oct 2023 18:22:54 +0200 Subject: [PATCH 2/6] Update README.md link to Q&A in Discussions --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7bb9588..564e748 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ The example repository shows how to integrate the `tufup` client into your app, ## Questions and Issues -If you have questions about `tufup`, or need help getting started, please post a question on [Stack Overflow][13], add a `tufup` tag, and we will help you there. +If you have questions about `tufup`, or need help getting started, please start a [new Q&A discussion][22], or post a question on [Stack Overflow][13]. If you encounter bugs or other problems that are likely to affect other users, please create a [new issue][14] here. @@ -207,3 +207,4 @@ A custom, platform *de*pendent, installation procedure can be specified via the [19]: https://peps.python.org/pep-0440/#public-version-identifiers [20]: https://packaging.pypa.io/en/stable/version.html#packaging.version.Version [21]: https://github.com/dennisvang/tufup/blob/master/src/tufup/client.py +[22]: https://github.com/dennisvang/tufup/discussions/new?category=q-a From 82429c8e26c72af666b39c615d64350eea59d029 Mon Sep 17 00:00:00 2001 From: Dennis <29799340+dennisvang@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:20:10 +0200 Subject: [PATCH 3/6] explicitly warn against whitespace in app name and filename (#85) --- README.md | 4 +++- examples/client/example_app.py | 2 +- src/tufup/common.py | 10 ++++++++-- tests/test_common.py | 10 ++++++++++ 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 564e748..bdec256 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,9 @@ Archive filenames and patch filenames follow the pattern `-` -where `name` is a short string that may contain alphanumeric characters, underscores, and hyphens, `version` is a version string according to the [PEP440][6] specification, and `suffix` is either `'.tar.gz'` or `'.patch'`. +where `name` is a short string that may *only* contain *alphanumeric characters*, *underscores*, and *hyphens*, `version` is a version string according to the [PEP440][6] specification, and `suffix` is either `'.tar.gz'` or `'.patch'`. + +***BEWARE***: *whitespace* is NOT allowed in the filename. Patches are typically smaller than archives, so the tufup *client* will always attempt to update using one or more patches. However, if the total amount of patch data is greater than the desired full archive file, a full update will be performed. diff --git a/examples/client/example_app.py b/examples/client/example_app.py index 0f2ab99..210f0a8 100644 --- a/examples/client/example_app.py +++ b/examples/client/example_app.py @@ -13,7 +13,7 @@ # python -m http.server -d examples/repo/content # App info -APP_NAME = 'example_app' +APP_NAME = 'example_app' # BEWARE: app name cannot contain whitespace CURRENT_VERSION = '1.0' # For this example, all files are stored in the tufup/examples/client diff --git a/src/tufup/common.py b/src/tufup/common.py index aaddad8..99fbbba 100644 --- a/src/tufup/common.py +++ b/src/tufup/common.py @@ -26,8 +26,10 @@ def __init__( is_archive: Optional[bool] = True, ): """ - Initialize either with target_path, or with name, version, archive. + + BEWARE: whitespace is not allowed in the filename, + nor in the `name` or `version` arguments """ super().__init__() if target_path is None: @@ -36,6 +38,10 @@ def __init__( ) self.target_path_str = str(target_path) # keep the original for reference self.path = pathlib.Path(target_path) + if ' ' in self.filename: + logger.critical( + f'invalid filename "{self.filename}": whitespace not allowed' + ) def __str__(self): return str(self.target_path_str) @@ -85,7 +91,7 @@ def version(self) -> Optional[Version]: version = Version(match_dict.get('version', '')) except InvalidVersion: version = None - logger.debug(f'No valid version in filename: {self.filename}') + logger.critical(f'No valid version in filename: {self.filename}') return version @property diff --git a/tests/test_common.py b/tests/test_common.py index 12d0e42..86a743d 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -1,3 +1,4 @@ +import logging import pathlib import bsdiff4 @@ -8,6 +9,15 @@ class TestTargetMeta(TempDirTestCase): + def test_init_whitespace(self): + for kwargs in [ + dict(target_path='w h i t e s p a c e-1.2.3.tar.gz'), + dict(name='w h i t e s p a c e'), + ]: + with self.subTest(msg=kwargs): + with self.assertLogs(level=logging.CRITICAL): + TargetMeta(**kwargs) + def test_eq_ne(self): target_meta = TargetMeta() self.assertEqual(target_meta, TargetMeta()) From 7740fefb5fd725f97c8e5e3f96763b0e28e96fa8 Mon Sep 17 00:00:00 2001 From: Dennis <29799340+dennisvang@users.noreply.github.com> Date: Mon, 6 Nov 2023 16:16:21 +0100 Subject: [PATCH 4/6] Update link to actions docs in python-package.yml --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index bce5162..ef4af56 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -1,5 +1,5 @@ # This workflow will install Python dependencies, run tests and lint with a variety of Python versions -# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python name: Python package From b22a84ef19e1a770d8694dcb067021148ef7c377 Mon Sep 17 00:00:00 2001 From: Dennis <29799340+dennisvang@users.noreply.github.com> Date: Tue, 7 Nov 2023 13:15:13 +0100 Subject: [PATCH 5/6] use ruff formatter (#88) * add ruff dev-dependency and specify single quote formatting * format all files using ruff * update github workflow to use ruff and update actions from v3 to v4 * fix errors found by ruff check . --- .github/workflows/python-package.yml | 15 +-- docs/source/conf.py | 10 +- examples/repo/repo_workflow_example.py | 8 +- pyproject.toml | 3 + requirements-dev.txt | 1 + src/tufup/__init__.py | 1 - src/tufup/client.py | 78 ++++++------- src/tufup/common.py | 14 ++- src/tufup/repo/__init__.py | 152 +++++++++++-------------- src/tufup/repo/cli.py | 47 +++----- src/tufup/utils/__init__.py | 4 +- src/tufup/utils/platform_specific.py | 55 ++++----- tests/__init__.py | 1 + tests/test_client.py | 35 +++--- tests/test_repo.py | 97 +++++++--------- tests/test_repo_cli.py | 24 ++-- tests/test_utils.py | 26 ++--- tests/test_utils_platform_specific.py | 7 +- 18 files changed, 249 insertions(+), 329 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index ef4af56..a729e3e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -20,22 +20,19 @@ jobs: runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 - python -m pip install -r requirements.txt - - name: Lint with flake8 + pip install -r requirements.txt + - name: Lint with ruff run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + pip install ruff + ruff --output-format=github . - name: Test with unittest run: | python -m unittest diff --git a/docs/source/conf.py b/docs/source/conf.py index 2826092..7c49009 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -41,11 +41,7 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = [ -"sphinx.ext.autodoc", -"sphinx.ext.todo", -"sphinx_sitemap" -] +extensions = ['sphinx.ext.autodoc', 'sphinx.ext.todo', 'sphinx_sitemap'] # Add any paths that contain templates here, relative to this directory. @@ -77,8 +73,8 @@ 'collapse_navigation': True, 'sticky_navigation': True, 'navigation_depth': 2, - 'includehidden': True, - 'titles_only': False + 'includehidden': True, + 'titles_only': False, } # config for autodoc. See https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#confval-autodoc_default_options diff --git a/examples/repo/repo_workflow_example.py b/examples/repo/repo_workflow_example.py index 5aa22ef..61e5ed5 100644 --- a/examples/repo/repo_workflow_example.py +++ b/examples/repo/repo_workflow_example.py @@ -125,9 +125,7 @@ # Create archive and patch and register the new update (here we sign # everything at once, for convenience) repo.add_bundle(new_version=new_version, new_bundle_dir=dummy_bundle_dir) - repo.publish_changes( - private_key_dirs=[OFFLINE_DIR_1, OFFLINE_DIR_2, ONLINE_DIR] - ) + repo.publish_changes(private_key_dirs=[OFFLINE_DIR_1, OFFLINE_DIR_2, ONLINE_DIR]) # Time goes by ... @@ -153,6 +151,4 @@ new_public_key_path=new_public_key_path, new_private_key_encrypted=False, ) -repo.publish_changes( - private_key_dirs=[OFFLINE_DIR_1, OFFLINE_DIR_2, ONLINE_DIR] -) +repo.publish_changes(private_key_dirs=[OFFLINE_DIR_1, OFFLINE_DIR_2, ONLINE_DIR]) diff --git a/pyproject.toml b/pyproject.toml index e01c6be..7bdd7da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,3 +45,6 @@ version = {attr = "tufup.__version__"} # this is still in beta (as of setuptools v65.6.3) # https://setuptools.pypa.io/en/stable/userguide/pyproject_config.html#setuptools-specific-configuration where = ["src"] + +[tool.ruff.format] +quote-style = "single" diff --git a/requirements-dev.txt b/requirements-dev.txt index 332c4a8..89c3fbd 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ # install dev dependencies build +ruff twine diff --git a/src/tufup/__init__.py b/src/tufup/__init__.py index 8442f4b..42c1e76 100644 --- a/src/tufup/__init__.py +++ b/src/tufup/__init__.py @@ -2,7 +2,6 @@ import sys from tufup.repo import cli -from tufup.utils import input_bool # https://packaging.python.org/en/latest/guides/single-sourcing-package-version/ # https://semver.org/ diff --git a/src/tufup/client.py b/src/tufup/client.py index c08327a..5ccebd2 100644 --- a/src/tufup/client.py +++ b/src/tufup/client.py @@ -22,18 +22,18 @@ class Client(tuf.ngclient.Updater): def __init__( - self, - app_name: str, - app_install_dir: pathlib.Path, - current_version: str, - metadata_dir: pathlib.Path, - metadata_base_url: str, - target_dir: pathlib.Path, - target_base_url: str, - extract_dir: Optional[pathlib.Path] = None, - refresh_required: bool = False, - session_auth: Optional[Dict[str, Union[Tuple[str, str], AuthBase]]] = None, - **kwargs, + self, + app_name: str, + app_install_dir: pathlib.Path, + current_version: str, + metadata_dir: pathlib.Path, + metadata_base_url: str, + target_dir: pathlib.Path, + target_base_url: str, + extract_dir: Optional[pathlib.Path] = None, + refresh_required: bool = False, + session_auth: Optional[Dict[str, Union[Tuple[str, str], AuthBase]]] = None, + **kwargs, ): # tuf.ngclient.Updater.__init__ loads local root metadata automatically super().__init__( @@ -74,7 +74,9 @@ def trusted_target_metas(self) -> list: logger.warning('targets metadata not found') return _trusted_target_metas - def get_targetinfo(self, target_path: Union[str, TargetMeta]) -> Optional[tuf.ngclient.TargetFile]: + def get_targetinfo( + self, target_path: Union[str, TargetMeta] + ) -> Optional[tuf.ngclient.TargetFile]: """Extend Updater.get_targetinfo to handle TargetMeta input args.""" if isinstance(target_path, TargetMeta): target_path = target_path.target_path_str @@ -89,11 +91,11 @@ def updates_available(self): return len(self.new_targets) > 0 def download_and_apply_update( - self, - skip_confirmation: bool = False, - install: Optional[Callable] = None, - progress_hook: Optional[Callable] = None, - **kwargs, + self, + skip_confirmation: bool = False, + install: Optional[Callable] = None, + progress_hook: Optional[Callable] = None, + **kwargs, ): """ Download and apply available updates. @@ -126,14 +128,14 @@ def download_and_apply_update( if install is None: install = install_update if self.updates_available and self._download_updates( - progress_hook=progress_hook + progress_hook=progress_hook ): self._apply_updates( install=install, skip_confirmation=skip_confirmation, **kwargs ) def check_for_updates( - self, pre: Optional[str] = None, patch: bool = True + self, pre: Optional[str] = None, patch: bool = True ) -> Optional[TargetMeta]: """ Check if any updates are available, based on current app version. @@ -169,25 +171,24 @@ def check_for_updates( logger.debug(f'{len(all_new_targets)} new *targets* found') # determine latest archive, filtered by the specified pre-release level new_archives = dict( - item for item in all_new_targets.items() + item + for item in all_new_targets.items() if item[0].is_archive and (not item[0].version.pre or item[0].version.pre[0] in included[pre]) ) new_archive_meta = None if new_archives: logger.debug(f'{len(new_archives)} new *archives* found') - new_archive_meta, self.new_archive_info = sorted( - new_archives.items() - )[-1] + new_archive_meta, self.new_archive_info = sorted(new_archives.items())[-1] self.new_archive_local_path = pathlib.Path( self.target_dir, new_archive_meta.path.name ) # patches must include all pre-releases and final releases up to, # and including, the latest archive as determined above new_patches = dict( - item for item in all_new_targets.items() - if item[0].is_patch - and item[0].version <= new_archive_meta.version + item + for item in all_new_targets.items() + if item[0].is_patch and item[0].version <= new_archive_meta.version ) # determine size of patch update and archive update total_patch_size = sum( @@ -227,10 +228,10 @@ def _download_updates(self, progress_hook: Optional[Callable]) -> bool: return len(self.downloaded_target_files) == len(self.new_targets) def _apply_updates( - self, - install: Callable, - skip_confirmation: bool, - **kwargs, + self, + install: Callable, + skip_confirmation: bool, + **kwargs, ): """ kwargs are passed on to the 'install' callable @@ -284,8 +285,8 @@ def _apply_updates( class AuthRequestsFetcher(tuf.ngclient.RequestsFetcher): # RequestsFetcher is public as of python-tuf v2.1.0 (see python-tuf #2277) def __init__( - self, - session_auth: Optional[Dict[str, Union[Tuple[str, str], AuthBase]]] = None, + self, + session_auth: Optional[Dict[str, Union[Tuple[str, str], AuthBase]]] = None, ) -> None: """ This extends the default tuf RequestsFetcher, so we can specify @@ -326,15 +327,14 @@ def attach_progress_hook(self, hook: Callable, bytes_expected: int): The hook must accept two kwargs: bytes_downloaded and bytes_expected """ + def progress( - bytes_new: int, - _cache: List[int] = [], # noqa: mutable default intentional + bytes_new: int, + _cache: List[int] = [], # noqa: mutable default intentional ): # mutable default is used to keep track of downloaded chunk sizes _cache.append(bytes_new) - return hook( - bytes_downloaded=sum(_cache), bytes_expected=bytes_expected - ) + return hook(bytes_downloaded=sum(_cache), bytes_expected=bytes_expected) self._progress = progress @@ -351,7 +351,7 @@ def _get_session(self, url: str) -> requests.Session: session.auth = self.session_auth.get(key) return session - def _chunks(self, response: "requests.Response") -> Iterator[bytes]: + def _chunks(self, response: 'requests.Response') -> Iterator[bytes]: """Call progress hook for every chunk.""" for data in super()._chunks(response=response): self._progress(bytes_new=len(data)) diff --git a/src/tufup/common.py b/src/tufup/common.py index 99fbbba..1039626 100644 --- a/src/tufup/common.py +++ b/src/tufup/common.py @@ -19,11 +19,11 @@ class TargetMeta(object): ) def __init__( - self, - target_path: Union[None, str, pathlib.Path] = None, - name: Optional[str] = None, - version: Optional[str] = None, - is_archive: Optional[bool] = True, + self, + target_path: Union[None, str, pathlib.Path] = None, + name: Optional[str] = None, + version: Optional[str] = None, + is_archive: Optional[bool] = True, ): """ Initialize either with target_path, or with name, version, archive. @@ -131,7 +131,9 @@ def compose_filename(cls, name: str, version: str, is_archive: bool): class Patcher(object): @classmethod - def create_patch(cls, src_path: pathlib.Path, dst_path: pathlib.Path) -> pathlib.Path: + def create_patch( + cls, src_path: pathlib.Path, dst_path: pathlib.Path + ) -> pathlib.Path: """ Create a binary patch file based on source and destination files. diff --git a/src/tufup/repo/__init__.py b/src/tufup/repo/__init__.py index f004765..e8d0497 100644 --- a/src/tufup/repo/__init__.py +++ b/src/tufup/repo/__init__.py @@ -4,10 +4,11 @@ import json import logging import pathlib + try: # workaround for PyInstaller issue 6911 (setuptools issue 3089) import setuptools.config.expand # noqa -except AssertionError as e: +except AssertionError: pass # assuming we are on the client side... import shutil from typing import Any, Dict, Iterable, List, Optional, TypedDict, Union @@ -19,6 +20,7 @@ import_ed25519_publickey_from_file, import_ed25519_privatekey_from_file, ) + # SSlibKey see: https://github.com/secure-systems-lab/securesystemslib/pull/456 from securesystemslib.signer import SSlibKey, SSlibSigner from tuf.api.metadata import ( @@ -62,7 +64,7 @@ ] # copied from python-tuf basic_repo.py -SPEC_VERSION = ".".join(SPECIFICATION_VERSION) +SPEC_VERSION = '.'.join(SPECIFICATION_VERSION) # copied from python-tuf basic_repo.py @@ -72,11 +74,11 @@ def in_(days: float) -> datetime: def make_gztar_archive( - src_dir: Union[pathlib.Path, str], - dst_dir: Union[pathlib.Path, str], - app_name: str, - version: str, - **kwargs, # allowed kwargs are passed on to shutil.make_archive + src_dir: Union[pathlib.Path, str], + dst_dir: Union[pathlib.Path, str], + app_name: str, + version: str, + **kwargs, # allowed kwargs are passed on to shutil.make_archive ) -> Optional[TargetMeta]: # remove disallowed kwargs for key in ['base_name', 'root_dir', 'format']: @@ -148,11 +150,11 @@ class Keys(Base): filename_pattern = '{key_name}' def __init__( - self, - dir_path: Union[pathlib.Path, str, None] = None, - encrypted: Optional[List[str]] = None, - key_map: Optional[RolesDict] = None, - thresholds: Optional[RolesDict] = None, + self, + dir_path: Union[pathlib.Path, str, None] = None, + encrypted: Optional[List[str]] = None, + key_map: Optional[RolesDict] = None, + thresholds: Optional[RolesDict] = None, ): if dir_path is None: dir_path = pathlib.Path.cwd() / DEFAULT_KEYS_DIR_NAME @@ -185,9 +187,7 @@ def import_public_key(self, role_name: str, key_name: Optional[str] = None): key_name = role_name public_key_path = self.public_key_path(key_name=key_name) if public_key_path.exists(): - ssl_key = import_ed25519_publickey_from_file( - filepath=str(public_key_path) - ) + ssl_key = import_ed25519_publickey_from_file(filepath=str(public_key_path)) getattr(self, role_name).append(ssl_key) logger.debug(f'public key imported: {public_key_path}') else: @@ -210,11 +210,11 @@ def create(self): @staticmethod def create_key_pair( - private_key_path: pathlib.Path, encrypted: bool + private_key_path: pathlib.Path, encrypted: bool ) -> pathlib.Path: if encrypted: # encrypt private key - logger.debug(f'set encryption password for private key') + logger.debug('set encryption password for private key') generate_keypair = generate_and_write_ed25519_keypair_with_prompt else: # do not encrypt private key (for automated signing) @@ -223,7 +223,7 @@ def create_key_pair( proceed = True if public_key_path.exists(): logger.warning(f'Public key already exists: {public_key_path}') - proceed = input(f'Overwrite key pair? [n]/y') == 'y' + proceed = input('Overwrite key pair? [n]/y') == 'y' if proceed: file_path_str = generate_keypair(filepath=str(private_key_path)) logger.info(f'key-pair created: {file_path_str}, {public_key_path}') @@ -313,9 +313,7 @@ def _import_roles(self, role_names: Iterable[str]): # there should be only one for each role setattr(self, role_name, Metadata.from_file(str(role_paths[0]))) - def initialize( - self, keys: Keys, expiration_days: Optional[RolesDict] = None - ): + def initialize(self, keys: Keys, expiration_days: Optional[RolesDict] = None): if expiration_days is None: expiration_days = DEFAULT_EXPIRATION_DAYS.copy() # based on python-tuf basic_repo.py @@ -356,9 +354,9 @@ def initialize( ) def add_or_update_target( - self, - local_path: Union[pathlib.Path, str], - url_path_segments: Optional[List[str]] = None, + self, + local_path: Union[pathlib.Path, str], + url_path_segments: Optional[List[str]] = None, ): # based on python-tuf basic_repo.py local_path = pathlib.Path(local_path) @@ -384,9 +382,7 @@ def remove_target(self, local_path: Union[pathlib.Path, str]) -> bool: local_path.unlink() return removed - def add_public_key( - self, role_name: str, public_key_path: Union[pathlib.Path, str] - ): + def add_public_key(self, role_name: str, public_key_path: Union[pathlib.Path, str]): """Import a public key from file and add it to the specified role.""" # based on python-tuf basic_repo.py ssl_key = import_ed25519_publickey_from_file(filepath=str(public_key_path)) @@ -402,9 +398,7 @@ def set_expiration_date(self, role_name: str, days: int): if hasattr(role, 'signed'): role.signed.expires = in_(days) - def sign_role( - self, role_name: str, private_key_path: Union[pathlib.Path, str] - ): + def sign_role(self, role_name: str, private_key_path: Union[pathlib.Path, str]): """ Sign role using specified private key. @@ -444,7 +438,8 @@ def file_exists(self, role_name: str): ignoring any versions in the filename. """ return any( - path.exists() for path in self.dir_path.iterdir() + path.exists() + for path in self.dir_path.iterdir() if path.is_file() and role_name in path.name ) @@ -457,12 +452,8 @@ def persist_role(self, role_name: str): """ # based on python-tuf basic_repo.py (but without consistent snapshots) role = getattr(self, role_name) - file_path = self.file_path( - role_name=role_name, version=role.signed.version - ) - role.to_file( - filename=str(file_path), serializer=JSONSerializer(compact=False) - ) + file_path = self.file_path(role_name=role_name, version=role.signed.version) + role.to_file(filename=str(file_path), serializer=JSONSerializer(compact=False)) if role_name == Root.type: # Copy the latest root metadata to 'root.json' (without version), # to use as trusted root metadata for distribution with the @@ -489,9 +480,7 @@ def get_latest_archive(self) -> Optional[TargetMeta]: latest_archive = None # sort by version signed_targets = self.targets.signed.targets if self.targets else dict() - targets = sorted( - TargetMeta(key) for key in signed_targets.keys() - ) + targets = sorted(TargetMeta(key) for key in signed_targets.keys()) # extract only the archives archives = [target for target in targets if target.is_archive] if archives: @@ -501,18 +490,19 @@ def get_latest_archive(self) -> Optional[TargetMeta]: class Repository(object): """High-level tools for repository management.""" + config_filename = '.tufup-repo-config' def __init__( - self, - app_name: str, - app_version_attr: Optional[str] = None, - repo_dir: Union[pathlib.Path, str, None] = None, - keys_dir: Union[pathlib.Path, str, None] = None, - key_map: Optional[RolesDict] = None, - encrypted_keys: Optional[List[str]] = None, - expiration_days: Optional[RolesDict] = None, - thresholds: Optional[RolesDict] = None, + self, + app_name: str, + app_version_attr: Optional[str] = None, + repo_dir: Union[pathlib.Path, str, None] = None, + keys_dir: Union[pathlib.Path, str, None] = None, + key_map: Optional[RolesDict] = None, + encrypted_keys: Optional[List[str]] = None, + expiration_days: Optional[RolesDict] = None, + thresholds: Optional[RolesDict] = None, ): if repo_dir is None: repo_dir = pathlib.Path.cwd() / DEFAULT_REPO_DIR_NAME @@ -574,9 +564,7 @@ def save_config(self): # todo: write directories relative to config file dir? file_path = self.get_config_file_path() file_path.write_text( - data=json.dumps( - self.config_dict, default=str, sort_keys=True, indent=4 - ), + data=json.dumps(self.config_dict, default=str, sort_keys=True, indent=4), encoding='utf-8', ) @@ -630,10 +618,10 @@ def refresh_expiration_date(self, role_name: str, days: Optional[int] = None): self.roles.set_expiration_date(role_name=role_name, days=days) def replace_key( - self, - old_key_name: str, - new_public_key_path: Union[pathlib.Path, str], - new_private_key_encrypted: bool, + self, + old_key_name: str, + new_public_key_path: Union[pathlib.Path, str], + new_private_key_encrypted: bool, ): """ Replace an existing key by a new one, e.g. after a key compromise. @@ -643,9 +631,9 @@ def replace_key( self.revoked_key_names = [] # Load old public key from file to obtain its key_id public_key_path = self.keys.public_key_path(key_name=old_key_name) - old_key_id = import_ed25519_publickey_from_file( - filepath=str(public_key_path) - )['keyid'] + old_key_id = import_ed25519_publickey_from_file(filepath=str(public_key_path))[ + 'keyid' + ] # Get new key name from public key path new_public_key_path = pathlib.Path(new_public_key_path) # force path # A key may be used for multiple roles, so we check the key id for @@ -654,9 +642,7 @@ def replace_key( try: # remove old key_id from roles dict, if found, and remove key # from keys dict if it is no longer used by any roles - self.roles.root.signed.revoke_key( - role=role_name, keyid=old_key_id - ) + self.roles.root.signed.revoke_key(role=role_name, keyid=old_key_id) # move old key name from key_map to revoked_key_map self.key_map[role_name].remove(old_key_name) # to ensure continuity, changes to root must be signed both @@ -675,10 +661,10 @@ def replace_key( ) def add_key( - self, - role_name: str, - public_key_path: Union[pathlib.Path, str], - encrypted: bool, + self, + role_name: str, + public_key_path: Union[pathlib.Path, str], + encrypted: bool, ): """ Register a new public key for the specified role. @@ -687,9 +673,7 @@ def add_key( """ public_key_path = pathlib.Path(public_key_path) # add new key to metadata - self.roles.add_public_key( - role_name=role_name, public_key_path=public_key_path - ) + self.roles.add_public_key(role_name=role_name, public_key_path=public_key_path) # add new key to key map key_name = public_key_path.with_suffix('').name if key_name not in self.key_map[role_name]: @@ -700,10 +684,10 @@ def add_key( self.encrypted_keys.append(key_name) def add_bundle( - self, - new_bundle_dir: Union[pathlib.Path, str], - new_version: Optional[str] = None, - skip_patch: bool = False, + self, + new_bundle_dir: Union[pathlib.Path, str], + new_version: Optional[str] = None, + skip_patch: bool = False, ): """ Adds a new application bundle to the local repository. @@ -785,9 +769,7 @@ def publish_changes(self, private_key_dirs: List[Union[pathlib.Path, str]]): for role_name in ['root', 'targets', 'snapshot', 'timestamp']: role = getattr(self.roles, role_name) # filename without version is always the latest version - latest_file_path = self.roles.file_path( - role_name=role_name, version=None - ) + latest_file_path = self.roles.file_path(role_name=role_name, version=None) # if the file does not exist yet, the role is considered modified, # and we don't want to bump version and expiration date again modified = True @@ -822,9 +804,7 @@ def publish_changes(self, private_key_dirs: List[Union[pathlib.Path, str]]): # we'll force a re-sign cascade by bumping the targets # version. Note this may cause a double version bump for # targets, but that should not matter. - if self.roles.file_path( - role_name='targets', version=None - ).exists(): + if self.roles.file_path(role_name='targets', version=None).exists(): self.roles.targets.signed.version += 1 elif role_name == 'targets': dependent = self.roles.snapshot.signed.meta[FILENAME_TARGETS] @@ -838,9 +818,7 @@ def publish_changes(self, private_key_dirs: List[Union[pathlib.Path, str]]): # Check if signature count meets threshold threshold = self.roles.root.signed.roles[role_name].threshold if len(role.signatures) < threshold: - logger.info( - f'{role_name} threshold not met. Trying to sign...' - ) + logger.info(f'{role_name} threshold not met. Trying to sign...') signature_count = self.threshold_sign( role_name=role_name, private_key_dirs=private_key_dirs ) @@ -851,9 +829,9 @@ def publish_changes(self, private_key_dirs: List[Union[pathlib.Path, str]]): logger.info('Config file updated.') def threshold_sign( - self, - role_name: str, - private_key_dirs: List[Union[pathlib.Path, str]], + self, + role_name: str, + private_key_dirs: List[Union[pathlib.Path, str]], ) -> int: """ Sign the metadata file for a specific role, and save changes to disk. @@ -911,7 +889,5 @@ def _load_keys_and_roles(self, create_keys: bool = False): if self.roles is None: logger.info('Importing metadata...') self.roles = Roles(dir_path=self.metadata_dir) - self.roles.initialize( - keys=self.keys, expiration_days=self.expiration_days - ) + self.roles.initialize(keys=self.keys, expiration_days=self.expiration_days) logger.info('Metadata imported.') diff --git a/src/tufup/repo/cli.py b/src/tufup/repo/cli.py index 0b8ae90..347f08a 100644 --- a/src/tufup/repo/cli.py +++ b/src/tufup/repo/cli.py @@ -4,9 +4,7 @@ import packaging.version from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES -from tufup.utils import ( - log_print, input_bool, input_numeric, input_text, input_list -) +from tufup.utils import log_print, input_bool, input_numeric, input_text, input_list from tufup.repo import Repository logger = logging.getLogger(__name__) @@ -43,9 +41,7 @@ def _get_repo(): try: return Repository.from_config() except TypeError: - _print_info( - 'Failed to load config. Did you initialize the repository?' - ) + _print_info('Failed to load config. Did you initialize the repository?') def _add_key_dirs_argument(parser: argparse.ArgumentParser): @@ -59,9 +55,7 @@ def get_parser() -> argparse.ArgumentParser: subparsers = parser.add_subparsers() # add debug option debug_parser = argparse.ArgumentParser(add_help=False) - debug_parser.add_argument( - '-d', '--debug', action='store_true', required=False - ) + debug_parser.add_argument('-d', '--debug', action='store_true', required=False) # init subparser_init = subparsers.add_parser('init', parents=[debug_parser]) subparser_init.set_defaults(func=_cmd_init) @@ -96,9 +90,7 @@ def get_parser() -> argparse.ArgumentParser: # keys subparser_keys = subparsers.add_parser('keys', parents=[debug_parser]) subparser_keys.set_defaults(func=_cmd_keys) - subparser_keys.add_argument( - 'new_key_name', help=HELP['keys_new_key_name'] - ) + subparser_keys.add_argument('new_key_name', help=HELP['keys_new_key_name']) subparser_keys.add_argument( '-c', '--create', action='store_true', help=HELP['keys_create'] ) @@ -106,17 +98,13 @@ def get_parser() -> argparse.ArgumentParser: '-e', '--encrypted', action='store_true', help=HELP['keys_encrypted'] ) # we use nested subparsers to deal with mutually dependent arguments - keys_subparsers = subparser_keys.add_subparsers( - help=HELP['keys_subcommands'] - ) + keys_subparsers = subparser_keys.add_subparsers(help=HELP['keys_subcommands']) subparser_keys_add = keys_subparsers.add_parser('add') subparser_keys_add.add_argument( 'role_name', choices=TOP_LEVEL_ROLE_NAMES, help=HELP['keys_role_name'] ) subparser_keys_replace = keys_subparsers.add_parser('replace') - subparser_keys_replace.add_argument( - 'old_key_name', help=HELP['keys_old_key_name'] - ) + subparser_keys_replace.add_argument('old_key_name', help=HELP['keys_old_key_name']) for sp in [subparser_keys_add, subparser_keys_replace]: _add_key_dirs_argument(parser=sp) # sign @@ -140,6 +128,7 @@ def get_parser() -> argparse.ArgumentParser: class _StoreVersionAction(argparse.Action): """Validates version string before storing.""" + def __call__(self, parser, namespace, values, option_string=None, **kwargs): # The first value should comply with PEP440 value = values[0] @@ -149,7 +138,7 @@ def __call__(self, parser, namespace, values, option_string=None, **kwargs): raise argparse.ArgumentError( self, f'Version string "{value}" is not PEP440 compliant.\n ' - f'See examples: https://www.python.org/dev/peps/pep-0440/\n' + f'See examples: https://www.python.org/dev/peps/pep-0440/\n', ) # Store the value, same as "store" action setattr(namespace, self.dest, values) @@ -188,8 +177,8 @@ def _get_config_from_user(**kwargs) -> dict: if key_name not in unique_key_names: unique_key_names.append(key_name) if input_bool( - prompt=f'Encrypt key "{key_name}"?', - default=key_name in encrypted_keys, + prompt=f'Encrypt key "{key_name}"?', + default=key_name in encrypted_keys, ): new_encrypted_keys.append(key_name) # expiration_days @@ -240,18 +229,14 @@ def _cmd_init(options: argparse.Namespace): def _cmd_keys(options: argparse.Namespace): logger.debug(f'command keys: {vars(options)}') repository = _get_repo() - public_key_path = repository.keys.public_key_path( - key_name=options.new_key_name - ) - private_key_path = repository.keys.private_key_path( - key_name=options.new_key_name - ) + public_key_path = repository.keys.public_key_path(key_name=options.new_key_name) + private_key_path = repository.keys.private_key_path(key_name=options.new_key_name) if options.create: _print_info(f'Creating key pair for {options.new_key_name}...') repository.keys.create_key_pair( private_key_path=private_key_path, encrypted=options.encrypted ) - _print_info(f'Key pair created.') + _print_info('Key pair created.') replace = hasattr(options, 'old_key_name') add = hasattr(options, 'role_name') if replace: @@ -304,9 +289,7 @@ def _cmd_sign(options: argparse.Namespace): days = int(options.expiration_days) # change expiration date in signed metadata _print_info(f'Setting expiration date {days} days from now...') - repository.refresh_expiration_date( - role_name=options.role_name, days=days - ) + repository.refresh_expiration_date(role_name=options.role_name, days=days) # also update version and expiration date for dependent roles, and sign # modified roles _print_info('Publishing changes...') @@ -318,4 +301,4 @@ def _cmd_sign(options: argparse.Namespace): role_name=options.role_name, private_key_dirs=options.key_dirs, ) - _print_info('Done.') \ No newline at end of file + _print_info('Done.') diff --git a/src/tufup/utils/__init__.py b/src/tufup/utils/__init__.py index 90d2404..b63ccd9 100644 --- a/src/tufup/utils/__init__.py +++ b/src/tufup/utils/__init__.py @@ -73,7 +73,7 @@ def input_bool(prompt: str, default: bool) -> bool: def input_list( - prompt: str, default: List[str], item_default: Optional[str] = None + prompt: str, default: List[str], item_default: Optional[str] = None ) -> List[str]: new_list = [] log_print(message=prompt, level=logging.DEBUG, logger=utils_logger) @@ -102,7 +102,7 @@ def input_numeric(prompt: str, default: int) -> int: def input_text( - prompt: str, default: Optional[str], optional: bool = False + prompt: str, default: Optional[str], optional: bool = False ) -> Optional[str]: answer = None prompt += f' (default: {default})' diff --git a/src/tufup/utils/platform_specific.py b/src/tufup/utils/platform_specific.py index 3185df7..0c9f537 100644 --- a/src/tufup/utils/platform_specific.py +++ b/src/tufup/utils/platform_specific.py @@ -18,11 +18,11 @@ def install_update( - src_dir: Union[pathlib.Path, str], - dst_dir: Union[pathlib.Path, str], - purge_dst_dir: bool = False, - exclude_from_purge: Optional[List[Union[pathlib.Path, str]]] = None, - **kwargs, + src_dir: Union[pathlib.Path, str], + dst_dir: Union[pathlib.Path, str], + purge_dst_dir: bool = False, + exclude_from_purge: Optional[List[Union[pathlib.Path, str]]] = None, + **kwargs, ): """ Installs update files using platform specific installation script. The @@ -105,6 +105,7 @@ def run_bat_as_admin(file_path: Union[pathlib.Path, str]): calling process exits. """ from ctypes import windll + # https://docs.microsoft.com/en-us/windows/win32/api/shellapi/nf-shellapi-shellexecutew result = windll.shell32.ShellExecuteW( None, # handle to parent window @@ -118,15 +119,15 @@ def run_bat_as_admin(file_path: Union[pathlib.Path, str]): def _install_update_win( - src_dir: Union[pathlib.Path, str], - dst_dir: Union[pathlib.Path, str], - purge_dst_dir: bool, - exclude_from_purge: List[Union[pathlib.Path, str]], - as_admin: bool = False, - batch_template: str = WIN_BATCH_TEMPLATE, - batch_template_extra_kwargs: Optional[dict] = None, - log_file_name: Optional[str] = None, - robocopy_options_override: Optional[List[str]] = None, + src_dir: Union[pathlib.Path, str], + dst_dir: Union[pathlib.Path, str], + purge_dst_dir: bool, + exclude_from_purge: List[Union[pathlib.Path, str]], + as_admin: bool = False, + batch_template: str = WIN_BATCH_TEMPLATE, + batch_template_extra_kwargs: Optional[dict] = None, + log_file_name: Optional[str] = None, + robocopy_options_override: Optional[List[str]] = None, ): """ Create a batch script that moves files from src to dst, then run the @@ -195,7 +196,7 @@ def _install_update_win( ) logger.debug(f'writing windows batch script:\n{script_content}') with NamedTemporaryFile( - mode='w', prefix=WIN_BATCH_PREFIX, suffix=WIN_BATCH_SUFFIX, delete=False + mode='w', prefix=WIN_BATCH_PREFIX, suffix=WIN_BATCH_SUFFIX, delete=False ) as temp_file: temp_file.write(script_content) logger.debug(f'temporary batch script created: {temp_file.name}') @@ -206,27 +207,29 @@ def _install_update_win( run_bat_as_admin(file_path=script_path) else: # we use Popen() instead of run(), because the latter blocks execution - subprocess.Popen( - [script_path], creationflags=subprocess.CREATE_NEW_CONSOLE - ) + subprocess.Popen([script_path], creationflags=subprocess.CREATE_NEW_CONSOLE) logger.debug('exiting') # exit current process sys.exit(0) def _install_update_mac( - src_dir: Union[pathlib.Path, str], - dst_dir: Union[pathlib.Path, str], - purge_dst_dir: bool, - exclude_from_purge: List[Union[pathlib.Path, str]], - **kwargs, + src_dir: Union[pathlib.Path, str], + dst_dir: Union[pathlib.Path, str], + purge_dst_dir: bool, + exclude_from_purge: List[Union[pathlib.Path, str]], + **kwargs, ): # todo: implement as_admin and debug kwargs for mac logger.debug(f'Kwargs not used: {kwargs}') if purge_dst_dir: - exclude_from_purge = [ # enforce path objects - pathlib.Path(item) for item in exclude_from_purge - ] if exclude_from_purge else [] + exclude_from_purge = ( + [ # enforce path objects + pathlib.Path(item) for item in exclude_from_purge + ] + if exclude_from_purge + else [] + ) logger.debug(f'Purging content of {dst_dir}') for path in pathlib.Path(dst_dir).iterdir(): if path not in exclude_from_purge: diff --git a/tests/__init__.py b/tests/__init__.py index 5fda146..84364e4 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -46,6 +46,7 @@ class TempDirTestCase(unittest.TestCase): The temporary directory becomes the current working directory (cwd), and it is accessible as a pathlib.Path, for convenience. """ + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.change_cwd = _create_cwd_change_generator() diff --git a/tests/test_client.py b/tests/test_client.py index 57c1782..482652c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -49,11 +49,11 @@ def setUp(self) -> None: metadata_base_url='http://localhost:8000/metadata/', target_dir=self.target_dir, target_base_url='http://localhost:8000/targets/', - session_auth={'http://localhost:8000': ('username', 'password')} + session_auth={'http://localhost:8000': ('username', 'password')}, ) def mock_download_metadata( - self, rolename: str, length: int, version: Optional[int] = None + self, rolename: str, length: int, version: Optional[int] = None ) -> bytes: if rolename == 'root': # indicate current root is newest version @@ -122,7 +122,7 @@ def test_download_and_apply_update(self): mock_apply = Mock(return_value=True) mock_install = Mock() with patch.multiple( - Client, _download_updates=mock_download, _apply_updates=mock_apply + Client, _download_updates=mock_download, _apply_updates=mock_apply ): client = self.get_refreshed_client() client.new_targets = {'dummy': None} @@ -144,13 +144,12 @@ def test_check_for_updates(self): self.assertTrue(client.check_for_updates(pre=pre)) self.assertEqual(expected, len(client.new_targets)) if pre == 'a': - self.assertTrue(all( - item.is_archive for item in - client.new_targets.keys()) + self.assertTrue( + all(item.is_archive for item in client.new_targets.keys()) ) else: - self.assertTrue(all( - item.is_patch for item in client.new_targets.keys()) + self.assertTrue( + all(item.is_patch for item in client.new_targets.keys()) ) def test_check_for_updates_already_up_to_date(self): @@ -174,9 +173,9 @@ def test__download_updates(self): client.new_targets = {Mock(): Mock()} for cached_path, downloaded_path in [('cached', None), (None, 'downloaded')]: with patch.multiple( - client, - find_cached_target=Mock(return_value=cached_path), - download_target=Mock(return_value=downloaded_path), + client, + find_cached_target=Mock(return_value=cached_path), + download_target=Mock(return_value=downloaded_path), ): self.assertTrue(client._download_updates(progress_hook=None)) local_path = next(iter(client.downloaded_target_files.values())) @@ -191,12 +190,12 @@ def test__apply_updates(self): client.downloaded_target_files = { target_meta: TEST_REPO_DIR / 'targets' / str(target_meta) for target_meta in client.trusted_target_metas - if target_meta.is_patch - and str(target_meta.version) in ['2.0', '3.0rc0'] + if target_meta.is_patch and str(target_meta.version) in ['2.0', '3.0rc0'] } # specify new archive (normally done in _check_updates) archives = [ - tp for tp in client.trusted_target_metas + tp + for tp in client.trusted_target_metas if tp.is_archive and str(tp.version) == '3.0rc0' ] client.new_archive_info = client.get_targetinfo(archives[-1]) @@ -263,9 +262,7 @@ def test_attach_progress_hook(self): mock_hook = Mock() bytes_expected = 10 fetcher = AuthRequestsFetcher() - fetcher.attach_progress_hook( - hook=mock_hook, bytes_expected=bytes_expected - ) + fetcher.attach_progress_hook(hook=mock_hook, bytes_expected=bytes_expected) bytes_new = 1 bytes_downloaded = 0 while bytes_downloaded < bytes_expected: @@ -307,9 +304,7 @@ def mock_iter_content(*args): # test custom progress hook mock_hook = Mock() bytes_expected = chunk_size * chunk_count - fetcher.attach_progress_hook( - hook=mock_hook, bytes_expected=bytes_expected - ) + fetcher.attach_progress_hook(hook=mock_hook, bytes_expected=bytes_expected) for __ in fetcher._chunks(response=mock_response): pass self.assertEqual(chunk_count, mock_hook.call_count) diff --git a/tests/test_repo.py b/tests/test_repo.py index 25bbd8d..7ec36df 100644 --- a/tests/test_repo.py +++ b/tests/test_repo.py @@ -20,14 +20,21 @@ Snapshot, Timestamp, TargetFile, - TOP_LEVEL_ROLE_NAMES + TOP_LEVEL_ROLE_NAMES, ) from tests import TempDirTestCase, TEST_REPO_DIR from tufup.common import TargetMeta import tufup.repo # for patching from tufup.repo import ( - Base, in_, Keys, make_gztar_archive, Repository, Roles, SUFFIX_PUB, SUFFIX_PATCH + Base, + in_, + Keys, + make_gztar_archive, + Repository, + Roles, + SUFFIX_PUB, + SUFFIX_PATCH, ) @@ -38,7 +45,9 @@ 'scheme': 'ed25519', 'keyid': '22f7c6046e29cfb0205a1c07941a5a57da39a6859b844f8c347f622a57ff82c8', 'keyid_hash_algorithms': ['sha256', 'sha512'], - 'keyval': {'public': '93032b5804ba40a725145171193782bdfa30038584715546aea3228ea8018e46'}, + 'keyval': { + 'public': '93032b5804ba40a725145171193782bdfa30038584715546aea3228ea8018e46' + }, } DUMMY_ROOT = Root( version=1, @@ -46,8 +55,7 @@ expires=datetime.now() + timedelta(days=1), keys=dict(), roles={ - role_name: Role(keyids=[], threshold=1) - for role_name in TOP_LEVEL_ROLE_NAMES + role_name: Role(keyids=[], threshold=1) for role_name in TOP_LEVEL_ROLE_NAMES }, consistent_snapshot=False, ) @@ -146,11 +154,9 @@ def test_import_all_public_keys(self): keys = Keys(dir_path=self.temp_dir_path) # create some key files for role_name in TOP_LEVEL_ROLE_NAMES: - private_key_filename = Keys.filename_pattern.format( - key_name=role_name) + private_key_filename = Keys.filename_pattern.format(key_name=role_name) file_path = self.temp_dir_path / private_key_filename - generate_and_write_unencrypted_ed25519_keypair( - filepath=str(file_path)) + generate_and_write_unencrypted_ed25519_keypair(filepath=str(file_path)) # test for role_name in TOP_LEVEL_ROLE_NAMES: self.assertFalse(getattr(keys, role_name)) @@ -188,15 +194,13 @@ def test_create_with_key_map(self): # prepare keys = Keys(dir_path=self.temp_dir_path, key_map=DUMMY_KEY_MAP) expected_key_names = [ - key_name - for key_names in DUMMY_KEY_MAP.values() - for key_name in key_names + key_name for key_names in DUMMY_KEY_MAP.values() for key_name in key_names ] # test keys.create() # we should now have five key-pairs (one for each, but two for root) filenames = [item.name for item in keys.dir_path.iterdir()] - self.assertEqual(2*len(expected_key_names), len(filenames)) + self.assertEqual(2 * len(expected_key_names), len(filenames)) for key_name in expected_key_names: self.assertIn(key_name, filenames) self.assertIn(key_name + SUFFIX_PUB, filenames) @@ -215,14 +219,10 @@ def test_create_key_pair_do_not_overwrite(self): key_name = 'dummy' private_key_filename = Keys.filename_pattern.format(key_name=key_name) private_key_path = self.temp_dir_path / private_key_filename - generate_and_write_unencrypted_ed25519_keypair( - filepath=str(private_key_path) - ) + generate_and_write_unencrypted_ed25519_keypair(filepath=str(private_key_path)) original_private_key = private_key_path.read_bytes() with patch('builtins.input', Mock(return_value='n')): - Keys.create_key_pair( - private_key_path=private_key_path, encrypted=False - ) + Keys.create_key_pair(private_key_path=private_key_path, encrypted=False) self.assertEqual(original_private_key, private_key_path.read_bytes()) def test_public(self): @@ -260,7 +260,7 @@ def test_find_private_key(self): ('offline', [Root.type, Targets.type]), ] key_dirs = [] - for dir_name, role_names in key_names: + for dir_name, role_names in key_names: dir_path = self.temp_dir_path / dir_name dir_path.mkdir() key_dirs.append(dir_path) @@ -308,10 +308,7 @@ def test_initialize_empty(self): # test roles.initialize(keys=mock_keys) self.assertTrue( - all( - isinstance(getattr(roles, n), Metadata) - for n in TOP_LEVEL_ROLE_NAMES - ) + all(isinstance(getattr(roles, n), Metadata) for n in TOP_LEVEL_ROLE_NAMES) ) # files do not exist yet, because the roles still need to be populated self.assertFalse(any(roles.dir_path.iterdir())) @@ -343,7 +340,9 @@ def test_add_or_update_target(self): local_target_path.write_bytes(b'some bytes') # test for segments, expected_url_path in [ - (None, filename), ([], filename), (['a', 'b'], 'a/b/' + filename) + (None, filename), + ([], filename), + (['a', 'b'], 'a/b/' + filename), ]: roles.add_or_update_target( local_path=local_target_path, url_path_segments=segments @@ -415,9 +414,7 @@ def test_sign_role(self): password=password, filepath=str(private_key_path) ) # test - roles.sign_role( - role_name=role_name, private_key_path=private_key_path - ) + roles.sign_role(role_name=role_name, private_key_path=private_key_path) self.assertEqual(signature_count, len(roles.root.signatures)) def test_file_path(self): @@ -536,7 +533,9 @@ def test_from_config(self): mock_load_keys_and_roles = Mock() # test with patch.object( - Repository, '_load_keys_and_roles', mock_load_keys_and_roles, + Repository, + '_load_keys_and_roles', + mock_load_keys_and_roles, ): repo = Repository.from_config() self.assertEqual( @@ -557,9 +556,7 @@ def test_initialize(self): repo.initialize() self.assertTrue(any(repo.keys_dir.iterdir())) self.assertTrue(any(repo.metadata_dir.iterdir())) - self.assertTrue( - all(getattr(repo.roles, name) for name in TOP_LEVEL_ROLE_NAMES) - ) + self.assertTrue(all(getattr(repo.roles, name) for name in TOP_LEVEL_ROLE_NAMES)) self.assertEqual( date.today() + timedelta(days=DUMMY_EXPIRATION_DAYS['root']), repo.roles.root.signed.expires.date(), @@ -615,15 +612,11 @@ def test_replace_key(self): len(repo.roles.root.signed.roles[role_name].keyids), ) # old key removed? - self.assertNotIn( - old_key_id, repo.roles.root.signed.roles[role_name].keyids - ) + self.assertNotIn(old_key_id, repo.roles.root.signed.roles[role_name].keyids) self.assertNotIn(old_key_name, repo.key_map[role_name]) self.assertIn(old_key_name, repo.revoked_key_names) # new key added? - self.assertIn( - new_key_id, repo.roles.root.signed.roles[role_name].keyids - ) + self.assertIn(new_key_id, repo.roles.root.signed.roles[role_name].keyids) self.assertIn(new_key_name, repo.key_map[role_name]) self.assertIn(new_key_name, repo.encrypted_keys) # no duplicates in encrypted_keys? @@ -658,9 +651,7 @@ def test_add_key(self): ) # new key added? self.assertEqual(2, len(repo.roles.root.signed.roles[role_name].keyids)) - self.assertIn( - new_key_id, repo.roles.root.signed.roles[role_name].keyids - ) + self.assertIn(new_key_id, repo.roles.root.signed.roles[role_name].keyids) self.assertIn(new_key_name, repo.key_map[role_name]) self.assertIn(new_key_name, repo.encrypted_keys) @@ -695,9 +686,7 @@ def test_add_bundle_no_patch(self): repo.add_bundle(new_version='1.0', new_bundle_dir=bundle_dir) # test bundle_file.write_text('much has changed in version 2') - repo.add_bundle( - new_version='2.0', new_bundle_dir=bundle_dir, skip_patch=True - ) + repo.add_bundle(new_version='2.0', new_bundle_dir=bundle_dir, skip_patch=True) self.assertTrue((repo.metadata_dir / 'targets.json').exists()) target_keys = list(repo.roles.targets.signed.targets.keys()) self.assertEqual(2, len(target_keys)) @@ -770,9 +759,7 @@ def test_threshold_sign(self): for path in repo.keys_dir.iterdir(): path.unlink() with self.assertRaises(Exception): - repo.threshold_sign( - role_name=role_name, private_key_dirs=[repo.keys_dir] - ) + repo.threshold_sign(role_name=role_name, private_key_dirs=[repo.keys_dir]) def test__load_keys_and_roles(self): # prepare @@ -785,9 +772,7 @@ def test__load_keys_and_roles(self): # test with patch('builtins.input', Mock(return_value='y')): repo._load_keys_and_roles(create_keys=True) - self.assertTrue( - all(getattr(repo.roles, name) for name in TOP_LEVEL_ROLE_NAMES) - ) + self.assertTrue(all(getattr(repo.roles, name) for name in TOP_LEVEL_ROLE_NAMES)) self.assertTrue( all((keys_dir / name).exists() for name in TOP_LEVEL_ROLE_NAMES) ) @@ -825,9 +810,7 @@ def test_publish_changes_threshold(self): for role_name in ['root', 'targets', 'snapshot', 'timestamp']: with self.subTest(msg=role_name): role = getattr(repo.roles, role_name) - self.assertEqual( - repo.thresholds[role_name], len(role.signatures) - ) + self.assertEqual(repo.thresholds[role_name], len(role.signatures)) def test_publish_changes(self): days = 999 @@ -844,9 +827,7 @@ def test_publish_changes(self): # note that initialize() already calls publish_changes... repo.initialize() # todo: make test independent... # make a change to metadata (in memory) - repo.roles.set_expiration_date( - role_name=test_role_name, days=days - ) + repo.roles.set_expiration_date(role_name=test_role_name, days=days) # make a change to config config_change = 'dummy' repo.encrypted_keys.append(config_change) @@ -868,6 +849,4 @@ def test_publish_changes(self): ) # verify change in config config_from_disk = repo.load_config() - self.assertIn( - config_change, config_from_disk['encrypted_keys'] - ) + self.assertIn(config_change, config_from_disk['encrypted_keys']) diff --git a/tests/test_repo_cli.py b/tests/test_repo_cli.py index 8c16736..a2eab77 100644 --- a/tests/test_repo_cli.py +++ b/tests/test_repo_cli.py @@ -69,16 +69,14 @@ def test__cmd_init(self): with patch('tufup.repo.cli.Repository', self.mock_repo_class): with patch('tufup.repo.cli.input_bool', Mock(return_value=True)): with patch( - 'tufup.repo.cli._get_config_from_user', - self.mock_repo_class.load_config, + 'tufup.repo.cli._get_config_from_user', + self.mock_repo_class.load_config, ): tufup.repo.cli._cmd_init(options=argparse.Namespace()) self.mock_repo.initialize.assert_called() def test__cmd_keys_create(self): - options = argparse.Namespace( - new_key_name='test', encrypted=True, create=True - ) + options = argparse.Namespace(new_key_name='test', encrypted=True, create=True) with patch('tufup.repo.cli.Repository', self.mock_repo_class): tufup.repo.cli._cmd_keys(options=options) self.mock_repo.keys.create_key_pair.assert_called() @@ -125,11 +123,11 @@ def test__cmd_targets_add(self): with patch('tufup.repo.cli.Repository', self.mock_repo_class): tufup.repo.cli._cmd_targets(options=options) self.mock_repo.add_bundle.assert_called_with( - new_version=version, new_bundle_dir=bundle_dir, skip_patch=skip_patch, - ) - self.mock_repo.publish_changes.assert_called_with( - private_key_dirs=key_dirs + new_version=version, + new_bundle_dir=bundle_dir, + skip_patch=skip_patch, ) + self.mock_repo.publish_changes.assert_called_with(private_key_dirs=key_dirs) def test__cmd_targets_remove_latest(self): key_dirs = ['c:\\my_private_keys'] @@ -163,9 +161,7 @@ def test__cmd_sign_expired(self): self.mock_repo.refresh_expiration_date.assert_called_with( role_name=role_name, days=self.config['expiration_days'][role_name] ) - self.mock_repo.publish_changes.assert_called_with( - private_key_dirs=key_dirs - ) + self.mock_repo.publish_changes.assert_called_with(private_key_dirs=key_dirs) def test__get_config_from_user_no_kwargs(self): default = '' @@ -220,7 +216,5 @@ def test__get_config_from_user_with_kwargs(self): ) default = '' with patch('builtins.input', Mock(return_value=default)): - config_kwargs = tufup.repo.cli._get_config_from_user( - **original_kwargs - ) + config_kwargs = tufup.repo.cli._get_config_from_user(**original_kwargs) self.assertEqual(config_kwargs, original_kwargs) diff --git a/tests/test_utils.py b/tests/test_utils.py index 13b1f7c..64e73c0 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -20,9 +20,7 @@ def test_remove_path(self): self.assertEqual(1, len(list(subdir_path.iterdir()))) # test with self.subTest(msg=arg_type): - self.assertTrue( - tufup.utils.remove_path(path=arg_type(dir_path)) - ) + self.assertTrue(tufup.utils.remove_path(path=arg_type(dir_path))) self.assertFalse(dir_path.exists()) @@ -47,19 +45,19 @@ def test_input_list(self): text_inputs = iter(['', new_item]) # we use iterators to simulate sequences of user inputs with patch.object( - tufup.utils, 'input_bool', lambda *_, **__: next(bool_inputs) + tufup.utils, 'input_bool', lambda *_, **__: next(bool_inputs) ): with patch.object( - tufup.utils, - 'input_text', - lambda *_, **__: next(text_inputs) or item_default, + tufup.utils, + 'input_text', + lambda *_, **__: next(text_inputs) or item_default, ): expected = default + [item_default, new_item] self.assertEqual( expected, tufup.utils.input_list( prompt='', default=default, item_default=item_default - ) + ), ) def test_input_numeric(self): @@ -82,18 +80,12 @@ def test_input_text(self): user_inputs = iter(['', answer]) with patch('builtins.input', lambda *_: next(user_inputs, '')): # this should iterate until we get a non-empty answer - self.assertEqual( - answer, tufup.utils.input_text(prompt='', default='') - ) + self.assertEqual(answer, tufup.utils.input_text(prompt='', default='')) # iterator exhausted, so next user input is '' - self.assertEqual( - answer, tufup.utils.input_text(prompt='', default=answer) - ) + self.assertEqual(answer, tufup.utils.input_text(prompt='', default=answer)) def test_input_text_optional(self): with patch('builtins.input', Mock(return_value='')): self.assertIsNone( - tufup.utils.input_text( - prompt='', default=None, optional=True - ) + tufup.utils.input_text(prompt='', default=None, optional=True) ) diff --git a/tests/test_utils_platform_specific.py b/tests/test_utils_platform_specific.py index 108a2d4..1b70216 100644 --- a/tests/test_utils_platform_specific.py +++ b/tests/test_utils_platform_specific.py @@ -118,7 +118,8 @@ def test_install_update_no_purge(self): ) def test_install_update_purge(self): extra_kwargs_strings = [ - 'purge_dst_dir=True', f'exclude_from_purge=["{self.keep_file_str}"]' + 'purge_dst_dir=True', + f'exclude_from_purge=["{self.keep_file_str}"]', ] if ON_WINDOWS: extra_kwargs_strings.extend(['as_admin=False', 'log_file_name=None']) @@ -139,7 +140,9 @@ def test_install_update_purge(self): @unittest.skipIf(condition=not ON_WINDOWS, reason='robocopy is windows only') def test_install_update_robocopy_options_override(self): extra_kwargs_strings = [ - 'as_admin=False', 'log_file_name=None', 'robocopy_options_override=[]' + 'as_admin=False', + 'log_file_name=None', + 'robocopy_options_override=[]', ] # run the dummy app in a separate process self.run_dummy_app(extra_kwargs_strings=extra_kwargs_strings) From 0a944abe839730bb6bb8680345454567c3533379 Mon Sep 17 00:00:00 2001 From: dennisvang <29799340+dennisvang@users.noreply.github.com> Date: Tue, 7 Nov 2023 13:19:46 +0100 Subject: [PATCH 6/6] explicitly call ruff *check* in github workflow --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index a729e3e..6397304 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -32,7 +32,7 @@ jobs: - name: Lint with ruff run: | pip install ruff - ruff --output-format=github . + ruff check --output-format=github . - name: Test with unittest run: | python -m unittest