From 327d0360c37e37a474c47b2f1746edad93d4faec Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 25 Nov 2024 14:33:04 +0100 Subject: [PATCH] storage/kafka: use separate consumer for metadata probing --- src/storage/src/source/kafka.rs | 40 +++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 9efa653d29461..39da413a93ac4 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -324,7 +324,8 @@ impl SourceRender for KafkaSourceConnection { let (stats_tx, stats_rx) = crossbeam_channel::unbounded(); let health_status = Arc::new(Mutex::new(Default::default())); let notificator = Arc::new(Notify::new()); - let consumer: Result, _> = connection + + let reader_consumer: Result, _> = connection .create_with_context( &config.config, GlueConsumerContext { @@ -363,9 +364,29 @@ impl SourceRender for KafkaSourceConnection { ) .await; - let consumer = match consumer { - Ok(consumer) => Arc::new(consumer), - Err(e) => { + // Consumers use a single connection to talk to the upstream, so if we'd use the + // same consumer in the reader and the metadata thread, metadata probes issued by + // the latter could be delayed by data fetches issued by the former. We avoid that + // by giving the metadata thread its own consumer. + let metadata_consumer: Result, _> = connection + .create_with_context( + &config.config, + MzClientContext::default(), + &btreemap! { + // Use the user-configured topic metadata refresh + // interval. + "topic.metadata.refresh.interval.ms" => + topic_metadata_refresh_interval + .as_millis() + .to_string(), + }, + InTask::Yes, + ) + .await; + + let (reader_consumer, metadata_consumer) = match (reader_consumer, metadata_consumer) { + (Ok(r), Ok(m)) => (r, m), + (Err(e), _) | (_, Err(e)) => { let update = HealthStatusUpdate::halting( format!( "failed creating kafka consumer: {}", @@ -395,6 +416,8 @@ impl SourceRender for KafkaSourceConnection { } }; + let reader_consumer = Arc::new(reader_consumer); + // Note that we wait for this AFTER we downgrade to the source `resume_upper`. This // allows downstream operators (namely, the `reclock_operator`) to downgrade to the // `resume_upper`, which is necessary for this basic form of backpressure to work. @@ -410,7 +433,6 @@ impl SourceRender for KafkaSourceConnection { let metadata_thread_handle = { let partition_info = Arc::downgrade(&partition_info); let topic = topic.clone(); - let consumer = Arc::clone(&consumer); let notificator = Arc::clone(¬ificator); // We want a fairly low ceiling on our polling frequency, since we rely @@ -441,7 +463,7 @@ impl SourceRender for KafkaSourceConnection { let probe_ts = mz_repr::Timestamp::try_from((now_fn)()).expect("must fit"); let result = fetch_partition_info( - consumer.client(), + metadata_consumer.client(), &topic, config .config @@ -492,7 +514,7 @@ impl SourceRender for KafkaSourceConnection { )); let ssh_status = - consumer.client().context().tunnel_status(); + metadata_consumer.client().context().tunnel_status(); let ssh_status = match ssh_status { SshTunnelStatus::Running => { Some(HealthStatusUpdate::running()) @@ -529,7 +551,7 @@ impl SourceRender for KafkaSourceConnection { source_name: config.name.clone(), id: config.id, partition_consumers: Vec::new(), - consumer: Arc::clone(&consumer), + consumer: Arc::clone(&reader_consumer), worker_id: config.worker_id, worker_count: config.worker_count, last_offsets: outputs @@ -553,7 +575,7 @@ impl SourceRender for KafkaSourceConnection { let offset_committer = KafkaResumeUpperProcessor { config: config.clone(), topic_name: topic.clone(), - consumer, + consumer: reader_consumer, progress_statistics: Arc::clone(&reader.progress_statistics), };