Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compact stream chunk with sink key when sink key mismatch #15345

Merged
merged 15 commits into from
Mar 22, 2024
1 change: 0 additions & 1 deletion e2e_test/batch/aggregate/two_phase_agg.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ SET RW_IMPLICIT_FLUSH TO true;

statement ok
SET RW_ENABLE_TWO_PHASE_AGG=false;

# This should override `RW_ENABLE_TWO_PHASE_AGG`, enabling it.
statement ok
SET RW_FORCE_TWO_PHASE_AGG=true;
Expand Down
232 changes: 201 additions & 31 deletions src/common/src/array/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use itertools::Itertools;
use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed};

use super::stream_chunk::{OpRowMutRef, StreamChunkMut};
use super::stream_chunk_builder::StreamChunkBuilder;
use super::stream_record::Record;
use super::DataType;
use crate::array::{Op, RowRef, StreamChunk};
use crate::row::{Project, RowExt};
use crate::util::hash_util::Crc32FastBuilder;
Expand Down Expand Up @@ -82,20 +85,104 @@ impl<'a> OpRowMutRefTuple<'a> {
type OpRowMap<'a, 'b> =
HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;

impl StreamChunkCompactor {
pub fn new(stream_key: Vec<usize>) -> Self {
#[derive(Clone, Debug)]
pub enum RowOp<'a> {
Insert(RowRef<'a>),
Delete(RowRef<'a>),
/// (old_value, new_value)
Update((RowRef<'a>, RowRef<'a>)),
}

pub struct RowOpMap<'a, 'b> {
map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
}

impl<'a, 'b> RowOpMap<'a, 'b> {
fn with_capacity(estimate_size: usize) -> Self {
Self {
stream_key,
chunks: vec![],
map: new_prehashed_map_with_capacity(estimate_size),
}
}

pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.stream_key)
pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
let entry = self.map.entry(k);
match entry {
Entry::Vacant(e) => {
e.insert(RowOp::Insert(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Delete(ref old_v) => {
e.insert(RowOp::Update((*old_v, v)));
}
RowOp::Insert(_) | RowOp::Update(_) => {
tracing::warn!("double insert");
}
},
}
}

pub fn push_chunk(&mut self, c: StreamChunk) {
self.chunks.push(c);
pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
let entry = self.map.entry(k);
match entry {
Entry::Vacant(e) => {
e.insert(RowOp::Delete(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Insert(_) => {
e.remove();
}
RowOp::Update((ref prev, _)) => {
e.insert(RowOp::Delete(*prev));
}
RowOp::Delete(_) => {
tracing::warn!("double delete");
}
},
}
}

pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
let mut ret = vec![];
let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
for (_, row_op) in self.map {
match row_op {
RowOp::Insert(row) => {
if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
ret.push(c)
}
}
RowOp::Delete(row) => {
if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
ret.push(c)
}
}
RowOp::Update((old, new)) => {
if old == new {
continue;
}
if let Some(c) = builder.append_record(Record::Update {
old_row: old,
new_row: new,
}) {
ret.push(c)
}
}
}
}
if let Some(c) = builder.take() {
ret.push(c);
}
ret
}
}

impl StreamChunkCompactor {
pub fn new(stream_key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
st1page marked this conversation as resolved.
Show resolved Hide resolved
Self { chunks, stream_key }
}

pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.stream_key)
}

/// Compact a chunk by modifying the ops and the visibility of a stream chunk. All UPDATE INSERT
Expand Down Expand Up @@ -153,11 +240,47 @@ impl StreamChunkCompactor {
}
chunks.into_iter().map(|(_, c)| c.into())
}

/// re-construct the stream chunks to compact them with the stream key.
st1page marked this conversation as resolved.
Show resolved Hide resolved
pub fn reconstructed_compacted_chunks(
self,
chunk_size: usize,
data_types: Vec<DataType>,
st1page marked this conversation as resolved.
Show resolved Hide resolved
) -> Vec<StreamChunk> {
let (chunks, key_indices) = self.into_inner();

let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
let chunks: Vec<(_, _, _)> = chunks
.into_iter()
.map(|c| {
let (c, ops) = c.into_parts();
let hash_values = c
.get_hash_values(&key_indices, Crc32FastBuilder)
.into_iter()
.map(|hash| hash.value())
.collect_vec();
(hash_values, ops, c)
})
.collect_vec();
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
let mut map = RowOpMap::with_capacity(estimate_size);
for (hash_values, ops, c) in &chunks {
for row in c.rows() {
let hash = hash_values[row.index()];
let op = ops[row.index()];
let stream_key = row.project(&key_indices);
st1page marked this conversation as resolved.
Show resolved Hide resolved
let k = Prehashed::new(stream_key, hash);
match op {
Op::Insert | Op::UpdateInsert => map.insert(k, row),
Op::Delete | Op::UpdateDelete => map.delete(k, row),
}
}
}
map.into_chunks(chunk_size, data_types)
}
}

pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> StreamChunk {
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(stream_chunk);
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]);
compactor.into_compacted_chunks().next().unwrap()
}

Expand All @@ -170,27 +293,29 @@ mod tests {
#[test]
fn test_merge_chunk_row() {
let pk_indices = [0, 1];
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
));
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
));
let chunks = vec![
StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
),
StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
),
];
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
let mut iter = compactor.into_compacted_chunks();
assert_eq!(
iter.next().unwrap().compact(),
Expand All @@ -213,4 +338,49 @@ mod tests {

assert_eq!(iter.next(), None);
}

#[test]
fn test_compact_chunk_row() {
let pk_indices = [0, 1];
let chunks = vec![
StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
),
StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
),
];
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);

let chunks = compactor.reconstructed_compacted_chunks(
100,
vec![DataType::Int64, DataType::Int64, DataType::Int64],
);
assert_eq!(
chunks.into_iter().next().unwrap(),
StreamChunk::from_pretty(
" I I I
+ 2 5 5
- 6 6 9
+ 4 9 2
U- 1 1 1
U+ 1 1 2
+ 2 2 2",
)
);
}
}
Loading
Loading