Skip to content

Commit

Permalink
remove set_expires, and, instead, set expiration date in sign_role
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisvang committed Apr 14, 2022
1 parent 08a8847 commit d26b333
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 58 deletions.
28 changes: 18 additions & 10 deletions examples/repo/repo_workflow_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import secrets # from python 3.9+ we can use random.randbytes

from notsotuf.common import Patcher, TargetPath
from notsotuf.repo import Keys, Roles, ROOT, TARGETS, _in
from notsotuf.repo import Keys, Roles, in_

"""
Expand Down Expand Up @@ -34,23 +34,19 @@
TARGETS_DIR = CONTENT_DIR / 'targets'

# Create key pairs for the top level tuf roles
keys = Keys(dir_path=KEYS_DIR, encrypted=[ROOT, TARGETS])
keys = Keys(dir_path=KEYS_DIR, encrypted=['root', 'targets'])
if keys.root is None:
# create key pair files and save to disk
keys.create()

# Initialize top level tuf roles
roles = Roles(dir_path=META_DIR)
if roles.root is None:
# Specify custom expiration dates (optional)
expires = dict(
root=_in(365), targets=_in(7), snapshot=_in(7), timestamp=_in(1)
)
# initialize metadata
roles.initialize(keys=keys, expires=expires)
roles.initialize(keys=keys)
# save root metadata file
print('signing initial root metadata')
roles.publish_root(keys_dirs=[KEYS_DIR])
roles.publish_root(keys_dirs=[KEYS_DIR], expires=in_(365))

# Create dummy initial target file (normally using e.g. PyInstaller and gzip)
TARGETS_DIR.mkdir(exist_ok=True)
Expand All @@ -70,8 +66,10 @@
# Register the initial target file
roles.add_or_update_target(local_path=initial_archive_path)
print('signing initial targets metadata')
roles.publish_targets(keys_dirs=[KEYS_DIR])
expires = dict(targets=in_(7), snapshot=in_(7), timestamp=in_(1))
roles.publish_targets(keys_dirs=[KEYS_DIR], expires=expires)

# register additional target files (as updates become available over time)
for version, modified_content in [
('2.0', dummy_archive_content + b'2'),
('3.0rc0', dummy_archive_content + b'3rc'),
Expand All @@ -95,4 +93,14 @@
roles.add_or_update_target(local_path=new_archive_path)
roles.add_or_update_target(local_path=new_patch_path)
print(f'signing updated metadata for version {version}')
roles.publish_targets(keys_dirs=[KEYS_DIR])
roles.publish_targets(keys_dirs=[KEYS_DIR], expires=expires)

# Time goes by
...

# Re-sign roles, before they expire
roles.sign_role(
role_name='timestamp',
expires=in_(1),
private_key_path=KEYS_DIR / 'timestamp',
)
76 changes: 52 additions & 24 deletions notsotuf/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@
"""

__all__ = ['Keys', 'Roles', '_in', 'TOP_LEVEL_ROLE_NAMES', 'SUFFIX_PUB', 'SUFFIX_JSON']
__all__ = [
'Keys', 'Roles', 'in_', 'TOP_LEVEL_ROLE_NAMES', 'SUFFIX_PUB', 'SUFFIX_JSON'
]

# copied from python-tuf basic_repo.py
SPEC_VERSION = ".".join(SPECIFICATION_VERSION)


# copied from python-tuf basic_repo.py
def _in(days: float) -> datetime:
"""Adds 'days' to now and returns datetime object w/o microseconds."""
def in_(days: float) -> datetime:
"""Returns a timestamp for the specified number of days from now."""
return datetime.utcnow().replace(microsecond=0) + timedelta(days=days)


Expand Down Expand Up @@ -191,22 +193,24 @@ def _import_roles(self, role_names: Iterable[str]):
if path.is_file() and path.stem in role_names:
setattr(self, path.stem, Metadata.from_file(str(path)))

