Skip to content

Commit

Permalink
move purge manifest into separate PurgeManifest class, with proper tests
Browse files Browse the repository at this point in the history
work in progress...
  • Loading branch information
dennisvang committed Nov 9, 2023
1 parent 1358fd2 commit 070d8c7
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 53 deletions.
101 changes: 73 additions & 28 deletions src/tufup/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
logger = logging.getLogger(__name__)

# dir name must be unequivocally linked to tufup, as dir contents are removed
EXTRACT_DIR_PURGE_MANIFEST_NAME = 'purge.manifest'
EXTRACT_DIR_NAME = 'tufup_extract_dir'
DEFAULT_EXTRACT_DIR = pathlib.Path(tempfile.gettempdir()) / EXTRACT_DIR_NAME

Expand Down Expand Up @@ -286,37 +285,16 @@ def _extract_archive(self):
self.extract_dir = DEFAULT_EXTRACT_DIR
logger.debug(f'using default extract_dir: {self.extract_dir}')
self.extract_dir.mkdir(exist_ok=True)
# purge all files/folders present in the extract_dir, but only if they are
# listed in the previous archive's purge manifest (this should prevent
# accidental removal of files unrelated to tufup)
purge_manifest_path = self.extract_dir / EXTRACT_DIR_PURGE_MANIFEST_NAME
if self.purge_extract_dir:
if purge_manifest_path.exists():
purge_manifest = json.loads(purge_manifest_path.read_text())
for path_item in self.extract_dir.iterdir():
if path_item.name in purge_manifest:
remove_path(path=path_item)
else:
logger.warning(
f'{path_item} not in {EXTRACT_DIR_PURGE_MANIFEST_NAME}'
)
logger.info(f'extract_dir purged: {self.extract_dir}')
else:
logger.warning(
f'cannot purge: {EXTRACT_DIR_PURGE_MANIFEST_NAME} not found'
)
# extract the new archive into the "temporary" extract_dir
# purge extract_dir
purge_manifest = PurgeManifest(dir_to_purge=self.extract_dir)
purge_manifest.purge()
# extract the new archive into the extract_dir
shutil.unpack_archive(
filename=self.new_archive_local_path, extract_dir=self.extract_dir
)
logger.debug(f'files extracted to {self.extract_dir}')
# create/overwrite purge manifest in extract_dir
with tarfile.open(self.new_archive_local_path) as tar:
new_purge_manifest = [EXTRACT_DIR_PURGE_MANIFEST_NAME] + [
pathlib.Path(t.name).name for t in tar if t.name != '.'
]
purge_manifest_path.write_text(json.dumps(new_purge_manifest))
logger.debug(f'purge manifest created: {purge_manifest_path}')
# create new purge manifest in extract_dir
purge_manifest.create_from_archive(archive=self.new_archive_local_path)


class AuthRequestsFetcher(tuf.ngclient.RequestsFetcher):
Expand Down Expand Up @@ -393,3 +371,70 @@ def _chunks(self, response: 'requests.Response') -> Iterator[bytes]:
for data in super()._chunks(response=response):
self._progress(bytes_new=len(data))
yield data


class PurgeManifest(object):
_filename = 'purge.manifest'

def __init__(self, dir_to_purge: Union[pathlib.Path, str]):
"""
a purge manifest is used to remove files and subdirectories from the
specified directory (`dir_to_purge`) in a safe manner, i.e. minimizing the
risk of accidental removal of pre-existing files/dirs that are unrelated to
the present application
this is achieved by removing only those items listed in a purge.manifest file
in the specified directory
if there is no purge.manifest file, nothing is removed
the purge.manifest file should be created (or updated) whenever files are
added to the `dir_to_purge`
the purge manifest can be used to clear the extract_dir or the
app_install_dir, for example
"""
self.dir_to_purge = pathlib.Path(dir_to_purge)

@property
def file_path(self) -> pathlib.Path:
return self.dir_to_purge / self._filename

def read_from_file(self) -> Optional[List[str]]:
if not self.file_path.exists():
logger.warning(f'cannot read manifest: {self.file_path} not found')
return
return json.loads(self.file_path.read_text())

def write_to_file(self, manifest: List[str]):
self.file_path.write_text(json.dumps(manifest))
logger.debug(f'purge manifest created: {self.file_path}')

