Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example to create a virtual dataset using lithops #203

Merged

Conversation

thodson-usgs
Copy link
Contributor

@thodson-usgs thodson-usgs commented Jul 29, 2024

At the suggestion of @TomNicholas, I created a simple example using lithops (and serverless compute) to create a virtual dataset from a list of netcdf files hosted on s3.

This PR depends on the fix provided in #206 (now merged).

Resolved with #206

The workflow was broken in the latest version of VirtualiZarr.
 The example runs fine on 5d08519.
However, using 179bb2a the workflow will run, but complains about ValueError: Could not convert object to NumPy datetime when I open the dataset using xarray:

Traceback (most recent call last):
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/zarr/meta.py", line 127, in decode_array_metadata
    fill_value = cls.decode_fill_value(meta["fill_value"], dtype, object_codec)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/zarr/meta.py", line 260, in decode_fill_value
    return np.array(v, dtype=dtype)[()]
           ^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Could not convert object to NumPy datetime

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/api.py", line 571, in open_dataset
    backend_ds = backend.open_dataset(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/kerchunk/xarray_backend.py", line 12, in open_dataset
    ref_ds = open_reference_dataset(
             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/kerchunk/xarray_backend.py", line 46, in open_reference_dataset
    return xr.open_dataset(m, engine="zarr", consolidated=False, **open_dataset_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/api.py", line 571, in open_dataset
    backend_ds = backend.open_dataset(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/zarr.py", line 1182, in open_dataset
    ds = store_entrypoint.open_dataset(
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/store.py", line 43, in open_dataset
    vars, attrs = filename_or_obj.load()
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/common.py", line 221, in load
    (_decode_variable_name(k), v) for k, v in self.get_variables().items()
                                              ^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/zarr.py", line 563, in get_variables
    return FrozenDict(
           ^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/core/utils.py", line 443, in FrozenDict
    return Frozen(dict(*args, **kwargs))
                  ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/zarr.py", line 563, in <genexpr>
    return FrozenDict(
                     ^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/zarr/hierarchy.py", line 691, in _array_iter
    yield _key if keys_only else (_key, self[key])
                                        ~~~~^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/zarr/hierarchy.py", line 467, in __getitem__
    return Array(
           ^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/zarr/core.py", line 170, in __init__
    self._load_metadata()
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/zarr/core.py", line 193, in _load_metadata
    self._load_metadata_nosync()
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/zarr/core.py", line 207, in _load_metadata_nosync
    meta = self._store._metadata_class.decode_array_metadata(meta_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/zarr/meta.py", line 141, in decode_array_metadata
    raise MetadataError("error decoding metadata") from e
zarr.errors.MetadataError: error decoding metadata

@TomNicholas
Copy link
Member

TomNicholas commented Jul 30, 2024

This is awesome @thodson-usgs ! Thanks for trying this out!

I'm marking this WIP, because the workflow is broken in the latest version of VirtualiZarr.

I'll use #201 to track whatever regression has occurred there, so we can talk about the cool serverless map_reduce you've done here! (which is relevant to #123)

My conception of what lithops.map_reduce is actually doing is quite fuzzy. Is it saving out any intermediate results to a storage layer? Is it doing multiple rounds of reduction (i.e. a tree-reduce)? Do you foresee any scaling issues with this approach?

@thodson-usgs
Copy link
Contributor Author

Hmm. I pulled the lastest version of VirtualiZarr in my testing environment, but I neglected to rebuild the runtime image, so I'll double check that.

My conception of what lithops.map_reduce is actually doing is quite fuzzy. Is it saving out any intermediate results to a storage layer? Is it doing multiple rounds of reduction (i.e. a tree-reduce)? Do you foresee any scaling issues with this approach?

Right, I set up s3 storage for cubed, but I think this workflow is entirely in memory. So, we'll invariably hit scaling issues unless avoid the reduce and by writing to disk during the map operation. Nevertheless, I'm really excited by how easy this was to set up, and I hope others will help improve upon it.

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Jul 30, 2024

Hmm. I pulled the lastest version of VirtualiZarr in my testing environment, but I neglected to rebuild the runtime image, so I'll double check that.

No, I rebuilt the runtime image using the latest VirtualiZarr commit, and the error persisted, so I believe it's real.

@TomNicholas
Copy link
Member

Right, I set up s3 storage for cubed

You're not using cubed at all here, that's for the actual rechunking.

I think this workflow is entirely in memory.

Lithops does have the ability to persist things - did you set up the storage layer for that?

So, we'll invariably hit scaling issues unless avoid the reduce and by writing to disk during the map operation. Nevertheless, I'm really excited by how easy this was to set up, and I hope others will help improve upon it.

My plan for scaling this to arbitrary size is actually to use cubed for the virtualizarr array reduction too - see #123 (comment). I expect this to be pretty complicated to achieve though - I'm not even sure if it's possible yet.

@TomNicholas
Copy link
Member

No, I rebuilt the runtime image using the latest VirtualiZarr commit, and the error persisted, so I believe it's real.

Presumably this error can be reproduced without lithops involved at all?

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Jul 30, 2024

Presumably this error can be reproduced without lithops involved at all?

Good point. I'll try that next, though something is fishy or else how did this work with previous versions?

My plan for scaling this to arbitrary size is actually to use cubed for the virtualizarr array reduction too - see #123 (comment). I expect this to be pretty complicated to achieve though - I'm not even sure if it's possible yet.

Ah, okay. So maybe my hope of simply creating a skeleton zarr, then writing the meta-chunks during the map with to_zarr(region=) is half baked. (The current workflow writes to json, but I'll test cloud optimized formats once this is running.)

@TomNicholas
Copy link
Member

I'll try that next, though something is fishy or else how did this work with previous versions?

My current guess is that we simply introduced some accidental regression in virtualizarr recently. The way to find it is to (1) reproduce the error without all the lithops stuff (opening and concatenating 2 files should be enough), then (2) use git bisect to find the offending commit.

Ah, okay. So maybe my hope of simply creating a skeleton zarr, then writing the meta-chunks during the map with to_zarr(region=) is half baked. (The current workflow writes to json, but I'll test cloud optimized formats once this is running.)

If you write a manifest to zarr right now (i.e. a "virtual zarr store") you have no way of opening or loading the data via xarray/zarr, because no zarr reader understands what a manifest.json is yet.

My model of what we're trying to do is:

  • We need to get to one in-memory virtual xr.Dataset, on one worker, containing references to all the netcdf files. Once we have that we can save it to kerchunk json / kerchunk parquet / zarr manifests / whatever and we would be done (for a large number of references we should currently save to kerchunk parquet format).
  • We know that that virtual dataset will fit on one worker, because we estimate the memory requirements in Performance roadmap #104 (comment).
  • Problem is that inspecting the netCDF files to generate chunk references is slow, and we want to parallelize that across many workers (i.e. call open_virtual_dataset(some_file.nc')` on each worker).
  • This step is embarrassingly parallel and hence can be done nicely with serverless functions, i.e. lithops map.
  • Issue is that once we open them we have to combine them in-memory via concatenation. If we used dask we could communicate them all to one worker, but with serverless the only way to communicate is to write to persistent storage then read from that storage.
  • I believe lithops map_reduce uses a storage layer to hold intermediate results https://lithops-cloud.github.io/docs/source/design.html#computation-flow. So it does a map, then reduce, each of which must read from and write to that storage layer.
  • Using that might be enough for a 1D reduce job, especially if there isn't too many files. But to do it in N-dimensions at large scale we really want a N-dimensional tree-reduce using serverless functions.
  • That's what cubed implements, for the case of N-D arrays. Then the question is how to get cubed to understand what to do with a ManifestArray instead of a np.ndarray?

@thodson-usgs thodson-usgs force-pushed the lithops-to-kerchunk-example branch from 064380f to 8e301ef Compare July 31, 2024 19:35
@thodson-usgs thodson-usgs changed the title WIP: Add example to create a virtual dataset using lithops Add example to create a virtual dataset using lithops Jul 31, 2024
@thodson-usgs
Copy link
Contributor Author

Updated with the changes made in #206. Also fine to close this example, if cubed can do this better.

Copy link
Member

@TomNicholas TomNicholas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @thodson-usgs !

Also fine to close this example, if cubed can do this better.

I think doing this will Cubed will be quite involved. If this works effectively for you then that's already awesome!

examples/virtualizarr-with-lithops/README.md Outdated Show resolved Hide resolved
examples/virtualizarr-with-lithops/README.md Outdated Show resolved Hide resolved
examples/virtualizarr-with-lithops/README.md Outdated Show resolved Hide resolved
@TomNicholas TomNicholas merged commit 53a609f into zarr-developers:main Sep 5, 2024
8 checks passed
@TomNicholas
Copy link
Member

Thank you @thodson-usgs !!

print(f"{len(file_pattern)} file paths were retrieved.")


def map_references(fil):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason not to use file instead of fil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must've copied that directly from an example. I'll need to check whether this follows some convention or just a typo.

@douglatornell
Copy link
Contributor

Just a guess... file was a reserved word in Python2. If the example code was old enough, or written by someone with habits formed in the Python2 days (I'm one of those folks 😁 ), that might explain things.

@thodson-usgs
Copy link
Contributor Author

Just a guess... file was a reserved word in Python2. If the example code was old enough, or written by someone with habits formed in the Python2 days (I'm one of those folks 😁 ), that might explain things.

Ah, @douglatornell! I thought that was the case but then I didn't see file among the reserved words. Thanks for clarifying.

@douglatornell
Copy link
Contributor

Yeah @thodson-usgs, I did a bit of a double take when I looked at the reserved words list and realized that file isn't reserved anymore in Python3.

@abarciauskas-bgse
Copy link
Collaborator

@thodson-usgs excited to see the lithops integration!

@TomNicholas TomNicholas mentioned this pull request Dec 16, 2024
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance references generation Reading byte ranges from archival files usage example Real world use case examples
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants