Skip to content

Commit

Permalink
add DoubleEndedIter impl for ForestIter
Browse files Browse the repository at this point in the history
  • Loading branch information
wngr committed Mar 16, 2021
1 parent 455185c commit 5f7ce1e
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 82 deletions.
210 changes: 141 additions & 69 deletions banyan/src/forest/read.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{BranchCache, Config, CryptoConfig, FilteredChunk, Forest, TreeTypes};
use crate::{
index::{
deserialize_compressed, zip_with_offset, Branch, BranchIndex, CompactSeq, Index, IndexRef,
Leaf, LeafIndex, NodeInfo,
deserialize_compressed, Branch, BranchIndex, CompactSeq, Index, IndexRef, Leaf, LeafIndex,
NodeInfo,
},
query::Query,
store::ReadOnlyStore,
Expand All @@ -25,6 +25,140 @@ pub struct ForestIter<T: TreeTypes, V, R, Q: Query<T>, F> {
pub pos_stack: SmallVec<[(usize, SmallVec<[bool; 64]>); 32]>,
}

impl<T: TreeTypes, V, R, Q, E, F> DoubleEndedIterator for ForestIter<T, V, R, Q, F>
where
T: TreeTypes + 'static,
V: DagCbor + Clone + Send + Sync + Debug + 'static,
R: ReadOnlyStore<T::Link> + Clone + Send + Sync + 'static,
Q: Query<T> + Clone + Send + 'static,
E: Send + 'static,
F: Fn(IndexRef<T>) -> E + Send + Sync + 'static,
{
fn next_back(&mut self) -> Option<Self::Item> {
let res: FilteredChunk<T, V, E> = 'outer: loop {
let head = match self.index_stack.last() {
Some(i) => i,
// Nothing to do ..
_ => return None,
};

let (pos, matching) = self.pos_stack.last_mut().expect("not empty");

// Branch is exhausted: Ascend.
if *pos == 0 {
// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();

// increase last stack ptr, if there is still something left to
// traverse
if !self.index_stack.is_empty() {
let last = self.pos_stack.last_mut().expect("not empty");
last.0 -= 1;
}
continue;
}

match self.forest.load_node(head) {
Ok(NodeInfo::Branch(index, branch)) => {
// we hit this branch node for the first time. Apply the
// query on its children and store it
if *pos == usize::MAX {
// If this is the root node, set to max offset
if self.index_stack.len() == 1 {
self.offset += index.count;
}
let mut q_matching = smallvec![true; index.summaries.len()];
self.query
.intersecting(self.offset - index.count, index, &mut q_matching);
debug_assert_eq!(branch.children.len(), q_matching.len());
let _ = std::mem::replace(matching, q_matching);
*pos = branch.children.len();
}

if matching[*pos - 1] {
// Descend into next child
self.index_stack
// TODO: clone :-( ?
.push(Arc::new(branch.children[*pos - 1].clone()));
let new_vec: SmallVec<[_; 64]> = smallvec![matching[*pos - 1]];
self.pos_stack.push((usize::MAX, new_vec));
continue 'outer;
} else {
let index = &branch.children[*pos - 1];

self.offset -= index.count();
let placeholder: FilteredChunk<T, V, E> = FilteredChunk {
range: self.offset..self.offset + index.count(),
data: Vec::new(),
extra: (self.mk_extra)(index.as_index_ref()),
};

*pos -= 1;
break placeholder;
}
}

Ok(NodeInfo::Leaf(index, leaf)) => {
let chunk = {
self.offset -= index.keys.count();
let mut matching: SmallVec<[_; 32]> = smallvec![true; index.keys.len()];
self.query.containing(self.offset, index, &mut matching);
let keys = index.select_keys(&matching);
let elems: Vec<V> = match leaf.as_ref().select(&matching) {
Ok(i) => i,
Err(e) => return Some(Err(e)),
};
let mut pairs = keys
.zip(elems)
.map(|((o, k), v)| (o + self.offset, k, v))
.collect::<Vec<_>>();
pairs.reverse();

FilteredChunk {
range: self.offset..self.offset + index.keys.count(),
data: pairs,
extra: (self.mk_extra)(IndexRef::Leaf(index)),
}
};

// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("not empty");
last.0 -= 1;

break chunk;
}

// even for purged leafs and branches or ignored chunks,
// produce a placeholder.
//
// the caller can find out if we skipped purged parts of the
// tree by using an appropriate mk_extra fn, or check
// `data.len()`.
Ok(_) => {
// Ascend to parent's node
let index = self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("Index stack not empty");
last.0 -= 1;
self.offset -= index.count();

let placeholder: FilteredChunk<T, V, E> = FilteredChunk {
range: self.offset..self.offset + index.count(),
data: Vec::new(),
extra: (self.mk_extra)(index.as_index_ref()),
};
break placeholder;
}
Err(e) => return Some(Err(e)),
};
};
Some(Ok(res))
}
}

