From d99979d00e379135eee10658b3cbd87ef4796ae7 Mon Sep 17 00:00:00 2001 From: Ian Thomas Date: Fri, 15 Sep 2023 21:05:57 +0100 Subject: [PATCH] Save cache metadata in JSON format (#1353) --- fsspec/implementations/cache_metadata.py | 54 +++++++++---- fsspec/implementations/tests/test_cached.py | 85 +++++++++++++++++---- 2 files changed, 112 insertions(+), 27 deletions(-) diff --git a/fsspec/implementations/cache_metadata.py b/fsspec/implementations/cache_metadata.py index 9fbab3c44..16964c2a7 100644 --- a/fsspec/implementations/cache_metadata.py +++ b/fsspec/implementations/cache_metadata.py @@ -7,6 +7,12 @@ from fsspec.utils import atomic_write +try: + import ujson as json +except ImportError: + if not TYPE_CHECKING: + import json + if TYPE_CHECKING: from typing import Any, Dict, Iterator, Literal @@ -23,7 +29,9 @@ class CacheMetadata: All reading and writing of cache metadata is performed by this class, accessing the cached files and blocks is not. - Metadata is stored in a single file per storage directory, pickled. + Metadata is stored in a single file per storage directory in JSON format. + For backward compatibility, also reads metadata stored in pickle format + which is converted to JSON when next saved. """ def __init__(self, storage: list[str]): @@ -41,6 +49,28 @@ def __init__(self, storage: list[str]): self._storage = storage self.cached_files: list[Detail] = [{}] + # Private attribute to force saving of metadata in pickle format rather than + # JSON for use in tests to confirm can read both pickle and JSON formats. + self._force_save_pickle = False + + def _load(self, fn: str) -> Detail: + """Low-level function to load metadata from specific file""" + try: + with open(fn, "r") as f: + return json.load(f) + except ValueError: + with open(fn, "rb") as f: + return pickle.load(f) + + def _save(self, metadata_to_save: Detail, fn: str) -> None: + """Low-level function to save metadata to specific file""" + if self._force_save_pickle: + with atomic_write(fn) as f: + pickle.dump(metadata_to_save, f) + else: + with atomic_write(fn, mode="w") as f: + json.dump(metadata_to_save, f) + def _scan_locations( self, writable_only: bool = False ) -> Iterator[tuple[str, str, bool]]: @@ -111,8 +141,7 @@ def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]: if self.cached_files[-1]: cache_path = os.path.join(self._storage[-1], "cache") - with atomic_write(cache_path) as fc: - pickle.dump(self.cached_files[-1], fc) + self._save(self.cached_files[-1], cache_path) writable_cache_empty = not self.cached_files[-1] return expired_files, writable_cache_empty @@ -122,13 +151,12 @@ def load(self) -> None: cached_files = [] for fn, _, _ in self._scan_locations(): if os.path.exists(fn): - with open(fn, "rb") as f: - # TODO: consolidate blocks here - loaded_cached_files = pickle.load(f) - for c in loaded_cached_files.values(): - if isinstance(c["blocks"], list): - c["blocks"] = set(c["blocks"]) - cached_files.append(loaded_cached_files) + # TODO: consolidate blocks here + loaded_cached_files = self._load(fn) + for c in loaded_cached_files.values(): + if isinstance(c["blocks"], list): + c["blocks"] = set(c["blocks"]) + cached_files.append(loaded_cached_files) else: cached_files.append({}) self.cached_files = cached_files or [{}] @@ -170,8 +198,7 @@ def save(self) -> None: continue if os.path.exists(fn): - with open(fn, "rb") as f: - cached_files = pickle.load(f) + cached_files = self._load(fn) for k, c in cached_files.items(): if k in cache: if c["blocks"] is True or cache[k]["blocks"] is True: @@ -197,8 +224,7 @@ def save(self) -> None: for c in cache.values(): if isinstance(c["blocks"], set): c["blocks"] = list(c["blocks"]) - with atomic_write(fn) as f: - pickle.dump(cache, f) + self._save(cache, fn) self.cached_files[-1] = cached_files def update_file(self, path: str, detail: Detail) -> None: diff --git a/fsspec/implementations/tests/test_cached.py b/fsspec/implementations/tests/test_cached.py index ee69045c3..d4e05bdcf 100644 --- a/fsspec/implementations/tests/test_cached.py +++ b/fsspec/implementations/tests/test_cached.py @@ -1,3 +1,4 @@ +import json import os import pickle import shutil @@ -111,7 +112,8 @@ def test_mapper(): @pytest.mark.parametrize( "cache_mapper", [BasenameCacheMapper(), BasenameCacheMapper(1), HashCacheMapper()] ) -def test_metadata(tmpdir, cache_mapper): +@pytest.mark.parametrize("force_save_pickle", [True, False]) +def test_metadata(tmpdir, cache_mapper, force_save_pickle): source = os.path.join(tmpdir, "source") afile = os.path.join(source, "afile") os.mkdir(source) @@ -123,6 +125,7 @@ def test_metadata(tmpdir, cache_mapper): cache_storage=os.path.join(tmpdir, "cache"), cache_mapper=cache_mapper, ) + fs._metadata._force_save_pickle = force_save_pickle with fs.open(afile, "rb") as f: assert f.read(5) == b"test" @@ -145,6 +148,42 @@ def test_metadata(tmpdir, cache_mapper): assert detail["fn"] == "source_@_afile" +def test_metadata_replace_pickle_with_json(tmpdir): + # For backward compatibility will allow reading of old pickled metadata. + # When the metadata is next saved, it is in json format. + source = os.path.join(tmpdir, "source") + afile = os.path.join(source, "afile") + os.mkdir(source) + open(afile, "w").write("test") + + # Save metadata in pickle format, to simulate old metadata + fs = fsspec.filesystem( + "filecache", + target_protocol="file", + cache_storage=os.path.join(tmpdir, "cache"), + ) + fs._metadata._force_save_pickle = True + with fs.open(afile, "rb") as f: + assert f.read(5) == b"test" + + # Confirm metadata is in pickle format + cache_fn = os.path.join(fs.storage[-1], "cache") + with open(cache_fn, "rb") as f: + metadata = pickle.load(f) + assert list(metadata.keys()) == [make_path_posix(afile)] + + # Force rewrite of metadata, now in json format + fs._metadata._force_save_pickle = False + fs.pop_from_cache(afile) + with fs.open(afile, "rb") as f: + assert f.read(5) == b"test" + + # Confirm metadata is in json format + with open(cache_fn, "r") as f: + metadata = json.load(f) + assert list(metadata.keys()) == [make_path_posix(afile)] + + def test_constructor_kwargs(tmpdir): fs = fsspec.filesystem("filecache", target_protocol="file", same_names=True) assert isinstance(fs._mapper, BasenameCacheMapper) @@ -174,7 +213,8 @@ def test_idempotent(): assert fs3.storage == fs.storage -def test_blockcache_workflow(ftp_writable, tmp_path): +@pytest.mark.parametrize("force_save_pickle", [True, False]) +def test_blockcache_workflow(ftp_writable, tmp_path, force_save_pickle): host, port, user, pw = ftp_writable fs = FTPFileSystem(host, port, user, pw) with fs.open("/out", "wb") as f: @@ -194,6 +234,7 @@ def test_blockcache_workflow(ftp_writable, tmp_path): # Open the blockcache and read a little bit of the data fs = fsspec.filesystem("blockcache", **fs_kwargs) + fs._metadata._force_save_pickle = force_save_pickle with fs.open("/out", "rb", block_size=5) as f: assert f.read(5) == b"test\n" @@ -202,13 +243,18 @@ def test_blockcache_workflow(ftp_writable, tmp_path): del fs # Check that cache file only has the first two blocks - with open(tmp_path / "cache", "rb") as f: - cache = pickle.load(f) - assert "/out" in cache - assert cache["/out"]["blocks"] == [0, 1] + if force_save_pickle: + with open(tmp_path / "cache", "rb") as f: + cache = pickle.load(f) + else: + with open(tmp_path / "cache", "r") as f: + cache = json.load(f) + assert "/out" in cache + assert cache["/out"]["blocks"] == [0, 1] # Reopen the same cache and read some more... fs = fsspec.filesystem("blockcache", **fs_kwargs) + fs._metadata._force_save_pickle = force_save_pickle with fs.open("/out", block_size=5) as f: assert f.read(5) == b"test\n" f.seek(30) @@ -292,7 +338,8 @@ def test_clear(): assert len(os.listdir(cache1)) < 2 -def test_clear_expired(tmp_path): +@pytest.mark.parametrize("force_save_pickle", [True, False]) +def test_clear_expired(tmp_path, force_save_pickle): def __ager(cache_fn, fn, del_fn=False): """ Modify the cache file to virtually add time lag to selected files. @@ -310,15 +357,23 @@ def __ager(cache_fn, fn, del_fn=False): import time if os.path.exists(cache_fn): - with open(cache_fn, "rb") as f: - cached_files = pickle.load(f) - fn_posix = pathlib.Path(fn).as_posix() - cached_files[fn_posix]["time"] = cached_files[fn_posix]["time"] - 691200 + if force_save_pickle: + with open(cache_fn, "rb") as f: + cached_files = pickle.load(f) + else: + with open(cache_fn, "r") as f: + cached_files = json.load(f) + fn_posix = pathlib.Path(fn).as_posix() + cached_files[fn_posix]["time"] = cached_files[fn_posix]["time"] - 691200 assert os.access(cache_fn, os.W_OK), "Cache is not writable" if del_fn: del cached_files[fn_posix]["fn"] - with open(cache_fn, "wb") as f: - pickle.dump(cached_files, f) + if force_save_pickle: + with open(cache_fn, "wb") as f: + pickle.dump(cached_files, f) + else: + with open(cache_fn, "w") as f: + json.dump(cached_files, f) time.sleep(1) origin = tmp_path.joinpath("origin") @@ -350,6 +405,7 @@ def __ager(cache_fn, fn, del_fn=False): fs = fsspec.filesystem( "filecache", target_protocol="file", cache_storage=str(cache1), cache_check=1 ) + fs._metadata._force_save_pickle = force_save_pickle assert fs.cat(str(f1)) == data # populates "last" cache if file not found in first one @@ -359,6 +415,7 @@ def __ager(cache_fn, fn, del_fn=False): cache_storage=[str(cache1), str(cache2)], cache_check=1, ) + fs._metadata._force_save_pickle = force_save_pickle assert fs.cat(str(f2)) == data assert fs.cat(str(f3)) == data assert len(os.listdir(cache2)) == 3 @@ -390,6 +447,7 @@ def __ager(cache_fn, fn, del_fn=False): same_names=True, cache_check=1, ) + fs._metadata._force_save_pickle = force_save_pickle assert fs.cat(str(f4)) == data cache_fn = os.path.join(fs.storage[-1], "cache") @@ -406,6 +464,7 @@ def __ager(cache_fn, fn, del_fn=False): same_names=True, cache_check=1, ) + fs._metadata._force_save_pickle = force_save_pickle assert fs.cat(str(f1)) == data cache_fn = os.path.join(fs.storage[-1], "cache")