Skip to content

Commit

Permalink
refactor: use high watermark to finish backfill faster
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Aug 30, 2024
1 parent c4fed7c commit 6fb622b
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 15 deletions.
27 changes: 27 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,33 @@ pub trait SplitReader: Sized + Send {
) -> crate::error::ConnectorResult<Self>;

fn into_stream(self) -> BoxChunkSourceStream;

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
HashMap::new()
}
}

/// Information used to determine whether we should start and finish source backfill.
///
/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
/// perhaps we should ban blocking DDL for it.
#[derive(Debug, Clone)]
pub enum BackfillInfo {
HasDataToBackfill {
/// The last available offsets for each split (**inclusive**).
///
/// This will be used to determine whether source backfill is finished when
/// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
/// blocking DDL cannot finish until new messages come.
///
/// When there are upstream messages, we will use the latest offsets from the upstream.
latest_offset: String,
},
/// If there are no messages in the split at all, we don't need to start backfill.
/// In this case, there will be no message from the backfill stream too.
/// If we started backfill, we cannot finish it until new messages come.
/// So we mark this a special case for optimization.
NoDataToBackfill,
}

for_all_sources!(impl_connector_properties);
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl KafkaSplitEnumerator {
self.report_high_watermark(*partition, high);
map.insert(*partition, (low, high));
}
tracing::debug!("fetch kafka watermarks: {map:?}");
Ok(map)
}

Expand Down
34 changes: 31 additions & 3 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ use crate::source::kafka::{
KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL,
};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData,
SplitReader,
into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId,
SplitMetaData, SplitReader,
};