impl<T: TreeTypes, V, R, Q, E, F> Iterator for ForestIter<T, V, R, Q, F>
where
T: TreeTypes + 'static,
Expand Down Expand Up @@ -399,73 +533,11 @@ where
index: Arc<Index<T>>,
mk_extra: &'static F,
) -> BoxStream<'static, Result<FilteredChunk<T, V, E>>> {
let this = self.clone();
let s =
async move {
Ok(match this.load_node(&index)? {
NodeInfo::Leaf(index, node) => {
// todo: don't get the node here, since we might not need it
let mut matching = vec![true; index.keys.len()];
query.containing(offset, index, &mut matching);
let keys = index.select_keys(&matching);
let elems: Vec<V> = node.as_ref().select(&matching)?;
let mut pairs = keys
.zip(elems)
.map(|((o, k), v)| (o + offset, k, v))
.collect::<Vec<_>>();
pairs.reverse();
let chunk = FilteredChunk {
range: offset..offset + index.keys.count(),
data: pairs,
extra: mk_extra(IndexRef::Leaf(index)),
};
stream::once(future::ok(chunk)).left_stream().left_stream()
}
NodeInfo::Branch(index, node) => {
// todo: don't get the node here, since we might not need it
let mut matching = vec![true; index.summaries.len()];
query.intersecting(offset, index, &mut matching);
let offsets = zip_with_offset(node.children.to_vec(), offset);
let children: Vec<_> = matching.into_iter().zip(offsets).collect();
let iter = children.into_iter().rev().map(
move |(is_matching, (child, offset))| {
if is_matching {
this.clone()
.stream_filtered_chunked_reverse0(
offset,
query.clone(),
Arc::new(child),
mk_extra,
)
.right_stream()
} else {
let placeholder = FilteredChunk {
range: offset..offset + child.count(),
data: Vec::new(),
extra: mk_extra(child.as_index_ref()),
};
stream::once(future::ok(placeholder)).left_stream()
}
},
);
stream::iter(iter).flatten().right_stream().left_stream()
}
NodeInfo::PurgedBranch(_) | NodeInfo::PurgedLeaf(_) => {
// even for purged leafs and branches, produce a placeholder.
//
// the caller can find out if we skipped purged parts of the tree by
// using an appropriate mk_extra fn.
let placeholder = FilteredChunk {
range: offset..offset + index.count(),
data: Vec::new(),
extra: mk_extra(index.as_index_ref()),
};
stream::once(future::ok(placeholder)).right_stream()
}
})
}
.try_flatten_stream();
Box::pin(s)
let iter = self.traverse_rev0(offset, query, index, mk_extra);
stream::unfold(iter, |mut iter| async move {
iter.next_back().map(|res| (res, iter))
})
.boxed()
}

pub(crate) fn dump0(&self, index: &Index<T>, prefix: &str) -> Result<()> {
Expand Down
12 changes: 0 additions & 12 deletions banyan/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,18 +402,6 @@ pub(crate) fn deserialize_compressed<T: TreeTypes>(
seq.items::<Index<T>>()
}

/// Utility method to zip a number of indices with an offset that is increased by each index value
pub(crate) fn zip_with_offset<'a, I: IntoIterator<Item = Index<T>> + 'a, T: TreeTypes + 'a>(
value: I,
offset: u64,
) -> impl Iterator<Item = (Index<T>, u64)> + 'a {
value.into_iter().scan(offset, |offset, x| {
let o0 = *offset;
*offset += x.count();
Some((x, o0))
})
}

/// Utility method to zip a number of indices with an offset that is increased by each index value
pub(crate) fn zip_with_offset_ref<
'a,
Expand Down
42 changes: 41 additions & 1 deletion banyan/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::index::*;
use crate::{
forest::{FilteredChunk, Forest, ForestIter, Transaction, TreeTypes},
store::BlockWriter,
util::BoxedIter,
util::{BoxedDoubleEndedIter, BoxedIter},
};
use crate::{query::Query, store::ReadOnlyStore, util::IterExt};
use anyhow::Result;
Expand Down Expand Up @@ -111,6 +111,22 @@ impl<
_ => anyhow::bail!("Empty tree"),
}
}
pub fn traverse_rev<
Q: Query<T> + Clone + Send + 'static,
E: Send + 'static,
F: Fn(IndexRef<T>) -> E + Send + Sync + 'static,
>(
&self,
offset: u64,
query: Q,
tree: &Tree<T>,
mk_extra: &'static F,
) -> Result<BoxedDoubleEndedIter<'static, Result<FilteredChunk<T, V, E>>>> {
match tree.root {
Some(ref index) => Ok(self.traverse_rev0(offset, query, index.clone(), mk_extra)),
_ => anyhow::bail!("Empty tree"),
}
}

pub(crate) fn traverse0<
Q: Query<T> + Clone + Send + 'static,
Expand All @@ -136,6 +152,30 @@ impl<
})
}

pub(crate) fn traverse_rev0<
Q: Query<T> + Clone + Send + 'static,
E: Send + 'static,
F: Fn(IndexRef<T>) -> E + Send + Sync + 'static,
>(
&self,
offset: u64,
query: Q,
index: Arc<Index<T>>,
mk_extra: &'static F,
// ) -> BoxedIter<'static, Result<FilteredChunk<T, V, E>>> {
) -> Box<dyn DoubleEndedIterator<Item = Result<FilteredChunk<T, V, E>>> + Send + 'static> {
let index_stack: SmallVec<[_; 64]> = smallvec![index];
let pos_stack: SmallVec<[_; 32]> = smallvec![(usize::MAX, smallvec![true])];

Box::new(ForestIter {
forest: self.clone(),
offset,
query,
mk_extra,
index_stack,
pos_stack,
})
}
pub(crate) fn dump_graph0<S>(
&self,
parent_id: Option<usize>,
Expand Down
1 change: 1 addition & 0 deletions banyan/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ where
}

pub(crate) type BoxedIter<'a, T> = Box<dyn Iterator<Item = T> + Send + 'a>;
pub(crate) type BoxedDoubleEndedIter<'a, T> = Box<dyn DoubleEndedIterator<Item = T> + Send + 'a>;

/// Like the one from itertools, but more convenient
pub(crate) enum EitherIter<L, R> {
Expand Down

0 comments on commit 5f7ce1e

Please sign in to comment.