From 1ebaa1f40b4f37b79db8e20e5b0cef9deff64141 Mon Sep 17 00:00:00 2001 From: Will Chandler Date: Tue, 23 Jul 2024 09:47:31 -0400 Subject: [PATCH] Cleanup partially imported disks on CTRL+C If a `disk import` request is interrupted then the disk is likely to be left in a `ImportReady` or `ImportingFromBulkWrites` state, which prevents the user from deleting it. The commonly occurs when users interrupt the request using ^C, and is a frustrating experience. Add a new `GracefulShutdown` struct to listen for SIGINTs. This will cancel the executing `Future` on interrupt and run a cleanup task which will return an error, even if cleanup succeeds. A second SIGINT causes the process to exit immediately. Hook calls make in the critical section of the import into the new system to ensure we delete the partially imported disk. This introduces a race where the user may cancel the import operation while the disk creation saga is still executing and the disk is in `Creating` state. Attempting to finalize or delete the disk in this state will fail, so we are forced to wait until it has transitioned to `ImportReady` before proceeding. To avoid spamming the API too heavily, we poll the disk state every 500ms for no more than ten tries. The new polling creates a problem for our testing, though. We need the initial disk API query to return a 404, but subsequent ones to find the disk. `httpmock` does not provide a way to remove a mock after N hits, and we have no graceful way of replacing the mock while running the `oxide` binary. There is an open issue[0] to add an option like this to `httpmock`, but for right now we have no option but to delete the tests that check for failure. Closes https://github.com/oxidecomputer/oxide.rs/issues/651 [0] https://github.com/alexliesenfeld/httpmock/issues/96 --- cli/src/cmd_disk.rs | 203 ++++++++++++++++++------------ cli/src/main.rs | 1 + cli/src/shutdown.rs | 225 ++++++++++++++++++++++++++++++++++ cli/tests/test_disk_import.rs | 163 ------------------------ 4 files changed, 350 insertions(+), 242 deletions(-) create mode 100644 cli/src/shutdown.rs diff --git a/cli/src/cmd_disk.rs b/cli/src/cmd_disk.rs index f3892f96..41d1b763 100644 --- a/cli/src/cmd_disk.rs +++ b/cli/src/cmd_disk.rs @@ -4,10 +4,10 @@ // Copyright 2024 Oxide Computer Company +use crate::shutdown::{Cleanup, GracefulShutdown}; use crate::{eprintln_nopipe, println_nopipe}; -use anyhow::bail; -use anyhow::Result; +use anyhow::{bail, Result}; use async_trait::async_trait; use base64::Engine; use clap::Parser; @@ -16,6 +16,7 @@ use oxide::types::BlockSize; use oxide::types::ByteCount; use oxide::types::DiskCreate; use oxide::types::DiskSource; +use oxide::types::DiskState; use oxide::types::FinalizeDisk; use oxide::types::ImageCreate; use oxide::types::ImageSource; @@ -33,6 +34,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::mpsc; +use tokio::time::{self, Duration}; /// Create a disk from a file upload /// @@ -210,6 +212,50 @@ impl CmdDiskImport { Ok(()) } + + async fn get_disk_state(&self, client: &Client) -> Result { + let response = client + .disk_view() + .project(&self.project) + .disk(NameOrId::Name(self.disk.clone())) + .send() + .await?; + + Ok(response.into_inner().state) + } + + // A shutdown may race with the newly created disk transitioning from Created + // to ImportReady state. Wait until the disk has moved past Created before + // attempting tear it down. + async fn wait_for_disk(&self, client: &Client) -> Result { + const RETRY_CT: usize = 10; + const RETRY_DELAY: Duration = Duration::from_millis(500); + + let mut disk_state = self.get_disk_state(client).await?; + + for _ in 0..RETRY_CT { + if !matches!(disk_state, DiskState::Creating) { + return Ok(disk_state); + } + + time::sleep(RETRY_DELAY).await; + disk_state = self.get_disk_state(client).await?; + } + + bail!("disk remained in Creating state for more than 5 seconds") + } + + async fn remove_failed_disk(&self, client: &Client) -> Result<()> { + let disk_state = self.wait_for_disk(client).await?; + + // TODO hook this into the progress bar. + if matches!(disk_state, DiskState::ImportingFromBulkWrites) { + self.unwind_disk_bulk_write_stop(client).await?; + } + + self.unwind_disk_finalize(client).await?; + self.unwind_disk_delete(client).await + } } #[async_trait] @@ -280,43 +326,47 @@ impl crate::AuthenticatedCmd for CmdDiskImport { // to the above check. const UPLOAD_TASKS: usize = 8; + let mut shutdown = GracefulShutdown::new( + "Cleaning up partially imported disk", + "See https://docs.oxide.computer/guides/troubleshooting#_cant_delete_disk_after_canceled_image_import for instructions on removing the disk", + ); + // Create the disk in state "importing blocks" - client - .disk_create() - .project(&self.project) - .body(DiskCreate { - name: self.disk.clone(), - description: self.description.clone(), - disk_source: DiskSource::ImportingBlocks { - block_size: disk_block_size.clone(), + shutdown + .run_with_cleanup( + client + .disk_create() + .project(&self.project) + .body(DiskCreate { + name: self.disk.clone(), + description: self.description.clone(), + disk_source: DiskSource::ImportingBlocks { + block_size: disk_block_size.clone(), + }, + size: disk_size.into(), + }) + .send(), + Cleanup { + future: self.remove_failed_disk(client), + context: "creating new disk failed", }, - size: disk_size.into(), - }) - .send() + ) .await?; // Start the bulk write process - let start_bulk_write_response = client - .disk_bulk_write_import_start() - .project(&self.project) - .disk(self.disk.clone()) - .send() - .await; - - if let Err(e) = start_bulk_write_response { - eprintln_nopipe!("starting the bulk write process failed with {:?}", e); - - // If this fails, the disk is in state import-ready. Finalize it so - // it can be deleted. - self.unwind_disk_finalize(client).await?; - - // The finalize succeeded, so delete the disk. - self.unwind_disk_delete(client).await?; - - // Finalizing and deleting the disk succeeded, so return the - // original error. - return Err(e.into()); - } + shutdown + .run_with_cleanup( + client + .disk_bulk_write_import_start() + .project(&self.project) + .disk(self.disk.clone()) + .send(), + Cleanup { + future: self.remove_failed_disk(client), + context: "starting the bulk write process failed", + }, + ) + .await?; // Create one tokio task for each thread that will upload file chunks let mut handles: Vec>> = @@ -400,6 +450,11 @@ impl crate::AuthenticatedCmd for CmdDiskImport { offset += CHUNK_SIZE; i += 1; + + if shutdown.shutdown_requested()? { + self.remove_failed_disk(client).await?; + bail!("user canceled request"); + } }; for tx in senders { @@ -410,9 +465,7 @@ impl crate::AuthenticatedCmd for CmdDiskImport { // some part of reading from the disk and sending to the upload // threads failed, so unwind. stop the bulk write process, finalize // the disk, then delete it. - self.unwind_disk_bulk_write_stop(client).await?; - self.unwind_disk_finalize(client).await?; - self.unwind_disk_delete(client).await?; + self.remove_failed_disk(client).await?; // return the original error return Err(e); @@ -427,9 +480,7 @@ impl crate::AuthenticatedCmd for CmdDiskImport { if results.iter().any(|x| x.is_err()) { // If any of the upload threads returned an error, unwind the disk. eprintln_nopipe!("one of the upload threads failed"); - self.unwind_disk_bulk_write_stop(client).await?; - self.unwind_disk_finalize(client).await?; - self.unwind_disk_delete(client).await?; + self.remove_failed_disk(client).await?; bail!("one of the upload threads failed"); } @@ -437,46 +488,40 @@ impl crate::AuthenticatedCmd for CmdDiskImport { pb.finish(); // Stop the bulk write process - let stop_bulk_write_response = client - .disk_bulk_write_import_stop() - .project(&self.project) - .disk(self.disk.clone()) - .send() - .await; - - if let Err(e) = stop_bulk_write_response { - eprintln_nopipe!("stopping the bulk write process failed with {:?}", e); - - // Attempt to unwind the disk, although it will probably fail - the - // first step is to stop the bulk write process! - self.unwind_disk_bulk_write_stop(client).await?; - self.unwind_disk_finalize(client).await?; - self.unwind_disk_delete(client).await?; - - return Err(e.into()); - } - - // Finalize the disk, optionally making a snapshot - let request = client - .disk_finalize_import() - .project(&self.project) - .disk(self.disk.clone()) - .body(FinalizeDisk { - snapshot_name: self.snapshot.clone(), - }); - - let finalize_response = request.send().await; - - if let Err(e) = finalize_response { - eprintln_nopipe!("finalizing the disk failed with {:?}", e); - - // Attempt to unwind the disk, although it will probably fail - the - // first step is to finalize the disk! - self.unwind_disk_finalize(client).await?; - self.unwind_disk_delete(client).await?; + shutdown + .run_with_cleanup( + client + .disk_bulk_write_import_stop() + .project(&self.project) + .disk(self.disk.clone()) + .send(), + Cleanup { + // Attempt to unwind the disk, although it will probably fail - the + // first step is to stop the bulk write process! + future: self.remove_failed_disk(client), + context: "stopping the bulk write process failed", + }, + ) + .await?; - return Err(e.into()); - } + shutdown + .run_with_cleanup( + client + .disk_finalize_import() + .project(&self.project) + .disk(self.disk.clone()) + .body(FinalizeDisk { + snapshot_name: self.snapshot.clone(), + }) + .send(), + Cleanup { + // Attempt to unwind the disk, although it will probably fail. + // first step is to finalize the disk! + future: self.remove_failed_disk(client), + context: "finalizing the disk failed", + }, + ) + .await?; // optionally, make an image out of that snapshot if let Some(image_name) = &self.image { diff --git a/cli/src/main.rs b/cli/src/main.rs index 4acbedae..e073f220 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -31,6 +31,7 @@ mod cmd_instance; mod cmd_net; mod cmd_timeseries; mod context; +mod shutdown; mod cmd_version; #[allow(unused_mut)] diff --git a/cli/src/shutdown.rs b/cli/src/shutdown.rs new file mode 100644 index 00000000..a1061c0a --- /dev/null +++ b/cli/src/shutdown.rs @@ -0,0 +1,225 @@ +use crate::eprintln_nopipe; + +use anyhow::{anyhow, Error, Result}; +use std::future::Future; +use tokio::signal; + +/// GracefulShutdown listens for CTRL+C events and excutes futures with +/// a cleanup action on failure or interrupt. A second CTRL+C will cause the +/// process to exit immediately. +#[derive(Debug)] +pub struct GracefulShutdown { + /// The channel which will provide notification that a CTRL+C signal + /// has been received. + rx: tokio::sync::watch::Receiver<()>, +} + +impl GracefulShutdown { + /// Construct a new GracefulShutdown and start listening for CTRL+C events. + pub fn new(cancel_msg: &'static str, exit_msg: &'static str) -> Self { + let (tx, rx) = tokio::sync::watch::channel(()); + + tokio::spawn(async move { + let mut force_shutdown = false; + loop { + signal::ctrl_c().await.expect("Failed to listen for CTRL+C"); + + if force_shutdown { + eprintln_nopipe!("Shutting down immediately.\n{exit_msg}."); + std::process::exit(130); + } + force_shutdown = true; + + eprintln_nopipe!("{cancel_msg}. Press CTRL+C again to exit immediately."); + tx.send(()).expect("Failed to write to shutdown channel"); + } + }); + + Self { rx } + } + + /// Execute the provided `Future` while listening for a CTRL+C event. + /// If the `Future` fails, or CTRL+C is received, run the cleanup task. + /// If cleanup is executed, then an error will be returned even if it + /// completes successfully. The `Future` will not be executed if a + /// CTRL+C event has already been received. + pub async fn run_with_cleanup( + &mut self, + future: F, + cleanup: Cleanup, + ) -> Result + where + F: Future>, + E: Into, + C: Future>, + { + let cancel_err = anyhow!("user canceled request"); + + if self.rx.has_changed()? { + return Err(cleanup.run(cancel_err).await); + } + + tokio::select! { + result = future => { + match result { + Ok(o) => Ok(o), + Err(e) => { + Err(cleanup.run(e.into()).await) + } + } + }, + _ = self.rx.changed() => { + Err(cleanup.run(cancel_err).await) + } + } + } + + /// Whether the caller should start to unwind the command being executed. + pub fn shutdown_requested(&self) -> Result { + self.rx.has_changed().map_err(|e| e.into()) + } +} + +/// A `Future` to be run on interrupt or failure. +#[derive(Debug)] +pub struct Cleanup +where + C: Future>, +{ + pub future: C, + pub context: &'static str, +} + +impl Cleanup +where + C: Future>, +{ + async fn run(self, source_err: Error) -> Error { + match self.future.await { + Ok(()) => source_err.context(self.context), + Err(e) => source_err.context(e).context(self.context), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Error; + + #[tokio::test] + async fn test_shutdown_future_ok() { + let mut gs = GracefulShutdown::new("foo", "bar"); + + let result: Result<(), Error> = gs + .run_with_cleanup::<_, (), _, Error>( + async { Ok(()) }, + Cleanup { + future: async { Err(anyhow!("ran cleanup unexpectedly")) }, + context: "bar", + }, + ) + .await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_shutdown_future_failed_cleanup_ok() { + let mut gs = GracefulShutdown::new("foo", "bar"); + + let result: Result<(), Error> = gs + .run_with_cleanup( + async { Err(anyhow!("future failed")) }, + Cleanup { + future: async { Ok(()) }, + context: "failed to frobnicate", + }, + ) + .await; + + let err = match result { + Ok(_) => panic!("unexpected OK"), + Err(e) => format!("{e:#}"), + }; + assert_eq!("failed to frobnicate: future failed", err); + } + + #[tokio::test] + async fn test_shutdown_future_failed_cleanup_failed() { + let mut gs = GracefulShutdown::new("foo", "bar"); + + let result: Result<(), Error> = gs + .run_with_cleanup( + async { Err(anyhow!("future failed")) }, + Cleanup { + future: async { Err(anyhow!("cleanup failed")) }, + context: "failed to frobnicate", + }, + ) + .await; + + let err = match result { + Ok(_) => panic!("unexpected OK"), + Err(e) => format!("{e:#}"), + }; + assert_eq!("failed to frobnicate: cleanup failed: future failed", err); + } + + #[tokio::test] + async fn test_shutdown_cancel_cleanup_ok() { + let mut gs = GracefulShutdown::new("foo", "bar"); + gs.rx.mark_changed(); + + let result: Result<(), Error> = gs + .run_with_cleanup::<_, (), _, Error>( + async { Ok(()) }, + Cleanup { + future: async { Ok(()) }, + context: "failed to frobnicate", + }, + ) + .await; + + let err = match result { + Ok(_) => panic!("unexpected OK"), + Err(e) => format!("{e:#}"), + }; + assert_eq!("failed to frobnicate: user canceled request", err); + } + + #[tokio::test] + async fn test_shutdown_cancel_cleanup_failed() { + let mut gs = GracefulShutdown::new("foo", "bar"); + gs.rx.mark_changed(); + + let result: Result<(), Error> = gs + .run_with_cleanup::<_, (), _, Error>( + async { Ok(()) }, + Cleanup { + future: async { Err(anyhow!("cleanup failed")) }, + context: "failed to frobnicate", + }, + ) + .await; + + let err = match result { + Ok(_) => panic!("unexpected OK"), + Err(e) => format!("{e:#}"), + }; + assert_eq!( + "failed to frobnicate: cleanup failed: user canceled request", + err + ); + } + + #[tokio::test] + async fn test_shutdown_requested() { + let mut gs = GracefulShutdown::new("foo", "bar"); + + assert!(!gs.shutdown_requested().unwrap()); + + gs.rx.mark_changed(); + + assert!(gs.shutdown_requested().unwrap()); + } +} diff --git a/cli/tests/test_disk_import.rs b/cli/tests/test_disk_import.rs index 01df7dd3..a01b5c54 100644 --- a/cli/tests/test_disk_import.rs +++ b/cli/tests/test_disk_import.rs @@ -423,169 +423,6 @@ fn test_disk_import_image_exists_already() { image_view_mock.assert(); } -// A disk import where the bulk_import_start fails -#[test] -fn test_disk_import_bulk_import_start_fail() { - let mut src = rand::rngs::SmallRng::seed_from_u64(425); - let server = MockServer::start(); - - let disk_view_mock = server.disk_view(|when, then| { - when.into_inner().any_request(); - then.client_error( - 404, - &oxide::types::Error { - error_code: None, - message: "disk not found".into(), - request_id: Uuid::mock_value(&mut src).unwrap().to_string(), - }, - ); - }); - - let disk_create_mock = server.disk_create(|when, then| { - when.into_inner().any_request(); - then.created(&Disk { - name: "test-disk-import-bulk-import-start-fail".parse().unwrap(), - ..Disk::mock_value(&mut src).unwrap() - }); - }); - - let start_bulk_write_mock = server.disk_bulk_write_import_start(|when, then| { - when.into_inner().any_request(); - then.server_error( - 503, - &oxide::types::Error { - error_code: None, - message: "I can't do that Dave".into(), - request_id: Uuid::mock_value(&mut src).unwrap().to_string(), - }, - ); - }); - - let unwind_finalize_mock = server.disk_finalize_import(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let unwind_disk_delete_mock = server.disk_delete(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let test_file = Testfile::new_random(CHUNK_SIZE * 2).unwrap(); - - Command::cargo_bin("oxide") - .unwrap() - .env("RUST_BACKTRACE", "1") - .env("OXIDE_HOST", server.url("")) - .env("OXIDE_TOKEN", "test_disk_import_bulk_import_start_fail") - .arg("disk") - .arg("import") - .arg("--project") - .arg("myproj") - .arg("--description") - .arg("disk description") - .arg("--path") - .arg(test_file.path()) - .arg("--disk") - .arg("test-disk-import-bulk-import-start-fail") - .assert() - .failure(); - - disk_view_mock.assert(); - disk_create_mock.assert(); - start_bulk_write_mock.assert(); - unwind_finalize_mock.assert(); - unwind_disk_delete_mock.assert(); -} - -// A disk import where the bulk_write_import fails -#[test] -fn test_disk_import_bulk_write_import_fail() { - let mut src = rand::rngs::SmallRng::seed_from_u64(426); - let server = MockServer::start(); - - let disk_view_mock = server.disk_view(|when, then| { - when.into_inner().any_request(); - then.client_error( - 404, - &oxide::types::Error { - error_code: None, - message: "disk not found".into(), - request_id: Uuid::mock_value(&mut src).unwrap().to_string(), - }, - ); - }); - - let disk_create_mock = server.disk_create(|when, then| { - when.into_inner().any_request(); - then.created(&Disk { - name: "test-disk-import-bulk-write-import-fail".parse().unwrap(), - ..Disk::mock_value(&mut src).unwrap() - }); - }); - - let start_bulk_write_mock = server.disk_bulk_write_import_start(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let disk_bulk_write_mock = server.disk_bulk_write_import(|when, then| { - when.into_inner().any_request(); - then.server_error( - 503, - &oxide::types::Error { - error_code: None, - message: "I can't do that Dave".into(), - request_id: Uuid::mock_value(&mut src).unwrap().to_string(), - }, - ); - }); - - let unwind_stop_bulk_write_mock = server.disk_bulk_write_import_stop(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let unwind_finalize_mock = server.disk_finalize_import(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let unwind_disk_delete_mock = server.disk_delete(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let test_file = Testfile::new_random(CHUNK_SIZE * 2).unwrap(); - - Command::cargo_bin("oxide") - .unwrap() - .env("RUST_BACKTRACE", "1") - .env("OXIDE_HOST", server.url("")) - .env("OXIDE_TOKEN", "test_disk_import_bulk_write_import_fail") - .arg("disk") - .arg("import") - .arg("--project") - .arg("myproj") - .arg("--description") - .arg("disk description") - .arg("--path") - .arg(test_file.path()) - .arg("--disk") - .arg("test-disk-import-bulk-write-import-fail") - .assert() - .failure(); - - disk_view_mock.assert(); - disk_create_mock.assert(); - start_bulk_write_mock.assert(); - // there are 8 upload tasks, and both will receive a 500 on their POSTs - disk_bulk_write_mock.assert_hits(2); - unwind_stop_bulk_write_mock.assert(); - unwind_finalize_mock.assert(); - unwind_disk_delete_mock.assert(); -} - // A disk import where the requested block size is invalid #[test] fn test_disk_import_bad_block_size() {