pub struct KafkaSplitReader {
consumer: StreamConsumer<RwConsumerContext>,
offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
backfill_info: HashMap<SplitId, BackfillInfo>,
bytes_per_second: usize,
max_num_messages: usize,
parser_config: ParserConfig,
Expand Down Expand Up @@ -106,7 +107,7 @@ impl SplitReader for KafkaSplitReader {
let mut tpl = TopicPartitionList::with_capacity(splits.len());

let mut offsets = HashMap::new();

let mut backfill_info = HashMap::new();
for split in splits {
offsets.insert(split.id(), (split.start_offset, split.stop_offset));

Expand All @@ -121,7 +122,29 @@ impl SplitReader for KafkaSplitReader {
} else {
tpl.add_partition(split.topic.as_str(), split.partition);
}

let (low, high) = consumer
.fetch_watermarks(
split.topic.as_str(),
split.partition,
properties.common.sync_call_timeout,
)
.await?;
tracing::debug!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
// note: low is inclusive, high is exclusive
if low == high {
backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
} else {
debug_assert!(high > 0);
backfill_info.insert(
split.id(),
BackfillInfo::HasDataToBackfill {
latest_offset: (high - 1).to_string(),
},
);
}
}
tracing::debug!("backfill_info: {:?}", backfill_info);

consumer.assign(&tpl)?;

Expand All @@ -143,6 +166,7 @@ impl SplitReader for KafkaSplitReader {
Ok(Self {
consumer,
offsets,
backfill_info,
bytes_per_second,
max_num_messages,
parser_config,
Expand All @@ -155,6 +179,10 @@ impl SplitReader for KafkaSplitReader {
let source_context = self.source_ctx.clone();
into_chunk_stream(self.into_data_stream(), parser_config, source_context)
}

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
self.backfill_info.clone()
}
}

impl KafkaSplitReader {
Expand Down
72 changes: 70 additions & 2 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
Expand All @@ -34,8 +35,9 @@ use crate::source::filesystem::opendal_source::{
};
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{
create_split_reader, BoxChunkSourceStream, BoxTryStream, Column, ConnectorProperties,
ConnectorState, SourceColumnDesc, SourceContext, SplitReader, WaitCheckpointTask,
create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column,
ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitReader,
WaitCheckpointTask,
};
use crate::{dispatch_source_prop, WithOptionsSecResolved};

Expand Down Expand Up @@ -129,6 +131,72 @@ impl SourceReader {
})
}

pub async fn build_stream_for_backfill(
&self,
state: ConnectorState,
column_ids: Vec<ColumnId>,
source_ctx: Arc<SourceContext>,
) -> ConnectorResult<(BoxChunkSourceStream, HashMap<SplitId, BackfillInfo>)> {
let Some(splits) = state else {
return Ok((pending().boxed(), HashMap::new()));
};
let config = self.config.clone();
let columns = self.get_target_columns(column_ids)?;

let data_gen_columns = Some(
columns
.iter()
.map(|col| Column {
name: col.name.clone(),
data_type: col.data_type.clone(),
is_visible: col.is_visible(),
})
.collect_vec(),
);

let parser_config = ParserConfig {
specific: self.parser_config.clone(),
common: CommonParserConfig {
rw_columns: columns,
},
};

let support_multiple_splits = config.support_multiple_splits();
dispatch_source_prop!(config, prop, {
let readers = if support_multiple_splits {
tracing::debug!(
"spawning connector split reader for multiple splits {:?}",
splits
);
let reader =
create_split_reader(*prop, splits, parser_config, source_ctx, data_gen_columns)
.await?;

vec![reader]
} else {
let to_reader_splits = splits.into_iter().map(|split| vec![split]);
try_join_all(to_reader_splits.into_iter().map(|splits| {
tracing::debug!(?splits, "spawning connector split reader");
let props = prop.clone();
let data_gen_columns = data_gen_columns.clone();
let parser_config = parser_config.clone();
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
let source_ctx = source_ctx.clone();
create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns)
}))
.await?
};

let backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();

Ok((
select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
backfill_info,
))
})
}

/// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s).
pub async fn build_stream(
&self,
Expand Down
46 changes: 36 additions & 10 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::JsonbVal;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{
BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
SplitMetaData,
};
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;
Expand All @@ -43,6 +44,7 @@ use crate::executor::{AddMutation, UpdateMutation};
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum BackfillState {
/// `None` means not started yet. It's the initial state.
/// XXX: perhaps we can also set to low-watermark instead of `None`
Backfilling(Option<String>),
/// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
SourceCachingUp(String),
Expand Down Expand Up @@ -90,6 +92,8 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {

/// Local variables used in the backfill stage.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
///
/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
#[derive(Debug)]
struct BackfillStage {
Expand All @@ -99,8 +103,8 @@ struct BackfillStage {
/// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
splits: Vec<SplitImpl>,
/// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
/// TODO: initialize this with high watermark so that we can finish backfilling even when upstream
/// doesn't emit any data.
/// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
/// so that we can finish backfilling even when upstream doesn't emit any data.
target_offsets: HashMap<SplitId, Option<String>>,
}

Expand Down Expand Up @@ -259,7 +263,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
&self,
source_desc: &SourceDesc,
splits: Vec<SplitImpl>,
) -> StreamExecutorResult<BoxChunkSourceStream> {
) -> StreamExecutorResult<(BoxChunkSourceStream, HashMap<SplitId, BackfillInfo>)> {
let column_ids = source_desc
.columns
.iter()
Expand All @@ -278,12 +282,22 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
source_desc.source.config.clone(),
None,
);
let stream = source_desc

// We will check watermark to decide whether we need to backfill.
// e.g., when there's a Kafka topic-partition without any data,
// we don't need to backfill at all. But if we do not check here,
// the executor can only know it's finished when data coming in.
// For blocking DDL, this would be annoying.

let (stream, backfill_info) = source_desc
.source
.build_stream(Some(splits), column_ids, Arc::new(source_ctx))
.build_stream_for_backfill(Some(splits), column_ids, Arc::new(source_ctx))
.await
.map_err(StreamExecutorError::connector_error)?;
Ok(apply_rate_limit(stream, self.rate_limit_rps).boxed())
Ok((
apply_rate_limit(stream, self.rate_limit_rps).boxed(),
backfill_info,
))
}

#[try_stream(ok = Message, error = StreamExecutorError)]
Expand Down Expand Up @@ -347,13 +361,25 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = core;

let source_chunk_reader = self
let (source_chunk_reader, backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage.get_latest_unfinished_splits()?,
)
.instrument_await("source_build_reader")
.await?;
for (split_id, info) in &backfill_info {
match info {
BackfillInfo::NoDataToBackfill => {
*backfill_stage.states.get_mut(split_id).unwrap() = BackfillState::Finished;
}
BackfillInfo::HasDataToBackfill { latest_offset } => {
// Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
*backfill_stage.target_offsets.get_mut(split_id).unwrap() =
Some(latest_offset.clone());
}
}
}

fn select_strategy(_: &mut ()) -> PollNext {
futures::stream::PollNext::Left
Expand Down Expand Up @@ -432,7 +458,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.actor_ctx.fragment_id.to_string(),
]);

let reader = self
let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage.get_latest_unfinished_splits()?,
Expand Down Expand Up @@ -514,7 +540,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
);

// Replace the source reader with a new one of the new state.
let reader = self
let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
latest_unfinished_splits,
Expand Down

0 comments on commit 6fb622b

Please sign in to comment.