Skip to content

Commit

Permalink
Merge branch 'master' into add-implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Nov 14, 2024
2 parents ccb149b + 4cb98ab commit 72f677d
Show file tree
Hide file tree
Showing 21 changed files with 551 additions and 87 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ jobs:
strategy:
fail-fast: false
matrix:
PY: ["3.8", "3.9", "3.10", "3.11", "3.12"]
PY:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- "3.13"

env:
CIRUN: true
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ exclude: >
repos:

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -14,7 +14,7 @@ repos:
- id: check-yaml
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.4
rev: v0.6.9
hooks:
# Run the linter.
- id: ruff
Expand Down
2 changes: 2 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ documentation carefully before using any particular package.
- `s3fs`_ for Amazon S3 and other compatible stores, with protocol "s3://"
- `sshfs`_ for access to SSH servers, with protocol "ssh://" or "sftp://"
- `swiftspec`_ for OpenStack SWIFT, with protocol "swift://"
- `tosfs`_ for ByteDance volcano engine Tinder Object Storage (TOS)
- `wandbfs`_ to access Wandb run data (experimental)
- `wandbfsspec`_ to access Weights & Biases (experimental)
- `webdav4`_ for WebDAV, with protocol "webdav://" or "dav://"
Expand Down Expand Up @@ -272,6 +273,7 @@ documentation carefully before using any particular package.
.. _s3fs: https://s3fs.readthedocs.io/en/latest/
.. _sshfs: https://github.com/fsspec/sshfs
.. _swiftspec: https://github.com/fsspec/swiftspec
.. _tosfs: https://tosfs.readthedocs.io/en/latest/
.. _wandbfs: https://github.com/jkulhanek/wandbfs
.. _wandbfsspec: https://github.com/alvarobartt/wandbfsspec
.. _webdav4: https://github.com/skshetry/webdav4
Expand Down
34 changes: 34 additions & 0 deletions docs/source/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,37 @@ available as the attribute ``.loop``.

<script data-goatcounter="https://fsspec.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>

AsyncFileSystemWrapper
----------------------

The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert
a synchronous filesystem into an asynchronous one. This is useful for quickly integrating
synchronous filesystems into workflows that may expect `AsyncFileSystem` instances.

Basic Usage
~~~~~~~~~~~

To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context.
In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance
backed by the normal, synchronous methods of `LocalFileSystem`:

.. code-block:: python
import asyncio
import fsspec
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
async def async_copy_file():
sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem
async_fs = AsyncFileSystemWrapper(sync_fs)
return await async_fs._copy('/source/file.txt', '/destination/file.txt')
asyncio.run(async_copy_file())
Limitations
-----------

This is experimental. Users should not expect this wrapper to magically make things faster.
It is primarily provided to allow usage of synchronous filesystems with interfaces that expect
`AsyncFileSystem` instances.
21 changes: 21 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
Changelog
=========

2024.10.0
---------

Fixes

