diff --git a/src/common/src/array/compact_chunk.rs b/src/common/src/array/compact_chunk.rs index 256850337a11..a625777b454b 100644 --- a/src/common/src/array/compact_chunk.rs +++ b/src/common/src/array/compact_chunk.rs @@ -21,19 +21,17 @@ use itertools::Itertools; use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed}; use super::stream_chunk::{OpRowMutRef, StreamChunkMut}; +use super::stream_chunk_builder::StreamChunkBuilder; +use super::stream_record::Record; +use super::DataType; use crate::array::{Op, RowRef, StreamChunk}; use crate::row::{Project, RowExt}; use crate::util::hash_util::Crc32FastBuilder; -/// Compact the stream chunks with just modify the `Ops` and visibility of the chunk. Currently, two -/// transformation will be applied -/// - remove intermediate operation of the same key. The operations of the same stream key will only -/// have three kind of patterns Insert, Delete or Update. -/// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be -/// removed. +/// A helper to compact the stream chunks with just modify the `Ops` and visibility of the chunk. pub struct StreamChunkCompactor { chunks: Vec, - stream_key: Vec, + key: Vec, } struct OpRowMutRefTuple<'a> { @@ -82,24 +80,119 @@ impl<'a> OpRowMutRefTuple<'a> { type OpRowMap<'a, 'b> = HashMap>>, OpRowMutRefTuple<'a>, BuildHasherDefault>; -impl StreamChunkCompactor { - pub fn new(stream_key: Vec) -> Self { +#[derive(Clone, Debug)] +pub enum RowOp<'a> { + Insert(RowRef<'a>), + Delete(RowRef<'a>), + /// (old_value, new_value) + Update((RowRef<'a>, RowRef<'a>)), +} + +pub struct RowOpMap<'a, 'b> { + map: HashMap>>, RowOp<'a>, BuildHasherDefault>, +} + +impl<'a, 'b> RowOpMap<'a, 'b> { + fn with_capacity(estimate_size: usize) -> Self { Self { - stream_key, - chunks: vec![], + map: new_prehashed_map_with_capacity(estimate_size), } } - pub fn into_inner(self) -> (Vec, Vec) { - (self.chunks, self.stream_key) + pub fn insert(&mut self, k: Prehashed>>, v: RowRef<'a>) { + let entry = self.map.entry(k); + match entry { + Entry::Vacant(e) => { + e.insert(RowOp::Insert(v)); + } + Entry::Occupied(mut e) => match e.get() { + RowOp::Delete(ref old_v) => { + e.insert(RowOp::Update((*old_v, v))); + } + RowOp::Insert(_) => { + tracing::warn!("double insert for the same pk"); + e.insert(RowOp::Insert(v)); + } + RowOp::Update((ref old_v, _)) => { + tracing::warn!("double insert for the same pk"); + e.insert(RowOp::Update((*old_v, v))); + } + }, + } + } + + pub fn delete(&mut self, k: Prehashed>>, v: RowRef<'a>) { + let entry = self.map.entry(k); + match entry { + Entry::Vacant(e) => { + e.insert(RowOp::Delete(v)); + } + Entry::Occupied(mut e) => match e.get() { + RowOp::Insert(_) => { + e.remove(); + } + RowOp::Update((ref prev, _)) => { + e.insert(RowOp::Delete(*prev)); + } + RowOp::Delete(_) => { + tracing::warn!("double delete for the same pk"); + e.insert(RowOp::Delete(v)); + } + }, + } + } + + pub fn into_chunks(self, chunk_size: usize, data_types: Vec) -> Vec { + let mut ret = vec![]; + let mut builder = StreamChunkBuilder::new(chunk_size, data_types); + for (_, row_op) in self.map { + match row_op { + RowOp::Insert(row) => { + if let Some(c) = builder.append_record(Record::Insert { new_row: row }) { + ret.push(c) + } + } + RowOp::Delete(row) => { + if let Some(c) = builder.append_record(Record::Delete { old_row: row }) { + ret.push(c) + } + } + RowOp::Update((old, new)) => { + if old == new { + continue; + } + if let Some(c) = builder.append_record(Record::Update { + old_row: old, + new_row: new, + }) { + ret.push(c) + } + } + } + } + if let Some(c) = builder.take() { + ret.push(c); + } + ret } +} - pub fn push_chunk(&mut self, c: StreamChunk) { - self.chunks.push(c); +impl StreamChunkCompactor { + pub fn new(key: Vec, chunks: Vec) -> Self { + Self { chunks, key } } - /// Compact a chunk by modifying the ops and the visibility of a stream chunk. All UPDATE INSERT - /// and UPDATE DELETE will be converted to INSERT and DELETE, and dropped according to + pub fn into_inner(self) -> (Vec, Vec) { + (self.chunks, self.key) + } + + /// Compact a chunk by modifying the ops and the visibility of a stream chunk. + /// Currently, two transformation will be applied + /// - remove intermediate operation of the same key. The operations of the same stream key will only + /// have three kind of patterns Insert, Delete or Update. + /// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be + /// removed. + /// All UPDATE INSERT and UPDATE DELETE will be converted to INSERT and DELETE, and dropped according to /// certain rules (see `merge_insert` and `merge_delete` for more details). pub fn into_compacted_chunks(self) -> impl Iterator { let (chunks, key_indices) = self.into_inner(); @@ -123,8 +216,8 @@ impl StreamChunkCompactor { for (row, mut op_row) in c.to_rows_mut() { op_row.set_op(op_row.op().normalize_update()); let hash = hash_values[row.index()]; - let stream_key = row.project(&key_indices); - match op_row_map.entry(Prehashed::new(stream_key, hash)) { + let key = row.project(&key_indices); + match op_row_map.entry(Prehashed::new(key, hash)) { Entry::Vacant(v) => { v.insert(OpRowMutRefTuple { previous: None, @@ -153,11 +246,47 @@ impl StreamChunkCompactor { } chunks.into_iter().map(|(_, c)| c.into()) } + + /// re-construct the stream chunks to compact them with the key. + pub fn reconstructed_compacted_chunks( + self, + chunk_size: usize, + data_types: Vec, + ) -> Vec { + let (chunks, key_indices) = self.into_inner(); + + let estimate_size = chunks.iter().map(|c| c.cardinality()).sum(); + let chunks: Vec<(_, _, _)> = chunks + .into_iter() + .map(|c| { + let (c, ops) = c.into_parts(); + let hash_values = c + .get_hash_values(&key_indices, Crc32FastBuilder) + .into_iter() + .map(|hash| hash.value()) + .collect_vec(); + (hash_values, ops, c) + }) + .collect_vec(); + let mut map = RowOpMap::with_capacity(estimate_size); + for (hash_values, ops, c) in &chunks { + for row in c.rows() { + let hash = hash_values[row.index()]; + let op = ops[row.index()]; + let key = row.project(&key_indices); + let k = Prehashed::new(key, hash); + match op { + Op::Insert | Op::UpdateInsert => map.insert(k, row), + Op::Delete | Op::UpdateDelete => map.delete(k, row), + } + } + } + map.into_chunks(chunk_size, data_types) + } } pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> StreamChunk { - let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec()); - compactor.push_chunk(stream_chunk); + let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]); compactor.into_compacted_chunks().next().unwrap() } @@ -170,27 +299,29 @@ mod tests { #[test] fn test_merge_chunk_row() { let pk_indices = [0, 1]; - let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec()); - compactor.push_chunk(StreamChunk::from_pretty( - " I I I - - 1 1 1 - + 1 1 2 - + 2 5 7 - + 4 9 2 - - 2 5 7 - + 2 5 5 - - 6 6 9 - + 6 6 9 - - 9 9 1", - )); - compactor.push_chunk(StreamChunk::from_pretty( - " I I I - - 6 6 9 - + 9 9 9 - - 9 9 4 - + 2 2 2 - + 9 9 1", - )); + let chunks = vec![ + StreamChunk::from_pretty( + " I I I + - 1 1 1 + + 1 1 2 + + 2 5 7 + + 4 9 2 + - 2 5 7 + + 2 5 5 + - 6 6 9 + + 6 6 9 + - 9 9 1", + ), + StreamChunk::from_pretty( + " I I I + - 6 6 9 + + 9 9 9 + - 9 9 4 + + 2 2 2 + + 9 9 1", + ), + ]; + let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks); let mut iter = compactor.into_compacted_chunks(); assert_eq!( iter.next().unwrap().compact(), @@ -213,4 +344,49 @@ mod tests { assert_eq!(iter.next(), None); } + + #[test] + fn test_compact_chunk_row() { + let pk_indices = [0, 1]; + let chunks = vec![ + StreamChunk::from_pretty( + " I I I + - 1 1 1 + + 1 1 2 + + 2 5 7 + + 4 9 2 + - 2 5 7 + + 2 5 5 + - 6 6 9 + + 6 6 9 + - 9 9 1", + ), + StreamChunk::from_pretty( + " I I I + - 6 6 9 + + 9 9 9 + - 9 9 4 + + 2 2 2 + + 9 9 1", + ), + ]; + let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks); + + let chunks = compactor.reconstructed_compacted_chunks( + 100, + vec![DataType::Int64, DataType::Int64, DataType::Int64], + ); + assert_eq!( + chunks.into_iter().next().unwrap(), + StreamChunk::from_pretty( + " I I I + + 2 5 5 + - 6 6 9 + + 4 9 2 + U- 1 1 1 + U+ 1 1 2 + + 2 2 2", + ) + ); + } } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 3ce22ee3142e..19e887ebb66f 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -23,6 +23,7 @@ use risingwave_common::array::stream_chunk::StreamChunkMut; use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkCompactor}; use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; +use risingwave_common::types::DataType; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ @@ -49,6 +50,8 @@ pub struct SinkExecutor { sink_param: SinkParam, log_store_factory: F, sink_writer_param: SinkWriterParam, + chunk_size: usize, + input_data_types: Vec, } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. @@ -88,6 +91,8 @@ impl SinkExecutor { sink_param: SinkParam, columns: Vec, log_store_factory: F, + chunk_size: usize, + input_data_types: Vec, ) -> StreamExecutorResult { let sink = build_sink(sink_param.clone())?; let sink_input_schema: Schema = columns @@ -115,6 +120,8 @@ impl SinkExecutor { sink_param, log_store_factory, sink_writer_param, + chunk_size, + input_data_types, }) } @@ -148,11 +155,51 @@ impl SinkExecutor { } }); + // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order + // stream key: a,b + // sink pk: a + + // original: + // (1,1) -> (1,2) + // (1,2) -> (1,3) + + // mv fragment 1: + // delete (1,1) + + // mv fragment 2: + // insert (1,2) + // delete (1,2) + + // mv fragment 3: + // insert (1,3) + + // merge to sink fragment: + // insert (1,3) + // insert (1,2) + // delete (1,2) + // delete (1,1) + // So we do additional compaction in the sink executor per barrier. + + // 1. compact all the changes with the stream key. + // 2. sink all the delete events and then sink all insert events. + + // after compacting with the stream key, the two event with the same user defined sink pk must have different stream key. + // So the delete event is not to delete the inserted record in our internal streaming SQL semantic. + let need_advance_delete = + stream_key_sink_pk_mismatch && self.sink_param.sink_type != SinkType::AppendOnly; + // NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case. + let re_construct_with_sink_pk = need_advance_delete + && self.sink_param.sink_type == SinkType::Upsert + && !self.sink_param.downstream_pk.is_empty(); let processed_input = Self::process_msg( input, self.sink_param.sink_type, stream_key, - stream_key_sink_pk_mismatch, + need_advance_delete, + re_construct_with_sink_pk, + self.chunk_size, + self.input_data_types, + self.sink_param.downstream_pk.clone(), ); if self.sink.is_sink_into_table() { @@ -243,72 +290,64 @@ impl SinkExecutor { } } + #[allow(clippy::too_many_arguments)] #[try_stream(ok = Message, error = StreamExecutorError)] async fn process_msg( input: impl MessageStream, sink_type: SinkType, stream_key: PkIndices, - stream_key_sink_pk_mismatch: bool, + need_advance_delete: bool, + re_construct_with_sink_pk: bool, + chunk_size: usize, + input_data_types: Vec, + down_stream_pk: Vec, ) { - // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order - // stream key: a,b - // sink pk: a - - // original: - // (1,1) -> (1,2) - // (1,2) -> (1,3) - - // mv fragment 1: - // delete (1,1) - - // mv fragment 2: - // insert (1,2) - // delete (1,2) - - // mv fragment 3: - // insert (1,3) - - // merge to sink fragment: - // insert (1,3) - // insert (1,2) - // delete (1,2) - // delete (1,1) - // So we do additional compaction in the sink executor per barrier. - - // 1. compact all the chanes with the stream key. - // 2. sink all the delete events and then sink all insert evernt. - - // after compacting with the stream key, the two event with the same used defined sink pk must have different stream key. - // So the delete event is not to delete the inserted record in our internal streaming SQL semantic. - if stream_key_sink_pk_mismatch && sink_type != SinkType::AppendOnly { - let mut chunk_buffer = StreamChunkCompactor::new(stream_key.clone()); - let mut watermark = None; + // need to buffer chunks during one barrier + if need_advance_delete || re_construct_with_sink_pk { + let mut chunk_buffer = vec![]; + let mut watermark: Option = None; #[for_await] for msg in input { match msg? { Message::Watermark(w) => watermark = Some(w), Message::Chunk(c) => { - chunk_buffer.push_chunk(c); + chunk_buffer.push(c); } Message::Barrier(barrier) => { - let mut delete_chunks = vec![]; - let mut insert_chunks = vec![]; - for c in mem::replace( - &mut chunk_buffer, - StreamChunkCompactor::new(stream_key.clone()), - ) - .into_compacted_chunks() - { - if sink_type != SinkType::ForceAppendOnly { - // Force append-only by dropping UPDATE/DELETE messages. We do this when the - // user forces the sink to be append-only while it is actually not based on - // the frontend derivation result. - delete_chunks.push(force_delete_only(c.clone())); + let chunks = mem::take(&mut chunk_buffer); + let chunks = if need_advance_delete { + let mut delete_chunks = vec![]; + let mut insert_chunks = vec![]; + + for c in StreamChunkCompactor::new(stream_key.clone(), chunks) + .into_compacted_chunks() + { + if sink_type != SinkType::ForceAppendOnly { + // Force append-only by dropping UPDATE/DELETE messages. We do this when the + // user forces the sink to be append-only while it is actually not based on + // the frontend derivation result. + delete_chunks.push(force_delete_only(c.clone())); + } + insert_chunks.push(force_append_only(c)); } - insert_chunks.push(force_append_only(c)); - } + delete_chunks + .into_iter() + .chain(insert_chunks.into_iter()) + .collect() + } else { + chunks + }; + let chunks = if re_construct_with_sink_pk { + StreamChunkCompactor::new(down_stream_pk.clone(), chunks) + .reconstructed_compacted_chunks( + chunk_size, + input_data_types.clone(), + ) + } else { + chunks + }; - for c in delete_chunks.into_iter().chain(insert_chunks.into_iter()) { + for c in chunks { yield Message::Chunk(c); } if let Some(w) = mem::take(&mut watermark) { @@ -514,6 +553,8 @@ mod test { sink_param, columns.clone(), BoundedInMemLogStoreFactory::new(1), + 1024, + vec![DataType::Int32, DataType::Int32, DataType::Int32], ) .await .unwrap(); @@ -603,7 +644,8 @@ mod test { ))), Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( " I I I - - 1 1 10", + - 1 1 10 + + 1 1 40", ))), Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), ]) @@ -639,6 +681,8 @@ mod test { sink_param, columns.clone(), BoundedInMemLogStoreFactory::new(1), + 1024, + vec![DataType::Int64, DataType::Int64, DataType::Int64], ) .await .unwrap(); @@ -648,9 +692,6 @@ mod test { // Barrier message. executor.next().await.unwrap().unwrap(); - let chunk_msg = executor.next().await.unwrap().unwrap(); - assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); - let chunk_msg = executor.next().await.unwrap().unwrap(); assert_eq!( chunk_msg.into_chunk().unwrap().compact(), @@ -663,36 +704,16 @@ mod test { // Barrier message. executor.next().await.unwrap().unwrap(); - let chunk_msg = executor.next().await.unwrap().unwrap(); - assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); - let chunk_msg = executor.next().await.unwrap().unwrap(); - assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); - let chunk_msg = executor.next().await.unwrap().unwrap(); assert_eq!( chunk_msg.into_chunk().unwrap().compact(), StreamChunk::from_pretty( " I I I - - 1 1 10", + U- 1 1 10 + U+ 1 1 40", ) ); - let chunk_msg = executor.next().await.unwrap().unwrap(); - assert_eq!( - chunk_msg.into_chunk().unwrap().compact(), - StreamChunk::from_pretty( - " I I I - + 1 3 30", - ) - ); - let chunk_msg = executor.next().await.unwrap().unwrap(); - assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); - let chunk_msg = executor.next().await.unwrap().unwrap(); - assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); - - // Should not receive the third stream chunk message because the force-append-only sink - // executor will drop all DELETE messages. - // The last barrier message. executor.next().await.unwrap().unwrap(); } @@ -761,6 +782,8 @@ mod test { sink_param, columns, BoundedInMemLogStoreFactory::new(1), + 1024, + vec![DataType::Int64, DataType::Int64], ) .await .unwrap(); diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 82e97342eea8..b583c553ce63 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -105,6 +105,8 @@ impl ExecutorBuilder for SinkExecutorBuilder { state_store: impl StateStore, ) -> StreamResult { let [input_executor]: [_; 1] = params.input.try_into().unwrap(); + let input_data_types = input_executor.info().schema.data_types(); + let chunk_size = params.env.config().developer.chunk_size; let sink_desc = node.sink_desc.as_ref().unwrap(); let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap()); @@ -204,6 +206,8 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_param, columns, factory, + chunk_size, + input_data_types, ) .await? .boxed() @@ -239,6 +243,8 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_param, columns, factory, + chunk_size, + input_data_types, ) .await? .boxed()