Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compact stream chunk with sink key when sink key mismatch #15345

Merged
merged 15 commits into from
Mar 22, 2024
260 changes: 218 additions & 42 deletions src/common/src/array/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ use itertools::Itertools;
use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed};

use super::stream_chunk::{OpRowMutRef, StreamChunkMut};
use super::stream_chunk_builder::StreamChunkBuilder;
use super::stream_record::Record;
use super::DataType;
use crate::array::{Op, RowRef, StreamChunk};
use crate::row::{Project, RowExt};
use crate::util::hash_util::Crc32FastBuilder;

/// Compact the stream chunks with just modify the `Ops` and visibility of the chunk. Currently, two
/// transformation will be applied
/// - remove intermediate operation of the same key. The operations of the same stream key will only
/// have three kind of patterns Insert, Delete or Update.
/// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be
/// removed.
/// A helper to compact the stream chunks with just modify the `Ops` and visibility of the chunk.
pub struct StreamChunkCompactor {
chunks: Vec<StreamChunk>,
stream_key: Vec<usize>,
key: Vec<usize>,
}

struct OpRowMutRefTuple<'a> {
Expand Down Expand Up @@ -82,24 +80,119 @@ impl<'a> OpRowMutRefTuple<'a> {
type OpRowMap<'a, 'b> =
HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;

impl StreamChunkCompactor {
pub fn new(stream_key: Vec<usize>) -> Self {
#[derive(Clone, Debug)]
pub enum RowOp<'a> {
Insert(RowRef<'a>),
Delete(RowRef<'a>),
/// (old_value, new_value)
Update((RowRef<'a>, RowRef<'a>)),
}

pub struct RowOpMap<'a, 'b> {
map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
}

impl<'a, 'b> RowOpMap<'a, 'b> {
fn with_capacity(estimate_size: usize) -> Self {
Self {
stream_key,
chunks: vec![],
map: new_prehashed_map_with_capacity(estimate_size),
}
}

pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.stream_key)
pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
let entry = self.map.entry(k);
match entry {
Entry::Vacant(e) => {
e.insert(RowOp::Insert(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Delete(ref old_v) => {
e.insert(RowOp::Update((*old_v, v)));
}
RowOp::Insert(_) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log more information in the pk to help debug?

tracing::warn!("double insert for the same pk");
e.insert(RowOp::Insert(v));
}
RowOp::Update((ref old_v, _)) => {
tracing::warn!("double insert for the same pk");
e.insert(RowOp::Update((*old_v, v)));
}
},
}
}

pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
let entry = self.map.entry(k);
match entry {
Entry::Vacant(e) => {
e.insert(RowOp::Delete(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Insert(_) => {
e.remove();
}
RowOp::Update((ref prev, _)) => {
e.insert(RowOp::Delete(*prev));
}
RowOp::Delete(_) => {
tracing::warn!("double delete for the same pk");
e.insert(RowOp::Delete(v));
}
},
}
}

pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
let mut ret = vec![];
let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
for (_, row_op) in self.map {
match row_op {
RowOp::Insert(row) => {
if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
ret.push(c)
}
}
RowOp::Delete(row) => {
if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
ret.push(c)
}
}
RowOp::Update((old, new)) => {
if old == new {
continue;
}
if let Some(c) = builder.append_record(Record::Update {
old_row: old,
new_row: new,
}) {
ret.push(c)
}
}
}
}
if let Some(c) = builder.take() {
ret.push(c);
}
ret
}
}

pub fn push_chunk(&mut self, c: StreamChunk) {
self.chunks.push(c);
impl StreamChunkCompactor {
pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
Self { chunks, key }
}

/// Compact a chunk by modifying the ops and the visibility of a stream chunk. All UPDATE INSERT
/// and UPDATE DELETE will be converted to INSERT and DELETE, and dropped according to
pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.key)
}

/// Compact a chunk by modifying the ops and the visibility of a stream chunk.
/// Currently, two transformation will be applied
/// - remove intermediate operation of the same key. The operations of the same stream key will only
/// have three kind of patterns Insert, Delete or Update.
/// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be
/// removed.
/// All UPDATE INSERT and UPDATE DELETE will be converted to INSERT and DELETE, and dropped according to
/// certain rules (see `merge_insert` and `merge_delete` for more details).
pub fn into_compacted_chunks(self) -> impl Iterator<Item = StreamChunk> {
let (chunks, key_indices) = self.into_inner();
Expand All @@ -123,8 +216,8 @@ impl StreamChunkCompactor {
for (row, mut op_row) in c.to_rows_mut() {
op_row.set_op(op_row.op().normalize_update());
let hash = hash_values[row.index()];
let stream_key = row.project(&key_indices);
match op_row_map.entry(Prehashed::new(stream_key, hash)) {
let key = row.project(&key_indices);
match op_row_map.entry(Prehashed::new(key, hash)) {
Entry::Vacant(v) => {
v.insert(OpRowMutRefTuple {
previous: None,
Expand Down Expand Up @@ -153,11 +246,47 @@ impl StreamChunkCompactor {
}
chunks.into_iter().map(|(_, c)| c.into())
}

/// re-construct the stream chunks to compact them with the key.
pub fn reconstructed_compacted_chunks(
self,
chunk_size: usize,
data_types: Vec<DataType>,
st1page marked this conversation as resolved.
Show resolved Hide resolved
) -> Vec<StreamChunk> {
let (chunks, key_indices) = self.into_inner();

let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
let chunks: Vec<(_, _, _)> = chunks
.into_iter()
.map(|c| {
let (c, ops) = c.into_parts();
let hash_values = c
.get_hash_values(&key_indices, Crc32FastBuilder)
.into_iter()
.map(|hash| hash.value())
.collect_vec();
(hash_values, ops, c)
})
.collect_vec();
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
let mut map = RowOpMap::with_capacity(estimate_size);
for (hash_values, ops, c) in &chunks {
for row in c.rows() {
let hash = hash_values[row.index()];
let op = ops[row.index()];
let key = row.project(&key_indices);
let k = Prehashed::new(key, hash);
match op {
Op::Insert | Op::UpdateInsert => map.insert(k, row),
Op::Delete | Op::UpdateDelete => map.delete(k, row),
}
}
}
map.into_chunks(chunk_size, data_types)
}
}

pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> StreamChunk {
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(stream_chunk);
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]);
compactor.into_compacted_chunks().next().unwrap()
}

Expand All @@ -170,27 +299,29 @@ mod tests {
#[test]
fn test_merge_chunk_row() {
let pk_indices = [0, 1];
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
));
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
));
let chunks = vec![
StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
),
StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
),
];
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
let mut iter = compactor.into_compacted_chunks();
assert_eq!(
iter.next().unwrap().compact(),
Expand All @@ -213,4 +344,49 @@ mod tests {

assert_eq!(iter.next(), None);
}

#[test]
fn test_compact_chunk_row() {
let pk_indices = [0, 1];
let chunks = vec![
StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
),
StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
),
];
let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);

let chunks = compactor.reconstructed_compacted_chunks(
100,
vec![DataType::Int64, DataType::Int64, DataType::Int64],
);
assert_eq!(
chunks.into_iter().next().unwrap(),
StreamChunk::from_pretty(
" I I I
+ 2 5 5
- 6 6 9
+ 4 9 2
U- 1 1 1
U+ 1 1 2
+ 2 2 2",
)
);
}
}
Loading
Loading