Skip to content

Commit

Permalink
refactor: use high watermark to finish backfill faster
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 2, 2024
1 parent c4fed7c commit 0eaf49f
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 51 deletions.
95 changes: 59 additions & 36 deletions e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,29 @@ SET rw_enable_shared_source TO true;
system ok
rpk topic create shared_source -p 4

# Test create source before produing data.
statement ok
create source s_before_produce (v1 int, v2 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
create materialized view mv_before_produce as select * from s_before_produce;

sleep 1s

# All partitions starts with backfill_info: NoDataToBackfill, so it finishes immediately.
system ok
internal_table.mjs --name mv_before_produce --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


system ok
cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
Expand All @@ -21,7 +44,7 @@ create source s0 (v1 int, v2 varchar) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

query I
query ?
select count(*) from rw_internal_tables where name like '%s0%';
----
1
Expand All @@ -41,21 +64,24 @@ create materialized view mv_1 as select * from s0;
# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 2s

# SourceExecutor's ingestion started, but it only starts from latest.
# SourceExecutor's ingestion started, but it only starts from latest (offset 1).
system ok
internal_table.mjs --name s0 --type source
----
(empty)


# offset 0 must be backfilled, not from upstream.
# SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark).
# (meaning upstream already consumed offset 0, so we only need to backfill to offset 0)
# After backfilling offset 0, it enters SourceCachingUp state. Now the backfill is finished.
# We wait for SourceExecutor to produce offset > 0.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""Backfilling"": ""0""}"
1,"{""Backfilling"": ""0""}"
2,"{""Backfilling"": ""0""}"
3,"{""Backfilling"": ""0""}"
0,"{""SourceCachingUp"": ""0""}"
1,"{""SourceCachingUp"": ""0""}"
2,"{""SourceCachingUp"": ""0""}"
3,"{""SourceCachingUp"": ""0""}"


# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
Expand All @@ -67,23 +93,23 @@ create materialized view mv_2 as select * from s0;

sleep 2s

query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_2;
----
1 a
Expand Down Expand Up @@ -111,7 +137,7 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
Expand All @@ -123,7 +149,7 @@ select v1, v2 from s0;
4 d
4 dd

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
Expand All @@ -146,18 +172,14 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# The result is non-deterministic:
# If the upstream row comes before the backfill row, it will be ignored, and the result state is "{""Backfilling"": ""1""}".
# If the upstream row comes after the backfill row, the result state is Finished.
# Uncomment below and run manually to see the result.

# system ok
# internal_table.mjs --name mv_1 --type sourcebackfill
# ----
# 0,"{""Finished""}"
# 1,"{""Finished""}"
# 2,"{""Finished""}"
# 3,"{""Finished""}"
# Transition from SourceCachingUp to Finished after consuming one upstream message.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


system ok
Expand All @@ -173,22 +195,30 @@ done

sleep 3s

query IT rowsort
query ?? rowsort
select v1, count(*) from s0 group by v1;
----
1 12
2 12
3 12
4 12

query IT rowsort
query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
2 12
3 12
4 12

query ?? rowsort
select v1, count(*) from mv_before_produce group by v1;
----
1 12
2 12
3 12
4 12


# start_offset changed to 11
system ok
Expand All @@ -200,15 +230,8 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# Now it is highly probable that all partitions have finished.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


statement ok
drop source s0 cascade;

statement ok
drop source s_before_produce cascade;
27 changes: 27 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,33 @@ pub trait SplitReader: Sized + Send {
) -> crate::error::ConnectorResult<Self>;

fn into_stream(self) -> BoxChunkSourceStream;

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
HashMap::new()
}
}

/// Information used to determine whether we should start and finish source backfill.
///
/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
/// perhaps we should ban blocking DDL for it.
#[derive(Debug, Clone)]
pub enum BackfillInfo {
HasDataToBackfill {
/// The last available offsets for each split (**inclusive**).
///
/// This will be used to determine whether source backfill is finished when
/// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
/// blocking DDL cannot finish until new messages come.
///
/// When there are upstream messages, we will use the latest offsets from the upstream.
latest_offset: String,
},
/// If there are no messages in the split at all, we don't need to start backfill.
/// In this case, there will be no message from the backfill stream too.
/// If we started backfill, we cannot finish it until new messages come.
/// So we mark this a special case for optimization.
NoDataToBackfill,
}

for_all_sources!(impl_connector_properties);
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl KafkaSplitEnumerator {
self.report_high_watermark(*partition, high);
map.insert(*partition, (low, high));
}
tracing::debug!("fetch kafka watermarks: {map:?}");
Ok(map)
}

Expand Down
34 changes: 31 additions & 3 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ use crate::source::kafka::{
KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL,
};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData,
SplitReader,
into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId,
SplitMetaData, SplitReader,
};

pub struct KafkaSplitReader {
consumer: StreamConsumer<RwConsumerContext>,
offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
backfill_info: HashMap<SplitId, BackfillInfo>,
bytes_per_second: usize,
max_num_messages: usize,
parser_config: ParserConfig,
Expand Down Expand Up @@ -106,7 +107,7 @@ impl SplitReader for KafkaSplitReader {
let mut tpl = TopicPartitionList::with_capacity(splits.len());

let mut offsets = HashMap::new();

let mut backfill_info = HashMap::new();
for split in splits {
offsets.insert(split.id(), (split.start_offset, split.stop_offset));

Expand All @@ -121,7 +122,29 @@ impl SplitReader for KafkaSplitReader {
} else {
tpl.add_partition(split.topic.as_str(), split.partition);
}

let (low, high) = consumer
.fetch_watermarks(
split.topic.as_str(),
split.partition,
properties.common.sync_call_timeout,
)
.await?;
tracing::debug!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
// note: low is inclusive, high is exclusive
if low == high {
backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
} else {
debug_assert!(high > 0);
backfill_info.insert(
split.id(),
BackfillInfo::HasDataToBackfill {
latest_offset: (high - 1).to_string(),
},
);
}
}
tracing::debug!("backfill_info: {:?}", backfill_info);

consumer.assign(&tpl)?;

Expand All @@ -143,6 +166,7 @@ impl SplitReader for KafkaSplitReader {
Ok(Self {
consumer,
offsets,
backfill_info,
bytes_per_second,
max_num_messages,
parser_config,
Expand All @@ -155,6 +179,10 @@ impl SplitReader for KafkaSplitReader {
let source_context = self.source_ctx.clone();
into_chunk_stream(self.into_data_stream(), parser_config, source_context)
}

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
self.backfill_info.clone()
}
}

impl KafkaSplitReader {
Expand Down
Loading

0 comments on commit 0eaf49f

Please sign in to comment.