Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to use zarr.sync.ProcessSynchronizer(path) with S3 as path #1224

Closed
vietnguyengit opened this issue Oct 27, 2022 · 8 comments
Closed

Comments

@vietnguyengit
Copy link

Hi everyone,

I've been digging around to see if there's already an existing way to use zarr.sync.ProcessSynchronizer(path) with S3 as path, but no luck.

My scenario is I have a Lambda function that listens to S3 events and writes NetCDF files to a Zarr store (on S3), each Lambda call will process one NetCDF file.

As Lambda is a distributed system, 10 new files uploaded will trigger 10 different processes that try to write to the Zarr store pretty much at the same time, and I experience some data corruption issues.

Using zarr.sync.ProcessSynchronizer() in xarray.dataset.to_zarr(synchronizer=...) for DirectoryStore seems to solve this write consistency issue.

But storing Zarr store on S3 is important to us, and cloud-optimised format like Zarr should be able to fully support S3. So I wonder if this is a bug or a non-existing feature or I just don't know it yet.

Please advise.

Thanks everyone.

@rabernat
Copy link
Contributor

Hi @vietnguyengit and welcome! As far as I know, there is no way to provide the sort of synchronization you're looking for using existing tools. The fact is that S3 is an "eventually consistent" store, meaning that an out-of-band mechanism is required to manage synchronization and locking.

Question: are the writes to overlapping chunks? Or can you guarantee that each write from lambda will not overlap with other writes? If so, you should be able to avoid the need for synchronization completely.

If not, something like zarr-developers/zarr-specs#154 might be the solution. This is an area we are working on actively at the moment.

@tasansal
Copy link
Contributor

tasansal commented Nov 17, 2022

We have a similar situation too when inferring ML models on volumetric data stored as Zarr.
We have overlaps. The workarounds are very hacky :)

Is there a way to NFS mount a small persistent volume to the lambdas to handle the "path" of the lock?

Edit: it looks like AWS EFS volumes may do the trick.

@rabernat
Copy link
Contributor

Using a POSIX layer on top of S3 is certainly one way to approach this. But I feel that a more cloud-native solution is needed. The Apache Iceberg approach is, to me, very appealing (that's what zarr-developers/zarr-specs#154 is about). The TileDB approach is also very elegant and worth exploring.

Question for everyone here: if you have multiple processes writing simultaneously to the same region of an array at approximately the same time, how would you decide which write to prefer? Is it sufficient to just prefer to most recent write based on a timestamp?

@tasansal
Copy link
Contributor

tasansal commented Nov 17, 2022

@rabernat agreed, a cloud-native solution would be nice. Any fast cloud native key-value store would work to store the locks. We average the overlaps in our case (they're probabilities), so we don't have to choose one. However, I would think, the most recent should go in because the chunk could have zeros if it wasn't previously written.

I am aware of TileDB but haven't looked at their concurrent write approach, do you have any resources for it?

Edit: I think I found it: https://tiledb-inc-tiledb.readthedocs-hosted.com/en/1.6.3/tutorials/concurrency-consistency.html

@alexgleith
Copy link

Even if it's not writing to the same region, there's a race condition, or congestion, or whatever you call it over the indexes, so writes will fail.

This is my naive example to do a massively parallel write of NetCDFs into a Zarr: https://github.com/aodn/aodn-public-notebooks/blob/main/zarr_creation/sst_zarr_creation_threaded.py

But there's a whole bunch of errors that happen, and it's a mess! So some form of locking, even just for the metadata, is required, I think.

@vietnguyengit
Copy link
Author

Hi @rabernat thanks for your comments, it was tricky to guarantee the Lambda invocations won't overlap, I'll have a look at your suggestions. Much appreciated!

Question for everyone here: if you have multiple processes writing simultaneously to the same region of an array at approximately the same time, how would you decide which write to prefer? Is it sufficient to just prefer to most recent write based on a timestamp?

I'm not sure if I answer your question correctly, but I tried to use the timestamp and sometimes with the SQS FIFO queues.

@tasansal I have tried with EFS but my experience so far has not been so successful, here is the architecture, I use Prefect to observe the flow runs.

image

Let me know if you have different results.

The workarounds are very hacky :)

What are your hacky workarounds at a high level, please?

And agreed with you all, a cloud-native approach would be ideal.

@tasansal
Copy link
Contributor

tasansal commented Nov 18, 2022

@tasansal I have tried with EFS but my experience so far has not been so successful, here is the architecture, I use Prefect to observe the flow runs.

@vietnguyengit That's a bummer, what were the issues with EFS? Too slow for locks?

The workarounds are very hacky :)

What are your hacky workarounds at a high level, please?

Just to clarify, we operate in a very different environment. We don't use stateless compute like Lambda. Instead we typically have a Dask cluster. For our case, the following were some options:

  • Having an NFS mounted volume just for locks, or a PVC if on K8s and use Zarr's ProcessSynchronizer
  • Inventing a key-value store wrapper for keeping the locks (like etcd or redis etc.)
  • Using Dask's locks on a Dask cluster.

We haven't tried any of these yet, but potential hacky workarounds, was literally discussing this today with the team.
None of them are good ideas, or may not be applicable if your workflow can't be Dask-ified, IMHO, but they may work.

I think dask is the way to go TBH if you can move your workflow to Dask. Because the Dask chunks can naturally align with the Zarr chunks, and Dask locks work across the cluster. The scheduler handles all the locking, and its pretty fast.

Actually I think @rabernat's pangeo-forge-recipes uses something similar. Over there you can run a Prefect DAG on a Dask cluster and merge a bunch of NetCDFs into a large Zarr. There are some locking mechanisms in place per variable iirc, I looked at it a while back. I may be remembering some things wrong. He can comment more on that :)

@vietnguyengit
Copy link
Author

vietnguyengit commented Nov 18, 2022

Thanks @tasansal I see, we do have workflows with Dask and orchestrated with Prefect to process massive Zarr store for aggregated data from hundreds of thousands of NetCDF files. And that works fine.

For example:

The experiments with Lambda were specifically to handle the case when some of the ingested files in that "Big" Zarr store were being revised (e.g. data provider recalibrated their calculations etc.) and we want the relevant regions of the Zarr store to be updated to reflect new data changes.

Anyhow, we concluded that Lambda was not fit for the purposes due to consistency issues.

The ability to have locks when processing files in multiple processes to the S3 Zarr store will help us in the decision for "event-driven" architecture when we receive "revised NetCDF files" from the data providers.

For now, scheduled flows bring fewer problems to deal with for our cases.

@zarr-developers zarr-developers locked and limited conversation to collaborators Dec 2, 2022
@joshmoore joshmoore converted this issue into discussion #1280 Dec 2, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants