Skip to content

Commit

Permalink
dry forward and backward iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
wngr committed Mar 16, 2021
1 parent da5b67b commit c91803c
Showing 1 changed file with 98 additions and 137 deletions.
235 changes: 98 additions & 137 deletions banyan/src/forest/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ use libipld::cbor::DagCbor;
use smallvec::{smallvec, SmallVec};

use std::{iter, sync::Arc, time::Instant};
enum Mode {
Forward,
Backward,
}
pub(crate) struct ForestIter<T: TreeTypes, V, R, Q: Query<T>, F> {
forest: Forest<T, V, R>,
offset: u64,
query: Q,
mk_extra: F,
index_stack: SmallVec<[Arc<Index<T>>; 64]>,
pos_stack: SmallVec<[(usize, SmallVec<[bool; 64]>); 32]>,
mode: Mode,
}

impl<T: TreeTypes, V, R, Q, E, F> ForestIter<T, V, R, Q, F>
Expand Down Expand Up @@ -51,6 +56,7 @@ where
mk_extra,
index_stack,
pos_stack,
mode: Mode::Forward,
}
}
pub(crate) fn new_rev(
Expand All @@ -70,6 +76,7 @@ where
mk_extra,
index_stack,
pos_stack,
mode: Mode::Backward,
}
}
}
Expand All @@ -86,123 +93,8 @@ where
F: Fn(IndexRef<T>) -> E + Send + Sync + 'static,
{
fn next_back(&mut self) -> Option<Self::Item> {
let res: FilteredChunk<T, V, E> = 'outer: loop {
let head = match self.index_stack.last() {
Some(i) => i,
// Nothing to do ..
_ => return None,
};

let (pos, matching) = self.pos_stack.last_mut().expect("not empty");

// Branch is exhausted: Ascend.
if *pos == 0 {
// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();

// increase last stack ptr, if there is still something left to
// traverse
if !self.index_stack.is_empty() {
let last = self.pos_stack.last_mut().expect("not empty");
last.0 -= 1;
}
continue;
}

match self.forest.load_node(head) {
Ok(NodeInfo::Branch(index, branch)) => {
// we hit this branch node for the first time. Apply the
// query on its children and store it
if *pos == usize::MAX {
let mut q_matching = smallvec![true; index.summaries.len()];
self.query
.intersecting(self.offset - index.count, index, &mut q_matching);
debug_assert_eq!(branch.children.len(), q_matching.len());
let _ = std::mem::replace(matching, q_matching);
*pos = branch.children.len();
}

if matching[*pos - 1] {
// Descend into next child
self.index_stack
// TODO: clone :-( ?
.push(Arc::new(branch.children[*pos - 1].clone()));
let new_vec: SmallVec<[_; 64]> = smallvec![matching[*pos - 1]];
self.pos_stack.push((usize::MAX, new_vec));
continue 'outer;
} else {
let index = &branch.children[*pos - 1];

self.offset -= index.count();
let placeholder: FilteredChunk<T, V, E> = FilteredChunk {
range: self.offset + index.count()..self.offset,
data: Vec::new(),
extra: (self.mk_extra)(index.as_index_ref()),
};

*pos -= 1;
break placeholder;
}
}

Ok(NodeInfo::Leaf(index, leaf)) => {
let chunk = {
self.offset -= index.keys.count();
let mut matching: SmallVec<[_; 32]> = smallvec![true; index.keys.len()];
self.query.containing(self.offset, index, &mut matching);
let keys = index.select_keys(&matching);
let elems: Vec<V> = match leaf.as_ref().select(&matching) {
Ok(i) => i,
Err(e) => return Some(Err(e)),
};
let mut pairs = keys
.zip(elems)
.map(|((o, k), v)| (o + self.offset, k, v))
.collect::<Vec<_>>();
pairs.reverse();

FilteredChunk {
range: self.offset + index.keys.count()..self.offset,
data: pairs,
extra: (self.mk_extra)(IndexRef::Leaf(index)),
}
};

// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("not empty");
last.0 -= 1;

break chunk;
}

// even for purged leafs and branches or ignored chunks,
// produce a placeholder.
//
// the caller can find out if we skipped purged parts of the
// tree by using an appropriate mk_extra fn, or check
// `data.len()`.
Ok(_) => {
// Ascend to parent's node
let index = self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("Index stack not empty");
last.0 -= 1;
self.offset -= index.count();

let placeholder: FilteredChunk<T, V, E> = FilteredChunk {
range: self.offset + index.count()..self.offset,
data: Vec::new(),
extra: (self.mk_extra)(index.as_index_ref()),
};
break placeholder;
}
Err(e) => return Some(Err(e)),
};
};
Some(Ok(res))
debug_assert!(matches!(self.mode, Mode::Backward));
self.next()
}
}

