Skip to content

Commit

Permalink
Merge branch 'master' into UP
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Nov 15, 2024
2 parents 92288fd + b9daa8b commit b6b7f1e
Show file tree
Hide file tree
Showing 27 changed files with 610 additions and 110 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ jobs:
strategy:
fail-fast: false
matrix:
PY: ["3.8", "3.9", "3.10", "3.11", "3.12"]
PY:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- "3.13"

env:
CIRUN: true
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ exclude: >
repos:

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -14,7 +14,7 @@ repos:
- id: check-yaml
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.4
rev: v0.6.9
hooks:
# Run the linter.
- id: ruff
Expand Down
56 changes: 44 additions & 12 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -209,41 +209,73 @@ Built-in Implementations
Other Known Implementations
---------------------------

- `abfs`_ for Azure Blob service
- `adl`_ for Azure DataLake storage

Note that most of these projects are hosted outside of the `fsspec` organisation. Please read their
documentation carefully before using any particular package.

- `abfs`_ for Azure Blob service, with protocol "abfs://"
- `adl`_ for Azure DataLake storage, with protocol "adl://"
- `alluxiofs`_ to access fsspec implemented filesystem with Alluxio distributed cache
- `boxfs`_ for access to Box file storage
- `dropbox`_ for access to dropbox shares
- `boxfs`_ for access to Box file storage, with protocol "box://"
- `csvbase`_ for access to csvbase.com hosted CSV files, with protocol "csvbase://"
- `dropbox`_ for access to dropbox shares, with protocol "dropbox://"
- `dvc`_ to access DVC/Git repository as a filesystem
- `gcsfs`_ for Google Cloud Storage
- `fsspec-encrypted`_ for transparent encryption on top of other fsspec filesystems.
- `gcsfs`_ for Google Cloud Storage, with protocol "gcs://"
- `gdrive`_ to access Google Drive and shares (experimental)
- `git`_ to access Git repositories
- `huggingface_hub`_ to access the Hugging Face Hub filesystem, with protocol "hf://"
- `lakefs`_ for lakeFS data lakes
- `ocifs`_ for access to Oracle Cloud Object Storage
- `hdfs-native`_ to access Hadoop filesystem, with protocol "hdfs://"
- `httpfs-sync`_ to access HTTP(s) files in a synchronous manner to offer an alternative to the aiohttp-based implementation.
- `ipfsspec`_ for the InterPlanetary File System (IPFS), with protocol "ipfs://"
- `irods`_ for access to iRODS servers, with protocol "irods://"
- `lakefs`_ for lakeFS data lakes, with protocol "lakefs://"
- `morefs`_ for `OverlayFileSystem`, `DictFileSystem`, and others
- `ocifs`_ for access to Oracle Cloud Object Storage, with protocol "oci://"
- `ocilake`_ for OCI Data Lake storage
- `ossfs`_ for Alibaba Cloud (Aliyun) Object Storage System (OSS)
- `p9fs`_ for 9P (Plan 9 Filesystem Protocol) servers
- `s3fs`_ for Amazon S3 and other compatible stores
- `PyAthena`_ for S3 access to Amazon Athena, with protocol "s3://" or "s3a://"
- `PyDrive2`_ for Google Drive access
- `s3fs`_ for Amazon S3 and other compatible stores, with protocol "s3://"
- `sshfs`_ for access to SSH servers, with protocol "ssh://" or "sftp://"
- `swiftspec`_ for OpenStack SWIFT, with protocol "swift://"
- `tosfs`_ for ByteDance volcano engine Tinder Object Storage (TOS)
- `wandbfs`_ to access Wandb run data (experimental)
- `webdav4`_ for WebDAV
- `wandbfsspec`_ to access Weights & Biases (experimental)
- `webdav4`_ for WebDAV, with protocol "webdav://" or "dav://"
- `xrootd`_ for xrootd, with protocol "root://"

