diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 49794c79ccf3..39ea3b5bba42 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -155,13 +155,46 @@ impl SinkExecutor { } }); + // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order + // stream key: a,b + // sink pk: a + + // original: + // (1,1) -> (1,2) + // (1,2) -> (1,3) + + // mv fragment 1: + // delete (1,1) + + // mv fragment 2: + // insert (1,2) + // delete (1,2) + + // mv fragment 3: + // insert (1,3) + + // merge to sink fragment: + // insert (1,3) + // insert (1,2) + // delete (1,2) + // delete (1,1) + // So we do additional compaction in the sink executor per barrier. + + // 1. compact all the chanes with the stream key. + // 2. sink all the delete events and then sink all insert evernt. + + // after compacting with the stream key, the two event with the same used defined sink pk must have different stream key. + // So the delete event is not to delete the inserted record in our internal streaming SQL semantic. + let need_advance_delete = + stream_key_sink_pk_mismatch && self.sink_param.sink_type != SinkType::AppendOnly; + let processed_input = Self::process_msg( input, self.sink_param.sink_type, stream_key, - stream_key_sink_pk_mismatch, + need_advance_delete, // NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case. - stream_key_sink_pk_mismatch, + need_advance_delete, self.chunk_size, self.input_data_types, ); @@ -259,43 +292,11 @@ impl SinkExecutor { input: impl MessageStream, sink_type: SinkType, stream_key: PkIndices, - stream_key_sink_pk_mismatch: bool, + need_advance_delete: bool, re_construct_with_sink_pk: bool, chunk_size: usize, input_data_types: Vec, ) { - // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order - // stream key: a,b - // sink pk: a - - // original: - // (1,1) -> (1,2) - // (1,2) -> (1,3) - - // mv fragment 1: - // delete (1,1) - - // mv fragment 2: - // insert (1,2) - // delete (1,2) - - // mv fragment 3: - // insert (1,3) - - // merge to sink fragment: - // insert (1,3) - // insert (1,2) - // delete (1,2) - // delete (1,1) - // So we do additional compaction in the sink executor per barrier. - - // 1. compact all the chanes with the stream key. - // 2. sink all the delete events and then sink all insert evernt. - - // after compacting with the stream key, the two event with the same used defined sink pk must have different stream key. - // So the delete event is not to delete the inserted record in our internal streaming SQL semantic. - let need_advance_delete = stream_key_sink_pk_mismatch && sink_type != SinkType::AppendOnly; - // need to buffer chunks during one barrier if need_advance_delete || re_construct_with_sink_pk { let mut chunk_buffer = vec![];