Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use high watermark to finish backfill faster #18342

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 59 additions & 36 deletions e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,29 @@ SET rw_enable_shared_source TO true;
system ok
rpk topic create shared_source -p 4

# Test create source before produing data.
statement ok
create source s_before_produce (v1 int, v2 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
create materialized view mv_before_produce as select * from s_before_produce;

sleep 2s

# All partitions starts with backfill_info: NoDataToBackfill, so it finishes immediately.
system ok
internal_table.mjs --name mv_before_produce --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


system ok
cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
Expand All @@ -21,7 +44,7 @@ create source s0 (v1 int, v2 varchar) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

query I
query ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we need to use ? instead of I ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently sqllogictest --override will produce ?. Actually the character doesn't have any meaning now, any character will pass test..

select count(*) from rw_internal_tables where name like '%s0%';
----
1
Expand All @@ -41,21 +64,24 @@ create materialized view mv_1 as select * from s0;
# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 2s

# SourceExecutor's ingestion started, but it only starts from latest.
# SourceExecutor's ingestion started, but it only starts from latest (offset 1).
system ok
internal_table.mjs --name s0 --type source
----
(empty)


# offset 0 must be backfilled, not from upstream.
# SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, what will the high watermark value be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1

# (meaning upstream already consumed offset 0, so we only need to backfill to offset 0)
# After backfilling offset 0, it enters SourceCachingUp state. Now the backfill is finished.
# We wait for SourceExecutor to produce offset > 0.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""Backfilling"": ""0""}"
1,"{""Backfilling"": ""0""}"
2,"{""Backfilling"": ""0""}"
3,"{""Backfilling"": ""0""}"
0,"{""SourceCachingUp"": ""0""}"
1,"{""SourceCachingUp"": ""0""}"
2,"{""SourceCachingUp"": ""0""}"
3,"{""SourceCachingUp"": ""0""}"


# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
Expand All @@ -67,23 +93,23 @@ create materialized view mv_2 as select * from s0;

sleep 2s

query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_2;
----
1 a
Expand Down Expand Up @@ -111,7 +137,7 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
Expand All @@ -123,7 +149,7 @@ select v1, v2 from s0;
4 d
4 dd

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
Expand All @@ -146,18 +172,14 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# The result is non-deterministic:
# If the upstream row comes before the backfill row, it will be ignored, and the result state is "{""Backfilling"": ""1""}".
# If the upstream row comes after the backfill row, the result state is Finished.
# Uncomment below and run manually to see the result.

# system ok
# internal_table.mjs --name mv_1 --type sourcebackfill
# ----
# 0,"{""Finished""}"
# 1,"{""Finished""}"
# 2,"{""Finished""}"
# 3,"{""Finished""}"
# Transition from SourceCachingUp to Finished after consuming one upstream message.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


system ok
Expand All @@ -173,22 +195,30 @@ done

sleep 3s

query IT rowsort
query ?? rowsort
select v1, count(*) from s0 group by v1;
----
1 12
2 12
3 12
4 12

query IT rowsort
query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
2 12
3 12
4 12

query ?? rowsort
select v1, count(*) from mv_before_produce group by v1;
----
1 12
2 12
3 12
4 12


# start_offset changed to 11
system ok
Expand All @@ -200,15 +230,8 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# Now it is highly probable that all partitions have finished.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


statement ok
drop source s0 cascade;

statement ok
drop source s_before_produce cascade;
27 changes: 27 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,33 @@ pub trait SplitReader: Sized + Send {
) -> crate::error::ConnectorResult<Self>;

fn into_stream(self) -> BoxChunkSourceStream;

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
HashMap::new()
}
}

/// Information used to determine whether we should start and finish source backfill.
///
/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
/// perhaps we should ban blocking DDL for it.
#[derive(Debug, Clone)]
pub enum BackfillInfo {
HasDataToBackfill {
/// The last available offsets for each split (**inclusive**).
///
/// This will be used to determine whether source backfill is finished when
/// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
/// blocking DDL cannot finish until new messages come.
///
/// When there are upstream messages, we will use the latest offsets from the upstream.
latest_offset: String,
},
/// If there are no messages in the split at all, we don't need to start backfill.
/// In this case, there will be no message from the backfill stream too.
/// If we started backfill, we cannot finish it until new messages come.
/// So we mark this a special case for optimization.
NoDataToBackfill,
}

for_all_sources!(impl_connector_properties);
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl KafkaSplitEnumerator {
self.report_high_watermark(*partition, high);
map.insert(*partition, (low, high));
}
tracing::debug!("fetch kafka watermarks: {map:?}");
Ok(map)
}

Expand Down
34 changes: 31 additions & 3 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ use crate::source::kafka::{
KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL,
};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData,
SplitReader,
into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId,
SplitMetaData, SplitReader,
};

pub struct KafkaSplitReader {
consumer: StreamConsumer<RwConsumerContext>,
offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
backfill_info: HashMap<SplitId, BackfillInfo>,
bytes_per_second: usize,
max_num_messages: usize,
parser_config: ParserConfig,
Expand Down Expand Up @@ -106,7 +107,7 @@ impl SplitReader for KafkaSplitReader {
let mut tpl = TopicPartitionList::with_capacity(splits.len());

let mut offsets = HashMap::new();

let mut backfill_info = HashMap::new();
for split in splits {
offsets.insert(split.id(), (split.start_offset, split.stop_offset));

Expand All @@ -121,7 +122,29 @@ impl SplitReader for KafkaSplitReader {
} else {
tpl.add_partition(split.topic.as_str(), split.partition);
}

let (low, high) = consumer
.fetch_watermarks(
split.topic.as_str(),
split.partition,
properties.common.sync_call_timeout,
)
.await?;
tracing::debug!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
// note: low is inclusive, high is exclusive
if low == high {
backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
} else {
debug_assert!(high > 0);
backfill_info.insert(
split.id(),
BackfillInfo::HasDataToBackfill {
latest_offset: (high - 1).to_string(),
},
);
}
}
tracing::debug!("backfill_info: {:?}", backfill_info);

consumer.assign(&tpl)?;

Expand All @@ -143,6 +166,7 @@ impl SplitReader for KafkaSplitReader {
Ok(Self {
consumer,
offsets,
backfill_info,
bytes_per_second,
max_num_messages,
parser_config,
Expand All @@ -155,6 +179,10 @@ impl SplitReader for KafkaSplitReader {
let source_context = self.source_ctx.clone();
into_chunk_stream(self.into_data_stream(), parser_config, source_context)
}

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
self.backfill_info.clone()
}
}

impl KafkaSplitReader {
Expand Down
Loading
Loading