Skip to content

Commit

Permalink
Add size_hint to windows and collect_to operator
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed May 24, 2024
1 parent 3497c5e commit bde7c42
Show file tree
Hide file tree
Showing 8 changed files with 546 additions and 32 deletions.
485 changes: 483 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ indexmap = "2.2.6"
tracing = { version = "0.1.40", features = ["log"] }
quick_cache = "0.5.1"
dashmap = "5.5.3"
arrow = "51.0.0"

[dev-dependencies]
# for the tests
Expand Down
55 changes: 30 additions & 25 deletions src/operator/window/aggr/collect.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,63 @@
/// TODO

use super::super::*;
use crate::operator::{Data, DataKey, Operator};
use crate::stream::{KeyedStream, WindowedStream};

#[derive(Clone)]
struct CollectVec<I, O, F>
struct CollectTo<I, O>
where
F: Fn(Vec<I>) -> O,
O: Extend<I> + Default,
{
vec: Vec<I>,
f: F,
_o: PhantomData<O>,
collection: O,
_i: PhantomData<I>,
}

impl<I, O> Clone for CollectTo<I, O>
where
O: Extend<I> + Default,
{
fn clone(&self) -> Self {
Self {
collection: Default::default(),
_i: PhantomData,
}
}
}

impl<I, O, F> WindowAccumulator for CollectVec<I, O, F>
impl<I, O> WindowAccumulator for CollectTo<I, O>
where
F: Fn(Vec<I>) -> O + Send + Clone + 'static,
O:,
I: Clone + Send + 'static,
O: Clone + Send + 'static,
O: Extend<I> + Default + Clone+ Send + 'static,
{
type In = I;

type Out = O;

#[inline]
fn process(&mut self, el: Self::In) {
self.vec.push(el);
self.collection.extend(std::iter::once(el));
}

#[inline]
fn output(self) -> Self::Out {
(self.f)(self.vec)
self.collection
}
}

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<Key, Out, OperatorChain, Out, WindowDescr>
impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where
WindowDescr: WindowDescription,
OperatorChain: Operator<KeyValue<Key, Out>> + 'static,
WindowDescr: WindowDescription<Out>,
OperatorChain: Operator<Out = (Key, Out)> + 'static,
Key: DataKey,
Out: Data + Ord,
{
/// Prefer other aggregators if possible as they don't save all elements
pub fn map<NewOut: Data, F: Fn(Vec<Out>) -> NewOut + Send + Clone + 'static>(
pub fn collect_to<C: Extend<Out> + Default + Clone + Send + 'static>(
self,
f: F,
) -> KeyedStream<Key, NewOut, impl Operator<KeyValue<Key, NewOut>>> {
let acc = CollectVec::<Out, NewOut, _> {
vec: Default::default(),
f,
_o: PhantomData,
) -> KeyedStream<impl Operator<Out = (Key, C)>> {
let acc = CollectTo::<Out, C> {
collection: Default::default(),
_i: PhantomData,
};
self.add_window_operator("WindowMap", acc)
self.add_window_operator("WindowCollect", acc)
}
}
8 changes: 7 additions & 1 deletion src/operator/window/aggr/collect_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ where
type In = I;

type Out = O;


#[inline]
fn process(&mut self, el: Self::In) {
Expand All @@ -31,6 +32,11 @@ where
fn output(self) -> Self::Out {
(self.f)(self.vec)
}

#[inline]
fn size_hint(&mut self, size: usize) {
self.vec.reserve(size);
}
}

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
Expand Down Expand Up @@ -59,6 +65,6 @@ where
f: |v| v,
_o: PhantomData,
};
self.add_window_operator("WindowMap", acc)
self.add_window_operator("WindowCollectVec", acc)
}
}
1 change: 1 addition & 0 deletions src/operator/window/aggr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod fold;
pub(super) use fold::{Fold, FoldFirst};

mod collect_vec;
mod collect;
mod count;
mod join;
mod max;
Expand Down
4 changes: 3 additions & 1 deletion src/operator/window/descr/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ impl<T: Data> WindowDescription<T> for CountWindow {

#[inline]
fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
let mut init = accumulator;
init.size_hint(self.size);
CountWindowManager {
init: accumulator,
init,
size: self.size,
slide: self.slide,
exact: self.exact,
Expand Down
18 changes: 17 additions & 1 deletion src/operator/window/descr/event_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::VecDeque;
use super::super::*;
use crate::operator::{Data, StreamElement, Timestamp};

#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct EventTimeWindowManager<A>
where
A: WindowAccumulator,
Expand All @@ -14,6 +14,22 @@ where
last_watermark: Option<Timestamp>,
ws: VecDeque<Slot<A>>,
}

impl<A> Clone for EventTimeWindowManager<A>
where
A: WindowAccumulator,
{
fn clone(&self) -> Self {
Self {
init: self.init.clone(),
size: self.size.clone(),
slide: self.slide.clone(),
last_watermark: self.last_watermark.clone(),
ws: self.ws.clone(),
}
}
}

impl<A: WindowAccumulator> EventTimeWindowManager<A> {
fn alloc_windows(&mut self, ts: Timestamp) {
assert!(self.last_watermark.map(|w| ts >= w).unwrap_or(true));
Expand Down
6 changes: 4 additions & 2 deletions src/operator/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ pub trait WindowDescription<T> {
///
/// Convention: output will always be called after at least one element has been processed
pub trait WindowAccumulator: Clone + Send + 'static {
type In: Data;
type Out: Data;
type In: Clone + Send + 'static;
type Out: Clone + Send + 'static;

#[allow(unused)]
fn size_hint(&mut self, size: usize) {}
/// Process a single input element updating the state of the accumulator
fn process(&mut self, el: Self::In);
/// Finalize the accumulator and produce a result
Expand Down

0 comments on commit bde7c42

Please sign in to comment.