def purge(self) -> None:
"""
remove all files/folders listed in the purge manifest from the specified
directory (`dir_to_purge`)
"""
manifest = self.read_from_file()
if manifest:
for path in self.dir_to_purge.iterdir():
relative_path = path.relative_to(self.dir_to_purge)
if str(relative_path) in manifest:
remove_path(path=path)
else:
logger.warning(f'{path} remains: not in {self._filename}')
logger.info(f'directory purged: {self.dir_to_purge}')

def create_from_archive(self, archive: Union[pathlib.Path, str]) -> None:
"""create/overwrite purge manifest in extract_dir"""
# note that `shutil.make_archive` puts everything in a dir called "." (for
# cwd) and prefixes items with "./" (if this ever poses a problem, we could
# switch to using `tarfile.TarFile.add()` with the `arcname` argument) see
# e.g https://stackoverflow.com/a/70081512
with tarfile.open(archive) as tar:
manifest = [self._filename] + [
tarinfo.name[2:] if tarinfo.name.startswith('./') else tarinfo.name
for tarinfo in tar
if tarinfo.name != '.'
]
self.write_to_file(manifest=manifest)
2 changes: 1 addition & 1 deletion src/tufup/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def make_gztar_archive(
if input(f'Found existing archive: {archive_path}.\nOverwrite? [n]/y') != 'y':
print('Using existing archive.')
return TargetMeta(archive_path)
# make archive
# make archive (see https://stackoverflow.com/a/70081512 about "." directory)
base_name = str(dst_dir / archive_filename.replace(SUFFIX_ARCHIVE, ''))
archive_path_str = shutil.make_archive(
base_name=base_name, # archive file path, no suffix
Expand Down
134 changes: 110 additions & 24 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import pathlib
import shutil
import subprocess
import tarfile
from typing import Optional
import unittest
from unittest.mock import Mock, patch
Expand All @@ -12,8 +14,9 @@
from tuf.ngclient import TargetFile

from tests import TempDirTestCase, TEST_REPO_DIR
from tufup.client import AuthRequestsFetcher, Client, EXTRACT_DIR_PURGE_MANIFEST_NAME
from tufup.client import AuthRequestsFetcher, Client, PurgeManifest
from tufup.common import TargetMeta
from tufup.utils.platform_specific import ON_WINDOWS

ROOT_FILENAME = 'root.json'
TARGETS_FILENAME = 'targets.json'
Expand Down Expand Up @@ -54,7 +57,7 @@ def setUp(self) -> None:
)

def mock_download_metadata(
self, rolename: str, length: int, version: Optional[int] = None
self, rolename: str, length: int, version: Optional[int] = None
) -> bytes:
if rolename == 'root':
# indicate current root is newest version
Expand Down Expand Up @@ -142,7 +145,7 @@ def test_download_and_apply_update(self):
mock_apply = Mock(return_value=True)
mock_install = Mock()
with patch.multiple(
Client, _download_updates=mock_download, _apply_updates=mock_apply
Client, _download_updates=mock_download, _apply_updates=mock_apply
):
client = self.get_refreshed_client()
client.new_targets = {'dummy': None}
Expand Down Expand Up @@ -193,9 +196,9 @@ def test__download_updates(self):
client.new_targets = {Mock(): Mock()}
for cached_path, downloaded_path in [('cached', None), (None, 'downloaded')]:
with patch.multiple(
client,
find_cached_target=Mock(return_value=cached_path),
download_target=Mock(return_value=downloaded_path),
client,
find_cached_target=Mock(return_value=cached_path),
download_target=Mock(return_value=downloaded_path),
):
self.assertTrue(client._download_updates(progress_hook=None))
local_path = next(iter(client.downloaded_target_files.values()))
Expand Down Expand Up @@ -228,26 +231,12 @@ def test__extract_archive_with_purge(self):
src=TEST_REPO_DIR / 'targets' / client.new_archive_local_path.name,
dst=client.new_archive_local_path,
)
# test extract without purge manifest
# test extract
client._extract_archive()
self.assertTrue(any(client.extract_dir.iterdir()))
# purge manifest should now exist
purge_manifest_path = client.extract_dir / EXTRACT_DIR_PURGE_MANIFEST_NAME
purge_manifest_content = json.loads(purge_manifest_path.read_text())
print(purge_manifest_content)
self.assertEqual(
{'1.root.json', 'dummy.exe', EXTRACT_DIR_PURGE_MANIFEST_NAME},
set(purge_manifest_content),
)
# test extract with purge manifest
with self.assertLogs(level='DEBUG') as logs:
client._extract_archive()
for item in logs.output:
print(item)
self.assertEqual(
len(purge_manifest_content),
sum('removed file' in msg.lower() for msg in logs.output),
)
# a purge manifest file should now exist
purge_manifest = PurgeManifest(dir_to_purge=client.extract_dir)
self.assertTrue(purge_manifest.file_path.exists())


