Skip to content

Commit

Permalink
Add optional file-based listings caching
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Oct 9, 2023
1 parent 7b2e9b2 commit 17e926e
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 28 deletions.
2 changes: 2 additions & 0 deletions ci/environment-py38.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ dependencies:
- nomkl
- jinja2
- tqdm
- diskcache
- platformdirs
- pip:
- hadoop-test-cluster
- smbprotocol
2 changes: 2 additions & 0 deletions ci/environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ dependencies:
- nomkl
- s3fs
- tqdm
- diskcache
- platformdirs
8 changes: 6 additions & 2 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ Base Classes
fsspec.core.OpenFiles
fsspec.core.get_fs_token_paths
fsspec.core.url_to_fs
fsspec.dircache.DirCache
fsspec.dircache.MemDirCache
fsspec.dircache.FileDirCache
fsspec.FSMap
fsspec.generic.GenericFileSystem
fsspec.registry.register_implementation
Expand Down Expand Up @@ -82,7 +83,10 @@ Base Classes

.. autofunction:: fsspec.core.url_to_fs

.. autoclass:: fsspec.dircache.DirCache
.. autoclass:: fsspec.dircache.MemDirCache
:members: __init__

.. autoclass:: fsspec.dircache.FileDirCache
:members: __init__

.. autoclass:: fsspec.FSMap
Expand Down
7 changes: 7 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

Dev
---------

Enhancements

- add filebased listing cache using diskcache

2023.9.2
--------

Expand Down
21 changes: 12 additions & 9 deletions docs/source/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,18 @@ Listings Caching
----------------

For some implementations, getting file listings (i.e., ``ls`` and anything that
depends on it) is expensive. These implementations use dict-like instances of
:class:`fsspec.dircache.DirCache` to manage the listings.

The cache allows for time-based expiry of entries with the ``listings_expiry_time``
parameter, or LRU expiry with the ``max_paths`` parameter. These can be
set on any implementation instance that uses listings caching; or to skip the
caching altogether, use ``use_listings_cache=False``. That would be appropriate
when the target location is known to be volatile because it is being written
to from other sources.
depends on it) is expensive. These implementations use either dict-like instances of
:class:`fsspec.dircache.MemDirCache` or file-based caching with instances of
:class:`fsspec.dircache.FileDirCache` to manage the listings.

The type of cache that is used, can be controlled via the keyword ``listings_cache_type``
that has to be one of `memdircache` or `filedircache`. The cache allows for time-based expiry
of entries with the ``listings_expiry_time`` parameter, or LRU expiry with the ``max_paths``
parameter. These can be set on any implementation instance that uses listings caching; or to
skip the caching altogether, use ``use_listings_cache=False``. That would be appropriate
when the target location is known to be volatile because it is being written to from other
sources. If you want to use the file-based caching, you can also provide the argument
``listings_cache_location`` to determine where the cache file is stored.

When the ``fsspec`` instance writes to the backend, the method ``invalidate_cache``
is called, so that subsequent listing of the given paths will force a refresh. In
Expand Down
97 changes: 95 additions & 2 deletions fsspec/dircache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
import time
from collections.abc import MutableMapping
from functools import lru_cache
from pathlib import Path

logger = logging.getLogger("fsspec")

class DirCache(MutableMapping):

