Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save cache metadata in JSON format #1353

Merged
merged 5 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions fsspec/implementations/cache_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

from fsspec.utils import atomic_write

try:
import ujson as json
except ImportError:
if not TYPE_CHECKING:
import json

if TYPE_CHECKING:
from typing import Any, Dict, Iterator, Literal

Expand All @@ -23,7 +29,9 @@ class CacheMetadata:
All reading and writing of cache metadata is performed by this class,
accessing the cached files and blocks is not.

Metadata is stored in a single file per storage directory, pickled.
Metadata is stored in a single file per storage directory in JSON format.
For backward compatibility, also reads metadata stored in pickle format
which is converted to JSON when next saved.
"""

def __init__(self, storage: list[str]):
Expand All @@ -41,6 +49,28 @@ def __init__(self, storage: list[str]):
self._storage = storage
self.cached_files: list[Detail] = [{}]

# Private attribute to force saving of metadata in pickle format rather than
# JSON for use in tests to confirm can read both pickle and JSON formats.
self._force_save_pickle = False

def _load(self, fn: str) -> Detail:
"""Low-level function to load metadata from specific file"""
try:
with open(fn, "r") as f:
return json.load(f)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception should be way more specific.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOError or JSON decode error. The latter will depend on the JSON library used, I don't think they nicely call it the same thing - but probably both subclass ValueError.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Exception was purposely generic. If anything fails with the JSON load it tried the pickle load. A corner case failure in the first will be repeated in the second.

It is not as simple as IOError or JSON decode errror. It is easy to obtain a UnicodeDecodeError for example, and I expect there are others that I haven't explicitly tested for.

But I don't have a strong opinion either way. I have changed it to a ValueError as suggested.

with open(fn, "rb") as f:
return pickle.load(f)

def _save(self, metadata_to_save: Detail, fn: str) -> None:
"""Low-level function to save metadata to specific file"""
if self._force_save_pickle:
with atomic_write(fn) as f:
pickle.dump(metadata_to_save, f)
else:
with atomic_write(fn, mode="w") as f:
json.dump(metadata_to_save, f)

def _scan_locations(
self, writable_only: bool = False
) -> Iterator[tuple[str, str, bool]]:
Expand Down Expand Up @@ -111,8 +141,7 @@ def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]:

if self.cached_files[-1]:
cache_path = os.path.join(self._storage[-1], "cache")
with atomic_write(cache_path) as fc:
pickle.dump(self.cached_files[-1], fc)
self._save(self.cached_files[-1], cache_path)

writable_cache_empty = not self.cached_files[-1]
return expired_files, writable_cache_empty
Expand All @@ -122,13 +151,12 @@ def load(self) -> None:
cached_files = []
for fn, _, _ in self._scan_locations():
if os.path.exists(fn):
with open(fn, "rb") as f:
# TODO: consolidate blocks here
loaded_cached_files = pickle.load(f)
for c in loaded_cached_files.values():
if isinstance(c["blocks"], list):
c["blocks"] = set(c["blocks"])
cached_files.append(loaded_cached_files)
# TODO: consolidate blocks here
loaded_cached_files = self._load(fn)
for c in loaded_cached_files.values():
if isinstance(c["blocks"], list):
c["blocks"] = set(c["blocks"])
cached_files.append(loaded_cached_files)
else:
cached_files.append({})
self.cached_files = cached_files or [{}]
Expand Down Expand Up @@ -170,8 +198,7 @@ def save(self) -> None:
continue

if os.path.exists(fn):
with open(fn, "rb") as f:
cached_files = pickle.load(f)
cached_files = self._load(fn)
for k, c in cached_files.items():
if k in cache:
if c["blocks"] is True or cache[k]["blocks"] is True:
Expand All @@ -197,8 +224,7 @@ def save(self) -> None:
for c in cache.values():
if isinstance(c["blocks"], set):
c["blocks"] = list(c["blocks"])
with atomic_write(fn) as f:
pickle.dump(cache, f)
self._save(cache, fn)
self.cached_files[-1] = cached_files

def update_file(self, path: str, detail: Detail) -> None:
Expand Down
85 changes: 72 additions & 13 deletions fsspec/implementations/tests/test_cached.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import pickle
import shutil
Expand Down Expand Up @@ -111,7 +112,8 @@ def test_mapper():
@pytest.mark.parametrize(
"cache_mapper", [BasenameCacheMapper(), BasenameCacheMapper(1), HashCacheMapper()]
)
def test_metadata(tmpdir, cache_mapper):
@pytest.mark.parametrize("force_save_pickle", [True, False])
def test_metadata(tmpdir, cache_mapper, force_save_pickle):
source = os.path.join(tmpdir, "source")
afile = os.path.join(source, "afile")
os.mkdir(source)
Expand All @@ -123,6 +125,7 @@ def test_metadata(tmpdir, cache_mapper):
cache_storage=os.path.join(tmpdir, "cache"),
cache_mapper=cache_mapper,
)
fs._metadata._force_save_pickle = force_save_pickle

with fs.open(afile, "rb") as f:
assert f.read(5) == b"test"
Expand All @@ -145,6 +148,42 @@ def test_metadata(tmpdir, cache_mapper):
assert detail["fn"] == "source_@_afile"


def test_metadata_replace_pickle_with_json(tmpdir):
# For backward compatibility will allow reading of old pickled metadata.
# When the metadata is next saved, it is in json format.
source = os.path.join(tmpdir, "source")
afile = os.path.join(source, "afile")
os.mkdir(source)
open(afile, "w").write("test")

# Save metadata in pickle format, to simulate old metadata
fs = fsspec.filesystem(
"filecache",
target_protocol="file",
cache_storage=os.path.join(tmpdir, "cache"),
)
fs._metadata._force_save_pickle = True
with fs.open(afile, "rb") as f:
assert f.read(5) == b"test"

# Confirm metadata is in pickle format
cache_fn = os.path.join(fs.storage[-1], "cache")
with open(cache_fn, "rb") as f:
metadata = pickle.load(f)
assert list(metadata.keys()) == [make_path_posix(afile)]

# Force rewrite of metadata, now in json format
fs._metadata._force_save_pickle = False
fs.pop_from_cache(afile)
with fs.open(afile, "rb") as f:
assert f.read(5) == b"test"

# Confirm metadata is in json format
with open(cache_fn, "r") as f:
metadata = json.load(f)
assert list(metadata.keys()) == [make_path_posix(afile)]


def test_constructor_kwargs(tmpdir):
fs = fsspec.filesystem("filecache", target_protocol="file", same_names=True)
assert isinstance(fs._mapper, BasenameCacheMapper)
Expand Down Expand Up @@ -174,7 +213,8 @@ def test_idempotent():
assert fs3.storage == fs.storage


def test_blockcache_workflow(ftp_writable, tmp_path):
@pytest.mark.parametrize("force_save_pickle", [True, False])
def test_blockcache_workflow(ftp_writable, tmp_path, force_save_pickle):
host, port, user, pw = ftp_writable
fs = FTPFileSystem(host, port, user, pw)
with fs.open("/out", "wb") as f:
Expand All @@ -194,6 +234,7 @@ def test_blockcache_workflow(ftp_writable, tmp_path):

# Open the blockcache and read a little bit of the data
fs = fsspec.filesystem("blockcache", **fs_kwargs)
fs._metadata._force_save_pickle = force_save_pickle
with fs.open("/out", "rb", block_size=5) as f:
assert f.read(5) == b"test\n"

Expand All @@ -202,13 +243,18 @@ def test_blockcache_workflow(ftp_writable, tmp_path):
del fs

# Check that cache file only has the first two blocks
with open(tmp_path / "cache", "rb") as f:
cache = pickle.load(f)
assert "/out" in cache
assert cache["/out"]["blocks"] == [0, 1]
if force_save_pickle:
with open(tmp_path / "cache", "rb") as f:
cache = pickle.load(f)
else:
with open(tmp_path / "cache", "r") as f:
cache = json.load(f)
assert "/out" in cache
assert cache["/out"]["blocks"] == [0, 1]

# Reopen the same cache and read some more...
fs = fsspec.filesystem("blockcache", **fs_kwargs)
fs._metadata._force_save_pickle = force_save_pickle
with fs.open("/out", block_size=5) as f:
assert f.read(5) == b"test\n"
f.seek(30)
Expand Down Expand Up @@ -292,7 +338,8 @@ def test_clear():
assert len(os.listdir(cache1)) < 2


def test_clear_expired(tmp_path):
@pytest.mark.parametrize("force_save_pickle", [True, False])
def test_clear_expired(tmp_path, force_save_pickle):
def __ager(cache_fn, fn, del_fn=False):
"""
Modify the cache file to virtually add time lag to selected files.
Expand All @@ -310,15 +357,23 @@ def __ager(cache_fn, fn, del_fn=False):
import time