class AuthRequestsFetcherTests(unittest.TestCase):
Expand Down Expand Up @@ -344,3 +333,100 @@ def mock_iter_content(*args):
for __ in fetcher._chunks(response=mock_response):
pass
self.assertEqual(chunk_count, mock_hook.call_count)


class PurgeManifestTests(TempDirTestCase):
def test_init(self):
self.assertTrue(PurgeManifest(dir_to_purge='some/path'))

def test_file_path_property(self):
self.assertTrue(PurgeManifest(dir_to_purge='some/path').file_path)

def test_read_from_file(self):
purge_manifest = PurgeManifest(dir_to_purge=self.temp_dir_path)
self.assertIsNone(purge_manifest.read_from_file())
# write dummy manifest file
purge_manifest.file_path.write_text('[]')
# test
self.assertEqual([], purge_manifest.read_from_file())

def test_write_to_file(self):
purge_manifest = PurgeManifest(dir_to_purge=self.temp_dir_path)
manifest = ['some.file']
purge_manifest.write_to_file(manifest=manifest)
self.assertEqual(manifest, json.loads(purge_manifest.file_path.read_text()))

def test_purge_no_manifest_file(self):
# prepare
dir_to_purge = self.temp_dir_path
dummy_file_path = dir_to_purge.joinpath('dummy.file')
dummy_file_path.touch()
purge_manifest = PurgeManifest(dir_to_purge=dir_to_purge)
# test
purge_manifest.purge()
self.assertTrue(dummy_file_path.exists())

def test_purge(self):
# create dummy files
dir_to_purge = self.temp_dir_path
subdir = dir_to_purge / 'subdir'
subdir.mkdir()
items_to_purge = [dir_to_purge / 'some.dummy', subdir / 'other.dummy', subdir]
items_to_keep = [dir_to_purge / 'file.to.keep']
for item in items_to_purge + items_to_keep:
if not item.exists():
item.touch()
# write manifest file manually (when writing the manifest from a .tar.gz
# archive, each dir and each item in that dir is listed, recursively,
# so we also need to include both subdir and the items inside subdir here)
purge_manifest = PurgeManifest(dir_to_purge=dir_to_purge)
manifest = [
str(item.relative_to(dir_to_purge))
for item in items_to_purge + [purge_manifest.file_path]
]
purge_manifest.file_path.write_text(json.dumps(manifest))
# test
purge_manifest.purge()
for path in items_to_purge:
self.assertFalse(path.exists())
for path in items_to_keep:
self.assertTrue(path.exists())

def test_create_from_archive(self):
# create dummy files, including some hidden
dir_to_purge = self.temp_dir_path
hidden_subdir = dir_to_purge / '.hidden'
hidden_subdir.mkdir()
if ON_WINDOWS:
# https://learn.microsoft.com/en-us/windows-server/administration/windows-commands/attrib
subprocess.run(['attrib', '+h', str(hidden_subdir)], check=True)
dummy_file_in_hidden_subdir = hidden_subdir / 'other.dummy'
dummy_file_in_hidden_subdir.touch()
dummy_file = dir_to_purge / 'some.dummy'
dummy_file.touch()
# create dummy archive
archive_path = self.temp_dir_path / 'dummy_archive.tar.gz'
# todo: should create tar using shutil.make_archive, like in make_gztar_archive, to test handling of "." and "./"
with tarfile.open(archive_path, mode='w:gz') as tar:
for path in [dummy_file, hidden_subdir]:
tar.add(
name=path, arcname=path.relative_to(dir_to_purge), recursive=True
)
# test
purge_manifest = PurgeManifest(dir_to_purge=dir_to_purge)
purge_manifest.create_from_archive(archive=archive_path)
self.assertTrue(purge_manifest.file_path.exists())
# load manifest data manually (normally we would use read_from_file)
manifest = json.loads(purge_manifest.file_path.read_text())
self.assertEqual(
{
str(item.relative_to(dir_to_purge))
for item in [
purge_manifest.file_path,
hidden_subdir,
dummy_file,
dummy_file_in_hidden_subdir,
]
},
set(manifest),
)

0 comments on commit 070d8c7

Please sign in to comment.