Skip to content

Commit

Permalink
Update dask_write.py to the new API (#527)
Browse files Browse the repository at this point in the history
* Update dask_write.py to the new API

* Update icechunk-python/examples/dask_write.py

Co-authored-by: Deepak Cherian <[email protected]>

* Update icechunk-python/examples/dask_write.py

Co-authored-by: Deepak Cherian <[email protected]>

---------

Co-authored-by: Deepak Cherian <[email protected]>
  • Loading branch information
paraseba and dcherian authored Dec 31, 2024
1 parent 84a8920 commit 1316e91
Showing 1 changed file with 49 additions and 48 deletions.
97 changes: 49 additions & 48 deletions icechunk-python/examples/dask_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
class Task:
"""A task distributed to Dask workers"""

store: (
icechunk.IcechunkStore
) # The worker will use this Icechunk store to read/write to the dataset
session: (
icechunk.Session
) # The worker will use this Icechunk session to read/write to the dataset
time: (
int # The position in the coordinate dimension where the read/write should happen
)
Expand All @@ -65,28 +65,29 @@ def generate_task_array(task: Task, shape: tuple[int, ...]) -> np.typing.ArrayLi
return np.random.rand(*shape)


def execute_write_task(task: Task) -> bytes:
def execute_write_task(task: Task) -> icechunk.Session:
"""Execute task as a write task.
This will read the time coordinade from `task` and write a "pancake" in that position,
using random data. Random data is generated using the task seed.
Returns the Icechunk store after the write is done.
Returns the Icechunk session after the write is done.
As you can see Icechunk stores can be passed to remote workers, and returned from them.
The reason to return the store is that we'll need all the remote stores, when they are
As you can see Icechunk session can be passed to remote workers, and returned from them.
The reason to return the session is that we'll need all the remote session, when they are
done, to be able to do a single, global commit to Icechunk.
"""

store = task.store
session = task.session
store = session.store

group = zarr.group(store=store, overwrite=False)
array = cast(zarr.Array, group["array"])
dprint(f"Writing at t={task.time}")
data = generate_task_array(task, array.shape[0:2])
array[:, :, task.time] = data
dprint(f"Writing at t={task.time} done")
return store.change_set_bytes()
return session


def execute_read_task(task: Task) -> None:
Expand All @@ -95,10 +96,11 @@ def execute_read_task(task: Task) -> None:
This will read the time coordinade from `task` and read a "pancake" in that position.
Then it will assert the data is valid by re-generating the random data from the passed seed.
As you can see Icechunk stores can be passed to remote workers.
As you can see Icechunk sessions can be passed to remote workers.
"""

store = task.store
session = task.session
store = session.store
group = zarr.group(store=store, overwrite=False)
array = cast(zarr.Array, group["array"])

Expand All @@ -109,37 +111,42 @@ def execute_read_task(task: Task) -> None:


def storage_config(args: argparse.Namespace) -> dict[str, Any]:
"""Return the Icechunk store S3 configuration map"""
"""Return the Icechunk S3 configuration map"""
bucket = args.url.netloc
prefix = args.url.path[1:]
return {
"bucket": bucket,
"prefix": prefix,
"region": "us-east-1",
}


def store_config(args: argparse.Namespace) -> dict[str, Any]:
"""Return the Icechunk store configuration.
def repository_config(args: argparse.Namespace) -> icechunk.RepositoryConfig:
"""Return the Icechunk repo configuration.
We lower the default to make sure we write chunks and not inline them.
"""
return {"inline_chunk_threshold_bytes": 1}
config = icechunk.RepositoryConfig.default()
config.inline_chunk_threshold_bytes = 1
return config


def create(args: argparse.Namespace) -> None:
"""Execute the create subcommand.
Creates an Icechunk store, a root group and an array named "array"
Creates an Icechunk repo, a root group and an array named "array"
with the shape passed as arguments.
Commits the Icechunk repository when done.
"""
store = icechunk.IcechunkStore.open_or_create(
storage=icechunk.StorageConfig.s3_from_env(**storage_config(args)),
mode="w",
config=icechunk.StoreConfig(**store_config(args)),
repo = icechunk.Repository.create(
storage=icechunk.s3_storage(**storage_config(args)),
config=repository_config(args),
)

session = repo.writable_session("main")
store = session.store

group = zarr.group(store=store, overwrite=True)
shape = (
args.x_chunks * args.chunk_x_size,
Expand All @@ -155,8 +162,8 @@ def create(args: argparse.Namespace) -> None:
dtype="f8",
fill_value=float("nan"),
)
_first_snapshot = store.commit("array created")
print("Array initialized")
first_snapshot = session.commit("array created")
print(f"Array initialized, snapshot {first_snapshot}")


def update(args: argparse.Namespace) -> None:
Expand All @@ -167,26 +174,21 @@ def update(args: argparse.Namespace) -> None:
can still use a lower level API to do the writes:
* We split the work into small `Task`s, one 'pancake' per task, at a given t coordinate.
* We use Dask's `map` to ship the `Task` to a worker
* The `Task` includes a copy of the Icechunk Store, so workers can do the writes
* When workers are done, they send their store back
* When all workers are done (Dask's `gather`), we take all Stores and do a distributed commit in Icechunk
* The `Task` includes a copy of the Icechunk Session, so workers can do the writes
* When workers are done, they send their Session back
* When all workers are done (Dask's `gather`), we take all Sessions and do a distributed commit in Icechunk
"""
storage_conf = storage_config(args)
store_conf = store_config(args)

store = icechunk.IcechunkStore.open_or_create(
storage=icechunk.StorageConfig.s3_from_env(**storage_conf),
mode="r+",
config=icechunk.StoreConfig(**store_conf),
repo = icechunk.Repository.open(
storage=icechunk.s3_storage(**storage_config(args)),
config=repository_config(args),
)

group = zarr.group(store=store, overwrite=False)
array = cast(zarr.Array, group["array"])
print(f"Found an array with shape: {array.shape}")
session = repo.writable_session("main")

tasks = [
Task(
store=store,
session=session,
time=time,
seed=time,
)
Expand All @@ -196,15 +198,15 @@ def update(args: argparse.Namespace) -> None:
client = Client(n_workers=args.workers, threads_per_worker=1)

map_result = client.map(execute_write_task, tasks)
worker_changes = client.gather(map_result)
worker_sessions = client.gather(map_result)

print("Starting distributed commit")
# we can use the current store as the commit coordinator, because it doesn't have any pending changes,
# we can use the current session as the commit coordinator, because it doesn't have any pending changes,
# all changes come from the tasks, Icechunk doesn't care about where the changes come from, the only
# important thing is to not count changes twice
for changes in worker_changes:
store.merge(changes)
commit_res = store.commit("distributed commit")
for worker_session in worker_sessions:
session.merge(worker_session)
commit_res = session.commit("distributed commit")
assert commit_res
print("Distributed commit done")

Expand All @@ -219,22 +221,21 @@ def verify(args: argparse.Namespace) -> None:
* We use Dask's `map` to ship the `Task` to a worker
* The `Task` includes a copy of the Icechunk Store, so workers can do the Icechunk reads
"""
storage_conf = storage_config(args)
store_conf = store_config(args)

store = icechunk.IcechunkStore.open_or_create(
storage=icechunk.StorageConfig.s3_from_env(**storage_conf),
mode="r",
config=icechunk.StoreConfig(**store_conf),
repo = icechunk.Repository.open(
storage=icechunk.s3_storage(**storage_config(args)),
config=repository_config(args),
)

session = repo.writable_session("main")
store = session.store

group = zarr.group(store=store, overwrite=False)
array = cast(zarr.Array, group["array"])
print(f"Found an array with shape: {array.shape}")

tasks = [
Task(
store=store,
session=session,
time=time,
seed=time,
)
Expand Down

0 comments on commit 1316e91

Please sign in to comment.