From b741f505413147f7ea0f2f36dd2a8f1db355fe13 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 11 Dec 2024 12:17:05 +0800 Subject: [PATCH] revert retry for source reader --- .../src/executor/source/source_executor.rs | 137 ++---------------- 1 file changed, 16 insertions(+), 121 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index ca98ec99ce6a..dcb469a12e1d 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,13 +13,10 @@ // limitations under the License. use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; use either::Either; -use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; @@ -41,7 +38,6 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; -use tracing::Instrument; use super::executor_core::StreamSourceCore; use super::{ @@ -50,7 +46,6 @@ use super::{ }; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; -use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::UpdateMutation; @@ -505,98 +500,20 @@ impl SourceExecutor { let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - - let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); - let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = None; - let seek_to_latest = self.is_shared_non_cdc && is_uninitialized; - let source_reader = source_desc.source.clone(); - let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); - let source_ctx = Arc::new(source_ctx); - let mut build_source_stream_fut = Box::pin(async move { - let backoff = get_infinite_backoff_strategy(); - tokio_retry::Retry::spawn(backoff, || async { - match source_reader - .build_stream( - recover_state.clone(), - column_ids.clone(), - source_ctx.clone(), - seek_to_latest, - ) - .await { - Ok((stream, latest_splits)) => Ok((stream, latest_splits)), - Err(e) => { - tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); - Err(e) - } - } - }) - .instrument(tracing::info_span!("build_source_stream_with_retry")) - .await - .expect("Retry build source stream until success.") - }); - - let mut need_resume_after_build = false; - // loop to create source stream until success - loop { - if let Some(barrier) = build_source_stream_and_poll_barrier( - &mut barrier_stream, - &mut reader_and_splits, - &mut build_source_stream_fut, + let (source_chunk_reader, latest_splits) = self + .build_stream_source_reader( + &source_desc, + recover_state, + // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. + // It's highly probable that the work of scanning historical data cannot be shared, + // so don't waste work on it. + // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 + // Note that shared CDC source is special. It already starts from latest. + self.is_shared_non_cdc && is_uninitialized, ) - .await? - { - if let Message::Barrier(barrier) = barrier { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Throttle(actor_to_apply) => { - if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id) - && *new_rate_limit != self.rate_limit_rps - { - tracing::info!( - "updating rate limit from {:?} to {:?}", - self.rate_limit_rps, - *new_rate_limit - ); - - // update the rate limit option, we will apply the rate limit - // when we finish building the source stream. - self.rate_limit_rps = *new_rate_limit; - } - } - Mutation::Resume => { - // We record the Resume mutation here and postpone the resume of the source stream - // after we have successfully built the source stream. - need_resume_after_build = true; - } - _ => { - // ignore other mutations and output a warn log - tracing::warn!( - "Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before - finish building source stream.", - mutation - ); - } - } - } - - // bump state store epoch - let _ = self.persist_state_and_clear_cache(barrier.epoch).await?; - yield Message::Barrier(barrier); - } else { - unreachable!("Only barrier message is expected when building source stream."); - } - } else { - assert!(reader_and_splits.is_some()); - tracing::info!("source stream created successfully"); - break; - } - } - let (source_chunk_reader, latest_splits) = - reader_and_splits.expect("source chunk reader and splits must be created"); - - let source_chunk_reader = apply_rate_limit(source_chunk_reader, self.rate_limit_rps) - .boxed() - .map_err(StreamExecutorError::connector_error); + .instrument_await("source_build_reader") + .await?; + let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error); if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. // Then even it receives no messages, we can observe it in state table. @@ -608,12 +525,13 @@ impl SourceExecutor { } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); let mut command_paused = false; - // - If the first barrier requires us to pause on startup and we haven't received a Resume mutation, pause the stream. - if is_pause_on_startup && !need_resume_after_build { + // - If the first barrier requires us to pause on startup, pause the stream. + if is_pause_on_startup { tracing::info!("source paused on startup"); stream.pause_stream(); command_paused = true; @@ -828,29 +746,6 @@ impl SourceExecutor { } } -async fn build_source_stream_and_poll_barrier( - barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, - reader_and_splits: &mut Option<(BoxChunkSourceStream, Option>)>, - build_future: &mut Pin< - Box>)>>, - >, -) -> StreamExecutorResult> { - if reader_and_splits.is_some() { - return Ok(None); - } - - tokio::select! { - biased; - build_ret = &mut *build_future => { - *reader_and_splits = Some(build_ret); - Ok(None) - } - msg = barrier_stream.next() => { - msg.transpose() - } - } -} - impl Execute for SourceExecutor { fn execute(self: Box) -> BoxedMessageStream { if self.stream_source_core.is_some() {