From c8e82a5594562890546aeef3adc0b8d2c91f4edc Mon Sep 17 00:00:00 2001 From: Dennis <29799340+dennisvang@users.noreply.github.com> Date: Wed, 7 Feb 2024 13:33:48 +0100 Subject: [PATCH] Improved key-finding ability for Repository.initialize (#103) * reproduce issue 102 and fix using extra_key_dirs * make Keys.find_private_key() recursive * use old-style typing for python 3.8 support --- src/tufup/repo/__init__.py | 28 +++++++++++++++++++++------- tests/test_repo.py | 38 ++++++++++++++++++++++++++++++++++---- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/src/tufup/repo/__init__.py b/src/tufup/repo/__init__.py index 84f01ec..fc90eed 100644 --- a/src/tufup/repo/__init__.py +++ b/src/tufup/repo/__init__.py @@ -260,16 +260,28 @@ def roles(self): return roles_map @classmethod - def find_private_key(cls, key_name: str, key_dirs: List[Union[pathlib.Path, str]]): - private_key_path = None + def find_private_key( + cls, key_name: str, key_dirs: List[Union[pathlib.Path, str]] + ) -> Optional[pathlib.Path]: + """ + recursively search key_dirs for a private key with specified key_name + + returns path to first matching file (or None) + """ private_key_filename = cls.filename_pattern.format(key_name=key_name) for key_dir in key_dirs: key_dir = pathlib.Path(key_dir) # ensure Path for path in key_dir.iterdir(): if path.is_file() and path.name == private_key_filename: - private_key_path = path - break - return private_key_path + # base case + return path + elif path.is_dir(): + # recursive case + private_key_path = cls.find_private_key( + key_name=key_name, key_dirs=[path] + ) + if private_key_path: + return private_key_path class Roles(Base): @@ -614,7 +626,7 @@ def from_config(cls): instance._load_keys_and_roles(create_keys=False) return instance - def initialize(self): + def initialize(self, extra_key_dirs: Optional[List[pathlib.Path]] = None): """ Initialize (or update) the local repository. @@ -627,6 +639,8 @@ def initialize(self): Safe to call for existing keys and roles. """ + extra_key_dirs = extra_key_dirs or [] + # Ensure dirs exist for path in [self.keys_dir, self.metadata_dir, self.targets_dir]: path.mkdir(parents=True, exist_ok=True) @@ -636,7 +650,7 @@ def initialize(self): # Publish root metadata (save 1.root.json and copy to root.json) if not self.roles.file_path('root').exists(): - self.publish_changes(private_key_dirs=[self.keys_dir]) + self.publish_changes(private_key_dirs=[self.keys_dir] + extra_key_dirs) def refresh_expiration_date(self, role_name: str, days: Optional[int] = None): if days is None: diff --git a/tests/test_repo.py b/tests/test_repo.py index 5c43453..271dfde 100644 --- a/tests/test_repo.py +++ b/tests/test_repo.py @@ -257,21 +257,22 @@ def test_find_private_key(self): # create dummy private key files in separate folders key_names = [ ('online', [Snapshot.type, Timestamp.type]), - ('offline', [Root.type, Targets.type]), + ('offline/subdir', [Root.type, Targets.type]), # subdir tests recursion ] - key_dirs = [] for dir_name, role_names in key_names: dir_path = self.temp_dir_path / dir_name - dir_path.mkdir() - key_dirs.append(dir_path) + dir_path.mkdir(parents=True) for role_name in role_names: filename = Keys.filename_pattern.format(key_name=role_name) (dir_path / filename).touch() # test + key_dirs = list(self.temp_dir_path.iterdir()) # ['online', 'offline'] for role_name in TOP_LEVEL_ROLE_NAMES: key_path = Keys.find_private_key(key_name=role_name, key_dirs=key_dirs) + self.assertTrue(key_path) self.assertIn(role_name, str(key_path)) self.assertTrue(key_path.exists()) + self.assertIsNone(Keys.find_private_key(key_name='missing', key_dirs=key_dirs)) class RolesTests(TempDirTestCase): @@ -611,6 +612,35 @@ def test_initialize(self): repo.roles.root.signed.expires.date(), ) + def test_initialize_extra_key_dirs(self): + # prepare + repo = Repository( + app_name='test', + keys_dir=self.temp_dir_path / 'keystore', + repo_dir=self.temp_dir_path / 'repo', + expiration_days=DUMMY_EXPIRATION_DAYS, + ) + repo.initialize() + # move private keys to separate dir + private_key_dir = self.temp_dir_path / 'private_keys' + private_key_dir.mkdir() + for path in repo.keys_dir.iterdir(): + if path.is_file() and not path.suffix: + path.rename(target=private_key_dir / path.name) + # remove metadata files + for path in repo.metadata_dir.iterdir(): + if path.suffix == '.json': + path.unlink() + # reproduce issue #102 + with self.assertRaises(Exception) as context: + repo.initialize() + self.assertIn('no private keys found', str(context.exception).lower()) + # test fix + try: + repo.initialize(extra_key_dirs=[private_key_dir]) + except Exception as e: + self.fail(msg=f'unexpected exception: {e}') + def test_refresh_expiration_date(self): repo = Repository( app_name='test',