.. _abfs: https://github.com/dask/adlfs
.. _adl: https://github.com/dask/adlfs
.. _alluxiofs: https://github.com/fsspec/alluxiofs
.. _boxfs: https://github.com/IBM/boxfs
.. _dropbox: https://github.com/MarineChap/intake_dropbox
.. _csvbase: https://github.com/calpaterson/csvbase-client
.. _dropbox: https://github.com/fsspec/dropboxdrivefs
.. _dvc: https://github.com/iterative/dvc
.. _fsspec-encrypted: https://github.com/thevgergroup/fsspec-encrypted
.. _gcsfs: https://gcsfs.readthedocs.io/en/latest/
.. _gdrive: https://github.com/fsspec/gdrivefs
.. _git: https://github.com/iterative/scmrepo
.. _hdfs-native: https://github.com/Kimahriman/hdfs-native/blob/master/python/hdfs_native/fsspec.py
.. _httpfs-sync: https://github.com/moradology/httpfs-sync
.. _huggingface_hub: https://huggingface.co/docs/huggingface_hub/main/en/guides/hf_file_system
.. _lakefs: https://github.com/appliedAI-Initiative/lakefs-spec
.. _ocifs: https://pypi.org/project/ocifs
.. _ipfsspec: https://github.com/fsspec/ipfsspec
.. _irods: https://github.com/xwcl/irods_fsspec
.. _lakefs: https://github.com/aai-institute/lakefs-spec
.. _morefs: https://github.com/iterative/morefs
.. _ocifs: https://ocifs.readthedocs.io/en/latest/
.. _ocilake: https://github.com/oracle/ocifs
.. _ossfs: https://github.com/fsspec/ossfs
.. _p9fs: https://github.com/pbchekin/p9fs-py
.. _PyAthena: https://github.com/laughingman7743/PyAthena
.. _PyDrive2: https://github.com/iterative/PyDrive2
.. _s3fs: https://s3fs.readthedocs.io/en/latest/
.. _sshfs: https://github.com/fsspec/sshfs
.. _swiftspec: https://github.com/fsspec/swiftspec
.. _tosfs: https://tosfs.readthedocs.io/en/latest/
.. _wandbfs: https://github.com/jkulhanek/wandbfs
.. _wandbfsspec: https://github.com/alvarobartt/wandbfsspec
.. _webdav4: https://github.com/skshetry/webdav4
.. _xrootd: https://github.com/CoffeaTeam/fsspec-xrootd

Expand Down
34 changes: 34 additions & 0 deletions docs/source/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,37 @@ available as the attribute ``.loop``.

<script data-goatcounter="https://fsspec.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>

AsyncFileSystemWrapper
----------------------

The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert
a synchronous filesystem into an asynchronous one. This is useful for quickly integrating
synchronous filesystems into workflows that may expect `AsyncFileSystem` instances.

Basic Usage
~~~~~~~~~~~

To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context.
In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance
backed by the normal, synchronous methods of `LocalFileSystem`:

.. code-block:: python
import asyncio
import fsspec
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
async def async_copy_file():
sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem
async_fs = AsyncFileSystemWrapper(sync_fs)
return await async_fs._copy('/source/file.txt', '/destination/file.txt')
asyncio.run(async_copy_file())
Limitations
-----------

This is experimental. Users should not expect this wrapper to magically make things faster.
It is primarily provided to allow usage of synchronous filesystems with interfaces that expect
`AsyncFileSystem` instances.
21 changes: 21 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
Changelog
=========

2024.10.0
---------

Fixes

