diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 91aea55a2dc31..97c08d098f6c9 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -248,10 +248,13 @@ async fn test_ddl_cancel() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_high_barrier_latency_cancel() -> Result<()> { +/// When cancelling a stream job under high latency, +/// the cancel should take a long time to take effect. +/// If we trigger a recovery however, the cancel should take effect immediately, +/// since cancel will immediately drop the table fragment. +async fn test_high_barrier_latency_cancel(config: Configuration) -> Result<()> { init_logger(); - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(config).await?; let mut session = cluster.start_session(); // Join 2 fact tables together to create a high barrier latency scenario. @@ -280,7 +283,7 @@ async fn test_high_barrier_latency_cancel() -> Result<()> { cluster .kill_nodes_and_restart(["compute-1", "compute-2", "compute-3"], 2) .await; - sleep(Duration::from_secs(3)).await; + sleep(Duration::from_secs(2)).await; tracing::debug!("killed cn, waiting recovery"); @@ -305,9 +308,26 @@ async fn test_high_barrier_latency_cancel() -> Result<()> { tracing::info!("restarted cn: trigger stream job recovery"); + // Make sure there's some progress first. + loop { + // Wait until at least 10% of records are ingested. + let progress = session + .run("select progress from rw_catalog.rw_ddl_progress;") + .await + .unwrap(); + tracing::info!(progress, "get progress before cancel stream job"); + let progress = progress.replace('%', ""); + let progress = progress.parse::().unwrap(); + if progress > 0.01 { + break; + } else { + sleep(Duration::from_micros(1)).await; + } + } // Loop in case the cancel gets dropped after // cn kill, before it drops the table fragment. - for _ in 0..5 { + for iteration in 0..5 { + tracing::info!(iteration, "cancelling stream job"); let mut session2 = cluster.start_session(); let handle = tokio::spawn(async move { let result = cancel_stream_jobs(&mut session2).await; @@ -343,6 +363,16 @@ async fn test_high_barrier_latency_cancel() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_high_barrier_latency_cancel_for_arrangement_backfill() -> Result<()> { + test_high_barrier_latency_cancel(Configuration::for_arrangement_backfill()).await +} + +#[tokio::test] +async fn test_high_barrier_latency_cancel_for_no_shuffle() -> Result<()> { + test_high_barrier_latency_cancel(Configuration::for_scale_no_shuffle()).await +} + // When cluster stop, foreground ddl job must be cancelled. #[tokio::test] async fn test_foreground_ddl_no_recovery() -> Result<()> {