Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compact stream chunk with sink key when sink key mismatch #15345

Merged
merged 15 commits into from
Mar 22, 2024
238 changes: 207 additions & 31 deletions src/common/src/array/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use itertools::Itertools;
use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed};

use super::stream_chunk::{OpRowMutRef, StreamChunkMut};
use super::stream_chunk_builder::StreamChunkBuilder;
use super::stream_record::Record;
use super::DataType;
use crate::array::{Op, RowRef, StreamChunk};
use crate::row::{Project, RowExt};
use crate::util::hash_util::Crc32FastBuilder;
Expand Down Expand Up @@ -82,20 +85,110 @@ impl<'a> OpRowMutRefTuple<'a> {
type OpRowMap<'a, 'b> =
HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;

impl StreamChunkCompactor {
pub fn new(stream_key: Vec<usize>) -> Self {
#[derive(Clone, Debug)]
pub enum RowOp<'a> {
Insert(RowRef<'a>),
Delete(RowRef<'a>),
/// (old_value, new_value)
Update((RowRef<'a>, RowRef<'a>)),
}

pub struct RowOpMap<'a, 'b> {
map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
}

impl<'a, 'b> RowOpMap<'a, 'b> {
fn with_capacity(estimate_size: usize) -> Self {
Self {
stream_key,
chunks: vec![],
map: new_prehashed_map_with_capacity(estimate_size),
}
}

pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.stream_key)
pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
let entry = self.map.entry(k);
match entry {
Entry::Vacant(e) => {
e.insert(RowOp::Insert(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Delete(ref old_v) => {
e.insert(RowOp::Update((*old_v, v)));
}
RowOp::Insert(_) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log more information in the pk to help debug?

tracing::warn!("double insert for the same pk");
e.insert(RowOp::Insert(v));
}
RowOp::Update((ref old_v, _)) => {
tracing::warn!("double insert for the same pk");
e.insert(RowOp::Update((*old_v, v)));
}
},
}
}

pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
let entry = self.map.entry(k);
match entry {
Entry::Vacant(e) => {
e.insert(RowOp::Delete(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Insert(_) => {
e.remove();
}
RowOp::Update((ref prev, _)) => {
e.insert(RowOp::Delete(*prev));
}
RowOp::Delete(_) => {
tracing::warn!("double delete for the same pk");
e.insert(RowOp::Delete(v));
}
},
}
}

pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
let mut ret = vec![];
let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
for (_, row_op) in self.map {
match row_op {
RowOp::Insert(row) => {
if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
ret.push(c)
}
}
RowOp::Delete(row) => {
if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
ret.push(c)
}
}
RowOp::Update((old, new)) => {
if old == new {
continue;
}
if let Some(c) = builder.append_record(Record::Update {
old_row: old,
new_row: new,
}) {
ret.push(c)
}
}
}
}
if let Some(c) = builder.take() {
ret.push(c);
}
ret
}
}

impl StreamChunkCompactor {
pub fn new(stream_key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
st1page marked this conversation as resolved.
Show resolved Hide resolved
Self { chunks, stream_key }
}

pub fn push_chunk(&mut self, c: StreamChunk) {
self.chunks.push(c);
pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.stream_key)
}

/// Compact a chunk by modifying the ops and the visibility of a stream chunk. All UPDATE INSERT
Expand Down Expand Up @@ -153,11 +246,47 @@ impl StreamChunkCompactor {
}
chunks.into_iter().map(|(_, c)| c.into())
}

/// re-construct the stream chunks to compact them with the stream key.
st1page marked this conversation as resolved.
Show resolved Hide resolved
pub fn reconstructed_compacted_chunks(
self,
chunk_size: usize,
data_types: Vec<DataType>,
st1page marked this conversation as resolved.
Show resolved Hide resolved
) -> Vec<StreamChunk> {
let (chunks, key_indices) = self.into_inner();

let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
let chunks: Vec<(_, _, _)> = chunks
.into_iter()
.map(|c| {
let (c, ops) = c.into_parts();
let hash_values = c
.get_hash_values(&key_indices, Crc32FastBuilder)
.into_iter()
.map(|hash| hash.value())
.collect_vec();
(hash_values, ops, c)
})
.collect_vec();
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
let mut map = RowOpMap::with_capacity(estimate_size);
for (hash_values, ops, c) in &chunks {
for row in c.rows() {
let hash = hash_values[row.index()];
let op = ops[row.index()];
let stream_key = row.project(&key_indices);
st1page marked this conversation as resolved.
Show resolved Hide resolved
let k = Prehashed::new(stream_key, hash);
match op {
Op::Insert | Op::UpdateInsert => map.insert(k, row),
Op::Delete | Op::UpdateDelete => map.delete(k, row),
}
}
}
map.into_chunks(chunk_size, data_types)
}
}

pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> StreamChunk {
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(stream_chunk);
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]);
compactor.into_compacted_chunks().next().unwrap()
}

Expand All @@ -170,27 +299,29 @@ mod tests {
#[test]
fn test_merge_chunk_row() {
let pk_indices = [0, 1];
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
));
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
));
let chunks = vec![
StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
),
StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
),
];
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
let mut iter = compactor.into_compacted_chunks();
assert_eq!(
iter.next().unwrap().compact(),
Expand All @@ -213,4 +344,49 @@ mod tests {

assert_eq!(iter.next(), None);
}

#[test]
fn test_compact_chunk_row() {
let pk_indices = [0, 1];
let chunks = vec![
StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
),
StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
),
];
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);

let chunks = compactor.reconstructed_compacted_chunks(
100,
vec![DataType::Int64, DataType::Int64, DataType::Int64],
);
assert_eq!(
chunks.into_iter().next().unwrap(),
StreamChunk::from_pretty(
" I I I
+ 2 5 5
- 6 6 9
+ 4 9 2
U- 1 1 1
U+ 1 1 2
+ 2 2 2",
)
);
}
}
Loading
Loading