if os.path.exists(cache_fn):
with open(cache_fn, "rb") as f:
cached_files = pickle.load(f)
fn_posix = pathlib.Path(fn).as_posix()
cached_files[fn_posix]["time"] = cached_files[fn_posix]["time"] - 691200
if force_save_pickle:
with open(cache_fn, "rb") as f:
cached_files = pickle.load(f)
else:
with open(cache_fn, "r") as f:
cached_files = json.load(f)
fn_posix = pathlib.Path(fn).as_posix()
cached_files[fn_posix]["time"] = cached_files[fn_posix]["time"] - 691200
assert os.access(cache_fn, os.W_OK), "Cache is not writable"
if del_fn:
del cached_files[fn_posix]["fn"]
with open(cache_fn, "wb") as f:
pickle.dump(cached_files, f)
if force_save_pickle:
with open(cache_fn, "wb") as f:
pickle.dump(cached_files, f)
else:
with open(cache_fn, "w") as f:
json.dump(cached_files, f)
time.sleep(1)

origin = tmp_path.joinpath("origin")
Expand Down Expand Up @@ -350,6 +405,7 @@ def __ager(cache_fn, fn, del_fn=False):
fs = fsspec.filesystem(
"filecache", target_protocol="file", cache_storage=str(cache1), cache_check=1
)
fs._metadata._force_save_pickle = force_save_pickle
assert fs.cat(str(f1)) == data

# populates "last" cache if file not found in first one
Expand All @@ -359,6 +415,7 @@ def __ager(cache_fn, fn, del_fn=False):
cache_storage=[str(cache1), str(cache2)],
cache_check=1,
)
fs._metadata._force_save_pickle = force_save_pickle
assert fs.cat(str(f2)) == data
assert fs.cat(str(f3)) == data
assert len(os.listdir(cache2)) == 3
Expand Down Expand Up @@ -390,6 +447,7 @@ def __ager(cache_fn, fn, del_fn=False):
same_names=True,
cache_check=1,
)
fs._metadata._force_save_pickle = force_save_pickle
assert fs.cat(str(f4)) == data

cache_fn = os.path.join(fs.storage[-1], "cache")
Expand All @@ -406,6 +464,7 @@ def __ager(cache_fn, fn, del_fn=False):
same_names=True,
cache_check=1,
)
fs._metadata._force_save_pickle = force_save_pickle
assert fs.cat(str(f1)) == data

cache_fn = os.path.join(fs.storage[-1], "cache")
Expand Down