class MemDirCache(MutableMapping):
"""
Caching of directory listings, in a structure like::
Expand Down Expand Up @@ -93,6 +97,95 @@ def __iter__(self):

def __reduce__(self):
return (
DirCache,
MemDirCache,
(self.use_listings_cache, self.listings_expiry_time, self.max_paths),
)


class FileDirCache(MutableMapping):
def __init__(
self,
use_listings_cache=True,
listings_expiry_time=None,
listings_cache_location=None,
**kwargs,
):
"""
Parameters
----------
use_listings_cache: bool
If False, this cache never returns items, but always reports KeyError,
and setting items has no effect
listings_expiry_time: int or float (optional)
Time in seconds that a listing is considered valid. If None,
listings do not expire.
listings_cache_location: str (optional)
Directory path at which the listings cache file is stored. If None,
an autogenerated path at the user folder is created.
"""
import platformdirs
from diskcache import Cache

listings_expiry_time = listings_expiry_time and float(listings_expiry_time)

if listings_cache_location:
listings_cache_location = Path(listings_cache_location) / str(
listings_expiry_time
)
listings_cache_location.mkdir(exist_ok=True, parents=True)
else:
listings_cache_location = Path(
platformdirs.user_cache_dir(appname="fsspec_dircache")
) / str(listings_expiry_time)

try:
listings_cache_location.mkdir(exist_ok=True, parents=True)
except Exception:
logger.error(
f"folder for dircache could not be created at {listings_cache_location}"
)

self.cache_location = listings_cache_location

logger.info(f"Dircache located at {listings_cache_location}")

self._cache = Cache(directory=listings_cache_location)
self.use_listings_cache = use_listings_cache
self.listings_expiry_time = listings_expiry_time

def __getitem__(self, item):
"""Draw item as fileobject from cache, retry if timeout occurs"""
return self._cache.get(key=item, read=True, retry=True)

def clear(self):
self._cache.clear()

def __len__(self):
return len(list(self._cache.iterkeys()))

def __contains__(self, item):
value = self._cache.get(item, retry=True) # None, if expired
if value:
return True
return False

def __setitem__(self, key, value):
if not self.use_listings_cache:
return
self._cache.set(
key=key, value=value, expire=self.listings_expiry_time, retry=True
)

def __delitem__(self, key):
del self._cache[key]

def __iter__(self):
return (k for k in self._cache.iterkeys() if k in self)

def __reduce__(self):
return (
FileDirCache,
(self.use_listings_cache, self.listings_expiry_time, self.cache_location),
)
2 changes: 2 additions & 0 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def __init__(
request_options.pop("listings_expiry_time", None)
request_options.pop("max_paths", None)
request_options.pop("skip_instance_cache", None)
request_options.pop("listings_cache_type", None)
request_options.pop("listings_cache_location", None)
self.kwargs = request_options

@property
Expand Down
79 changes: 67 additions & 12 deletions fsspec/implementations/tests/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,30 @@ def test_list_invalid_args(server):
h.glob(server + "/index/*")


def test_list_cache(server):
h = fsspec.filesystem("http", use_listings_cache=True)
@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"])
def test_list_cache(server, listings_cache_type):
h = fsspec.filesystem(
"http", use_listings_cache=True, listings_cache_type=listings_cache_type
)

h.dircache.clear() # Needed for filedircache

out = h.glob(server + "/index/*")
assert out == [server + "/index/realfile"]

h.dircache.clear() # clean up


@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"])
def test_list_cache_with_expiry_time_cached(server, listings_cache_type):
h = fsspec.filesystem(
"http",
use_listings_cache=True,
listings_expiry_time=30,
listings_cache_type=listings_cache_type,
)

def test_list_cache_with_expiry_time_cached(server):
h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=30)
h.dircache.clear() # Needed for filedircache

# First, the directory cache is not initialized.
assert not h.dircache
Expand All @@ -49,9 +65,19 @@ def test_list_cache_with_expiry_time_cached(server):
out = h.glob(server + "/index/*")
assert out == [server + "/index/realfile"]

h.dircache.clear() # clean up


@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"])
def test_list_cache_with_expiry_time_purged(server, listings_cache_type):
h = fsspec.filesystem(
"http",
use_listings_cache=True,
listings_expiry_time=0.3,
listings_cache_type=listings_cache_type,
)

def test_list_cache_with_expiry_time_purged(server):
h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=0.3)
h.dircache.clear() # Needed for filedircache

# First, the directory cache is not initialized.
assert not h.dircache
Expand Down Expand Up @@ -80,9 +106,20 @@ def test_list_cache_with_expiry_time_purged(server):
cached_items = h.dircache.get(server + "/index/")
assert len(cached_items) == 1

h.dircache.clear() # clean up

def test_list_cache_reuse(server):
h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5)

@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"])
def test_list_cache_reuse(server, listings_cache_type):
h = fsspec.filesystem(
"http",
use_listings_cache=True,
listings_expiry_time=5,
listings_cache_type=listings_cache_type,
)

# Needed for filedircache
h.dircache.clear()

# First, the directory cache is not initialized.
assert not h.dircache
Expand All @@ -101,14 +138,26 @@ def test_list_cache_reuse(server):

# Verify that yet another new instance, with caching enabled,
# will see the same cache content again.
h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5)
h = fsspec.filesystem(
"http",
use_listings_cache=True,
listings_expiry_time=5,
listings_cache_type=listings_cache_type,
)
assert len(h.dircache) == 1

# However, yet another instance with a different expiry time will also not have
# any valid cache content.
h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=666)
h = fsspec.filesystem(
"http",
use_listings_cache=True,
listings_expiry_time=666,
listings_cache_type=listings_cache_type,
)
assert len(h.dircache) == 0

h.dircache.clear() # clean up


def test_ls_raises_filenotfound(server):
h = fsspec.filesystem("http")
Expand All @@ -123,8 +172,14 @@ def test_list_cache_with_max_paths(server):
assert out == [server + "/index/realfile"]


def test_list_cache_with_skip_instance_cache(server):
h = fsspec.filesystem("http", use_listings_cache=True, skip_instance_cache=True)
@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"])
def test_list_cache_with_skip_instance_cache(server, listings_cache_type):
h = fsspec.filesystem(
"http",
use_listings_cache=True,
skip_instance_cache=True,
listings_cache_type=listings_cache_type,
)
out = h.glob(server + "/index/*")
assert out == [server + "/index/realfile"]

Expand Down
15 changes: 12 additions & 3 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from .callbacks import _DEFAULT_CALLBACK
from .config import apply_config, conf
from .dircache import DirCache
from .dircache import FileDirCache, MemDirCache
from .transaction import Transaction
from .utils import (
_unstrip_protocol,
Expand Down Expand Up @@ -127,7 +127,7 @@ def __init__(self, *args, **storage_options):
Parameters
----------
use_listings_cache, listings_expiry_time, max_paths:
passed to ``DirCache``, if the implementation supports
passed to ``MemDirCache``, if the implementation supports
directory listing caching. Pass use_listings_cache=False
to disable such caching.
skip_instance_cache: bool
Expand All @@ -144,7 +144,16 @@ def __init__(self, *args, **storage_options):
self._intrans = False
self._transaction = None
self._invalidated_caches_in_transaction = []
self.dircache = DirCache(**storage_options)

listings_cache_type = storage_options.get("listings_cache_type", "memdircache")
if listings_cache_type not in ("memdircache", "filedircache"):
raise ValueError(
"'listings_cache_type' has to be one of ('memdircache', 'filedircache')"
)
if listings_cache_type == "memdircache":
self.dircache = MemDirCache(**storage_options)
else:
self.dircache = FileDirCache(**storage_options)

if storage_options.pop("add_docs", None):
warnings.warn("add_docs is no longer supported.", FutureWarning)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"abfs": ["adlfs"],
"adl": ["adlfs"],
"dask": ["dask", "distributed"],
"dircache": ["diskcache", "platformdirs"],
"dropbox": ["dropboxdrivefs", "requests", "dropbox"],
"gcs": ["gcsfs"],
"git": ["pygit2"],
Expand Down
Empty file added tox.ini
Empty file.

0 comments on commit 17e926e

Please sign in to comment.