From c4d4f23cced94fa56732d0ad24fcd66d43536d4e Mon Sep 17 00:00:00 2001 From: Raphael Hagen Date: Fri, 26 Jan 2024 10:37:43 -0700 Subject: [PATCH 1/6] updated consolidated_reference with path.fs.save_json --- pangeo_forge_recipes/writers.py | 6 ++++-- tests/test_writers.py | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index 41c013bf..e252165b 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -84,8 +84,10 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping: if isinstance(store, zarr.storage.FSStore): path = store.path - zc = zarr.consolidate_metadata(path) - return zc + zarr.convenience.consolidate_metadata(path) + # How do we update a parquet reference file? + path.fs.save_json(ref_path) + return path def store_dataset_fragment( diff --git a/tests/test_writers.py b/tests/test_writers.py index 3955116f..97e5b8e0 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -182,7 +182,6 @@ def test_reference_netcdf( netcdf_local_file_pattern_sequential, pipeline, tmp_target, - # why are we not using tmp_target? output_file_name, ): pattern = netcdf_local_file_pattern_sequential @@ -203,6 +202,6 @@ def test_reference_netcdf( ) full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) - mapper = fsspec.get_mapper("reference://", fo=full_path) assert zarr.open_consolidated(mapper) + assert xr.open_zarr(mapper, consolidated=True) From 72c073c661dcd89f73d3a6b939376ce6a9e92fbd Mon Sep 17 00:00:00 2001 From: Raphael Hagen Date: Fri, 26 Jan 2024 11:24:59 -0700 Subject: [PATCH 2/6] moved fs.save_json into ref path of consolidate_metadata --- pangeo_forge_recipes/writers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index e252165b..19431122 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -81,13 +81,14 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping: if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem): ref_path = store.fs.storage_args[0] path = fsspec.get_mapper("reference://", fo=ref_path) + path.fs.save_json(ref_path) if isinstance(store, zarr.storage.FSStore): path = store.path zarr.convenience.consolidate_metadata(path) # How do we update a parquet reference file? - path.fs.save_json(ref_path) - return path + zc = zarr.open_consolidated(path) + return zc def store_dataset_fragment( From bcbc52bebb0be2c39a8a787d296e3ed280e7d526 Mon Sep 17 00:00:00 2001 From: Raphael Hagen Date: Fri, 26 Jan 2024 13:15:18 -0700 Subject: [PATCH 3/6] consolidate json works, consolidate parquet fails --- pangeo_forge_recipes/writers.py | 29 +++++++++++++++-------------- tests/test_writers.py | 8 +++++++- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index 19431122..f7c4bf82 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -79,15 +79,16 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping: import zarr if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem): - ref_path = store.fs.storage_args[0] - path = fsspec.get_mapper("reference://", fo=ref_path) - path.fs.save_json(ref_path) + path = store.fs.storage_args[0] + + zarr.convenience.consolidate_metadata(store) + if "json" in os.path.split(path)[-1]: + store.fs.save_json(path) + if isinstance(store, zarr.storage.FSStore): - path = store.path + zarr.convenience.consolidate_metadata(store) - zarr.convenience.consolidate_metadata(path) - # How do we update a parquet reference file? - zc = zarr.open_consolidated(path) + zc = zarr.open_consolidated(store) return zc @@ -135,13 +136,7 @@ def write_combined_reference( storage_options = full_target.fsspec_kwargs # type: ignore[union-attr] remote_protocol = full_target.get_fsspec_remote_protocol() # type: ignore[union-attr] - # If reference is a ReferenceFileSystem, write to json - if isinstance(reference, fsspec.FSMap) and isinstance(reference.fs, ReferenceFileSystem): - # context manager reuses dep injected auth credentials without passing storage options - with full_target.fs.open(outpath, "wb") as f: - f.write(ujson.dumps(reference.fs.references).encode()) - - elif file_ext == ".parquet": + if file_ext == ".parquet": # Creates empty parquet store to be written to if full_target.exists(output_file_name): full_target.rm(output_file_name, recursive=True) @@ -164,6 +159,12 @@ def write_combined_reference( # call to write reference to empty parquet store out.flush() + # If reference is a ReferenceFileSystem, write to json + elif isinstance(reference, fsspec.FSMap) and isinstance(reference.fs, ReferenceFileSystem): + # context manager reuses dep injected auth credentials without passing storage options + with full_target.fs.open(outpath, "wb") as f: + f.write(ujson.dumps(reference.fs.references).encode()) + else: raise NotImplementedError(f"{file_ext = } not supported.") return ReferenceFileSystem( diff --git a/tests/test_writers.py b/tests/test_writers.py index 97e5b8e0..b8f162bf 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -202,6 +202,12 @@ def test_reference_netcdf( ) full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) - mapper = fsspec.get_mapper("reference://", fo=full_path) + mapper = fsspec.get_mapper( + "reference://", + target_protocol=tmp_target.get_fsspec_remote_protocol(), + remote_protocol=tmp_target.get_fsspec_remote_protocol(), + fo=full_path, + ) + assert zarr.open_consolidated(mapper) assert xr.open_zarr(mapper, consolidated=True) From 86a6eb3e71fc78a3244ec2d68a93887aa6e09a7b Mon Sep 17 00:00:00 2001 From: Raphael Hagen Date: Thu, 1 Feb 2024 11:39:07 -0700 Subject: [PATCH 4/6] removed kerchunk option for consolidate_metadata --- pangeo_forge_recipes/transforms.py | 2 +- pangeo_forge_recipes/writers.py | 14 +++++++------- tests/test_writers.py | 10 ++++++---- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index ef52f420..d62e26d5 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -402,7 +402,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: @dataclass class ConsolidateMetadata(beam.PTransform): - """Calls Zarr Python consolidate_metadata on an existing Zarr store or Kerchunk reference + """Calls Zarr Python consolidate_metadata on an existing Zarr store (https://zarr.readthedocs.io/en/stable/_modules/zarr/convenience.html#consolidate_metadata)""" def expand(self, pcoll: beam.PCollection) -> beam.PCollection: diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index f7c4bf82..f1f23071 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -68,9 +68,9 @@ def _is_first_in_merge_dim(index): def consolidate_metadata(store: MutableMapping) -> MutableMapping: - """Consolidate metadata for a Zarr store or Kerchunk reference + """Consolidate metadata for a Zarr store - :param store: Input Store for Zarr or Kerchunk reference + :param store: Input Store for Zarr :type store: MutableMapping :return: Output Store :rtype: MutableMapping @@ -79,11 +79,11 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping: import zarr if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem): - path = store.fs.storage_args[0] - - zarr.convenience.consolidate_metadata(store) - if "json" in os.path.split(path)[-1]: - store.fs.save_json(path) + raise ValueError( + "Creating consolidated metadata for Kerchunk references has" + " no preformance benefits: Issue: " + " https://github.com/pangeo-forge/pangeo-forge-recipes/issues/675" + ) if isinstance(store, zarr.storage.FSStore): zarr.convenience.consolidate_metadata(store) diff --git a/tests/test_writers.py b/tests/test_writers.py index b8f162bf..21b9c223 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -173,9 +173,13 @@ def test_zarr_consolidate_metadata( ) | ConsolidateMetadata() ) - zc = zarr.storage.FSStore(os.path.join(tmp_target.root_path, "store")) + + path = os.path.join(tmp_target.root_path, "store") + zc = zarr.storage.FSStore(path) assert zc[".zmetadata"] is not None + assert xr.open_zarr(path, consolidated=True) + @pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"]) def test_reference_netcdf( @@ -198,7 +202,6 @@ def test_reference_netcdf( concat_dims=["time"], output_file_name=output_file_name, ) - | ConsolidateMetadata() ) full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) @@ -209,5 +212,4 @@ def test_reference_netcdf( fo=full_path, ) - assert zarr.open_consolidated(mapper) - assert xr.open_zarr(mapper, consolidated=True) + assert xr.open_zarr(mapper, consolidated=False) From 5d96d58548d5baec35982dccfe4634d3ebfce5b8 Mon Sep 17 00:00:00 2001 From: Raphael Hagen Date: Thu, 1 Feb 2024 12:50:47 -0700 Subject: [PATCH 5/6] added kwargs to lazyreferencemapper to fix failing test --- pangeo_forge_recipes/transforms.py | 1 - pangeo_forge_recipes/writers.py | 7 ++++--- tests/test_end_to_end.py | 1 + 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index d0397a89..5fa8c40e 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -542,7 +542,6 @@ def expand(self, references: beam.PCollection) -> beam.PCollection[zarr.storage. # unpack fsspec options that will be used below for transforms without dep injection storage_options = self.target_root.fsspec_kwargs # type: ignore[union-attr] remote_protocol = self.target_root.get_fsspec_remote_protocol() # type: ignore[union-attr] - return ( references | CombineReferences( diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index f1f23071..0fa3c523 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -123,7 +123,7 @@ def write_combined_reference( full_target: FSSpecTarget, concat_dims: List[str], output_file_name: str, - refs_per_component: int = 1000, + refs_per_component: int = 10000, mzz_kwargs: Optional[Dict] = None, ) -> zarr.storage.FSStore: """Write a kerchunk combined references object to file.""" @@ -142,10 +142,11 @@ def write_combined_reference( full_target.rm(output_file_name, recursive=True) full_target.makedir(output_file_name) - out = LazyReferenceMapper.create(refs_per_component, outpath, full_target.fs) + out = LazyReferenceMapper.create( + root=outpath, fs=full_target.fs, record_size=refs_per_component + ) # Calls MultiZarrToZarr on a MultiZarrToZarr object and adds kwargs to write to parquet. - MultiZarrToZarr( [reference], concat_dims=concat_dims, diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index 009f4410..f746950a 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -106,6 +106,7 @@ def test_reference_netcdf( ) full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) file_ext = os.path.splitext(output_file_name)[-1] + if file_ext == ".json": mapper = fsspec.get_mapper("reference://", fo=full_path) ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False}) From 4b7a407f6f03ede1a5b23cc045cb37934a510dc1 Mon Sep 17 00:00:00 2001 From: Raphael Hagen Date: Mon, 5 Feb 2024 13:54:56 -0700 Subject: [PATCH 6/6] removed consolidated metadata issue discussion from ValueError --- pangeo_forge_recipes/writers.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index 0fa3c523..69608a9b 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -80,9 +80,8 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping: if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem): raise ValueError( - "Creating consolidated metadata for Kerchunk references has" - " no preformance benefits: Issue: " - " https://github.com/pangeo-forge/pangeo-forge-recipes/issues/675" + """Creating consolidated metadata for Kerchunk references should not + yield a performance benefit so consolidating metadata is not supported.""" ) if isinstance(store, zarr.storage.FSStore):