Skip to content

Commit

Permalink
Improved key-finding ability for Repository.initialize (#103)
Browse files Browse the repository at this point in the history
* reproduce issue 102 and fix using extra_key_dirs

* make Keys.find_private_key() recursive

* use old-style typing for python 3.8 support
  • Loading branch information
dennisvang authored Feb 7, 2024
1 parent 7733ea1 commit c8e82a5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 11 deletions.
28 changes: 21 additions & 7 deletions src/tufup/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,16 +260,28 @@ def roles(self):
return roles_map

@classmethod
def find_private_key(cls, key_name: str, key_dirs: List[Union[pathlib.Path, str]]):
private_key_path = None
def find_private_key(
cls, key_name: str, key_dirs: List[Union[pathlib.Path, str]]
) -> Optional[pathlib.Path]:
"""
recursively search key_dirs for a private key with specified key_name
returns path to first matching file (or None)
"""
private_key_filename = cls.filename_pattern.format(key_name=key_name)
for key_dir in key_dirs:
key_dir = pathlib.Path(key_dir) # ensure Path
for path in key_dir.iterdir():
if path.is_file() and path.name == private_key_filename:
private_key_path = path
break
return private_key_path
# base case
return path
elif path.is_dir():
# recursive case
private_key_path = cls.find_private_key(
key_name=key_name, key_dirs=[path]
)
if private_key_path:
return private_key_path


class Roles(Base):
Expand Down Expand Up @@ -614,7 +626,7 @@ def from_config(cls):
instance._load_keys_and_roles(create_keys=False)
return instance

def initialize(self):
def initialize(self, extra_key_dirs: Optional[List[pathlib.Path]] = None):
"""
Initialize (or update) the local repository.
Expand All @@ -627,6 +639,8 @@ def initialize(self):
Safe to call for existing keys and roles.
"""
extra_key_dirs = extra_key_dirs or []

# Ensure dirs exist
for path in [self.keys_dir, self.metadata_dir, self.targets_dir]:
path.mkdir(parents=True, exist_ok=True)
Expand All @@ -636,7 +650,7 @@ def initialize(self):

# Publish root metadata (save 1.root.json and copy to root.json)
if not self.roles.file_path('root').exists():
self.publish_changes(private_key_dirs=[self.keys_dir])
self.publish_changes(private_key_dirs=[self.keys_dir] + extra_key_dirs)

def refresh_expiration_date(self, role_name: str, days: Optional[int] = None):
if days is None:
Expand Down
38 changes: 34 additions & 4 deletions tests/test_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,21 +257,22 @@ def test_find_private_key(self):
# create dummy private key files in separate folders
key_names = [
('online', [Snapshot.type, Timestamp.type]),
('offline', [Root.type, Targets.type]),
('offline/subdir', [Root.type, Targets.type]), # subdir tests recursion
]
key_dirs = []
for dir_name, role_names in key_names:
dir_path = self.temp_dir_path / dir_name
dir_path.mkdir()
key_dirs.append(dir_path)
dir_path.mkdir(parents=True)
for role_name in role_names:
filename = Keys.filename_pattern.format(key_name=role_name)
(dir_path / filename).touch()
# test
key_dirs = list(self.temp_dir_path.iterdir()) # ['online', 'offline']
for role_name in TOP_LEVEL_ROLE_NAMES:
key_path = Keys.find_private_key(key_name=role_name, key_dirs=key_dirs)
self.assertTrue(key_path)
self.assertIn(role_name, str(key_path))
self.assertTrue(key_path.exists())
self.assertIsNone(Keys.find_private_key(key_name='missing', key_dirs=key_dirs))


class RolesTests(TempDirTestCase):
Expand Down Expand Up @@ -611,6 +612,35 @@ def test_initialize(self):
repo.roles.root.signed.expires.date(),
)

def test_initialize_extra_key_dirs(self):
# prepare
repo = Repository(
app_name='test',
keys_dir=self.temp_dir_path / 'keystore',
repo_dir=self.temp_dir_path / 'repo',
expiration_days=DUMMY_EXPIRATION_DAYS,
)
repo.initialize()
# move private keys to separate dir
private_key_dir = self.temp_dir_path / 'private_keys'
private_key_dir.mkdir()
for path in repo.keys_dir.iterdir():
if path.is_file() and not path.suffix:
path.rename(target=private_key_dir / path.name)
# remove metadata files
for path in repo.metadata_dir.iterdir():
if path.suffix == '.json':
path.unlink()
# reproduce issue #102
with self.assertRaises(Exception) as context:
repo.initialize()
self.assertIn('no private keys found', str(context.exception).lower())
# test fix
try:
repo.initialize(extra_key_dirs=[private_key_dir])
except Exception as e:
self.fail(msg=f'unexpected exception: {e}')

def test_refresh_expiration_date(self):
repo = Repository(
app_name='test',
Expand Down

0 comments on commit c8e82a5

Please sign in to comment.