Expand All @@ -228,7 +120,12 @@ where
let (pos, matching) = self.pos_stack.last_mut().expect("not empty");

// Branch is exhausted: Ascend.
if *pos == matching.len() {
let exhausted = match self.mode {
Mode::Forward => *pos == matching.len(),
Mode::Backward => *pos == 0,
};

if exhausted {
// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
Expand All @@ -237,71 +134,124 @@ where
// traverse
if !self.index_stack.is_empty() {
let last = self.pos_stack.last_mut().expect("not empty");
last.0 += 1;
match self.mode {
Mode::Forward => {
last.0 += 1;
}
Mode::Backward => {
last.0 -= 1;
}
}
}
continue;
continue 'outer;
}

match self.forest.load_node(head) {
Ok(NodeInfo::Branch(index, branch)) => {
// we hit this branch node for the first time. Apply the
// query on its children and store it
if *pos == 0 {
let new_branch = match self.mode {
Mode::Forward => *pos == 0,
Mode::Backward => *pos == usize::MAX,
};
if new_branch {
let mut q_matching = smallvec![true; index.summaries.len()];
self.query.intersecting(self.offset, index, &mut q_matching);
let start_offset = match self.mode {
Mode::Forward => self.offset,
Mode::Backward => self.offset - index.count,
};
self.query
.intersecting(start_offset, index, &mut q_matching);
debug_assert_eq!(branch.children.len(), q_matching.len());
let _ = std::mem::replace(matching, q_matching);

if matches!(self.mode, Mode::Backward) {
*pos = branch.children.len();
}
}

if matching[*pos] {
let next_idx = match self.mode {
Mode::Forward => *pos,
Mode::Backward => *pos - 1,
};
if matching[next_idx] {
// Descend into next child
self.index_stack
// TODO: clone :-( ?
.push(Arc::new(branch.children[*pos].clone()));
let new_vec: SmallVec<[_; 64]> = smallvec![matching[*pos]];
self.pos_stack.push((0, new_vec));
.push(Arc::new(branch.children[next_idx].clone()));
let new_vec: SmallVec<[_; 64]> = smallvec![matching[next_idx]];
let start_pos = match self.mode {
Mode::Forward => 0,
Mode::Backward => usize::MAX,
};
self.pos_stack.push((start_pos, new_vec));
continue 'outer;
} else {
let index = &branch.children[*pos];

let index = &branch.children[next_idx];

let range = match self.mode {
Mode::Forward => {
self.offset += index.count();
*pos += 1;
self.offset - index.count()..self.offset
}
Mode::Backward => {
self.offset -= index.count();
*pos -= 1;
self.offset + index.count()..self.offset
}
};
let placeholder: FilteredChunk<T, V, E> = FilteredChunk {
range: self.offset..self.offset + index.count(),
range,
data: Vec::new(),
extra: (self.mk_extra)(index.as_index_ref()),
};

self.offset += index.count();
*pos += 1;
break placeholder;
}
}

Ok(NodeInfo::Leaf(index, leaf)) => {
let chunk = {
if matches!(self.mode, Mode::Backward) {
self.offset -= index.keys.count();
}
let mut matching: SmallVec<[_; 32]> = smallvec![true; index.keys.len()];
self.query.containing(self.offset, index, &mut matching);
let keys = index.select_keys(&matching);
let elems: Vec<V> = match leaf.as_ref().select(&matching) {
Ok(i) => i,
Err(e) => return Some(Err(e)),
};
let pairs = keys
let mut pairs = keys
.zip(elems)
.map(|((o, k), v)| (o + self.offset, k, v))
.collect::<Vec<_>>();
if matches!(self.mode, Mode::Backward) {
pairs.reverse();
}
FilteredChunk {
range: self.offset..self.offset + index.keys.count(),
data: pairs,
extra: (self.mk_extra)(IndexRef::Leaf(index)),
}
};
self.offset += index.keys.count();
if matches!(self.mode, Mode::Forward) {
self.offset += index.keys.count();
}

// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("not empty");
last.0 += 1;
match self.mode {
Mode::Forward => {
last.0 += 1;
}
Mode::Backward => {
last.0 -= 1;
}
}

break chunk;
}
Expand All @@ -317,11 +267,22 @@ where
let index = self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("Index stack not empty");
last.0 += 1;
self.offset += index.count();

let range = match self.mode {
Mode::Forward => {
self.offset += index.count();
last.0 += 1;
self.offset - index.count()..self.offset
}
Mode::Backward => {
self.offset -= index.count();
last.0 -= 1;
self.offset + index.count()..self.offset
}
};

let placeholder: FilteredChunk<T, V, E> = FilteredChunk {
range: self.offset..self.offset + index.count(),
range,
data: Vec::new(),
extra: (self.mk_extra)(index.as_index_ref()),
};
Expand Down

0 comments on commit c91803c

Please sign in to comment.