def initialize(self, keys: Keys, expires: Optional[Dict[str, datetime]] = None):
expires = expires or dict()
def initialize(self, keys: Keys):
# based on python-tuf basic_repo.py
common_kwargs = dict(version=1, spec_version=SPEC_VERSION)
# role-specific kwargs
initial_data = {
Root: dict(expires=_in(365), keys=keys.public(), roles=keys.roles(), consistent_snapshot=False),
Targets: dict(expires=_in(7), targets=dict()),
Snapshot: dict(expires=_in(7), meta={FILENAME_TARGETS: MetaFile(version=1)}),
Timestamp: dict(expires=_in(1), snapshot_meta=MetaFile(version=1)),
Root: dict(
expires=in_(0),
keys=keys.public(),
roles=keys.roles(),
consistent_snapshot=False,
),
Targets: dict(expires=in_(0), targets=dict()),
Snapshot: dict(
expires=in_(0), meta={FILENAME_TARGETS: MetaFile(version=1)}
),
Timestamp: dict(expires=in_(0), snapshot_meta=MetaFile(version=1)),
}
for role_class, role_kwargs in initial_data.items():
# update expiration date if provided
expiration_date = expires.get(role_class.type)
if expiration_date:
role_kwargs['expires'] = expiration_date
# intialize role
setattr(
self,
Expand Down Expand Up @@ -239,20 +243,22 @@ def add_public_key(
"""Import a public key from file and add it to the specified role."""
# based on python-tuf basic_repo.py
ssl_key = import_ed25519_publickey_from_file(filepath=str(public_key_path))
self.root.signed.add_key(role=role_name, key=Key.from_securesystemslib_key(ssl_key))
self.root.signed.add_key(
role=role_name, key=Key.from_securesystemslib_key(ssl_key)
)

def set_signature_threshold(self, role_name: str, threshold: int):
self.root.signed.roles[role_name].threshold = threshold

def set_expires(self, role_name: str, expires: datetime):
getattr(self, role_name).signed.expires = expires # noqa

def sign_role(
self,
role_name: str,
private_key_path: Union[pathlib.Path, str],
expires: datetime,
encrypted: bool = False,
):
# set new expiration date
getattr(self, role_name).signed.expires = expires
# based on python-tuf basic_repo.py
ssl_key = import_ed25519_privatekey_from_file(
filepath=str(private_key_path), prompt=encrypted
Expand All @@ -263,17 +269,27 @@ def sign_role(
def persist_role(self, role_name: str):
# based on python-tuf basic_repo.py (but without consistent snapshots)
role = getattr(self, role_name)
file_path = self.dir_path / self.filename_pattern.format(role_name=role.signed.type)
file_path = self.dir_path / self.filename_pattern.format(
role_name=role.signed.type
)
role.to_file(filename=str(file_path), serializer=JSONSerializer(compact=False))

def publish_root(self, keys_dirs: List[Union[pathlib.Path, str]]):
def publish_root(
self, keys_dirs: List[Union[pathlib.Path, str]], expires: datetime
):
"""Call this whenever root has been modified (should be rare)."""
# root content has changed, so increment version
self.root.signed.version += 1
# sign and save
self._publish_metadata(role_names=[ROOT], keys_dirs=keys_dirs)
self._publish_metadata(
role_names=[Root.type], keys_dirs=keys_dirs, expires=dict(root=expires)
)

def publish_targets(self, keys_dirs: List[Union[pathlib.Path, str]]):
def publish_targets(
self,
keys_dirs: List[Union[pathlib.Path, str]],
expires: Dict[str, datetime],
):
"""Call this whenever new targets have been added."""
# targets content has changed, so increment version
self.targets.signed.version += 1
Expand All @@ -284,15 +300,27 @@ def publish_targets(self, keys_dirs: List[Union[pathlib.Path, str]]):
self.timestamp.signed.snapshot_meta.version = self.snapshot.signed.version
self.timestamp.signed.version += 1
# sign and save
self._publish_metadata(role_names=[TARGETS, SNAPSHOT, TIMESTAMP], keys_dirs=keys_dirs)
self._publish_metadata(
role_names=[Targets.type, Snapshot.type, Timestamp.type],
keys_dirs=keys_dirs,
expires=expires,
)

def _publish_metadata(self, role_names: List[str], keys_dirs: List[Union[pathlib.Path, str]]):
def _publish_metadata(
self,
role_names: List[str],
keys_dirs: List[Union[pathlib.Path, str]],
expires: Dict[str, datetime],
):
# sign the metadata files and save to disk
for role_name in role_names:
private_key_path = Keys.find_private(role_name=role_name, key_dirs=keys_dirs)
private_key_path = Keys.find_private(
role_name=role_name, key_dirs=keys_dirs
)
self.sign_role(
role_name=role_name,
private_key_path=private_key_path,
expires=expires[role_name],
encrypted=role_name in self.encrypted,
)
self.persist_role(role_name=role_name)
62 changes: 38 additions & 24 deletions tests/test_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
from unittest.mock import patch, Mock

from securesystemslib.interface import generate_and_write_unencrypted_ed25519_keypair
from tuf.api.metadata import Metadata, Role, Root, TargetFile, TOP_LEVEL_ROLE_NAMES
from tuf.api.metadata import (
Metadata,
Role,
Root,
Targets,
Snapshot,
Timestamp,
TargetFile,
TOP_LEVEL_ROLE_NAMES
)

import notsotuf.repo # for patching
from notsotuf.repo import (
Base, Keys, Roles, _in, SUFFIX_PUB, ROOT, TARGETS, SNAPSHOT, TIMESTAMP
)
from notsotuf.repo import Base, Keys, Roles, in_, SUFFIX_PUB
from tests import TempDirTestCase


Expand All @@ -22,15 +29,20 @@
'keyid_hash_algorithms': ['sha256', 'sha512'],
'keyval': {'public': '93032b5804ba40a725145171193782bdfa30038584715546aea3228ea8018e46'},
}

DUMMY_ROOT = Root(
version=1,
spec_version='1.0',
expires=datetime.now() + timedelta(days=1),
keys=dict(),
roles={role_name: Role(keyids=[], threshold=1) for role_name in TOP_LEVEL_ROLE_NAMES},
roles={
role_name: Role(keyids=[], threshold=1)
for role_name in TOP_LEVEL_ROLE_NAMES
},
consistent_snapshot=False,
)
DUMMY_EXPIRES = dict(
root=in_(0), targets=in_(0), snapshot=in_(0), timestamp=in_(0)
)


class CommonTests(TempDirTestCase):
Expand Down Expand Up @@ -97,7 +109,10 @@ def test_roles(self):

def test_find_private(self):
# create dummy private key files in separate folders
key_names = [('online', [SNAPSHOT, TIMESTAMP]), ('offline', [ROOT, TARGETS])]
key_names = [
('online', [Snapshot.type, Timestamp.type]),
('offline', [Root.type, Targets.type]),
]
key_dirs = []
for dir_name, role_names in key_names:
dir_path = self.temp_dir_path / dir_name
Expand Down Expand Up @@ -136,7 +151,7 @@ def test_initialize(self):
mock_keys.roles = Mock(return_value={n: None for n in TOP_LEVEL_ROLE_NAMES})
roles = Roles(dir_path=self.temp_dir_path)
# test
roles.initialize(keys=mock_keys, expires=dict(root=_in(1)))
roles.initialize(keys=mock_keys)
self.assertTrue(
all(isinstance(getattr(roles, n), Metadata) for n in TOP_LEVEL_ROLE_NAMES)
)
Expand Down Expand Up @@ -187,26 +202,21 @@ def test_set_signature_threshold(self):
roles.set_signature_threshold(role_name=role_name, threshold=threshold)
self.assertEqual(threshold, roles.root.signed.roles[role_name].threshold)

def test_set_expires(self):
# prepare
role_name = 'targets'
expires = datetime.now() + timedelta(days=1)
roles = Roles(dir_path=self.temp_dir_path)
setattr(roles, role_name, Mock(signed=Mock(expires=None)))
# test
roles.set_expires(role_name=role_name, expires=expires)
self.assertEqual(expires, getattr(roles, role_name).signed.expires)

def test_sign_role(self):
# prepare
role_name = 'root'
private_key_path = self.temp_dir_path / 'root_key'
generate_and_write_unencrypted_ed25519_keypair(filepath=str(private_key_path))
generate_and_write_unencrypted_ed25519_keypair(
filepath=str(private_key_path)
)
roles = Roles(dir_path=self.temp_dir_path)
roles.root = Metadata(signed=DUMMY_ROOT, signatures=dict())
# test
roles.sign_role(
role_name=role_name, private_key_path=private_key_path, encrypted=False
role_name=role_name,
private_key_path=private_key_path,
expires=in_(0),
encrypted=False,
)
self.assertTrue(roles.root.signatures)

Expand All @@ -225,7 +235,7 @@ def test_publish_root(self):
roles.root = Mock(signed=Mock(version=1))
roles.encrypted = []
# test
roles.publish_root(keys_dirs=[])
roles.publish_root(keys_dirs=[], expires=in_(0))
self.assertEqual(2, roles.root.signed.version)
self.assertTrue(Roles._publish_metadata.called) # noqa

Expand All @@ -242,8 +252,10 @@ def test_publish_targets(self):
)
roles.encrypted = []
# test
roles.publish_targets(keys_dirs=[])
role_names = [TARGETS, SNAPSHOT, TIMESTAMP]
expires = DUMMY_EXPIRES.copy()
expires.pop('root') # no need to sign root
roles.publish_targets(keys_dirs=[], expires=expires)
role_names = [Targets.type, Snapshot.type, Timestamp.type]
self.assertTrue(
all(getattr(roles, n).signed.version == 2 for n in role_names)
)
Expand All @@ -256,6 +268,8 @@ def test__publish_metadata(self):
roles.encrypted = []
# test
role_names = TOP_LEVEL_ROLE_NAMES
roles._publish_metadata(role_names=role_names, keys_dirs=[])
roles._publish_metadata(
role_names=role_names, keys_dirs=[], expires=DUMMY_EXPIRES
)
self.assertTrue(Roles.sign_role.called) # noqa
self.assertTrue(Roles.persist_role.called) # noqa

0 comments on commit d26b333

Please sign in to comment.