Skip to content

Commit

Permalink
storage/kafka: use separate consumer for metadata probing
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Nov 25, 2024
1 parent 51ec033 commit 327d036
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ impl SourceRender for KafkaSourceConnection {
let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
let health_status = Arc::new(Mutex::new(Default::default()));
let notificator = Arc::new(Notify::new());
let consumer: Result<BaseConsumer<_>, _> = connection

let reader_consumer: Result<BaseConsumer<_>, _> = connection
.create_with_context(
&config.config,
GlueConsumerContext {
Expand Down Expand Up @@ -363,9 +364,29 @@ impl SourceRender for KafkaSourceConnection {
)
.await;

let consumer = match consumer {
Ok(consumer) => Arc::new(consumer),
Err(e) => {
// Consumers use a single connection to talk to the upstream, so if we'd use the
// same consumer in the reader and the metadata thread, metadata probes issued by
// the latter could be delayed by data fetches issued by the former. We avoid that
// by giving the metadata thread its own consumer.
let metadata_consumer: Result<BaseConsumer<_>, _> = connection
.create_with_context(
&config.config,
MzClientContext::default(),
&btreemap! {
// Use the user-configured topic metadata refresh
// interval.
"topic.metadata.refresh.interval.ms" =>
topic_metadata_refresh_interval
.as_millis()
.to_string(),
},
InTask::Yes,
)
.await;

let (reader_consumer, metadata_consumer) = match (reader_consumer, metadata_consumer) {
(Ok(r), Ok(m)) => (r, m),
(Err(e), _) | (_, Err(e)) => {
let update = HealthStatusUpdate::halting(
format!(
"failed creating kafka consumer: {}",
Expand Down Expand Up @@ -395,6 +416,8 @@ impl SourceRender for KafkaSourceConnection {
}
};

let reader_consumer = Arc::new(reader_consumer);

// Note that we wait for this AFTER we downgrade to the source `resume_upper`. This
// allows downstream operators (namely, the `reclock_operator`) to downgrade to the
// `resume_upper`, which is necessary for this basic form of backpressure to work.
Expand All @@ -410,7 +433,6 @@ impl SourceRender for KafkaSourceConnection {
let metadata_thread_handle = {
let partition_info = Arc::downgrade(&partition_info);
let topic = topic.clone();
let consumer = Arc::clone(&consumer);
let notificator = Arc::clone(&notificator);

// We want a fairly low ceiling on our polling frequency, since we rely
Expand Down Expand Up @@ -441,7 +463,7 @@ impl SourceRender for KafkaSourceConnection {
let probe_ts =
mz_repr::Timestamp::try_from((now_fn)()).expect("must fit");
let result = fetch_partition_info(
consumer.client(),
metadata_consumer.client(),
&topic,
config
.config
Expand Down Expand Up @@ -492,7 +514,7 @@ impl SourceRender for KafkaSourceConnection {
));

let ssh_status =
consumer.client().context().tunnel_status();
metadata_consumer.client().context().tunnel_status();
let ssh_status = match ssh_status {
SshTunnelStatus::Running => {
Some(HealthStatusUpdate::running())
Expand Down Expand Up @@ -529,7 +551,7 @@ impl SourceRender for KafkaSourceConnection {
source_name: config.name.clone(),
id: config.id,
partition_consumers: Vec::new(),
consumer: Arc::clone(&consumer),
consumer: Arc::clone(&reader_consumer),
worker_id: config.worker_id,
worker_count: config.worker_count,
last_offsets: outputs
Expand All @@ -553,7 +575,7 @@ impl SourceRender for KafkaSourceConnection {
let offset_committer = KafkaResumeUpperProcessor {
config: config.clone(),
topic_name: topic.clone(),
consumer,
consumer: reader_consumer,
progress_statistics: Arc::clone(&reader.progress_statistics),
};

Expand Down

0 comments on commit 327d036

Please sign in to comment.