Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/kafka: notify reader on partition info change #30599

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ impl SourceRender for KafkaSourceConnection {
let partition_info = Arc::downgrade(&partition_info);
let topic = topic.clone();
let consumer = Arc::clone(&consumer);
let notificator = Arc::clone(&notificator);

// We want a fairly low ceiling on our polling frequency, since we rely
// on this heartbeat to determine the health of our Kafka connection.
Expand All @@ -435,6 +436,7 @@ impl SourceRender for KafkaSourceConnection {
poll_interval =? poll_interval,
"kafka metadata thread: starting..."
);
let mut nparts = 0;
while let Some(partition_info) = partition_info.upgrade() {
let probe_ts =
mz_repr::Timestamp::try_from((now_fn)()).expect("must fit");
Expand All @@ -456,13 +458,24 @@ impl SourceRender for KafkaSourceConnection {
);
match result {
Ok(info) => {
let new_nparts = info.len();

*partition_info.lock().unwrap() = Some((probe_ts, info));

// If the number of partitions has changed, wake up the
// consumer so that it can check if the new parts have
// data.
if nparts != new_nparts {
notificator.notify_one();
}
nparts = new_nparts;

trace!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: updated partition metadata info",
);
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: updated partition metadata info",
);

// Clear all the health namespaces we know about.
// Note that many kafka sources's don't have an ssh tunnel, but
Expand Down
Loading