Skip to content

Commit

Permalink
Merge pull request #203 from earth-mover/seba/empty-manifest
Browse files Browse the repository at this point in the history
Don't write empty manifests
  • Loading branch information
paraseba authored Oct 13, 2024
2 parents 651a697 + 98fdc94 commit 95bc718
Show file tree
Hide file tree
Showing 37 changed files with 148 additions and 40 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"8AEWDWJRTMECASF516SG"}
{"snapshot":"JSQ148MYX6VKHPP4D0WG"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"3KP6E7F3C2PE2HNGCNM0"}
{"snapshot":"MTH5CQPGNWZ516P7QVK0"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"C1ZKMGE3ESPJ24YKN9MG"}
{"snapshot":"3EMAFJFYV394722VTAPG"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"H01K0XJPGVW4HFX470AG"}
{"snapshot":"J0N9DYWKA5ECGPP056NG"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"AG1HZQ5SWS8DM8DNC670"}
{"snapshot":"ATFBNT6AY8J7ERFY8SD0"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"VNKSCC59M58V0MSJ01RG"}
{"snapshot":"ZW2AXQ9ZDPRS9V5331PG"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"8AEWDWJRTMECASF516SG"}
{"snapshot":"JSQ148MYX6VKHPP4D0WG"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"AG1HZQ5SWS8DM8DNC670"}
{"snapshot":"ATFBNT6AY8J7ERFY8SD0"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"VNKSCC59M58V0MSJ01RG"}
{"snapshot":"ZW2AXQ9ZDPRS9V5331PG"}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
14 changes: 9 additions & 5 deletions icechunk/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl ChangeSet {

pub fn new_nodes_iterator<'a>(
&'a self,
manifest_id: &'a ManifestId,
manifest_id: Option<&'a ManifestId>,
) -> impl Iterator<Item = NodeSnapshot> + 'a {
self.new_nodes().filter_map(move |path| {
if self.is_deleted(path) {
Expand All @@ -330,10 +330,14 @@ impl ChangeSet {
match node.node_data {
NodeData::Group => Some(node),
NodeData::Array(meta, _no_manifests_yet) => {
let new_manifests = vec![ManifestRef {
object_id: manifest_id.clone(),
extents: ManifestExtents(vec![]),
}];
let new_manifests = manifest_id
.map(|mid| {
vec![ManifestRef {
object_id: mid.clone(),
extents: ManifestExtents(vec![]),
}]
})
.unwrap_or_default();
Some(NodeSnapshot {
node_data: NodeData::Array(meta, new_manifests),
..node
Expand Down
144 changes: 118 additions & 26 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,13 +638,8 @@ impl Repository {
pub async fn list_nodes(
&self,
) -> RepositoryResult<impl Iterator<Item = NodeSnapshot> + '_> {
updated_nodes(
self.storage.as_ref(),
&self.change_set,
&self.snapshot_id,
&ObjectId::FAKE,
)
.await
updated_nodes(self.storage.as_ref(), &self.change_set, &self.snapshot_id, None)
.await
}

pub async fn all_chunks(
Expand Down Expand Up @@ -842,18 +837,16 @@ async fn updated_existing_nodes<'a>(
storage: &(dyn Storage + Send + Sync),
change_set: &'a ChangeSet,
parent_id: &SnapshotId,
manifest_id: &'a ManifestId,
manifest_id: Option<&'a ManifestId>,
) -> RepositoryResult<impl Iterator<Item = NodeSnapshot> + 'a> {
// TODO: solve this duplication, there is always the possibility of this being the first
// version
let manifest_refs = manifest_id.map(|mid| {
vec![ManifestRef { object_id: mid.clone(), extents: ManifestExtents(vec![]) }]
});
let updated_nodes =
storage.fetch_snapshot(parent_id).await?.iter_arc().filter_map(move |node| {
let new_manifests = if node.node_type() == NodeType::Array {
//FIXME: it could be none for empty arrays
Some(vec![ManifestRef {
object_id: manifest_id.clone(),
extents: ManifestExtents(vec![]),
}])
manifest_refs.clone()
} else {
None
};
Expand All @@ -867,7 +860,7 @@ async fn updated_nodes<'a>(
storage: &(dyn Storage + Send + Sync),
change_set: &'a ChangeSet,
parent_id: &SnapshotId,
manifest_id: &'a ManifestId,
manifest_id: Option<&'a ManifestId>,
) -> RepositoryResult<impl Iterator<Item = NodeSnapshot> + 'a> {
Ok(updated_existing_nodes(storage, change_set, parent_id, manifest_id)
.await?
Expand Down Expand Up @@ -961,20 +954,30 @@ async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
.map_ok(|(_path, chunk_info)| chunk_info);

let new_manifest = Arc::new(Manifest::from_stream(chunks).await?);
let new_manifest_id = ObjectId::random();
storage.write_manifests(new_manifest_id.clone(), Arc::clone(&new_manifest)).await?;
let new_manifest_id = if new_manifest.len() > 0 {
let id = ObjectId::random();
storage.write_manifests(id.clone(), Arc::clone(&new_manifest)).await?;
Some(id)
} else {
None
};

let all_nodes =
updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?;
updated_nodes(storage, &change_set, parent_id, new_manifest_id.as_ref()).await?;

let old_snapshot = storage.fetch_snapshot(parent_id).await?;
let mut new_snapshot = Snapshot::from_iter(
old_snapshot.as_ref(),
Some(properties),
vec![ManifestFileInfo {
id: new_manifest_id.clone(),
format_version: new_manifest.icechunk_manifest_format_version,
}],
new_manifest_id
.as_ref()
.map(|mid| {
vec![ManifestFileInfo {
id: mid.clone(),
format_version: new_manifest.icechunk_manifest_format_version,
}]
})
.unwrap_or_default(),
vec![],
all_nodes,
);
Expand Down Expand Up @@ -1721,6 +1724,9 @@ mod tests {
atts == UserAttributesSnapshot::Inline(UserAttributes::try_new(br#"{"foo":42}"#).unwrap())
));

// since we wrote every asset and we are using a caching storage, we should never need to fetch them
assert!(logging.fetch_operations().is_empty());

//test the previous version is still alive
let ds = Repository::update(Arc::clone(&storage), previous_snapshot_id).build();
assert_eq!(
Expand All @@ -1732,9 +1738,6 @@ mod tests {
Some(ChunkPayload::Inline("new chunk".into()))
);

// since we write every asset and we are using a caching storage, we should never need to fetch them
assert!(logging.fetch_operations().is_empty());

Ok(())
}

Expand Down Expand Up @@ -1817,9 +1820,29 @@ mod tests {

#[tokio::test]
async fn test_manifests_shrink() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> =
let in_mem_storage =
Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into())));
let storage: Arc<dyn Storage + Send + Sync> = in_mem_storage.clone();
let mut ds = Repository::init(Arc::clone(&storage), false).await?.build();

// there should be no manifests yet
assert!(!in_mem_storage
.all_keys()
.await?
.iter()
.any(|key| key.contains("manifest")));

// initialization creates one snapshot
assert_eq!(
1,
in_mem_storage
.all_keys()
.await?
.iter()
.filter(|key| key.contains("snapshot"))
.count(),
);

ds.add_group(Path::root()).await?;
let zarr_meta = ZarrArrayMetadata {
shape: vec![5, 5],
Expand All @@ -1841,6 +1864,30 @@ mod tests {
ds.add_array(a1path.clone(), zarr_meta.clone()).await?;
ds.add_array(a2path.clone(), zarr_meta.clone()).await?;

let _ = ds.commit("main", "first commit", None).await?;

// there should be no manifests yet because we didn't add any chunks
assert_eq!(
0,
in_mem_storage
.all_keys()
.await?
.iter()
.filter(|key| key.contains("manifest"))
.count(),
);
// there should be two snapshots, one for the initialization commit and one for the real
// commit
assert_eq!(
2,
in_mem_storage
.all_keys()
.await?
.iter()
.filter(|key| key.contains("snapshot"))
.count(),
);

// add 3 chunks
ds.set_chunk_ref(
a1path.clone(),
Expand All @@ -1863,6 +1910,17 @@ mod tests {

ds.commit("main", "commit", None).await?;

// there should be one manifest now
assert_eq!(
1,
in_mem_storage
.all_keys()
.await?
.iter()
.filter(|key| key.contains("manifest"))
.count()
);

let manifest_id = match ds.get_array(&a1path).await?.node_data {
NodeData::Array(_, manifests) => {
manifests.first().as_ref().unwrap().object_id.clone()
Expand All @@ -1874,6 +1932,18 @@ mod tests {

ds.delete_array(a2path).await?;
ds.commit("main", "array2 deleted", None).await?;

// there should be two manifests
assert_eq!(
2,
in_mem_storage
.all_keys()
.await?
.iter()
.filter(|key| key.contains("manifest"))
.count()
);

let manifest_id = match ds.get_array(&a1path).await?.node_data {
NodeData::Array(_, manifests) => {
manifests.first().as_ref().unwrap().object_id.clone()
Expand All @@ -1888,6 +1958,28 @@ mod tests {
// delete a chunk
ds.set_chunk_ref(a1path.clone(), ChunkIndices(vec![0, 0]), None).await?;
ds.commit("main", "chunk deleted", None).await?;

// there should be three manifests
assert_eq!(
3,
in_mem_storage
.all_keys()
.await?
.iter()
.filter(|key| key.contains("manifest"))
.count()
);
// there should be five snapshots
assert_eq!(
5,
in_mem_storage
.all_keys()
.await?
.iter()
.filter(|key| key.contains("snapshot"))
.count(),
);

let manifest_id = match ds.get_array(&a1path).await?.node_data {
NodeData::Array(_, manifests) => {
manifests.first().as_ref().unwrap().object_id.clone()
Expand Down
12 changes: 12 additions & 0 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ impl ObjectStorage {
})
}

/// Return all keys in the store
///
/// Intended for testing and debugging purposes only.
pub async fn all_keys(&self) -> StorageResult<Vec<String>> {
Ok(self
.store
.list(None)
.map_ok(|obj| obj.location.to_string())
.try_collect()
.await?)
}

fn get_path<const SIZE: usize, T: FileTypeTag>(
&self,
file_prefix: &str,
Expand Down

0 comments on commit 95bc718

Please sign in to comment.