Skip to content

Commit

Permalink
storage/kafka: notify reader on partition info change
Browse files Browse the repository at this point in the history
When the number of partitions changed, we need to wake up the reader so
he can assign itself to any new partitions it should consume.
  • Loading branch information
teskje committed Nov 25, 2024
1 parent d0f68ce commit 51ec033
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ impl SourceRender for KafkaSourceConnection {
let partition_info = Arc::downgrade(&partition_info);
let topic = topic.clone();
let consumer = Arc::clone(&consumer);
let notificator = Arc::clone(&notificator);

// We want a fairly low ceiling on our polling frequency, since we rely
// on this heartbeat to determine the health of our Kafka connection.
Expand All @@ -435,6 +436,7 @@ impl SourceRender for KafkaSourceConnection {
poll_interval =? poll_interval,
"kafka metadata thread: starting..."
);
let mut nparts = 0;
while let Some(partition_info) = partition_info.upgrade() {
let probe_ts =
mz_repr::Timestamp::try_from((now_fn)()).expect("must fit");
Expand All @@ -456,13 +458,24 @@ impl SourceRender for KafkaSourceConnection {
);
match result {
Ok(info) => {
let new_nparts = info.len();

*partition_info.lock().unwrap() = Some((probe_ts, info));

// If the number of partitions has changed, wake up the
// consumer so that it can check if the new parts have
// data.
if nparts != new_nparts {
notificator.notify_one();
}
nparts = new_nparts;

trace!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: updated partition metadata info",
);
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: updated partition metadata info",
);

// Clear all the health namespaces we know about.
// Note that many kafka sources's don't have an ssh tunnel, but
Expand Down

0 comments on commit 51ec033

Please sign in to comment.