Skip to content

Commit

Permalink
use need_advance_delete instead of pk mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Mar 1, 2024
1 parent f7c00ca commit 0e5765e
Showing 1 changed file with 36 additions and 35 deletions.
71 changes: 36 additions & 35 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,46 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
}
});

// When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
// stream key: a,b
// sink pk: a

// original:
// (1,1) -> (1,2)
// (1,2) -> (1,3)

// mv fragment 1:
// delete (1,1)

// mv fragment 2:
// insert (1,2)
// delete (1,2)

// mv fragment 3:
// insert (1,3)

// merge to sink fragment:
// insert (1,3)
// insert (1,2)
// delete (1,2)
// delete (1,1)
// So we do additional compaction in the sink executor per barrier.

// 1. compact all the chanes with the stream key.
// 2. sink all the delete events and then sink all insert evernt.

// after compacting with the stream key, the two event with the same used defined sink pk must have different stream key.
// So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
let need_advance_delete =
stream_key_sink_pk_mismatch && self.sink_param.sink_type != SinkType::AppendOnly;

let processed_input = Self::process_msg(
input,
self.sink_param.sink_type,
stream_key,
stream_key_sink_pk_mismatch,
need_advance_delete,
// NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case.
stream_key_sink_pk_mismatch,
need_advance_delete,
self.chunk_size,
self.input_data_types,
);
Expand Down Expand Up @@ -259,43 +292,11 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
input: impl MessageStream,
sink_type: SinkType,
stream_key: PkIndices,
stream_key_sink_pk_mismatch: bool,
need_advance_delete: bool,
re_construct_with_sink_pk: bool,
chunk_size: usize,
input_data_types: Vec<DataType>,
) {
// When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
// stream key: a,b
// sink pk: a

// original:
// (1,1) -> (1,2)
// (1,2) -> (1,3)

// mv fragment 1:
// delete (1,1)

// mv fragment 2:
// insert (1,2)
// delete (1,2)

// mv fragment 3:
// insert (1,3)

// merge to sink fragment:
// insert (1,3)
// insert (1,2)
// delete (1,2)
// delete (1,1)
// So we do additional compaction in the sink executor per barrier.

// 1. compact all the chanes with the stream key.
// 2. sink all the delete events and then sink all insert evernt.

// after compacting with the stream key, the two event with the same used defined sink pk must have different stream key.
// So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
let need_advance_delete = stream_key_sink_pk_mismatch && sink_type != SinkType::AppendOnly;

// need to buffer chunks during one barrier
if need_advance_delete || re_construct_with_sink_pk {
let mut chunk_buffer = vec![];
Expand Down

0 comments on commit 0e5765e

Please sign in to comment.