Skip to content

Commit

Permalink
fix(integration_tests): separate test for arrangement backfill cancel…
Browse files Browse the repository at this point in the history
… with high barrier latency (#15011)
  • Loading branch information
kwannoel authored Feb 7, 2024
1 parent 8e3c526 commit 8735491
Showing 1 changed file with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,13 @@ async fn test_ddl_cancel() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_high_barrier_latency_cancel() -> Result<()> {
/// When cancelling a stream job under high latency,
/// the cancel should take a long time to take effect.
/// If we trigger a recovery however, the cancel should take effect immediately,
/// since cancel will immediately drop the table fragment.
async fn test_high_barrier_latency_cancel(config: Configuration) -> Result<()> {
init_logger();
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(config).await?;
let mut session = cluster.start_session();

// Join 2 fact tables together to create a high barrier latency scenario.
Expand Down Expand Up @@ -280,7 +283,7 @@ async fn test_high_barrier_latency_cancel() -> Result<()> {
cluster
.kill_nodes_and_restart(["compute-1", "compute-2", "compute-3"], 2)
.await;
sleep(Duration::from_secs(3)).await;
sleep(Duration::from_secs(2)).await;

tracing::debug!("killed cn, waiting recovery");

Expand All @@ -305,9 +308,26 @@ async fn test_high_barrier_latency_cancel() -> Result<()> {

tracing::info!("restarted cn: trigger stream job recovery");

// Make sure there's some progress first.
loop {
// Wait until at least 10% of records are ingested.
let progress = session
.run("select progress from rw_catalog.rw_ddl_progress;")
.await
.unwrap();
tracing::info!(progress, "get progress before cancel stream job");
let progress = progress.replace('%', "");
let progress = progress.parse::<f64>().unwrap();
if progress > 0.01 {
break;
} else {
sleep(Duration::from_micros(1)).await;
}
}
// Loop in case the cancel gets dropped after
// cn kill, before it drops the table fragment.
for _ in 0..5 {
for iteration in 0..5 {
tracing::info!(iteration, "cancelling stream job");
let mut session2 = cluster.start_session();
let handle = tokio::spawn(async move {
let result = cancel_stream_jobs(&mut session2).await;
Expand Down Expand Up @@ -343,6 +363,16 @@ async fn test_high_barrier_latency_cancel() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_high_barrier_latency_cancel_for_arrangement_backfill() -> Result<()> {
test_high_barrier_latency_cancel(Configuration::for_arrangement_backfill()).await
}

#[tokio::test]
async fn test_high_barrier_latency_cancel_for_no_shuffle() -> Result<()> {
test_high_barrier_latency_cancel(Configuration::for_scale_no_shuffle()).await
}

// When cluster stop, foreground ddl job must be cancelled.
#[tokio::test]
async fn test_foreground_ddl_no_recovery() -> Result<()> {
Expand Down

0 comments on commit 8735491

Please sign in to comment.