- Performance of memoryFS rm (#1725)
- Performance of git FS info (#1712)
- Avoid git hex for newer pygit (#1703)
- tests fix for zip (#1700, 1691)
- missing open_async for dirFS (#1698)
- handle pathlib in zip (#1689)
- skip tests needing kerchunk if not installed (#1689)
- allow repeated kwargs in unchain (#1673)

Other

- Code style (#1704, 1706)
- allow pyarrow in referenceFS parquet (#1692)
- don't hardcode test port for parallel runs (#1690)


2024.9.0
--------

Expand Down
46 changes: 33 additions & 13 deletions fsspec/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import os
import threading
import warnings
from itertools import groupby
from operator import itemgetter
from concurrent.futures import Future, ThreadPoolExecutor
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -161,21 +163,39 @@ def _fetch(self, start: int | None, end: int | None) -> bytes:
return b""
start_block = start // self.blocksize
end_block = end // self.blocksize
need = [i for i in range(start_block, end_block + 1) if i not in self.blocks]
hits = [i for i in range(start_block, end_block + 1) if i in self.blocks]
self.miss_count += len(need)
self.hit_count += len(hits)
while need:
# TODO: not a for loop so we can consolidate blocks later to
# make fewer fetch calls; this could be parallel
i = need.pop(0)

sstart = i * self.blocksize
send = min(sstart + self.blocksize, self.size)
block_range = range(start_block, end_block + 1)
# Determine which blocks need to be fetched. This sequence is sorted by construction.
need = (i for i in block_range if i not in self.blocks)
# Count the number of blocks already cached
self.hit_count += sum(1 for i in block_range if i in self.blocks)

# Consolidate needed blocks.
# Algorithm adapted from Python 2.x itertools documentation.
# We are grouping an enumerated sequence of blocks. By comparing when the difference
# between an ascending range (provided by enumerate) and the needed block numbers
# we can detect when the block number skips values. The key computes this difference.
# Whenever the difference changes, we know that we have previously cached block(s),
# and a new group is started. In other words, this algorithm neatly groups
# runs of consecutive block numbers so they can be fetched together.
for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
# Extract the blocks from the enumerated sequence
_blocks = tuple(map(itemgetter(1), _blocks))
# Compute start of first block
sstart = _blocks[0] * self.blocksize
# Compute the end of the last block. Last block may not be full size.
send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)

# Fetch bytes (could be multiple consecutive blocks)
self.total_requested_bytes += send - sstart
logger.debug(f"MMap get block #{i} ({sstart}-{send})")
logger.debug(
f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
)
self.cache[sstart:send] = self.fetcher(sstart, send)
self.blocks.add(i)

# Update set of cached blocks
self.blocks.update(_blocks)
# Update cache statistics with number of blocks we had to cache
self.miss_count += len(_blocks)

return self.cache[start:end]

Expand Down
29 changes: 18 additions & 11 deletions fsspec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,19 @@ def open_files(


def _un_chain(path, kwargs):
x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
bits = (
[p if "://" in p or x.match(p) else p + "://" for p in path.split("::")]
if "::" in path
else [path]
)
# Avoid a circular import
from fsspec.implementations.cached import CachingFileSystem

if "::" in path:
x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
bits = []
for p in path.split("::"):
if "://" in p or x.match(p):
bits.append(p)
else:
bits.append(p + "://")
else:
bits = [path]
# [[url, protocol, kwargs], ...]
out = []
previous_bit = None
Expand All @@ -346,12 +353,12 @@ def _un_chain(path, kwargs):
kws = kwargs.pop(protocol, {})
if bit is bits[0]:
kws.update(kwargs)
kw = dict(**extra_kwargs, **kws)
kw = dict(
**{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]},
**kws,
)
bit = cls._strip_protocol(bit)
if (
protocol in {"blockcache", "filecache", "simplecache"}
and "target_protocol" not in kw
):
if "target_protocol" not in kw and issubclass(cls, CachingFileSystem):
bit = previous_bit
out.append((bit, protocol, kw))
previous_bit = bit
Expand Down
96 changes: 96 additions & 0 deletions fsspec/implementations/asyn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import asyncio
import inspect
import functools
from fsspec.asyn import AsyncFileSystem


def async_wrapper(func, obj=None):
"""
Wraps a synchronous function to make it awaitable.
Parameters
----------
func : callable
The synchronous function to wrap.
obj : object, optional
The instance to bind the function to, if applicable.
Returns
-------
coroutine
An awaitable version of the function.
"""

@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)

return wrapper


class AsyncFileSystemWrapper(AsyncFileSystem):
"""
A wrapper class to convert a synchronous filesystem into an asynchronous one.
This class takes an existing synchronous filesystem implementation and wraps all
its methods to provide an asynchronous interface.
Parameters
----------
sync_fs : AbstractFileSystem
The synchronous filesystem instance to wrap.
"""

def __init__(self, sync_fs, *args, **kwargs):
super().__init__(*args, **kwargs)
self.asynchronous = True
self.fs = sync_fs
self._wrap_all_sync_methods()

@property
def fsid(self):
return f"async_{self.fs.fsid}"

def _wrap_all_sync_methods(self):
"""
Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
"""
for method_name in dir(self.fs):
if method_name.startswith("_"):
continue

attr = inspect.getattr_static(self.fs, method_name)
if isinstance(attr, property):
continue

method = getattr(self.fs, method_name)
if callable(method) and not asyncio.iscoroutinefunction(method):
async_method = async_wrapper(method, obj=self)
setattr(self, f"_{method_name}", async_method)

@classmethod
def wrap_class(cls, sync_fs_class):
"""
Create a new class that can be used to instantiate an AsyncFileSystemWrapper
with lazy instantiation of the underlying synchronous filesystem.
Parameters
----------
sync_fs_class : type
The class of the synchronous filesystem to wrap.
Returns
-------
type
A new class that wraps the provided synchronous filesystem class.
"""

class GeneratedAsyncFileSystemWrapper(cls):
def __init__(self, *args, **kwargs):
sync_fs = sync_fs_class(*args, **kwargs)
super().__init__(sync_fs)

GeneratedAsyncFileSystemWrapper.__name__ = (
f"Async{sync_fs_class.__name__}Wrapper"
)
return GeneratedAsyncFileSystemWrapper
66 changes: 27 additions & 39 deletions fsspec/implementations/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def _path_to_object(self, path, ref):
tree = comm.tree
for part in parts:
if part and isinstance(tree, pygit2.Tree):
if part not in tree:
raise FileNotFoundError(path)
tree = tree[part]
return tree

Expand All @@ -69,46 +71,32 @@ def _get_kwargs_from_urls(path):
out["ref"], path = path.split("@", 1)
return out

@staticmethod
def _object_to_info(obj, path=None):
# obj.name and obj.filemode are None for the root tree!
is_dir = isinstance(obj, pygit2.Tree)
return {
"type": "directory" if is_dir else "file",
"name": (
"/".join([path, obj.name or ""]).lstrip("/") if path else obj.name
),
"hex": str(obj.id),
"mode": "100644" if obj.filemode is None else f"{obj.filemode:o}",
"size": 0 if is_dir else obj.size,
}

def ls(self, path, detail=True, ref=None, **kwargs):
path = self._strip_protocol(path)
tree = self._path_to_object(path, ref)
if isinstance(tree, pygit2.Tree):
out = []
for obj in tree:
if isinstance(obj, pygit2.Tree):
out.append(
{
"type": "directory",
"name": "/".join([path, obj.name]).lstrip("/"),
"hex": str(obj.id),
"mode": f"{obj.filemode:o}",
"size": 0,
}
)
else:
out.append(
{
"type": "file",
"name": "/".join([path, obj.name]).lstrip("/"),
"hex": str(obj.id),
"mode": f"{obj.filemode:o}",
"size": obj.size,
}
)
else:
obj = tree
out = [
{
"type": "file",
"name": obj.name,
"hex": str(obj.id),
"mode": f"{obj.filemode:o}",
"size": obj.size,
}
]
if detail:
return out
return [o["name"] for o in out]
tree = self._path_to_object(self._strip_protocol(path), ref)
return [
GitFileSystem._object_to_info(obj, path)
if detail
else GitFileSystem._object_to_info(obj, path)["name"]
for obj in (tree if isinstance(tree, pygit2.Tree) else [tree])
]

def info(self, path, ref=None, **kwargs):
tree = self._path_to_object(self._strip_protocol(path), ref)
return GitFileSystem._object_to_info(tree, path)

def ukey(self, path, ref=None):
return self.info(path, ref=ref)["hex"]
Expand Down
Loading

0 comments on commit 72f677d

Please sign in to comment.