diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index dab645f6b31bc..9efa653d29461 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -411,6 +411,7 @@ impl SourceRender for KafkaSourceConnection { let partition_info = Arc::downgrade(&partition_info); let topic = topic.clone(); let consumer = Arc::clone(&consumer); + let notificator = Arc::clone(¬ificator); // We want a fairly low ceiling on our polling frequency, since we rely // on this heartbeat to determine the health of our Kafka connection. @@ -435,6 +436,7 @@ impl SourceRender for KafkaSourceConnection { poll_interval =? poll_interval, "kafka metadata thread: starting..." ); + let mut nparts = 0; while let Some(partition_info) = partition_info.upgrade() { let probe_ts = mz_repr::Timestamp::try_from((now_fn)()).expect("must fit"); @@ -456,13 +458,24 @@ impl SourceRender for KafkaSourceConnection { ); match result { Ok(info) => { + let new_nparts = info.len(); + *partition_info.lock().unwrap() = Some((probe_ts, info)); + + // If the number of partitions has changed, wake up the + // consumer so that it can check if the new parts have + // data. + if nparts != new_nparts { + notificator.notify_one(); + } + nparts = new_nparts; + trace!( - source_id = config.id.to_string(), - worker_id = config.worker_id, - num_workers = config.worker_count, - "kafka metadata thread: updated partition metadata info", - ); + source_id = config.id.to_string(), + worker_id = config.worker_id, + num_workers = config.worker_count, + "kafka metadata thread: updated partition metadata info", + ); // Clear all the health namespaces we know about. // Note that many kafka sources's don't have an ssh tunnel, but