From 6fb622b64fcd4ae42b2501b72f9eff3eb2ca0b7d Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 30 Aug 2024 17:42:31 +0800 Subject: [PATCH] refactor: use high watermark to finish backfill faster --- src/connector/src/source/base.rs | 27 +++++++ .../src/source/kafka/enumerator/client.rs | 1 + .../src/source/kafka/source/reader.rs | 34 ++++++++- src/connector/src/source/reader/reader.rs | 72 ++++++++++++++++++- .../source/source_backfill_executor.rs | 46 +++++++++--- 5 files changed, 165 insertions(+), 15 deletions(-) diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 38c2f25eb0336..d2b5aa1e88b4c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -370,6 +370,33 @@ pub trait SplitReader: Sized + Send { ) -> crate::error::ConnectorResult; fn into_stream(self) -> BoxChunkSourceStream; + + fn backfill_info(&self) -> HashMap { + HashMap::new() + } +} + +/// Information used to determine whether we should start and finish source backfill. +/// +/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable), +/// perhaps we should ban blocking DDL for it. +#[derive(Debug, Clone)] +pub enum BackfillInfo { + HasDataToBackfill { + /// The last available offsets for each split (**inclusive**). + /// + /// This will be used to determine whether source backfill is finished when + /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise, + /// blocking DDL cannot finish until new messages come. + /// + /// When there are upstream messages, we will use the latest offsets from the upstream. + latest_offset: String, + }, + /// If there are no messages in the split at all, we don't need to start backfill. + /// In this case, there will be no message from the backfill stream too. + /// If we started backfill, we cannot finish it until new messages come. + /// So we mark this a special case for optimization. + NoDataToBackfill, } for_all_sources!(impl_connector_properties); diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index ff007076c1338..5551c12b433b3 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -170,6 +170,7 @@ impl KafkaSplitEnumerator { self.report_high_watermark(*partition, high); map.insert(*partition, (low, high)); } + tracing::debug!("fetch kafka watermarks: {map:?}"); Ok(map) } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 5ace1820b4249..72d4c36377c81 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -34,13 +34,14 @@ use crate::source::kafka::{ KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData, - SplitReader, + into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId, + SplitMetaData, SplitReader, }; pub struct KafkaSplitReader { consumer: StreamConsumer, offsets: HashMap, Option)>, + backfill_info: HashMap, bytes_per_second: usize, max_num_messages: usize, parser_config: ParserConfig, @@ -106,7 +107,7 @@ impl SplitReader for KafkaSplitReader { let mut tpl = TopicPartitionList::with_capacity(splits.len()); let mut offsets = HashMap::new(); - + let mut backfill_info = HashMap::new(); for split in splits { offsets.insert(split.id(), (split.start_offset, split.stop_offset)); @@ -121,7 +122,29 @@ impl SplitReader for KafkaSplitReader { } else { tpl.add_partition(split.topic.as_str(), split.partition); } + + let (low, high) = consumer + .fetch_watermarks( + split.topic.as_str(), + split.partition, + properties.common.sync_call_timeout, + ) + .await?; + tracing::debug!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}"); + // note: low is inclusive, high is exclusive + if low == high { + backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill); + } else { + debug_assert!(high > 0); + backfill_info.insert( + split.id(), + BackfillInfo::HasDataToBackfill { + latest_offset: (high - 1).to_string(), + }, + ); + } } + tracing::debug!("backfill_info: {:?}", backfill_info); consumer.assign(&tpl)?; @@ -143,6 +166,7 @@ impl SplitReader for KafkaSplitReader { Ok(Self { consumer, offsets, + backfill_info, bytes_per_second, max_num_messages, parser_config, @@ -155,6 +179,10 @@ impl SplitReader for KafkaSplitReader { let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) } + + fn backfill_info(&self) -> HashMap { + self.backfill_info.clone() + } } impl KafkaSplitReader { diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 61468bd72a4b6..95764792c0025 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use anyhow::Context; @@ -34,8 +35,9 @@ use crate::source::filesystem::opendal_source::{ }; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ - create_split_reader, BoxChunkSourceStream, BoxTryStream, Column, ConnectorProperties, - ConnectorState, SourceColumnDesc, SourceContext, SplitReader, WaitCheckpointTask, + create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column, + ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitReader, + WaitCheckpointTask, }; use crate::{dispatch_source_prop, WithOptionsSecResolved}; @@ -129,6 +131,72 @@ impl SourceReader { }) } + pub async fn build_stream_for_backfill( + &self, + state: ConnectorState, + column_ids: Vec, + source_ctx: Arc, + ) -> ConnectorResult<(BoxChunkSourceStream, HashMap)> { + let Some(splits) = state else { + return Ok((pending().boxed(), HashMap::new())); + }; + let config = self.config.clone(); + let columns = self.get_target_columns(column_ids)?; + + let data_gen_columns = Some( + columns + .iter() + .map(|col| Column { + name: col.name.clone(), + data_type: col.data_type.clone(), + is_visible: col.is_visible(), + }) + .collect_vec(), + ); + + let parser_config = ParserConfig { + specific: self.parser_config.clone(), + common: CommonParserConfig { + rw_columns: columns, + }, + }; + + let support_multiple_splits = config.support_multiple_splits(); + dispatch_source_prop!(config, prop, { + let readers = if support_multiple_splits { + tracing::debug!( + "spawning connector split reader for multiple splits {:?}", + splits + ); + let reader = + create_split_reader(*prop, splits, parser_config, source_ctx, data_gen_columns) + .await?; + + vec![reader] + } else { + let to_reader_splits = splits.into_iter().map(|split| vec![split]); + try_join_all(to_reader_splits.into_iter().map(|splits| { + tracing::debug!(?splits, "spawning connector split reader"); + let props = prop.clone(); + let data_gen_columns = data_gen_columns.clone(); + let parser_config = parser_config.clone(); + // TODO: is this reader split across multiple threads...? Realistically, we want + // source_ctx to live in a single actor. + let source_ctx = source_ctx.clone(); + create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns) + })) + .await? + }; + + let backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect(); + + Ok(( + select_all(readers.into_iter().map(|r| r.into_stream())).boxed(), + backfill_info, + )) + }) + } + /// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s). pub async fn build_stream( &self, diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index bda1e36132502..e53c1bd2f3d64 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -27,7 +27,8 @@ use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::types::JsonbVal; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, + BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, + SplitMetaData, }; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -43,6 +44,7 @@ use crate::executor::{AddMutation, UpdateMutation}; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum BackfillState { /// `None` means not started yet. It's the initial state. + /// XXX: perhaps we can also set to low-watermark instead of `None` Backfilling(Option), /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset. SourceCachingUp(String), @@ -90,6 +92,8 @@ pub struct SourceBackfillExecutorInner { /// Local variables used in the backfill stage. /// +/// See for a state diagram about how it works. +/// /// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`. #[derive(Debug)] struct BackfillStage { @@ -99,8 +103,8 @@ struct BackfillStage { /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`). splits: Vec, /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling. - /// TODO: initialize this with high watermark so that we can finish backfilling even when upstream - /// doesn't emit any data. + /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it) + /// so that we can finish backfilling even when upstream doesn't emit any data. target_offsets: HashMap>, } @@ -259,7 +263,7 @@ impl SourceBackfillExecutorInner { &self, source_desc: &SourceDesc, splits: Vec, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult<(BoxChunkSourceStream, HashMap)> { let column_ids = source_desc .columns .iter() @@ -278,12 +282,22 @@ impl SourceBackfillExecutorInner { source_desc.source.config.clone(), None, ); - let stream = source_desc + + // We will check watermark to decide whether we need to backfill. + // e.g., when there's a Kafka topic-partition without any data, + // we don't need to backfill at all. But if we do not check here, + // the executor can only know it's finished when data coming in. + // For blocking DDL, this would be annoying. + + let (stream, backfill_info) = source_desc .source - .build_stream(Some(splits), column_ids, Arc::new(source_ctx)) + .build_stream_for_backfill(Some(splits), column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error)?; - Ok(apply_rate_limit(stream, self.rate_limit_rps).boxed()) + Ok(( + apply_rate_limit(stream, self.rate_limit_rps).boxed(), + backfill_info, + )) } #[try_stream(ok = Message, error = StreamExecutorError)] @@ -347,13 +361,25 @@ impl SourceBackfillExecutorInner { // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = core; - let source_chunk_reader = self + let (source_chunk_reader, backfill_info) = self .build_stream_source_reader( &source_desc, backfill_stage.get_latest_unfinished_splits()?, ) .instrument_await("source_build_reader") .await?; + for (split_id, info) in &backfill_info { + match info { + BackfillInfo::NoDataToBackfill => { + *backfill_stage.states.get_mut(split_id).unwrap() = BackfillState::Finished; + } + BackfillInfo::HasDataToBackfill { latest_offset } => { + // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value. + *backfill_stage.target_offsets.get_mut(split_id).unwrap() = + Some(latest_offset.clone()); + } + } + } fn select_strategy(_: &mut ()) -> PollNext { futures::stream::PollNext::Left @@ -432,7 +458,7 @@ impl SourceBackfillExecutorInner { self.actor_ctx.fragment_id.to_string(), ]); - let reader = self + let (reader, _backfill_info) = self .build_stream_source_reader( &source_desc, backfill_stage.get_latest_unfinished_splits()?, @@ -514,7 +540,7 @@ impl SourceBackfillExecutorInner { ); // Replace the source reader with a new one of the new state. - let reader = self + let (reader, _backfill_info) = self .build_stream_source_reader( &source_desc, latest_unfinished_splits,