From eb420076fa23c1051df0cbde0b2b91f6935f8a99 Mon Sep 17 00:00:00 2001 From: Benjamin Gutzmann Date: Fri, 10 May 2024 18:13:38 +0200 Subject: [PATCH] Add optional file-based listings caching --- docs/source/api.rst | 12 +- docs/source/changelog.rst | 8 ++ docs/source/features.rst | 29 ++-- fsspec/asyn.py | 12 +- fsspec/dircache.py | 160 +++++++++++++++++++--- fsspec/implementations/http.py | 47 +++++-- fsspec/implementations/libarchive.py | 2 +- fsspec/implementations/tests/test_http.py | 133 +++++++++++++----- fsspec/implementations/zip.py | 2 +- fsspec/spec.py | 28 ++-- fsspec/tests/test_spec.py | 2 +- fsspec/tests/test_utils.py | 48 +++++++ fsspec/utils.py | 51 ++++++- pyproject.toml | 5 + 14 files changed, 446 insertions(+), 93 deletions(-) diff --git a/docs/source/api.rst b/docs/source/api.rst index cb14fe7e1..c01b7802f 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -47,7 +47,9 @@ Base Classes fsspec.core.OpenFiles fsspec.core.get_fs_token_paths fsspec.core.url_to_fs - fsspec.dircache.DirCache + fsspec.dircache.DisabledListingsCache + fsspec.dircache.MemoryListingsCache + fsspec.dircache.FileListingsCache fsspec.FSMap fsspec.generic.GenericFileSystem fsspec.registry.register_implementation @@ -82,7 +84,13 @@ Base Classes .. autofunction:: fsspec.core.url_to_fs -.. autoclass:: fsspec.dircache.DirCache +.. autoclass:: fsspec.dircache.DisabledListingsCache + :members: __init__ + +.. autoclass:: fsspec.dircache.MemoryListingsCache + :members: __init__ + +.. autoclass:: fsspec.dircache.FileListingsCache :members: __init__ .. autoclass:: fsspec.FSMap diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index f5f30fdd9..71ff51efc 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,14 @@ Changelog ========= +Dev +-------- + +Enhancements + +- add file-based listing cache using diskcache (#895) + warning: use new ``listings_cache_options`` instead of ``use_listings_cache`` etc. + 2024.3.1 -------- diff --git a/docs/source/features.rst b/docs/source/features.rst index 907084e0d..1d9ac5e0a 100644 --- a/docs/source/features.rst +++ b/docs/source/features.rst @@ -181,15 +181,26 @@ Listings Caching ---------------- For some implementations, getting file listings (i.e., ``ls`` and anything that -depends on it) is expensive. These implementations use dict-like instances of -:class:`fsspec.dircache.DirCache` to manage the listings. - -The cache allows for time-based expiry of entries with the ``listings_expiry_time`` -parameter, or LRU expiry with the ``max_paths`` parameter. These can be -set on any implementation instance that uses listings caching; or to skip the -caching altogether, use ``use_listings_cache=False``. That would be appropriate -when the target location is known to be volatile because it is being written -to from other sources. +depends on it) is expensive. These implementations maye use either dict-like instances of +:class:`fsspec.dircache.MemoryListingsCache` or file-based caching with instances of +:class:`fsspec.dircache.FileListingsCache` to manage the listings. + +The listings cache can be controlled via the keyword ``listings_cache_options`` which is a dictionary. +The type of cache that is used, can be controlled via the keyword ``cache_type`` (`disabled`, `memory` or `file`). +The cache allows for time-based expiry of entries with the keyword ``expiry_time``. If the target location is known to +be volatile because e.g. it is being written to from other sources we recommend to disable the listings cache. +If you want to use the file-based caching, you can also provide the argument +``directory`` to determine where the cache file is stored. + +Example for ``listings_cache_options``: + +.. code-block:: json + + { + "cache_type": "file", + "expiry_time": 3600, + "directory": "/tmp/cache" + } When the ``fsspec`` instance writes to the backend, the method ``invalidate_cache`` is called, so that subsequent listing of the given paths will force a refresh. In diff --git a/fsspec/asyn.py b/fsspec/asyn.py index a040efc4b..caf7b0897 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -312,7 +312,15 @@ class AsyncFileSystem(AbstractFileSystem): mirror_sync_methods = True disable_throttling = False - def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): + def __init__( + self, + *args, + asynchronous=False, + loop=None, + batch_size=None, + listings_cache_options=None, + **kwargs, + ): self.asynchronous = asynchronous self._pid = os.getpid() if not asynchronous: @@ -320,7 +328,7 @@ def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwar else: self._loop = None self.batch_size = batch_size - super().__init__(*args, **kwargs) + super().__init__(listings_cache_options, *args, **kwargs) @property def loop(self): diff --git a/fsspec/dircache.py b/fsspec/dircache.py index eca19566b..3300bcc07 100644 --- a/fsspec/dircache.py +++ b/fsspec/dircache.py @@ -1,9 +1,44 @@ +import logging import time from collections.abc import MutableMapping +from enum import Enum from functools import lru_cache +from pathlib import Path +from typing import Optional, Union +logger = logging.getLogger(__name__) -class DirCache(MutableMapping): + +class DisabledListingsCache(MutableMapping): + def __init__(self, *args, **kwargs): + pass + + def __getitem__(self, item): + raise KeyError + + def __setitem__(self, key, value): + pass + + def __delitem__(self, key): + pass + + def __iter__(self): + return iter(()) + + def __len__(self): + return 0 + + def clear(self): + pass + + def __contains__(self, item): + return False + + def __reduce__(self): + return (DisabledListingsCache, ()) + + +class MemoryListingsCache(MutableMapping): """ Caching of directory listings, in a structure like:: @@ -26,19 +61,14 @@ class DirCache(MutableMapping): def __init__( self, - use_listings_cache=True, - listings_expiry_time=None, + expiry_time=None, max_paths=None, - **kwargs, ): """ Parameters ---------- - use_listings_cache: bool - If False, this cache never returns items, but always reports KeyError, - and setting items has no effect - listings_expiry_time: int or float (optional) + expiry_time: int or float (optional) Time in seconds that a listing is considered valid. If None, listings do not expire. max_paths: int (optional) @@ -49,15 +79,14 @@ def __init__( self._times = {} if max_paths: self._q = lru_cache(max_paths + 1)(lambda key: self._cache.pop(key, None)) - self.use_listings_cache = use_listings_cache - self.listings_expiry_time = listings_expiry_time - self.max_paths = max_paths + self._expiry_time = expiry_time + self._max_paths = max_paths def __getitem__(self, item): - if self.listings_expiry_time is not None: - if self._times.get(item, 0) - time.time() < -self.listings_expiry_time: + if self._expiry_time is not None: + if self._times.get(item, 0) - time.time() < -self._expiry_time: del self._cache[item] - if self.max_paths: + if self._max_paths: self._q(item) return self._cache[item] # maybe raises KeyError @@ -75,12 +104,10 @@ def __contains__(self, item): return False def __setitem__(self, key, value): - if not self.use_listings_cache: - return - if self.max_paths: + if self._max_paths: self._q(key) self._cache[key] = value - if self.listings_expiry_time is not None: + if self._expiry_time is not None: self._times[key] = time.time() def __delitem__(self, key): @@ -93,6 +120,99 @@ def __iter__(self): def __reduce__(self): return ( - DirCache, - (self.use_listings_cache, self.listings_expiry_time, self.max_paths), + MemoryListingsCache, + (self._expiry_time, self._max_paths), + ) + + +class FileListingsCache(MutableMapping): + def __init__( + self, + expiry_time: Optional[int], + directory: Optional[Path], + ): + """ + + Parameters + ---------- + expiry_time: int or float (optional) + Time in seconds that a listing is considered valid. If None, + listings do not expire. + directory: str (optional) + Directory path at which the listings cache file is stored. If None, + an autogenerated path at the user folder is created. + + """ + try: + import platformdirs + from diskcache import Cache + except ImportError as e: + raise ImportError( + "The optional dependencies ``platformdirs`` and ``diskcache`` are required for file-based dircache." + ) from e + + if not directory: + directory = platformdirs.user_cache_dir(appname="fsspec") + directory = Path(directory) / "dircache" / str(expiry_time) + + try: + directory.mkdir(exist_ok=True, parents=True) + except OSError as e: + logger.error(f"Directory for dircache could not be created at {directory}.") + raise e + else: + logger.info(f"Dircache located at {directory}.") + + self._expiry_time = expiry_time + self._directory = directory + self._cache = Cache(directory=str(directory)) + + def __getitem__(self, item): + """Draw item as fileobject from cache, retry if timeout occurs""" + return self._cache.get(key=item, read=True, retry=True) + + def clear(self): + self._cache.clear() + + def __len__(self): + return len(list(self._cache.iterkeys())) + + def __contains__(self, item): + value = self._cache.get(item, retry=True) # None, if expired + if value: + return True + return False + + def __setitem__(self, key, value): + self._cache.set(key=key, value=value, expire=self._expiry_time, retry=True) + + def __delitem__(self, key): + del self._cache[key] + + def __iter__(self): + return (k for k in self._cache.iterkeys() if k in self) + + def __reduce__(self): + return ( + FileListingsCache, + (self._expiry_time, self._directory), ) + + +class CacheType(Enum): + DISABLED = DisabledListingsCache + MEMORY = MemoryListingsCache + FILE = FileListingsCache + + +def create_listings_cache( + cache_type: CacheType, + expiry_time: Optional[int], + **kwargs, +) -> Optional[Union[MemoryListingsCache, FileListingsCache]]: + cache_map = { + CacheType.DISABLED: DisabledListingsCache, + CacheType.MEMORY: MemoryListingsCache, + CacheType.FILE: FileListingsCache, + } + return cache_map[cache_type](expiry_time, **kwargs) diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 4580764ce..1d9961020 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -3,7 +3,6 @@ import logging import re import weakref -from copy import copy from urllib.parse import urlparse import aiohttp @@ -58,6 +57,7 @@ def __init__( client_kwargs=None, get_client=get_client, encoded=False, + listings_cache_options=None, **storage_options, ): """ @@ -83,11 +83,39 @@ def __init__( A callable which takes keyword arguments and constructs an aiohttp.ClientSession. It's state will be managed by the HTTPFileSystem class. + listings_cache_options: dict + Options for the listings cache. storage_options: key-value Any other parameters passed on to requests cache_type, cache_options: defaults used in open """ - super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) + # TODO: remove in future release + # Clean caching-related parameters from `storage_options` + # before propagating them as `request_options` through `self.kwargs`. + old_listings_cache_kwargs = { + "use_listings_cache", + "listings_expiry_time", + "max_paths", + "skip_instance_cache", + } + # intersection of old_listings_cache_kwargs and storage_options + old_listings_cache_kwargs = old_listings_cache_kwargs.intersection( + storage_options + ) + if old_listings_cache_kwargs: + logger.warning( + f"The following parameters are not used anymore and will be ignored: {old_listings_cache_kwargs}. " + f"Use new `listings_cache_options` instead." + ) + for key in old_listings_cache_kwargs: + del storage_options[key] + super().__init__( + self, + asynchronous=asynchronous, + loop=loop, + listings_cache_options=listings_cache_options, + **storage_options, + ) self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE self.simple_links = simple_links self.same_schema = same_scheme @@ -96,19 +124,10 @@ def __init__( self.client_kwargs = client_kwargs or {} self.get_client = get_client self.encoded = encoded - self.kwargs = storage_options - self._session = None - - # Clean caching-related parameters from `storage_options` - # before propagating them as `request_options` through `self.kwargs`. # TODO: Maybe rename `self.kwargs` to `self.request_options` to make # it clearer. - request_options = copy(storage_options) - self.use_listings_cache = request_options.pop("use_listings_cache", False) - request_options.pop("listings_expiry_time", None) - request_options.pop("max_paths", None) - request_options.pop("skip_instance_cache", None) - self.kwargs = request_options + self.kwargs = storage_options + self._session = None @property def fsid(self): @@ -201,7 +220,7 @@ async def _ls_real(self, url, detail=True, **kwargs): return sorted(out) async def _ls(self, url, detail=True, **kwargs): - if self.use_listings_cache and url in self.dircache: + if url in self.dircache: out = self.dircache[url] else: out = await self._ls_real(url, detail=detail, **kwargs) diff --git a/fsspec/implementations/libarchive.py b/fsspec/implementations/libarchive.py index eb6f14535..da3b3361d 100644 --- a/fsspec/implementations/libarchive.py +++ b/fsspec/implementations/libarchive.py @@ -115,7 +115,7 @@ def __init__( Kwargs passed when instantiating the target FS, if ``fo`` is a string. """ - super().__init__(self, **kwargs) + super().__init__(False, self, **kwargs) if mode != "r": raise ValueError("Only read from archive files accepted") if isinstance(fo, str): diff --git a/fsspec/implementations/tests/test_http.py b/fsspec/implementations/tests/test_http.py index fdae51ff5..6d3d62056 100644 --- a/fsspec/implementations/tests/test_http.py +++ b/fsspec/implementations/tests/test_http.py @@ -10,6 +10,7 @@ import fsspec.asyn import fsspec.utils +from fsspec.dircache import CacheType, FileListingsCache, MemoryListingsCache from fsspec.implementations.http import HTTPStreamFile from fsspec.tests.conftest import data, reset_files, server, win # noqa: F401 @@ -27,87 +28,157 @@ def test_list_invalid_args(server): def test_list_cache(server): - h = fsspec.filesystem("http", use_listings_cache=True) + h = fsspec.filesystem("http", listings_cache_options=True) + assert h.listings_cache_options == { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + assert issubclass(h.dircache.__class__, MemoryListingsCache) out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] -def test_list_cache_with_expiry_time_cached(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=30) +@pytest.mark.parametrize("expiry_time", [None, 10]) +def test_list_cache_memory(server, expiry_time): + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": "memory", "expiry_time": expiry_time}, + ) + assert h.listings_cache_options == { + "cache_type": CacheType.MEMORY, + "expiry_time": expiry_time, + } + assert issubclass(h.dircache.__class__, MemoryListingsCache) + start = time.time() + out = h.glob(server + "/index/*") + normal_duration = time.time() - start + assert out == [server + "/index/realfile"] + # Verify cache content. + assert len(h.dircache) == 1 + start = time.time() + out = h.glob(server + "/index/*") + cached_duration = time.time() - start + assert out == [server + "/index/realfile"] + assert normal_duration / cached_duration > 1.5 - # First, the directory cache is not initialized. - assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", - # the cache will automatically get populated. +@pytest.mark.parametrize("expiry_time", [None, 10]) +def test_list_cache_file(server, tmp_path, expiry_time): + h = fsspec.filesystem( + "http", + listings_cache_options={ + "cache_type": "file", + "expiry_time": expiry_time, + "directory": tmp_path, + }, + ) + assert h.listings_cache_options == { + "cache_type": CacheType.FILE, + "expiry_time": expiry_time, + "directory": tmp_path, + } + assert issubclass(h.dircache.__class__, FileListingsCache) + h.dircache.clear() # Needed for filedircache + start = time.time() out = h.glob(server + "/index/*") + normal_duration = time.time() - start assert out == [server + "/index/realfile"] - # Verify cache content. assert len(h.dircache) == 1 - + start = time.time() out = h.glob(server + "/index/*") + cached_duration = time.time() - start assert out == [server + "/index/realfile"] + assert normal_duration / cached_duration > 1.5 + h.dircache.clear() # clean up -def test_list_cache_with_expiry_time_purged(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=0.3) - +@pytest.mark.parametrize("listings_cache_type", ["memory", "file"]) +def test_list_cache_with_expiry_time_purged(server, listings_cache_type): + h = fsspec.filesystem( + "http", + listings_cache_options={ + "cache_type": listings_cache_type, + "expiry_time": 3, + }, + ) + expected_listings_cache_options = { + "cache_type": ( + CacheType.MEMORY if listings_cache_type == "memory" else CacheType.FILE + ), + "expiry_time": 3, + } + if listings_cache_type == "file": + expected_listings_cache_options["directory"] = None + assert h.listings_cache_options == expected_listings_cache_options + h.dircache.clear() # Needed for filedircache # First, the directory cache is not initialized. assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] assert len(h.dircache) == 1 - # Verify cache content. assert server + "/index/" in h.dircache assert len(h.dircache.get(server + "/index/")) == 1 - # Wait beyond the TTL / cache expiry time. - time.sleep(0.31) - + time.sleep(4) # Verify that the cache item should have been purged. cached_items = h.dircache.get(server + "/index/") assert cached_items is None - # Verify that after clearing the item from the cache, # it can get populated again. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] cached_items = h.dircache.get(server + "/index/") assert len(cached_items) == 1 + h.dircache.clear() # clean up -def test_list_cache_reuse(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) - +@pytest.mark.parametrize("listings_cache_type", ["memory", "file"]) +def test_list_cache_reuse(server, listings_cache_type): + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 5}, + ) + expected_listings_cache_options = { + "cache_type": ( + CacheType.MEMORY if listings_cache_type == "memory" else CacheType.FILE + ), + "expiry_time": 5, + } + if listings_cache_type == "file": + expected_listings_cache_options["directory"] = None + assert h.listings_cache_options == expected_listings_cache_options + # Needed for filedircache + h.dircache.clear() # First, the directory cache is not initialized. assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] - # Verify cache content. assert len(h.dircache) == 1 - # Verify another instance without caching enabled does not have cache content. h = fsspec.filesystem("http", use_listings_cache=False) assert not h.dircache - # Verify that yet another new instance, with caching enabled, # will see the same cache content again. - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 5}, + ) assert len(h.dircache) == 1 - # However, yet another instance with a different expiry time will also not have # any valid cache content. - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=666) + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 666}, + ) assert len(h.dircache) == 0 + h.dircache.clear() # clean up def test_ls_raises_filenotfound(server): @@ -123,12 +194,6 @@ def test_list_cache_with_max_paths(server): assert out == [server + "/index/realfile"] -def test_list_cache_with_skip_instance_cache(server): - h = fsspec.filesystem("http", use_listings_cache=True, skip_instance_cache=True) - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] - - def test_glob_return_subfolders(server): h = fsspec.filesystem("http") out = h.glob(server + "/simple/*") diff --git a/fsspec/implementations/zip.py b/fsspec/implementations/zip.py index 9d9c046bf..97f90eca6 100644 --- a/fsspec/implementations/zip.py +++ b/fsspec/implementations/zip.py @@ -44,7 +44,7 @@ def __init__( compression, allowZip64, compresslevel: passed to ZipFile Only relevant when creating a ZIP """ - super().__init__(self, **kwargs) + super().__init__(False, self, **kwargs) if mode not in set("rwa"): raise ValueError(f"mode '{mode}' no understood") self.mode = mode diff --git a/fsspec/spec.py b/fsspec/spec.py index bcc01f514..e6176599b 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -9,16 +9,17 @@ from errno import ESPIPE from glob import has_magic from hashlib import sha256 -from typing import ClassVar +from typing import ClassVar, Optional from .callbacks import DEFAULT_CALLBACK from .config import apply_config, conf -from .dircache import DirCache +from .dircache import create_listings_cache from .transaction import Transaction from .utils import ( _unstrip_protocol, glob_translate, isfilelike, + normalize_listings_cache_options, other_paths, read_block, stringify_path, @@ -115,7 +116,12 @@ class AbstractFileSystem(metaclass=_Cached): #: Extra *class attributes* that should be considered when hashing. _extra_tokenize_attributes = () - def __init__(self, *args, **storage_options): + def __init__( + self, + listings_cache_options: Optional[bool, dict] = None, + *args, + **storage_options, + ): """Create and configure file-system instance Instances may be cachable, so if similar enough arguments are seen @@ -128,10 +134,11 @@ def __init__(self, *args, **storage_options): Parameters ---------- - use_listings_cache, listings_expiry_time, max_paths: - passed to ``DirCache``, if the implementation supports - directory listing caching. Pass use_listings_cache=False - to disable such caching. + listings_cache_options: bool or dict + If True, a default MemoryListingsCache cache is created. + If dict of arguments, used to create a directory cache using + argument cache_type ("memory" or "file"), expiry_time, and + other arguments passed to ``MemoryListingsCache`` or ``FileListingsCache``. skip_instance_cache: bool If this is a cachable implementation, pass True here to force creating a new instance even if a matching instance exists, and prevent @@ -146,7 +153,11 @@ def __init__(self, *args, **storage_options): self._intrans = False self._transaction = None self._invalidated_caches_in_transaction = [] - self.dircache = DirCache(**storage_options) + listings_cache_options = normalize_listings_cache_options( + listings_cache_options + ) + self.listings_cache_options = listings_cache_options + self.dircache = create_listings_cache(**listings_cache_options) if storage_options.pop("add_docs", None): warnings.warn("add_docs is no longer supported.", FutureWarning) @@ -358,6 +369,7 @@ def _ls_from_cache(self, path): but contains nothing), None if not in cache. """ parent = self._parent(path) + try: return self.dircache[path.rstrip("/")] except KeyError: diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index de23d783d..b267e0444 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -859,7 +859,7 @@ def test_json(): def test_ls_from_cache(): - fs = DummyTestFS() + fs = DummyTestFS(listings_cache_options=True) uncached_results = fs.ls("top_level/second_level/", refresh=True) assert fs.ls("top_level/second_level/", refresh=False) == uncached_results diff --git a/fsspec/tests/test_utils.py b/fsspec/tests/test_utils.py index 10fa89a2d..b581647dc 100644 --- a/fsspec/tests/test_utils.py +++ b/fsspec/tests/test_utils.py @@ -6,6 +6,7 @@ import pytest import fsspec.utils +from fsspec.dircache import CacheType from fsspec.utils import ( can_be_local, common_prefix, @@ -13,6 +14,7 @@ infer_storage_options, merge_offset_ranges, mirror_from, + normalize_listings_cache_options, other_paths, read_block, seek_delimiter, @@ -476,3 +478,49 @@ def test_stringify_path(path, expected): path = fsspec.utils.stringify_path(path) assert path == expected + + +@pytest.mark.parametrize("listings_cache_options", [None, False, {}]) +def test_normalize_listings_cache_options_disable(listings_cache_options): + assert normalize_listings_cache_options(listings_cache_options) == { + "cache_type": CacheType.DISABLED, + "expiry_time": None, + } + + +def test_normalize_listings_cache_options_enable(): + assert normalize_listings_cache_options(True) == { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + + +def test_normalize_listings_cache_options_with_cache_type(): + assert normalize_listings_cache_options({"cache_type": CacheType.FILE}) == { + "cache_type": CacheType.FILE, + "directory": None, + "expiry_time": None, + } + + +def test_normalize_listings_cache_options_with_expiry_time(): + assert normalize_listings_cache_options({"expiry_time": 10}) == { + "cache_type": CacheType.MEMORY, + "expiry_time": 10, + } + + +def test_normalize_listings_cache_options_file_cache_with_directory(): + assert normalize_listings_cache_options( + {"cache_type": CacheType.FILE, "directory": "foo"} + ) == {"cache_type": CacheType.FILE, "directory": "foo", "expiry_time": None} + + +def test_normalize_listings_cache_options_invalid_cache_type(): + with pytest.raises(ValueError): + normalize_listings_cache_options({"cache_type": "invalid"}) + + +def test_normalize_listings_cache_options_invalid_expiry_time(): + with pytest.raises(ValueError): + normalize_listings_cache_options({"expiry_time": -1}) diff --git a/fsspec/utils.py b/fsspec/utils.py index 6e130aa75..cf76ceebe 100644 --- a/fsspec/utils.py +++ b/fsspec/utils.py @@ -18,17 +18,20 @@ Callable, Iterable, Iterator, + Optional, Sequence, TypeVar, + Union, ) from urllib.parse import urlsplit +from fsspec.dircache import CacheType + if TYPE_CHECKING: from typing_extensions import TypeGuard from fsspec.spec import AbstractFileSystem - DEFAULT_BLOCK_SIZE = 5 * 2**20 T = TypeVar("T") @@ -738,3 +741,49 @@ def glob_translate(pat): results.append(any_sep) res = "".join(results) return rf"(?s:{res})\Z" + + +def normalize_listings_cache_options( + listings_cache_options: Optional[Union[bool, dict]], +) -> dict: + """Normalize listings cache options + Cases: + - listings_cache_options is None: return disabled cache options (cache_type=CacheType.DISABLED, expiry_time=None) + - listings_cache_options is True: return default cache options (cache_type=CacheType.MEMORY, expiry_time=None) + - listings_cache_options is dict: return normalized cache options + """ + default_listings_cache_options = { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + if not listings_cache_options: + return {"cache_type": CacheType.DISABLED, "expiry_time": None} + elif listings_cache_options is True: + return default_listings_cache_options + else: + normalized_listings_cache_options = default_listings_cache_options.copy() + normalized_listings_cache_options.update(listings_cache_options) + # disassemble and reassemble normalized_listings_cache_options + cache_type = normalized_listings_cache_options["cache_type"] + expiry_time = normalized_listings_cache_options["expiry_time"] + directory = normalized_listings_cache_options.get("directory") + try: + cache_type = CacheType(cache_type) + except ValueError: + try: + cache_type = CacheType[cache_type.upper()] + except KeyError as e: + raise ValueError( + f"Cache type must be one of {', '.join(ct.name.lower() for ct in CacheType)}" + ) from e + if expiry_time: + if expiry_time < 0: + raise ValueError("Expiry time must be positive") + expiry_time = int(expiry_time) if expiry_time else None + normalized_listings_cache_options = { + "cache_type": cache_type, + "expiry_time": expiry_time, + } + if cache_type == CacheType.FILE: + normalized_listings_cache_options["directory"] = directory + return normalized_listings_cache_options diff --git a/pyproject.toml b/pyproject.toml index caee973d2..6d3e819d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,12 +29,14 @@ adl = ["adlfs"] arrow = ["pyarrow >= 1"] dask = ["dask", "distributed"] dev = ["ruff", "pre-commit"] +dircache = ["diskcache", "platformdirs"] dropbox = ["dropbox", "dropboxdrivefs", "requests"] entrypoints = [] full = [ 'adlfs', 'aiohttp !=4.0.0a0, !=4.0.0a1', 'dask', + 'diskcache', 'distributed', 'dropbox', 'dropboxdrivefs', @@ -44,6 +46,7 @@ full = [ 'ocifs', 'panel', 'paramiko', + 'platformdirs', 'pyarrow >= 1', 'pygit2', 'requests', @@ -84,6 +87,7 @@ test_full = [ 'aiohttp!=4.0.0a0, !=4.0.0a1', 'cloudpickle', 'dask', + 'diskcache', 'distributed', 'dropbox', 'dropboxdrivefs', @@ -99,6 +103,7 @@ test_full = [ 'pandas', 'panel', 'paramiko', + 'platformdirs', 'pyarrow', 'pyarrow >= 1', 'pyftpdlib',