From 88c0dcf953c50d44bc4a1570d92a2720435a46d1 Mon Sep 17 00:00:00 2001 From: syelman Date: Wed, 26 Jul 2023 19:13:17 +0200 Subject: [PATCH 1/9] init --- anndata/_io/specs/methods.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/anndata/_io/specs/methods.py b/anndata/_io/specs/methods.py index 37c532288..c68904fff 100644 --- a/anndata/_io/specs/methods.py +++ b/anndata/_io/specs/methods.py @@ -44,6 +44,18 @@ H5File = h5py.File +#################### +# Dask utils # +#################### + +try: + from dask.utils import SerializableLock as Lock +except ImportError: + from threading import Lock + +# to fix https://github.com/dask/distributed/issues/780 +GLOBAL_LOCK = Lock() + #################### # Dispatch methods # #################### @@ -314,7 +326,7 @@ def write_basic_dask(f, k, elem, _writer, dataset_kwargs=MappingProxyType({})): import dask.array as da g = f.require_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs) - da.store(elem, g) + da.store(elem, g, lock=GLOBAL_LOCK) @_REGISTRY.register_read(H5Array, IOSpec("array", "0.2.0")) From 11d679377bf185b58b9e482584b933abedb76a80 Mon Sep 17 00:00:00 2001 From: syelman Date: Fri, 28 Jul 2023 16:57:24 +0200 Subject: [PATCH 2/9] add tests --- anndata/tests/test_dask.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/anndata/tests/test_dask.py b/anndata/tests/test_dask.py index e2d820e08..f52df282e 100644 --- a/anndata/tests/test_dask.py +++ b/anndata/tests/test_dask.py @@ -11,6 +11,8 @@ gen_adata, assert_equal, ) +from anndata.experimental import write_elem, read_elem +from anndata.experimental.merge import as_group from anndata.compat import DaskArray pytest.importorskip("dask.array") @@ -94,6 +96,38 @@ def test_dask_write(adata, tmp_path, diskfmt): assert isinstance(orig.varm["a"], DaskArray) +def test_dask_distributed_write(adata, tmp_path, diskfmt): + import dask.array as da + import numpy as np + import dask.distributed as dd + + pth = tmp_path / f"test_write.{diskfmt}" + g = as_group(pth, mode="w") + + with dd.LocalCluster(n_workers=1, threads_per_worker=1, processes=False) as cluster: + with dd.Client(cluster): + M, N = adata.X.shape + adata.obsm["a"] = da.random.random((M, 10)) + adata.obsm["b"] = da.random.random((M, 10)) + adata.varm["a"] = da.random.random((N, 10)) + orig = adata + write_elem(g, "", orig) + curr = read_elem(g) + + with pytest.raises(Exception): + assert_equal(curr.obsm["a"], curr.obsm["b"]) + + assert_equal(curr.varm["a"], orig.varm["a"]) + assert_equal(curr.obsm["a"], orig.obsm["a"]) + + assert isinstance(curr.X, np.ndarray) + assert isinstance(curr.obsm["a"], np.ndarray) + assert isinstance(curr.varm["a"], np.ndarray) + assert isinstance(orig.X, DaskArray) + assert isinstance(orig.obsm["a"], DaskArray) + assert isinstance(orig.varm["a"], DaskArray) + + def test_dask_to_memory_check_array_types(adata, tmp_path, diskfmt): import dask.array as da import numpy as np From 9e9b5e870c95a86a910ea32af105edb8c8ed04d6 Mon Sep 17 00:00:00 2001 From: syelman Date: Tue, 22 Aug 2023 13:53:56 +0200 Subject: [PATCH 3/9] give error for h5py and distributed --- anndata/_io/specs/methods.py | 19 +++++++++++++++++-- anndata/tests/test_dask.py | 6 ++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/anndata/_io/specs/methods.py b/anndata/_io/specs/methods.py index 91dae4ce3..1218602e9 100644 --- a/anndata/_io/specs/methods.py +++ b/anndata/_io/specs/methods.py @@ -343,14 +343,29 @@ def write_basic(f, k, elem, _writer, dataset_kwargs=MappingProxyType({})): @_REGISTRY.register_write(ZarrGroup, DaskArray, IOSpec("array", "0.2.0")) -@_REGISTRY.register_write(H5Group, DaskArray, IOSpec("array", "0.2.0")) -def write_basic_dask(f, k, elem, _writer, dataset_kwargs=MappingProxyType({})): +def write_basic_dask_zarr(f, k, elem, _writer, dataset_kwargs=MappingProxyType({})): import dask.array as da g = f.require_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs) da.store(elem, g, lock=GLOBAL_LOCK) +# Adding this seperately because h5py isn't serializable +# https://github.com/pydata/xarray/issues/4242 +@_REGISTRY.register_write(H5Group, DaskArray, IOSpec("array", "0.2.0")) +def write_basic_dask_h5(f, k, elem, _writer, dataset_kwargs=MappingProxyType({})): + import dask.array as da + import dask.config as dc + + if dc.get("scheduler", None) == "dask.distributed": + raise ValueError( + "Cannot write dask arrays to hdf5 when using distributed scheduler" + ) + + g = f.require_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs) + da.store(elem, g) + + @_REGISTRY.register_read(H5Array, IOSpec("array", "0.2.0")) @_REGISTRY.register_read(ZarrArray, IOSpec("array", "0.2.0")) @_REGISTRY.register_read(ZarrArray, IOSpec("string-array", "0.2.0")) diff --git a/anndata/tests/test_dask.py b/anndata/tests/test_dask.py index f52df282e..7b60046ad 100644 --- a/anndata/tests/test_dask.py +++ b/anndata/tests/test_dask.py @@ -111,6 +111,12 @@ def test_dask_distributed_write(adata, tmp_path, diskfmt): adata.obsm["b"] = da.random.random((M, 10)) adata.varm["a"] = da.random.random((N, 10)) orig = adata + if diskfmt == "h5ad": + with pytest.raises( + ValueError, match="Cannot write dask arrays to hdf5" + ): + write_elem(g, "", orig) + return write_elem(g, "", orig) curr = read_elem(g) From 7a7ec6312626476469b3a5c3ce5262d22206904a Mon Sep 17 00:00:00 2001 From: syelman Date: Tue, 22 Aug 2023 14:10:20 +0200 Subject: [PATCH 4/9] add importorskip --- anndata/tests/test_dask.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/anndata/tests/test_dask.py b/anndata/tests/test_dask.py index 7b60046ad..a89cd2532 100644 --- a/anndata/tests/test_dask.py +++ b/anndata/tests/test_dask.py @@ -96,10 +96,12 @@ def test_dask_write(adata, tmp_path, diskfmt): assert isinstance(orig.varm["a"], DaskArray) +@pytest.mark.dependency(depends=[""]) def test_dask_distributed_write(adata, tmp_path, diskfmt): import dask.array as da import numpy as np - import dask.distributed as dd + + dd = pytest.importorskip("dask.distributed") pth = tmp_path / f"test_write.{diskfmt}" g = as_group(pth, mode="w") From 4400d6a0d8c3c04920e52f0b2c969a58adb82bd6 Mon Sep 17 00:00:00 2001 From: syelman Date: Tue, 22 Aug 2023 14:13:52 +0200 Subject: [PATCH 5/9] add dask distributed to tests --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index f8e97c1a0..af923dedd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,7 @@ test = [ "boltons", "scanpy", "dask[array]", + "dask[distributed]", "awkward>=2.3", "pytest_memray", ] From 56c59ab5e2f43876b5955599f0c45d8bebf6f94d Mon Sep 17 00:00:00 2001 From: syelman Date: Tue, 22 Aug 2023 14:23:53 +0200 Subject: [PATCH 6/9] fix extra line --- anndata/tests/test_dask.py | 1 - 1 file changed, 1 deletion(-) diff --git a/anndata/tests/test_dask.py b/anndata/tests/test_dask.py index a89cd2532..e7e153095 100644 --- a/anndata/tests/test_dask.py +++ b/anndata/tests/test_dask.py @@ -96,7 +96,6 @@ def test_dask_write(adata, tmp_path, diskfmt): assert isinstance(orig.varm["a"], DaskArray) -@pytest.mark.dependency(depends=[""]) def test_dask_distributed_write(adata, tmp_path, diskfmt): import dask.array as da import numpy as np From cc8aa4e9c9664939c2b9619314da7c9c97a1fde3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Selman=20=C3=96zleyen?= <32667648+syelman@users.noreply.github.com> Date: Tue, 22 Aug 2023 14:26:22 +0200 Subject: [PATCH 7/9] Update pyproject.toml Co-authored-by: Philipp A. --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index af923dedd..cb66b0cd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,8 +87,7 @@ test = [ "joblib", "boltons", "scanpy", - "dask[array]", - "dask[distributed]", + "dask[array,distributed]", "awkward>=2.3", "pytest_memray", ] From e4d13b3a4d9587b959d73041db5880152b8f4baa Mon Sep 17 00:00:00 2001 From: syelman Date: Tue, 22 Aug 2023 14:28:43 +0200 Subject: [PATCH 8/9] pytest mark need --- anndata/tests/test_dask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/anndata/tests/test_dask.py b/anndata/tests/test_dask.py index e7e153095..e7c5bea87 100644 --- a/anndata/tests/test_dask.py +++ b/anndata/tests/test_dask.py @@ -96,12 +96,12 @@ def test_dask_write(adata, tmp_path, diskfmt): assert isinstance(orig.varm["a"], DaskArray) +@pytest.mark.needs("dask.distributed") def test_dask_distributed_write(adata, tmp_path, diskfmt): import dask.array as da + import dask.distributed as dd import numpy as np - dd = pytest.importorskip("dask.distributed") - pth = tmp_path / f"test_write.{diskfmt}" g = as_group(pth, mode="w") From 75dec29f0a531f52e2f31d51dcd3b684ab012d34 Mon Sep 17 00:00:00 2001 From: Philipp A Date: Fri, 25 Aug 2023 14:01:19 +0200 Subject: [PATCH 9/9] remove unneeded mark --- anndata/tests/test_dask.py | 1 - 1 file changed, 1 deletion(-) diff --git a/anndata/tests/test_dask.py b/anndata/tests/test_dask.py index e7c5bea87..cb745a8f5 100644 --- a/anndata/tests/test_dask.py +++ b/anndata/tests/test_dask.py @@ -96,7 +96,6 @@ def test_dask_write(adata, tmp_path, diskfmt): assert isinstance(orig.varm["a"], DaskArray) -@pytest.mark.needs("dask.distributed") def test_dask_distributed_write(adata, tmp_path, diskfmt): import dask.array as da import dask.distributed as dd