Skip to content

Commit

Permalink
add tests for gzip header and reproducibility
Browse files Browse the repository at this point in the history
work in progres...
  • Loading branch information
dennisvang committed Nov 21, 2023
1 parent 18820d2 commit dfa0b3d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 20 deletions.
18 changes: 10 additions & 8 deletions src/tufup/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import pathlib
import re
import shutil
from tempfile import TemporaryDirectory
from typing import Optional, Union

Expand Down Expand Up @@ -147,19 +146,22 @@ def gzip(
Supported kwargs, i.e. `compresslevel` and/or `mtime`, are passed on to
`gzip.compress()` [5].
Note that gzip includes filename and timestamp by default, which makes the
resulting file unreproducible. To fix this we need to do the equivalent of
`gzip --no-name` from GNU gzip [1]. Python's gzip package supports the
`mtime` argument to set the timestamp [2]. Also see SOURCE_DATE_EPOCH env
setting [3], [4] (not supported by Python's gzip, afaik). In addition,
we need to make sure the same algorithm is used, with the same compression
setting.
Note that gzip includes both *filename* and *timestamp* by default,
which makes the resulting files unreproducible. To fix this we need to do the
equivalent of `gzip --no-name` from GNU gzip [1]. Using `gzip.open()` or
using the `gzip.GzipFile` class will add the filename to the header, but this
can be prevented by using `gzip.compress()`, which also supports an `mtime`
argument to set the timestamp [2]. Also see SOURCE_DATE_EPOCH env setting [
3], [4] (not supported by Python's gzip, afaik). In addition, we need to make
sure the same algorithm is used, with the same compression setting. Also see
GZIP header definition in rfc1952 spec [6].
[1]: https://www.gnu.org/software/gzip/manual/gzip.html#Invoking-gzip
[2]: https://docs.python.org/3/library/gzip.html#examples-of-usage
[3]: https://reproducible-builds.org/docs/source-date-epoch/
[4]: https://www.gnu.org/software/gzip/manual/gzip.html#Environment
[5]: https://docs.python.org/3/library/gzip.html#gzip.compress
[6]: https://datatracker.ietf.org/doc/html/rfc1952#page-5
"""
if src_path.suffix == SUFFIX_GZIP:
gzip_function = gzip.decompress
Expand Down
66 changes: 54 additions & 12 deletions tests/test_common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import gzip
import logging
import pathlib
import shutil
import struct
import tarfile
import time

import bsdiff4
from packaging.version import Version
Expand Down Expand Up @@ -141,19 +142,29 @@ def setUp(self) -> None:
self.file_paths = dict()
self.tar_paths = dict()
self.gz_paths = dict()
for key in ['old', 'new']:
# The gzip header contains an mtime field [1], and we need to make sure we
# can set this field properly. However, the resolution of os.stat.mtime
# depends on the operating system and file system, e.g. Windows/FAT32 has a 2
# sec resolution [2], so to check for inequality of the *default* mtime in
# the gzip header, we would need to force a delay on the order of seconds in
# our tests. To work around this, we override the mtime field for test files.
# [1]: https://datatracker.ietf.org/doc/html/rfc1952#page-5
# [2]: https://docs.python.org/3.12/library/os.html#os.stat_result
mtimes = dict(
old=time.time() - 100, # some arbitrary time in the past [seconds]
new=None, # i.e. just use the default mtime (current time)
)
for key, mtime in mtimes.items():
# create dummy file
file_path = self.temp_dir_path / key
file_path.write_text(key)
# create .tar archive from dummy file
tar_path = file_path.with_suffix('.tar')
with tarfile.open(tar_path, 'w') as tar:
tar.add(file_path)
# compress .tar file using gzip
# compress .tar file using gzip (without filename in header)
gz_path = tar_path.with_suffix('.tar.gz')
with tar_path.open(mode='rb') as tar_file:
with gzip.open(gz_path, mode='wb') as gz_file:
shutil.copyfileobj(tar_file, gz_file)
gz_path.write_bytes(gzip.compress(data=tar_path.read_bytes(), mtime=mtime))
# keep reference
self.file_paths[key] = file_path
self.tar_paths[key] = tar_path
Expand All @@ -164,21 +175,52 @@ def setUp(self) -> None:
dst_bytes=self.tar_paths['new'].read_bytes(),
)

def test_gzip_header(self):
# see gzip header definition in RFC 1952
# byte order: little endian
# https://datatracker.ietf.org/doc/html/rfc1952#page-4
gzip_header_bytes = 10 # "basic" header size
# make dummy data
expected_mtime = 123
gz_bytes = gzip.compress(data=b'dummy', mtime=expected_mtime)
# read basic header (variable names from RFC 1952)
(ID1, ID2, CM, FLG, MTIME, XFL, OS) = struct.unpack(
'<BBBBIBB', gz_bytes[:gzip_header_bytes]
)
# extract flags (variable names from RFC 1952)
(FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT) = (FLG & 1 << i for i in range(5))
# verify that we don't have a filename, and mtime matches expectation
self.assertEqual(expected_mtime, MTIME)
self.assertFalse(FNAME)

def test_gzip_compress_reproducibility(self):
# verify that different mtimes lead to differences in the gz file
not_repr_gz_path = Patcher.gzip(
src_path=self.tar_paths['old'],
dst_path=self.temp_dir_path / 'not-reproducible.tar.gz',
)
self.assertNotEqual(
self.gz_paths['old'].read_bytes(), not_repr_gz_path.read_bytes()
)
# verify that we can override the mtime to remove these differences
repr_gz_path = Patcher.gzip(
src_path=self.tar_paths['old'],
dst_path=self.temp_dir_path / 'reproducible.tar.gz',
mtime=0,
)
self.assertEqual(self.gz_paths['old'].read_bytes(), repr_gz_path.read_bytes())

def test_gzip_compress(self):
# prepare
src_path = self.tar_paths['old']
# test gzip compression
with self.assertLogs(level='DEBUG') as logs:
for dst_path in [None, self.temp_dir_path / 'compressed.tar.gz']:
with self.subTest(msg=dst_path):
gz_path = Patcher.gzip(
src_path=src_path, dst_path=dst_path,# mtime=src_path.stat().st_mtime
)
gz_path = Patcher.gzip(src_path=src_path, dst_path=dst_path)
self.assertTrue(gz_path.exists())
self.assertEqual(2, sum(1 for msg in logs.output if 'compress' in msg))
# test reproducibility
self.assertNotEqual(self.gz_paths['old'], gz_path) # it's not the same file
self.assertEqual(self.gz_paths['old'].read_bytes(), gz_path.read_bytes())
# note these are not byte-for-byte equal, because of the default mtime

def test_gzip_decompress(self):
src_path = self.gz_paths['old']
Expand Down

0 comments on commit dfa0b3d

Please sign in to comment.