Skip to content

Commit

Permalink
feat: compact stream chunk with sink key when sink key mismatch (#15345
Browse files Browse the repository at this point in the history
…) (#15860)

Co-authored-by: stonepage <[email protected]>
Co-authored-by: Yuhao Su <[email protected]>
  • Loading branch information
3 people authored Mar 22, 2024
1 parent 910b047 commit 1095a4f
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 120 deletions.
260 changes: 218 additions & 42 deletions src/common/src/array/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ use itertools::Itertools;
use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed};

use super::stream_chunk::{OpRowMutRef, StreamChunkMut};
use super::stream_chunk_builder::StreamChunkBuilder;
use super::stream_record::Record;
use super::DataType;
use crate::array::{Op, RowRef, StreamChunk};
use crate::row::{Project, RowExt};
use crate::util::hash_util::Crc32FastBuilder;

/// Compact the stream chunks with just modify the `Ops` and visibility of the chunk. Currently, two
/// transformation will be applied
/// - remove intermediate operation of the same key. The operations of the same stream key will only
/// have three kind of patterns Insert, Delete or Update.
/// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be
/// removed.
/// A helper to compact the stream chunks with just modify the `Ops` and visibility of the chunk.
pub struct StreamChunkCompactor {
chunks: Vec<StreamChunk>,
stream_key: Vec<usize>,
key: Vec<usize>,
}

struct OpRowMutRefTuple<'a> {
Expand Down Expand Up @@ -82,24 +80,119 @@ impl<'a> OpRowMutRefTuple<'a> {
type OpRowMap<'a, 'b> =
HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;

impl StreamChunkCompactor {
pub fn new(stream_key: Vec<usize>) -> Self {
#[derive(Clone, Debug)]
pub enum RowOp<'a> {
Insert(RowRef<'a>),
Delete(RowRef<'a>),
/// (old_value, new_value)
Update((RowRef<'a>, RowRef<'a>)),
}

pub struct RowOpMap<'a, 'b> {
map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
}

impl<'a, 'b> RowOpMap<'a, 'b> {
fn with_capacity(estimate_size: usize) -> Self {
Self {
stream_key,
chunks: vec![],
map: new_prehashed_map_with_capacity(estimate_size),
}
}

pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.stream_key)
pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
let entry = self.map.entry(k);
match entry {
Entry::Vacant(e) => {
e.insert(RowOp::Insert(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Delete(ref old_v) => {
e.insert(RowOp::Update((*old_v, v)));
}
RowOp::Insert(_) => {
tracing::warn!("double insert for the same pk");
e.insert(RowOp::Insert(v));
}
RowOp::Update((ref old_v, _)) => {
tracing::warn!("double insert for the same pk");
e.insert(RowOp::Update((*old_v, v)));
}
},
}
}

pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
let entry = self.map.entry(k);
match entry {
Entry::Vacant(e) => {
e.insert(RowOp::Delete(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Insert(_) => {
e.remove();
}
RowOp::Update((ref prev, _)) => {
e.insert(RowOp::Delete(*prev));
}
RowOp::Delete(_) => {
tracing::warn!("double delete for the same pk");
e.insert(RowOp::Delete(v));
}
},
}
}

pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
let mut ret = vec![];
let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
for (_, row_op) in self.map {
match row_op {
RowOp::Insert(row) => {
if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
ret.push(c)
}
}
RowOp::Delete(row) => {
if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
ret.push(c)
}
}
RowOp::Update((old, new)) => {
if old == new {
continue;
}
if let Some(c) = builder.append_record(Record::Update {
old_row: old,
new_row: new,
}) {
ret.push(c)
}
}
}
}
if let Some(c) = builder.take() {
ret.push(c);
}
ret
}
}

pub fn push_chunk(&mut self, c: StreamChunk) {
self.chunks.push(c);
impl StreamChunkCompactor {
pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
Self { chunks, key }
}

/// Compact a chunk by modifying the ops and the visibility of a stream chunk. All UPDATE INSERT
/// and UPDATE DELETE will be converted to INSERT and DELETE, and dropped according to
pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.key)
}

/// Compact a chunk by modifying the ops and the visibility of a stream chunk.
/// Currently, two transformation will be applied
/// - remove intermediate operation of the same key. The operations of the same stream key will only
/// have three kind of patterns Insert, Delete or Update.
/// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be
/// removed.
/// All UPDATE INSERT and UPDATE DELETE will be converted to INSERT and DELETE, and dropped according to
/// certain rules (see `merge_insert` and `merge_delete` for more details).
pub fn into_compacted_chunks(self) -> impl Iterator<Item = StreamChunk> {
let (chunks, key_indices) = self.into_inner();
Expand All @@ -123,8 +216,8 @@ impl StreamChunkCompactor {
for (row, mut op_row) in c.to_rows_mut() {
op_row.set_op(op_row.op().normalize_update());
let hash = hash_values[row.index()];
let stream_key = row.project(&key_indices);
match op_row_map.entry(Prehashed::new(stream_key, hash)) {
let key = row.project(&key_indices);
match op_row_map.entry(Prehashed::new(key, hash)) {
Entry::Vacant(v) => {
v.insert(OpRowMutRefTuple {
previous: None,
Expand Down Expand Up @@ -153,11 +246,47 @@ impl StreamChunkCompactor {
}
chunks.into_iter().map(|(_, c)| c.into())
}

/// re-construct the stream chunks to compact them with the key.
pub fn reconstructed_compacted_chunks(
self,
chunk_size: usize,
data_types: Vec<DataType>,
) -> Vec<StreamChunk> {
let (chunks, key_indices) = self.into_inner();

let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
let chunks: Vec<(_, _, _)> = chunks
.into_iter()
.map(|c| {
let (c, ops) = c.into_parts();
let hash_values = c
.get_hash_values(&key_indices, Crc32FastBuilder)
.into_iter()
.map(|hash| hash.value())
.collect_vec();
(hash_values, ops, c)
})
.collect_vec();
let mut map = RowOpMap::with_capacity(estimate_size);
for (hash_values, ops, c) in &chunks {
for row in c.rows() {
let hash = hash_values[row.index()];
let op = ops[row.index()];
let key = row.project(&key_indices);
let k = Prehashed::new(key, hash);
match op {
Op::Insert | Op::UpdateInsert => map.insert(k, row),
Op::Delete | Op::UpdateDelete => map.delete(k, row),
}
}
}
map.into_chunks(chunk_size, data_types)
}
}

pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> StreamChunk {
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(stream_chunk);
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]);
compactor.into_compacted_chunks().next().unwrap()
}

Expand All @@ -170,27 +299,29 @@ mod tests {
#[test]
fn test_merge_chunk_row() {
let pk_indices = [0, 1];
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
));
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
));
let chunks = vec![
StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
),
StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
),
];
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
let mut iter = compactor.into_compacted_chunks();
assert_eq!(
iter.next().unwrap().compact(),
Expand All @@ -213,4 +344,49 @@ mod tests {

assert_eq!(iter.next(), None);
}

#[test]
fn test_compact_chunk_row() {
let pk_indices = [0, 1];
let chunks = vec![
StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
),
StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
),
];
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);

let chunks = compactor.reconstructed_compacted_chunks(
100,
vec![DataType::Int64, DataType::Int64, DataType::Int64],
);
assert_eq!(
chunks.into_iter().next().unwrap(),
StreamChunk::from_pretty(
" I I I
+ 2 5 5
- 6 6 9
+ 4 9 2
U- 1 1 1
U+ 1 1 2
+ 2 2 2",
)
);
}
}
Loading

0 comments on commit 1095a4f

Please sign in to comment.