Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsolidateMetadata2 #678

Merged
3 changes: 1 addition & 2 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:

@dataclass
class ConsolidateMetadata(beam.PTransform):
"""Calls Zarr Python consolidate_metadata on an existing Zarr store or Kerchunk reference
"""Calls Zarr Python consolidate_metadata on an existing Zarr store
(https://zarr.readthedocs.io/en/stable/_modules/zarr/convenience.html#consolidate_metadata)"""

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
Expand Down Expand Up @@ -542,7 +542,6 @@ def expand(self, references: beam.PCollection) -> beam.PCollection[zarr.storage.
# unpack fsspec options that will be used below for transforms without dep injection
storage_options = self.target_root.fsspec_kwargs # type: ignore[union-attr]
remote_protocol = self.target_root.get_fsspec_remote_protocol() # type: ignore[union-attr]

return (
references
| CombineReferences(
Expand Down
37 changes: 21 additions & 16 deletions pangeo_forge_recipes/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ def _is_first_in_merge_dim(index):


def consolidate_metadata(store: MutableMapping) -> MutableMapping:
"""Consolidate metadata for a Zarr store or Kerchunk reference
"""Consolidate metadata for a Zarr store

:param store: Input Store for Zarr or Kerchunk reference
:param store: Input Store for Zarr
:type store: MutableMapping
:return: Output Store
:rtype: MutableMapping
Expand All @@ -79,12 +79,16 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping:
import zarr

if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem):
ref_path = store.fs.storage_args[0]
path = fsspec.get_mapper("reference://", fo=ref_path)
raise ValueError(
"Creating consolidated metadata for Kerchunk references has"
" no preformance benefits: Issue: "
" https://github.com/pangeo-forge/pangeo-forge-recipes/issues/675"
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved
)

if isinstance(store, zarr.storage.FSStore):
path = store.path
zarr.convenience.consolidate_metadata(store)

zc = zarr.consolidate_metadata(path)
zc = zarr.open_consolidated(store)
return zc


Expand Down Expand Up @@ -119,7 +123,7 @@ def write_combined_reference(
full_target: FSSpecTarget,
concat_dims: List[str],
output_file_name: str,
refs_per_component: int = 1000,
refs_per_component: int = 10000,
mzz_kwargs: Optional[Dict] = None,
) -> zarr.storage.FSStore:
"""Write a kerchunk combined references object to file."""
Expand All @@ -132,22 +136,17 @@ def write_combined_reference(
storage_options = full_target.fsspec_kwargs # type: ignore[union-attr]
remote_protocol = full_target.get_fsspec_remote_protocol() # type: ignore[union-attr]

# If reference is a ReferenceFileSystem, write to json
if isinstance(reference, fsspec.FSMap) and isinstance(reference.fs, ReferenceFileSystem):
# context manager reuses dep injected auth credentials without passing storage options
with full_target.fs.open(outpath, "wb") as f:
f.write(ujson.dumps(reference.fs.references).encode())

elif file_ext == ".parquet":
if file_ext == ".parquet":
# Creates empty parquet store to be written to
if full_target.exists(output_file_name):
full_target.rm(output_file_name, recursive=True)
full_target.makedir(output_file_name)

out = LazyReferenceMapper.create(refs_per_component, outpath, full_target.fs)
out = LazyReferenceMapper.create(
root=outpath, fs=full_target.fs, record_size=refs_per_component
)

# Calls MultiZarrToZarr on a MultiZarrToZarr object and adds kwargs to write to parquet.

MultiZarrToZarr(
[reference],
concat_dims=concat_dims,
Expand All @@ -161,6 +160,12 @@ def write_combined_reference(
# call to write reference to empty parquet store
out.flush()

# If reference is a ReferenceFileSystem, write to json
elif isinstance(reference, fsspec.FSMap) and isinstance(reference.fs, ReferenceFileSystem):
# context manager reuses dep injected auth credentials without passing storage options
with full_target.fs.open(outpath, "wb") as f:
f.write(ujson.dumps(reference.fs.references).encode())

else:
raise NotImplementedError(f"{file_ext = } not supported.")
return ReferenceFileSystem(
Expand Down
1 change: 1 addition & 0 deletions tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def test_reference_netcdf(
)
full_path = os.path.join(tmp_target.root_path, store_name, output_file_name)
file_ext = os.path.splitext(output_file_name)[-1]

if file_ext == ".json":
mapper = fsspec.get_mapper("reference://", fo=full_path)
ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False})
Expand Down
17 changes: 12 additions & 5 deletions tests/test_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,13 @@ def test_zarr_consolidate_metadata(
)
| ConsolidateMetadata()
)
zc = zarr.storage.FSStore(os.path.join(tmp_target.root_path, "store"))

path = os.path.join(tmp_target.root_path, "store")
zc = zarr.storage.FSStore(path)
assert zc[".zmetadata"] is not None

assert xr.open_zarr(path, consolidated=True)


def test_zarr_encoding(
netcdf_local_file_pattern,
Expand Down Expand Up @@ -207,7 +211,6 @@ def test_reference_netcdf(
netcdf_local_file_pattern_sequential,
pipeline,
tmp_target,
# why are we not using tmp_target?
output_file_name,
):
pattern = netcdf_local_file_pattern_sequential
Expand All @@ -224,10 +227,14 @@ def test_reference_netcdf(
concat_dims=["time"],
output_file_name=output_file_name,
)
| ConsolidateMetadata()
)

full_path = os.path.join(tmp_target.root_path, store_name, output_file_name)
mapper = fsspec.get_mapper(
"reference://",
target_protocol=tmp_target.get_fsspec_remote_protocol(),
remote_protocol=tmp_target.get_fsspec_remote_protocol(),
fo=full_path,
)

mapper = fsspec.get_mapper("reference://", fo=full_path)
assert zarr.open_consolidated(mapper)
assert xr.open_zarr(mapper, consolidated=False)
Loading