From 1017e120ec6a8e2d366a0118b466fe636ab6cc63 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 7 Nov 2023 20:49:06 -0500 Subject: [PATCH] Add "data" filesystem --- docs/source/api.rst | 4 ++ fsspec/core.py | 2 + fsspec/implementations/data.py | 38 +++++++++++++++++++ fsspec/implementations/reference.py | 4 +- .../tests/.benchmarks/test_data.py | 9 +++++ fsspec/registry.py | 1 + 6 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 fsspec/implementations/data.py create mode 100644 fsspec/implementations/tests/.benchmarks/test_data.py diff --git a/docs/source/api.rst b/docs/source/api.rst index 6b0135532..2343bd76e 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -113,6 +113,7 @@ Built-in Implementations fsspec.implementations.cached.SimpleCacheFileSystem fsspec.implementations.cached.WholeFileCacheFileSystem fsspec.implementations.dask.DaskWorkerFileSystem + fsspec.implementations.data.DataFileSystem fsspec.implementations.dbfs.DatabricksFileSystem fsspec.implementations.dirfs.DirFileSystem fsspec.implementations.ftp.FTPFileSystem @@ -149,6 +150,9 @@ Built-in Implementations .. autoclass:: fsspec.implementations.dask.DaskWorkerFileSystem :members: __init__ +.. autoclass:: fsspec.implementations.data.DataFileSystem + :members: __init__ + .. autoclass:: fsspec.implementations.dbfs.DatabricksFileSystem :members: __init__ diff --git a/fsspec/core.py b/fsspec/core.py index dd5e9c4f0..c5bce58ab 100644 --- a/fsspec/core.py +++ b/fsspec/core.py @@ -514,6 +514,8 @@ def split_protocol(urlpath): if len(protocol) > 1: # excludes Windows paths return protocol, path + if ":" in urlpath and urlpath.find(":") > 1: + return urlpath.split(":", 1) return None, urlpath diff --git a/fsspec/implementations/data.py b/fsspec/implementations/data.py new file mode 100644 index 000000000..0a29883a7 --- /dev/null +++ b/fsspec/implementations/data.py @@ -0,0 +1,38 @@ +import base64 +import io +from urllib.parse import unquote + +from fsspec import AbstractFileSystem + + +class DataFileSystem(AbstractFileSystem): + """A handy decoder for data-URLs + + Example + ------- + >>> with fsspec.open("data:,Hello%2C%20World%21") as f: + ... print(f.read()) + b"Hello, World!" + + """ + + protocol = "data" + + def cat_file(self, path, start=None, end=None, **kwargs): + pref, data = path.split(",", 1) + if pref.endswith("base64"): + return base64.b64decode(data)[start:end] + return unquote(data).encode()[start:end] + + def _open( + self, + path, + mode="rb", + block_size=None, + autocommit=True, + cache_options=None, + **kwargs, + ): + if "r" not in mode: + raise ValueError("Read only filesystem") + return io.BytesIO(self.cat_file(path)) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index 25ec3febd..674654d97 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -152,6 +152,7 @@ def open_refs(field, record): @staticmethod def create(record_size, root, fs, **kwargs): met = {"metadata": {}, "record_size": record_size} + fs.makedirs(root, exist_ok=True) fs.pipe("/".join([root, ".zmetadata"]), json.dumps(met).encode()) return LazyReferenceMapper(root, fs, **kwargs) @@ -292,7 +293,7 @@ def _get_chunk_sizes(self, field): def _generate_record(self, field, record): """The references for a given parquet file of a given field""" refs = self.open_refs(field, record) - it = iter(zip(refs.values())) + it = iter(zip(*refs.values())) if len(refs) == 3: # All urls return (list(t) for t in it) @@ -650,6 +651,7 @@ def __init__( self.fss[protocol] = fs if remote_protocol is None: # get single protocol from references + # TODO: warning here, since this can be very expensive? for ref in self.references.values(): if callable(ref): ref = ref() diff --git a/fsspec/implementations/tests/.benchmarks/test_data.py b/fsspec/implementations/tests/.benchmarks/test_data.py new file mode 100644 index 000000000..1536304c3 --- /dev/null +++ b/fsspec/implementations/tests/.benchmarks/test_data.py @@ -0,0 +1,9 @@ +import fsspec + + +def test_1(): + with fsspec.open("data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==") as f: + assert f.read() == b"Hello, World!" + + with fsspec.open("data:,Hello%2C%20World%21") as f: + assert f.read() == b"Hello, World!" diff --git a/fsspec/registry.py b/fsspec/registry.py index d1614f130..e20f693df 100644 --- a/fsspec/registry.py +++ b/fsspec/registry.py @@ -60,6 +60,7 @@ def register_implementation(name, cls, clobber=False, errtxt=None): # protocols mapped to the class which implements them. This dict can # updated with register_implementation known_implementations = { + "data": {"class": "fsspec.implementations.data.DataFileSystem"}, "file": {"class": "fsspec.implementations.local.LocalFileSystem"}, "local": {"class": "fsspec.implementations.local.LocalFileSystem"}, "memory": {"class": "fsspec.implementations.memory.MemoryFileSystem"},