- Performance of memoryFS rm (#1725)
- Performance of git FS info (#1712)
- Avoid git hex for newer pygit (#1703)
- tests fix for zip (#1700, 1691)
- missing open_async for dirFS (#1698)
- handle pathlib in zip (#1689)
- skip tests needing kerchunk if not installed (#1689)
- allow repeated kwargs in unchain (#1673)

Other

- Code style (#1704, 1706)
- allow pyarrow in referenceFS parquet (#1692)
- don't hardcode test port for parallel runs (#1690)


2024.9.0
--------

Expand Down
46 changes: 33 additions & 13 deletions fsspec/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import os
import threading
import warnings
from itertools import groupby
from operator import itemgetter
from concurrent.futures import Future, ThreadPoolExecutor
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -156,21 +158,39 @@ def _fetch(self, start: int | None, end: int | None) -> bytes:
return b""
start_block = start // self.blocksize
end_block = end // self.blocksize
need = [i for i in range(start_block, end_block + 1) if i not in self.blocks]
hits = [i for i in range(start_block, end_block + 1) if i in self.blocks]
self.miss_count += len(need)
self.hit_count += len(hits)
while need:
# TODO: not a for loop so we can consolidate blocks later to
# make fewer fetch calls; this could be parallel
i = need.pop(0)

sstart = i * self.blocksize
send = min(sstart + self.blocksize, self.size)
block_range = range(start_block, end_block + 1)
# Determine which blocks need to be fetched. This sequence is sorted by construction.
need = (i for i in block_range if i not in self.blocks)
# Count the number of blocks already cached
self.hit_count += sum(1 for i in block_range if i in self.blocks)

# Consolidate needed blocks.
# Algorithm adapted from Python 2.x itertools documentation.
# We are grouping an enumerated sequence of blocks. By comparing when the difference
# between an ascending range (provided by enumerate) and the needed block numbers
# we can detect when the block number skips values. The key computes this difference.
# Whenever the difference changes, we know that we have previously cached block(s),
# and a new group is started. In other words, this algorithm neatly groups
# runs of consecutive block numbers so they can be fetched together.
for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
# Extract the blocks from the enumerated sequence
_blocks = tuple(map(itemgetter(1), _blocks))
# Compute start of first block
sstart = _blocks[0] * self.blocksize
# Compute the end of the last block. Last block may not be full size.
send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)

# Fetch bytes (could be multiple consecutive blocks)
self.total_requested_bytes += send - sstart
logger.debug(f"MMap get block #{i} ({sstart}-{send})")
logger.debug(
f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
)
self.cache[sstart:send] = self.fetcher(sstart, send)
self.blocks.add(i)

# Update set of cached blocks
self.blocks.update(_blocks)
# Update cache statistics with number of blocks we had to cache
self.miss_count += len(_blocks)

return self.cache[start:end]

Expand Down
33 changes: 19 additions & 14 deletions fsspec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,19 @@ def open_files(


def _un_chain(path, kwargs):
x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
bits = (
[p if "://" in p or x.match(p) else p + "://" for p in path.split("::")]
if "::" in path
else [path]
)
# Avoid a circular import
from fsspec.implementations.cached import CachingFileSystem

if "::" in path:
x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
bits = []
for p in path.split("::"):
if "://" in p or x.match(p):
bits.append(p)
else:
bits.append(p + "://")
else:
bits = [path]
# [[url, protocol, kwargs], ...]
out = []
previous_bit = None
Expand All @@ -346,12 +353,12 @@ def _un_chain(path, kwargs):
kws = kwargs.pop(protocol, {})
if bit is bits[0]:
kws.update(kwargs)
kw = dict(**extra_kwargs, **kws)
kw = dict(
**{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]},
**kws,
)
bit = cls._strip_protocol(bit)
if (
protocol in {"blockcache", "filecache", "simplecache"}
and "target_protocol" not in kw
):
if "target_protocol" not in kw and issubclass(cls, CachingFileSystem):
bit = previous_bit
out.append((bit, protocol, kw))
previous_bit = bit
Expand Down Expand Up @@ -673,9 +680,7 @@ def get_fs_token_paths(
elif not isinstance(paths, list):
paths = list(paths)
else:
if "w" in mode and expand:
paths = _expand_paths(paths, name_function, num)
elif "x" in mode and expand:
if ("w" in mode or "x" in mode) and expand:
paths = _expand_paths(paths, name_function, num)
elif "*" in paths:
paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
Expand Down
96 changes: 96 additions & 0 deletions fsspec/implementations/asyn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import asyncio
import inspect
import functools
from fsspec.asyn import AsyncFileSystem


def async_wrapper(func, obj=None):
"""
Wraps a synchronous function to make it awaitable.
Parameters
----------
func : callable
The synchronous function to wrap.
obj : object, optional
The instance to bind the function to, if applicable.
Returns
-------
coroutine
An awaitable version of the function.
"""

@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)

return wrapper


class AsyncFileSystemWrapper(AsyncFileSystem):
"""
A wrapper class to convert a synchronous filesystem into an asynchronous one.
This class takes an existing synchronous filesystem implementation and wraps all
its methods to provide an asynchronous interface.
Parameters
----------
sync_fs : AbstractFileSystem
The synchronous filesystem instance to wrap.
"""

def __init__(self, sync_fs, *args, **kwargs):
super().__init__(*args, **kwargs)
self.asynchronous = True
self.fs = sync_fs
self._wrap_all_sync_methods()

@property
def fsid(self):
return f"async_{self.fs.fsid}"

def _wrap_all_sync_methods(self):
"""
Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
"""
for method_name in dir(self.fs):
if method_name.startswith("_"):
continue

attr = inspect.getattr_static(self.fs, method_name)
if isinstance(attr, property):
continue

method = getattr(self.fs, method_name)
if callable(method) and not asyncio.iscoroutinefunction(method):
async_method = async_wrapper(method, obj=self)
setattr(self, f"_{method_name}", async_method)

@classmethod
def wrap_class(cls, sync_fs_class):
"""
Create a new class that can be used to instantiate an AsyncFileSystemWrapper
with lazy instantiation of the underlying synchronous filesystem.
Parameters
----------
sync_fs_class : type
The class of the synchronous filesystem to wrap.
Returns
-------
type
A new class that wraps the provided synchronous filesystem class.
"""

class GeneratedAsyncFileSystemWrapper(cls):
def __init__(self, *args, **kwargs):
sync_fs = sync_fs_class(*args, **kwargs)
super().__init__(sync_fs)

GeneratedAsyncFileSystemWrapper.__name__ = (
f"Async{sync_fs_class.__name__}Wrapper"
)
return GeneratedAsyncFileSystemWrapper
Loading

0 comments on commit b6b7f1e

Please sign in to comment.