From 03e89cc1713d178bec349704e99b5c075de66d74 Mon Sep 17 00:00:00 2001 From: Ben Mares Date: Tue, 15 Oct 2024 15:38:04 +0100 Subject: [PATCH 01/16] Fix performance issues with MemoryFileSystem.rm (#1725) Closes #1724. --- fsspec/implementations/memory.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/fsspec/implementations/memory.py b/fsspec/implementations/memory.py index 93860af6a..3b6f0d204 100644 --- a/fsspec/implementations/memory.py +++ b/fsspec/implementations/memory.py @@ -248,6 +248,10 @@ def created(self, path): except KeyError as e: raise FileNotFoundError(path) from e + def isfile(self, path): + path = self._strip_protocol(path) + return path in self.store + def rm(self, path, recursive=False, maxdepth=None): if isinstance(path, str): path = self._strip_protocol(path) @@ -255,14 +259,14 @@ def rm(self, path, recursive=False, maxdepth=None): path = [self._strip_protocol(p) for p in path] paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) for p in reversed(paths): + if self.isfile(p): + self.rm_file(p) # If the expanded path doesn't exist, it is only because the expanded # path was a directory that does not exist in self.pseudo_dirs. This # is possible if you directly create files without making the # directories first. - if not self.exists(p): + elif not self.exists(p): continue - if self.isfile(p): - self.rm_file(p) else: self.rmdir(p) From 952cd981b9b8370a4bfa231a7c5c528184e92fcf Mon Sep 17 00:00:00 2001 From: Maximilian Knespel Date: Tue, 15 Oct 2024 17:47:55 +0200 Subject: [PATCH 02/16] Speed up git backend (#1712) --- fsspec/implementations/git.py | 66 ++++++++++-------------- fsspec/implementations/tests/test_git.py | 33 ++++++++++++ fsspec/spec.py | 2 +- 3 files changed, 61 insertions(+), 40 deletions(-) diff --git a/fsspec/implementations/git.py b/fsspec/implementations/git.py index fde671b8c..7b9d3539a 100644 --- a/fsspec/implementations/git.py +++ b/fsspec/implementations/git.py @@ -55,6 +55,8 @@ def _path_to_object(self, path, ref): tree = comm.tree for part in parts: if part and isinstance(tree, pygit2.Tree): + if part not in tree: + raise FileNotFoundError(path) tree = tree[part] return tree @@ -69,46 +71,32 @@ def _get_kwargs_from_urls(path): out["ref"], path = path.split("@", 1) return out + @staticmethod + def _object_to_info(obj, path=None): + # obj.name and obj.filemode are None for the root tree! + is_dir = isinstance(obj, pygit2.Tree) + return { + "type": "directory" if is_dir else "file", + "name": ( + "/".join([path, obj.name or ""]).lstrip("/") if path else obj.name + ), + "hex": str(obj.id), + "mode": "100644" if obj.filemode is None else f"{obj.filemode:o}", + "size": 0 if is_dir else obj.size, + } + def ls(self, path, detail=True, ref=None, **kwargs): - path = self._strip_protocol(path) - tree = self._path_to_object(path, ref) - if isinstance(tree, pygit2.Tree): - out = [] - for obj in tree: - if isinstance(obj, pygit2.Tree): - out.append( - { - "type": "directory", - "name": "/".join([path, obj.name]).lstrip("/"), - "hex": str(obj.id), - "mode": f"{obj.filemode:o}", - "size": 0, - } - ) - else: - out.append( - { - "type": "file", - "name": "/".join([path, obj.name]).lstrip("/"), - "hex": str(obj.id), - "mode": f"{obj.filemode:o}", - "size": obj.size, - } - ) - else: - obj = tree - out = [ - { - "type": "file", - "name": obj.name, - "hex": str(obj.id), - "mode": f"{obj.filemode:o}", - "size": obj.size, - } - ] - if detail: - return out - return [o["name"] for o in out] + tree = self._path_to_object(self._strip_protocol(path), ref) + return [ + GitFileSystem._object_to_info(obj, path) + if detail + else GitFileSystem._object_to_info(obj, path)["name"] + for obj in (tree if isinstance(tree, pygit2.Tree) else [tree]) + ] + + def info(self, path, ref=None, **kwargs): + tree = self._path_to_object(self._strip_protocol(path), ref) + return GitFileSystem._object_to_info(tree, path) def ukey(self, path, ref=None): return self.info(path, ref=ref)["hex"] diff --git a/fsspec/implementations/tests/test_git.py b/fsspec/implementations/tests/test_git.py index ffa7b47d9..2aeb544a1 100644 --- a/fsspec/implementations/tests/test_git.py +++ b/fsspec/implementations/tests/test_git.py @@ -61,6 +61,39 @@ def test_refs(repo): assert f.read() == b"data3" +def _check_FileNotFoundError(f, *args, **kwargs): + with pytest.raises(FileNotFoundError): + f(*args, **kwargs) + + +def test_file_existence_checks(repo): + d, sha = repo + + fs, _ = fsspec.url_to_fs(f"git://{d}:abranch@") + + assert fs.lexists("inner") + assert fs.exists("inner") + assert fs.isdir("inner") + assert fs.info("inner") + assert fs.ls("inner") + + assert fs.lexists("inner/file1") + assert fs.exists("inner/file1") + assert fs.info("inner/file1") + assert fs.ls("inner/file1") + + assert not fs.lexists("non-existing-file") + assert not fs.exists("non-existing-file") + + assert not fs.isfile("non-existing-file") + assert not fs.isdir("non-existing-file") + + _check_FileNotFoundError(fs.info, "non-existing-file") + _check_FileNotFoundError(fs.size, "non-existing-file") + _check_FileNotFoundError(fs.ls, "non-existing-file") + _check_FileNotFoundError(fs.open, "non-existing-file") + + def test_url(repo): d, sha = repo fs, _, paths = fsspec.core.get_fs_token_paths(f"git://file1::file://{d}") diff --git a/fsspec/spec.py b/fsspec/spec.py index e67d280c1..8284366ba 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -648,7 +648,7 @@ def info(self, path, **kwargs): Returns a single dictionary, with exactly the same information as ``ls`` would with ``detail=True``. - The default implementation should calls ls and could be overridden by a + The default implementation calls ls and could be overridden by a shortcut. kwargs are passed on to ```ls()``. Some file systems might not be able to measure the file's size, in From 9d56f9232190ebac46ef42d2b9a096c59449bd40 Mon Sep 17 00:00:00 2001 From: Mikhail Karasikov Date: Wed, 16 Oct 2024 22:43:08 +0200 Subject: [PATCH 03/16] allow repeated extra arguments (#1673) Co-authored-by: Martin Durant --- fsspec/core.py | 5 ++++- fsspec/tests/test_core.py | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/fsspec/core.py b/fsspec/core.py index e79f2a894..1954667fe 100644 --- a/fsspec/core.py +++ b/fsspec/core.py @@ -346,7 +346,10 @@ def _un_chain(path, kwargs): kws = kwargs.pop(protocol, {}) if bit is bits[0]: kws.update(kwargs) - kw = dict(**extra_kwargs, **kws) + kw = dict( + **{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]}, + **kws, + ) bit = cls._strip_protocol(bit) if ( protocol in {"blockcache", "filecache", "simplecache"} diff --git a/fsspec/tests/test_core.py b/fsspec/tests/test_core.py index 53592aff9..1cdeec90c 100644 --- a/fsspec/tests/test_core.py +++ b/fsspec/tests/test_core.py @@ -465,3 +465,21 @@ def test_chained_url(ftp_writable): def test_automkdir_local(): fs, _ = fsspec.core.url_to_fs("file://", auto_mkdir=True) assert fs.auto_mkdir is True + + +def test_repeated_argument(): + pytest.importorskip("adlfs") + from fsspec.core import url_to_fs + + fs, url = url_to_fs( + "az://DIR@ACCOUNT.blob.core.windows.net/DATA", + anon=False, + account_name="ACCOUNT", + ) + assert fs.storage_options == {"account_name": "ACCOUNT", "anon": False} + with pytest.raises(TypeError): + url_to_fs( + "az://DIR@ACCOUNT.blob.core.windows.net/DATA", + anon=False, + account_name="OTHER", + ) From 1fe5695802679e8ce1ae242a44d4846195425509 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sun, 20 Oct 2024 21:19:01 -0400 Subject: [PATCH 04/16] changelog (#1729) --- docs/source/changelog.rst | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index b19ee9e3b..fea257b44 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,27 @@ Changelog ========= +2024.10.0 +--------- + +Fixes + +- Performance of memoryFS rm (#1725) +- Performance of git FS info (#1712) +- Avoid git hex for newer pygit (#1703) +- tests fix for zip (#1700, 1691) +- missing open_async for dirFS (#1698) +- handle pathlib in zip (#1689) +- skip tests needing kerchunk if not installed (#1689) +- allow repeated kwargs in unchain (#1673) + +Other + +- Code style (#1704, 1706) +- allow pyarrow in referenceFS parquet (#1692) +- don't hardcode test port for parallel runs (#1690) + + 2024.9.0 -------- From 97a2168fd0a11e11ea43b64aa24b01ce9666ff9c Mon Sep 17 00:00:00 2001 From: vinoyang Date: Mon, 21 Oct 2024 21:48:24 +0800 Subject: [PATCH 05/16] Add tosfs to other known implementations list (#1728) --- docs/source/api.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/api.rst b/docs/source/api.rst index 9fdcd1f17..94c4cdd5b 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -224,6 +224,7 @@ Other Known Implementations - `ossfs`_ for Alibaba Cloud (Aliyun) Object Storage System (OSS) - `p9fs`_ for 9P (Plan 9 Filesystem Protocol) servers - `s3fs`_ for Amazon S3 and other compatible stores +- `tosfs`_ for ByteDance volcano engine Tinder Object Storage (TOS) - `wandbfs`_ to access Wandb run data (experimental) - `webdav4`_ for WebDAV - `xrootd`_ for xrootd, with protocol "root://" @@ -243,6 +244,7 @@ Other Known Implementations .. _ossfs: https://github.com/fsspec/ossfs .. _p9fs: https://github.com/pbchekin/p9fs-py .. _s3fs: https://s3fs.readthedocs.io/en/latest/ +.. _tosfs: https://tosfs.readthedocs.io/en/latest/ .. _wandbfs: https://github.com/jkulhanek/wandbfs .. _webdav4: https://github.com/skshetry/webdav4 .. _xrootd: https://github.com/CoffeaTeam/fsspec-xrootd From d93f71d09cf3bd92dc59fcc19f011fd0a445075e Mon Sep 17 00:00:00 2001 From: Ryan Grout Date: Wed, 23 Oct 2024 08:14:50 -0600 Subject: [PATCH 06/16] Fixes for _un_chain (#1736) --------- Co-authored-by: Martin Durant --- fsspec/core.py | 24 ++++++++++++--------- fsspec/implementations/tests/test_cached.py | 24 +++++++++++++-------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/fsspec/core.py b/fsspec/core.py index 1954667fe..79580bee9 100644 --- a/fsspec/core.py +++ b/fsspec/core.py @@ -329,12 +329,19 @@ def open_files( def _un_chain(path, kwargs): - x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word - bits = ( - [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")] - if "::" in path - else [path] - ) + # Avoid a circular import + from fsspec.implementations.cached import CachingFileSystem + + if "::" in path: + x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word + bits = [] + for p in path.split("::"): + if "://" in p or x.match(p): + bits.append(p) + else: + bits.append(p + "://") + else: + bits = [path] # [[url, protocol, kwargs], ...] out = [] previous_bit = None @@ -351,10 +358,7 @@ def _un_chain(path, kwargs): **kws, ) bit = cls._strip_protocol(bit) - if ( - protocol in {"blockcache", "filecache", "simplecache"} - and "target_protocol" not in kw - ): + if "target_protocol" not in kw and issubclass(cls, CachingFileSystem): bit = previous_bit out.append((bit, protocol, kw)) previous_bit = bit diff --git a/fsspec/implementations/tests/test_cached.py b/fsspec/implementations/tests/test_cached.py index 67edbc8df..c9222d5b5 100644 --- a/fsspec/implementations/tests/test_cached.py +++ b/fsspec/implementations/tests/test_cached.py @@ -267,7 +267,7 @@ def test_blockcache_workflow(ftp_writable, tmp_path, force_save_pickle): assert f.read(5) == b"test\n" -@pytest.mark.parametrize("impl", ["filecache", "blockcache"]) +@pytest.mark.parametrize("impl", ["filecache", "blockcache", "cached"]) def test_workflow(ftp_writable, impl): host, port, user, pw = ftp_writable fs = FTPFileSystem(host, port, user, pw) @@ -295,7 +295,7 @@ def test_workflow(ftp_writable, impl): ) # new value, because we overwrote the cached location -@pytest.mark.parametrize("impl", ["simplecache", "blockcache"]) +@pytest.mark.parametrize("impl", ["simplecache", "blockcache", "cached"]) def test_glob(ftp_writable, impl): host, port, user, pw = ftp_writable fs = FTPFileSystem(host, port, user, pw) @@ -622,7 +622,7 @@ def open_raise(*_, **__): assert "Cache save failed due to interpreter shutdown" in caplog.text -@pytest.mark.parametrize("impl", ["filecache", "simplecache", "blockcache"]) +@pytest.mark.parametrize("impl", ["filecache", "simplecache", "blockcache", "cached"]) def test_local_filecache_creates_dir_if_needed(impl): import tempfile @@ -875,7 +875,7 @@ def test_filecache_with_checks(): assert fs.cat(f1) == data * 2 # changed, since origin changed -@pytest.mark.parametrize("impl", ["filecache", "simplecache", "blockcache"]) +@pytest.mark.parametrize("impl", ["filecache", "simplecache", "blockcache", "cached"]) @pytest.mark.parametrize("fs", ["local", "multi"], indirect=["fs"]) def test_filecache_takes_fs_instance(impl, fs): origin = tempfile.mkdtemp() @@ -889,7 +889,7 @@ def test_filecache_takes_fs_instance(impl, fs): assert fs2.cat(f1) == data -@pytest.mark.parametrize("impl", ["filecache", "simplecache", "blockcache"]) +@pytest.mark.parametrize("impl", ["filecache", "simplecache", "blockcache", "cached"]) @pytest.mark.parametrize("fs", ["local", "multi"], indirect=["fs"]) def test_filecache_serialization(impl, fs): fs1 = fsspec.filesystem(impl, fs=fs) @@ -1031,7 +1031,9 @@ def test_multi_cache(protocol): assert f.read() == b"hello" -@pytest.mark.parametrize("protocol", ["simplecache", "filecache", "blockcache"]) +@pytest.mark.parametrize( + "protocol", ["simplecache", "filecache", "blockcache", "cached"] +) def test_multi_cat(protocol, ftp_writable): host, port, user, pw = ftp_writable fs = FTPFileSystem(host, port, user, pw) @@ -1064,7 +1066,9 @@ def test_multi_cache_chain(protocol): assert files[0].read() == b"hello" -@pytest.mark.parametrize("protocol", ["blockcache", "simplecache", "filecache"]) +@pytest.mark.parametrize( + "protocol", ["blockcache", "cached", "simplecache", "filecache"] +) def test_strip(protocol): fs = fsspec.filesystem(protocol, target_protocol="memory") url1 = "memory://afile" @@ -1235,9 +1239,11 @@ def test_cache_dir_auto_deleted(temp_cache, tmpdir): assert local.exists(cache_dir) -@pytest.mark.parametrize("protocol", ["filecache", "blockcache", "simplecache"]) +@pytest.mark.parametrize( + "protocol", ["filecache", "blockcache", "cached", "simplecache"] +) def test_cache_size(tmpdir, protocol): - if win and protocol == "blockcache": + if win and protocol in {"blockcache", "cached"}: pytest.skip("Windows file locking affects blockcache size tests") source = os.path.join(tmpdir, "source") From 4517882f67d635d50b54cd53fd04ee3a37b6943c Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos Orfanos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:48:37 +0200 Subject: [PATCH 07/16] Update linters in pre-commit (#1714) Many new ruff versions have been released. --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 48e3c950d..f1f11e947 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,7 @@ exclude: > repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v5.0.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -14,7 +14,7 @@ repos: - id: check-yaml - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.4.4 + rev: v0.6.9 hooks: # Run the linter. - id: ruff From 199ee82486990a295f744820d4dd2a7180e1c7a6 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 23 Oct 2024 14:50:54 -0400 Subject: [PATCH 08/16] Parquet refs: nan or missing references (#1738) --- fsspec/implementations/reference.py | 12 ++++-- .../implementations/tests/test_reference.py | 41 +++++++++++++++++++ fsspec/mapping.py | 2 +- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index d5d3f2968..3d22ddc40 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -41,7 +41,7 @@ def _first(d): def _prot_in_references(path, references): ref = references.get(path) - if isinstance(ref, (list, tuple)): + if isinstance(ref, (list, tuple)) and isinstance(ref[0], str): return split_protocol(ref[0])[0] if ref[0] else ref[0] @@ -173,8 +173,11 @@ def open_refs(field, record): """cached parquet file loader""" path = self.url.format(field=field, record=record) data = io.BytesIO(self.fs.cat_file(path)) - df = self.pd.read_parquet(data, engine=self.engine) - refs = {c: df[c].to_numpy() for c in df.columns} + try: + df = self.pd.read_parquet(data, engine=self.engine) + refs = {c: df[c].to_numpy() for c in df.columns} + except IOError: + refs = None return refs self.open_refs = open_refs @@ -871,6 +874,9 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs): # found and on_error is "raise" try: u, s, e = self._cat_common(p) + if not isinstance(u, (bytes, str)): + # nan/None from parquet + continue except FileNotFoundError as err: if on_error == "raise": raise diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index d82dc1771..fefdd6024 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -792,3 +792,44 @@ def test_deep_parq(m, engine): "instant/one/.zarray", "instant/one/0", ] + + +def test_parquet_no_data(m): + zarr = pytest.importorskip("zarr") + lz = fsspec.implementations.reference.LazyReferenceMapper.create( + "memory://out.parq", fs=m + ) + + g = zarr.open_group(lz, mode="w") + arr = g.create_dataset( + name="one", + dtype="int32", + shape=(10,), + chunks=(5,), + compression=None, + fill_value=1, + ) + lz.flush() + + assert (arr[:] == 1).all() + + +def test_parquet_no_references(m): + zarr = pytest.importorskip("zarr") + lz = fsspec.implementations.reference.LazyReferenceMapper.create( + "memory://out.parq", fs=m + ) + + g = zarr.open_group(lz, mode="w") + arr = g.create_dataset( + name="one", + dtype="int32", + shape=(), + chunks=(), + compression=None, + fill_value=1, + ) + lz.flush() + arr[...] + + assert arr[...].tolist() == 1 # scalar, equal to fill value diff --git a/fsspec/mapping.py b/fsspec/mapping.py index 8fb9b9efb..752eef352 100644 --- a/fsspec/mapping.py +++ b/fsspec/mapping.py @@ -112,7 +112,7 @@ def getitems(self, keys, on_error="raise"): for k, v in out.items() } return { - key: out[k2] + key: out[k2] if on_error == "raise" else out.get(k2, KeyError(k2)) for key, k2 in zip(keys, keys2) if on_error == "return" or not isinstance(out[k2], BaseException) } From 001be4a1245f1639489c355432ce73310bd50e1f Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 23 Oct 2024 18:04:53 -0400 Subject: [PATCH 09/16] Fix broken async reference file system `_cat_file` method (#1734) Co-authored-by: Martin Durant --- fsspec/implementations/reference.py | 4 ++- .../implementations/tests/test_reference.py | 29 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index 3d22ddc40..79bff141a 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -806,7 +806,9 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): return part_or_url[start:end] protocol, _ = split_protocol(part_or_url) try: - await self.fss[protocol]._cat_file(part_or_url, start=start, end=end) + return await self.fss[protocol]._cat_file( + part_or_url, start=start0, end=end0 + ) except Exception as e: raise ReferenceNotReachable(path, part_or_url) from e diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index fefdd6024..0063d11b0 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -513,6 +513,35 @@ def test_cat_file_ranges(m): assert fs.cat_file("d", 1, -3) == other[4:10][1:-3] +@pytest.mark.asyncio +async def test_async_cat_file_ranges(): + fsspec.get_filesystem_class("http").clear_instance_cache() + fss = fsspec.filesystem("https", asynchronous=True) + session = await fss.set_session() + + fs = fsspec.filesystem( + "reference", + fo={ + "version": 1, + "refs": { + "reference_time/0": [ + "https://noaa-nwm-retro-v2-0-pds.s3.amazonaws.com/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp", + 39783, + 12, + ], + }, + }, + fs={"https": fss}, + remote_protocol="https", + asynchronous=True, + ) + + assert ( + await fs._cat_file("reference_time/0") == b"x^K0\xa9d\x04\x00\x03\x13\x01\x0f" + ) + await session.close() + + @pytest.mark.parametrize( "fo", [ From 7de985ac64f7d19e1162bc41efd82366efb4dd29 Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos Orfanos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Fri, 25 Oct 2024 13:51:24 +0200 Subject: [PATCH 10/16] Fix typo (#1705) --- fsspec/spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/spec.py b/fsspec/spec.py index 8284366ba..6ce338b32 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -408,7 +408,7 @@ def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs): topdown: bool (True) Whether to walk the directory tree from the top downwards or from the bottom upwards. - on_error: "omit", "raise", a collable + on_error: "omit", "raise", a callable if omit (default), path with exception will simply be empty; If raise, an underlying exception will be raised; if callable, it will be called with a single OSError instance as argument From cbd73b0e78bf7f8db228f4d6b00bbe241e024dca Mon Sep 17 00:00:00 2001 From: vinoyang Date: Mon, 28 Oct 2024 21:08:38 +0800 Subject: [PATCH 11/16] Register tosfs to registry (#1739) --- fsspec/registry.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fsspec/registry.py b/fsspec/registry.py index c261b9b08..02094ea0a 100644 --- a/fsspec/registry.py +++ b/fsspec/registry.py @@ -202,6 +202,10 @@ def register_implementation(name, cls, clobber=False, errtxt=None): "err": 'SFTPFileSystem requires "paramiko" to be installed', }, "tar": {"class": "fsspec.implementations.tar.TarFileSystem"}, + "tosfs": { + "class": "tosfs.TosFileSystem", + "err": "Install tosfs to access ByteDance volcano engine Tinder Object Storage", + }, "wandb": {"class": "wandbfs.WandbFS", "err": "Install wandbfs to access wandb"}, "webdav": { "class": "webdav4.fsspec.WebdavFileSystem", From 61b66b6767ccd796e920722a0e8bbc270392b94a Mon Sep 17 00:00:00 2001 From: Ryan Grout Date: Mon, 28 Oct 2024 07:25:57 -0600 Subject: [PATCH 12/16] Consolidate block fetch requests. (#1733) --------- Co-authored-by: Martin Durant --- fsspec/caching.py | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/fsspec/caching.py b/fsspec/caching.py index a3f7a1c9f..bc74ad241 100644 --- a/fsspec/caching.py +++ b/fsspec/caching.py @@ -7,6 +7,8 @@ import os import threading import warnings +from itertools import groupby +from operator import itemgetter from concurrent.futures import Future, ThreadPoolExecutor from typing import ( TYPE_CHECKING, @@ -161,21 +163,39 @@ def _fetch(self, start: int | None, end: int | None) -> bytes: return b"" start_block = start // self.blocksize end_block = end // self.blocksize - need = [i for i in range(start_block, end_block + 1) if i not in self.blocks] - hits = [i for i in range(start_block, end_block + 1) if i in self.blocks] - self.miss_count += len(need) - self.hit_count += len(hits) - while need: - # TODO: not a for loop so we can consolidate blocks later to - # make fewer fetch calls; this could be parallel - i = need.pop(0) - - sstart = i * self.blocksize - send = min(sstart + self.blocksize, self.size) + block_range = range(start_block, end_block + 1) + # Determine which blocks need to be fetched. This sequence is sorted by construction. + need = (i for i in block_range if i not in self.blocks) + # Count the number of blocks already cached + self.hit_count += sum(1 for i in block_range if i in self.blocks) + + # Consolidate needed blocks. + # Algorithm adapted from Python 2.x itertools documentation. + # We are grouping an enumerated sequence of blocks. By comparing when the difference + # between an ascending range (provided by enumerate) and the needed block numbers + # we can detect when the block number skips values. The key computes this difference. + # Whenever the difference changes, we know that we have previously cached block(s), + # and a new group is started. In other words, this algorithm neatly groups + # runs of consecutive block numbers so they can be fetched together. + for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]): + # Extract the blocks from the enumerated sequence + _blocks = tuple(map(itemgetter(1), _blocks)) + # Compute start of first block + sstart = _blocks[0] * self.blocksize + # Compute the end of the last block. Last block may not be full size. + send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size) + + # Fetch bytes (could be multiple consecutive blocks) self.total_requested_bytes += send - sstart - logger.debug(f"MMap get block #{i} ({sstart}-{send})") + logger.debug( + f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})" + ) self.cache[sstart:send] = self.fetcher(sstart, send) - self.blocks.add(i) + + # Update set of cached blocks + self.blocks.update(_blocks) + # Update cache statistics with number of blocks we had to cache + self.miss_count += len(_blocks) return self.cache[start:end] From bb84b0c0b0a97c88cae24cb016483ead25262a77 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 30 Oct 2024 09:40:46 -0400 Subject: [PATCH 13/16] Provide default _fetch_range implementation for File (#1732) Since this was not marked as NotImplemented --- fsspec/spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/spec.py b/fsspec/spec.py index 6ce338b32..9659f2e98 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -1915,7 +1915,7 @@ def _initiate_upload(self): def _fetch_range(self, start, end): """Get the specified set of bytes from remote""" - raise NotImplementedError + return self.fs.cat_file(self.path, start=start, end=end) def read(self, length=-1): """ From 60eab20c8fbe4af2a048fd834e38a38ca29e6c1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Wed, 30 Oct 2024 08:53:04 -0600 Subject: [PATCH 14/16] Test with Python 3.13 (#1710) --- .github/workflows/main.yaml | 8 +++++++- pyproject.toml | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 9797e6a9c..653eb8084 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -13,7 +13,13 @@ jobs: strategy: fail-fast: false matrix: - PY: ["3.8", "3.9", "3.10", "3.11", "3.12"] + PY: + - "3.8" + - "3.9" + - "3.10" + - "3.11" + - "3.12" + - "3.13" env: CIRUN: true diff --git a/pyproject.toml b/pyproject.toml index 48368711f..d9cb2d688 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", ] [project.optional-dependencies] From 9a161714f0bbfe44ee769f259420f2f7db975471 Mon Sep 17 00:00:00 2001 From: Tilman Hoffbauer Date: Thu, 31 Oct 2024 14:15:04 +0100 Subject: [PATCH 15/16] Fix race condition in local ls() (#1744) Fix #1742 --- fsspec/implementations/local.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/fsspec/implementations/local.py b/fsspec/implementations/local.py index f032d8aeb..4c588232f 100644 --- a/fsspec/implementations/local.py +++ b/fsspec/implementations/local.py @@ -60,7 +60,12 @@ def ls(self, path, detail=False, **kwargs): info = self.info(path) if info["type"] == "directory": with os.scandir(path) as it: - infos = [self.info(f) for f in it] + infos = [] + for f in it: + try: + infos.append(self.info(f)) + except FileNotFoundError: + pass else: infos = [info] From 4cb98abccaefd2e53ea92daceb260443183b2bb4 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Tue, 12 Nov 2024 15:01:20 -0600 Subject: [PATCH 16/16] Add async wrapper for sync FS (#1745) Co-authored-by: Martin Durant --- .github/workflows/main.yaml | 1 - docs/source/async.rst | 34 +++++ fsspec/implementations/asyn_wrapper.py | 96 ++++++++++++ .../tests/test_asyn_wrapper.py | 142 ++++++++++++++++++ 4 files changed, 272 insertions(+), 1 deletion(-) create mode 100644 fsspec/implementations/asyn_wrapper.py create mode 100644 fsspec/implementations/tests/test_asyn_wrapper.py diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 653eb8084..f9e5ed98c 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -14,7 +14,6 @@ jobs: fail-fast: false matrix: PY: - - "3.8" - "3.9" - "3.10" - "3.11" diff --git a/docs/source/async.rst b/docs/source/async.rst index 58334b333..dc44df381 100644 --- a/docs/source/async.rst +++ b/docs/source/async.rst @@ -152,3 +152,37 @@ available as the attribute ``.loop``. + +AsyncFileSystemWrapper +---------------------- + +The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert +a synchronous filesystem into an asynchronous one. This is useful for quickly integrating +synchronous filesystems into workflows that may expect `AsyncFileSystem` instances. + +Basic Usage +~~~~~~~~~~~ + +To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context. +In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance +backed by the normal, synchronous methods of `LocalFileSystem`: + +.. code-block:: python + + import asyncio + import fsspec + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + async def async_copy_file(): + sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem + async_fs = AsyncFileSystemWrapper(sync_fs) + return await async_fs._copy('/source/file.txt', '/destination/file.txt') + + asyncio.run(async_copy_file()) + +Limitations +----------- + +This is experimental. Users should not expect this wrapper to magically make things faster. +It is primarily provided to allow usage of synchronous filesystems with interfaces that expect +`AsyncFileSystem` instances. diff --git a/fsspec/implementations/asyn_wrapper.py b/fsspec/implementations/asyn_wrapper.py new file mode 100644 index 000000000..9ba7811ce --- /dev/null +++ b/fsspec/implementations/asyn_wrapper.py @@ -0,0 +1,96 @@ +import asyncio +import inspect +import functools +from fsspec.asyn import AsyncFileSystem + + +def async_wrapper(func, obj=None): + """ + Wraps a synchronous function to make it awaitable. + + Parameters + ---------- + func : callable + The synchronous function to wrap. + obj : object, optional + The instance to bind the function to, if applicable. + + Returns + ------- + coroutine + An awaitable version of the function. + """ + + @functools.wraps(func) + async def wrapper(*args, **kwargs): + return await asyncio.to_thread(func, *args, **kwargs) + + return wrapper + + +class AsyncFileSystemWrapper(AsyncFileSystem): + """ + A wrapper class to convert a synchronous filesystem into an asynchronous one. + + This class takes an existing synchronous filesystem implementation and wraps all + its methods to provide an asynchronous interface. + + Parameters + ---------- + sync_fs : AbstractFileSystem + The synchronous filesystem instance to wrap. + """ + + def __init__(self, sync_fs, *args, **kwargs): + super().__init__(*args, **kwargs) + self.asynchronous = True + self.fs = sync_fs + self._wrap_all_sync_methods() + + @property + def fsid(self): + return f"async_{self.fs.fsid}" + + def _wrap_all_sync_methods(self): + """ + Wrap all synchronous methods of the underlying filesystem with asynchronous versions. + """ + for method_name in dir(self.fs): + if method_name.startswith("_"): + continue + + attr = inspect.getattr_static(self.fs, method_name) + if isinstance(attr, property): + continue + + method = getattr(self.fs, method_name) + if callable(method) and not asyncio.iscoroutinefunction(method): + async_method = async_wrapper(method, obj=self) + setattr(self, f"_{method_name}", async_method) + + @classmethod + def wrap_class(cls, sync_fs_class): + """ + Create a new class that can be used to instantiate an AsyncFileSystemWrapper + with lazy instantiation of the underlying synchronous filesystem. + + Parameters + ---------- + sync_fs_class : type + The class of the synchronous filesystem to wrap. + + Returns + ------- + type + A new class that wraps the provided synchronous filesystem class. + """ + + class GeneratedAsyncFileSystemWrapper(cls): + def __init__(self, *args, **kwargs): + sync_fs = sync_fs_class(*args, **kwargs) + super().__init__(sync_fs) + + GeneratedAsyncFileSystemWrapper.__name__ = ( + f"Async{sync_fs_class.__name__}Wrapper" + ) + return GeneratedAsyncFileSystemWrapper diff --git a/fsspec/implementations/tests/test_asyn_wrapper.py b/fsspec/implementations/tests/test_asyn_wrapper.py new file mode 100644 index 000000000..d29202003 --- /dev/null +++ b/fsspec/implementations/tests/test_asyn_wrapper.py @@ -0,0 +1,142 @@ +import asyncio +import pytest +import os + +import fsspec +from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper +from fsspec.implementations.local import LocalFileSystem +from .test_local import csv_files, filetexts + + +def test_is_async(): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + assert async_fs.async_impl + + +def test_class_wrapper(): + fs_cls = LocalFileSystem + async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls) + assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper" + async_fs = async_fs_cls() + assert async_fs.async_impl + + +@pytest.mark.asyncio +async def test_cats(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + result = await async_fs._cat(".test.fakedata.1.csv") + assert result == b"a,b\n1,2\n" + + out = set( + ( + await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"]) + ).values() + ) + assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"} + + result = await async_fs._cat(".test.fakedata.1.csv", None, None) + assert result == b"a,b\n1,2\n" + + result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=6) + assert result == b"a,b\n1,2\n"[1:6] + + result = await async_fs._cat(".test.fakedata.1.csv", start=-1) + assert result == b"a,b\n1,2\n"[-1:] + + result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=-2) + assert result == b"a,b\n1,2\n"[1:-2] + + # test synchronous API is available as expected + result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2) + assert result == b"a,b\n1,2\n"[1:-2] + + out = set( + ( + await async_fs._cat( + [".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1 + ) + ).values() + ) + assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]} + + +@pytest.mark.asyncio +async def test_basic_crud_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._touch(".test.fakedata.3.csv") + assert await async_fs._exists(".test.fakedata.3.csv") + + data = await async_fs._cat(".test.fakedata.1.csv") + assert data == b"a,b\n1,2\n" + + await async_fs._pipe(".test.fakedata.1.csv", b"a,b\n5,6\n") + data = await async_fs._cat(".test.fakedata.1.csv") + assert data == b"a,b\n5,6\n" + + await async_fs._rm(".test.fakedata.1.csv") + assert not await async_fs._exists(".test.fakedata.1.csv") + + +@pytest.mark.asyncio +async def test_error_handling(): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + with pytest.raises(FileNotFoundError): + await async_fs._cat(".test.non_existent.csv") + + with pytest.raises(FileNotFoundError): + await async_fs._rm(".test.non_existent.csv") + + +@pytest.mark.asyncio +async def test_concurrent_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + async def read_file(file_path): + return await async_fs._cat(file_path) + + results = await asyncio.gather( + read_file(".test.fakedata.1.csv"), + read_file(".test.fakedata.2.csv"), + read_file(".test.fakedata.1.csv"), + ) + + assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"] + + +@pytest.mark.asyncio +async def test_directory_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._makedirs("new_directory") + assert await async_fs._isdir("new_directory") + + files = await async_fs._ls(".") + filenames = [os.path.basename(file) for file in files] + + assert ".test.fakedata.1.csv" in filenames + assert ".test.fakedata.2.csv" in filenames + assert "new_directory" in filenames + + +@pytest.mark.asyncio +async def test_batch_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"]) + assert not await async_fs._exists(".test.fakedata.1.csv") + assert not await async_fs._exists(".test.fakedata.2.csv")