Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved key-finding ability for Repository.initialize #103

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions src/tufup/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,16 +260,28 @@ def roles(self):
return roles_map

@classmethod
def find_private_key(cls, key_name: str, key_dirs: List[Union[pathlib.Path, str]]):
private_key_path = None
def find_private_key(
cls, key_name: str, key_dirs: List[Union[pathlib.Path, str]]
) -> Optional[pathlib.Path]:
"""
recursively search key_dirs for a private key with specified key_name

returns path to first matching file (or None)
"""
private_key_filename = cls.filename_pattern.format(key_name=key_name)
for key_dir in key_dirs:
key_dir = pathlib.Path(key_dir) # ensure Path
for path in key_dir.iterdir():
if path.is_file() and path.name == private_key_filename:
private_key_path = path
break
return private_key_path
# base case
return path
elif path.is_dir():
# recursive case
private_key_path = cls.find_private_key(
key_name=key_name, key_dirs=[path]
)
if private_key_path:
return private_key_path


class Roles(Base):
Expand Down Expand Up @@ -614,7 +626,7 @@ def from_config(cls):
instance._load_keys_and_roles(create_keys=False)
return instance

def initialize(self):
def initialize(self, extra_key_dirs: Optional[List[pathlib.Path]] = None):
"""
Initialize (or update) the local repository.

Expand All @@ -627,6 +639,8 @@ def initialize(self):

Safe to call for existing keys and roles.
"""
extra_key_dirs = extra_key_dirs or []

# Ensure dirs exist
for path in [self.keys_dir, self.metadata_dir, self.targets_dir]:
path.mkdir(parents=True, exist_ok=True)
Expand All @@ -636,7 +650,7 @@ def initialize(self):

# Publish root metadata (save 1.root.json and copy to root.json)
if not self.roles.file_path('root').exists():
self.publish_changes(private_key_dirs=[self.keys_dir])
self.publish_changes(private_key_dirs=[self.keys_dir] + extra_key_dirs)

def refresh_expiration_date(self, role_name: str, days: Optional[int] = None):
if days is None:
Expand Down
38 changes: 34 additions & 4 deletions tests/test_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,21 +257,22 @@ def test_find_private_key(self):
# create dummy private key files in separate folders
key_names = [
('online', [Snapshot.type, Timestamp.type]),
('offline', [Root.type, Targets.type]),
('offline/subdir', [Root.type, Targets.type]), # subdir tests recursion
]
key_dirs = []
for dir_name, role_names in key_names:
dir_path = self.temp_dir_path / dir_name
dir_path.mkdir()
key_dirs.append(dir_path)
dir_path.mkdir(parents=True)
for role_name in role_names:
filename = Keys.filename_pattern.format(key_name=role_name)
(dir_path / filename).touch()
# test
key_dirs = list(self.temp_dir_path.iterdir()) # ['online', 'offline']
for role_name in TOP_LEVEL_ROLE_NAMES:
key_path = Keys.find_private_key(key_name=role_name, key_dirs=key_dirs)
self.assertTrue(key_path)
self.assertIn(role_name, str(key_path))
self.assertTrue(key_path.exists())
self.assertIsNone(Keys.find_private_key(key_name='missing', key_dirs=key_dirs))


class RolesTests(TempDirTestCase):
Expand Down Expand Up @@ -611,6 +612,35 @@ def test_initialize(self):
repo.roles.root.signed.expires.date(),
)

def test_initialize_extra_key_dirs(self):
# prepare
repo = Repository(
app_name='test',
keys_dir=self.temp_dir_path / 'keystore',
repo_dir=self.temp_dir_path / 'repo',
expiration_days=DUMMY_EXPIRATION_DAYS,
)
repo.initialize()
# move private keys to separate dir
private_key_dir = self.temp_dir_path / 'private_keys'
private_key_dir.mkdir()
for path in repo.keys_dir.iterdir():
if path.is_file() and not path.suffix:
path.rename(target=private_key_dir / path.name)
# remove metadata files
for path in repo.metadata_dir.iterdir():
if path.suffix == '.json':
path.unlink()
# reproduce issue #102
with self.assertRaises(Exception) as context:
repo.initialize()
self.assertIn('no private keys found', str(context.exception).lower())
# test fix
try:
repo.initialize(extra_key_dirs=[private_key_dir])
except Exception as e:
self.fail(msg=f'unexpected exception: {e}')

def test_refresh_expiration_date(self):
repo = Repository(
app_name='test',
Expand Down
Loading