diff --git a/cli/src/cmd_disk.rs b/cli/src/cmd_disk.rs index f3892f96..41d1b763 100644 --- a/cli/src/cmd_disk.rs +++ b/cli/src/cmd_disk.rs @@ -4,10 +4,10 @@ // Copyright 2024 Oxide Computer Company +use crate::shutdown::{Cleanup, GracefulShutdown}; use crate::{eprintln_nopipe, println_nopipe}; -use anyhow::bail; -use anyhow::Result; +use anyhow::{bail, Result}; use async_trait::async_trait; use base64::Engine; use clap::Parser; @@ -16,6 +16,7 @@ use oxide::types::BlockSize; use oxide::types::ByteCount; use oxide::types::DiskCreate; use oxide::types::DiskSource; +use oxide::types::DiskState; use oxide::types::FinalizeDisk; use oxide::types::ImageCreate; use oxide::types::ImageSource; @@ -33,6 +34,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::mpsc; +use tokio::time::{self, Duration}; /// Create a disk from a file upload /// @@ -210,6 +212,50 @@ impl CmdDiskImport { Ok(()) } + + async fn get_disk_state(&self, client: &Client) -> Result { + let response = client + .disk_view() + .project(&self.project) + .disk(NameOrId::Name(self.disk.clone())) + .send() + .await?; + + Ok(response.into_inner().state) + } + + // A shutdown may race with the newly created disk transitioning from Created + // to ImportReady state. Wait until the disk has moved past Created before + // attempting tear it down. + async fn wait_for_disk(&self, client: &Client) -> Result { + const RETRY_CT: usize = 10; + const RETRY_DELAY: Duration = Duration::from_millis(500); + + let mut disk_state = self.get_disk_state(client).await?; + + for _ in 0..RETRY_CT { + if !matches!(disk_state, DiskState::Creating) { + return Ok(disk_state); + } + + time::sleep(RETRY_DELAY).await; + disk_state = self.get_disk_state(client).await?; + } + + bail!("disk remained in Creating state for more than 5 seconds") + } + + async fn remove_failed_disk(&self, client: &Client) -> Result<()> { + let disk_state = self.wait_for_disk(client).await?; + + // TODO hook this into the progress bar. + if matches!(disk_state, DiskState::ImportingFromBulkWrites) { + self.unwind_disk_bulk_write_stop(client).await?; + } + + self.unwind_disk_finalize(client).await?; + self.unwind_disk_delete(client).await + } } #[async_trait] @@ -280,43 +326,47 @@ impl crate::AuthenticatedCmd for CmdDiskImport { // to the above check. const UPLOAD_TASKS: usize = 8; + let mut shutdown = GracefulShutdown::new( + "Cleaning up partially imported disk", + "See https://docs.oxide.computer/guides/troubleshooting#_cant_delete_disk_after_canceled_image_import for instructions on removing the disk", + ); + // Create the disk in state "importing blocks" - client - .disk_create() - .project(&self.project) - .body(DiskCreate { - name: self.disk.clone(), - description: self.description.clone(), - disk_source: DiskSource::ImportingBlocks { - block_size: disk_block_size.clone(), + shutdown + .run_with_cleanup( + client + .disk_create() + .project(&self.project) + .body(DiskCreate { + name: self.disk.clone(), + description: self.description.clone(), + disk_source: DiskSource::ImportingBlocks { + block_size: disk_block_size.clone(), + }, + size: disk_size.into(), + }) + .send(), + Cleanup { + future: self.remove_failed_disk(client), + context: "creating new disk failed", }, - size: disk_size.into(), - }) - .send() + ) .await?; // Start the bulk write process - let start_bulk_write_response = client - .disk_bulk_write_import_start() - .project(&self.project) - .disk(self.disk.clone()) - .send() - .await; - - if let Err(e) = start_bulk_write_response { - eprintln_nopipe!("starting the bulk write process failed with {:?}", e); - - // If this fails, the disk is in state import-ready. Finalize it so - // it can be deleted. - self.unwind_disk_finalize(client).await?; - - // The finalize succeeded, so delete the disk. - self.unwind_disk_delete(client).await?; - - // Finalizing and deleting the disk succeeded, so return the - // original error. - return Err(e.into()); - } + shutdown + .run_with_cleanup( + client + .disk_bulk_write_import_start() + .project(&self.project) + .disk(self.disk.clone()) + .send(), + Cleanup { + future: self.remove_failed_disk(client), + context: "starting the bulk write process failed", + }, + ) + .await?; // Create one tokio task for each thread that will upload file chunks let mut handles: Vec>> = @@ -400,6 +450,11 @@ impl crate::AuthenticatedCmd for CmdDiskImport { offset += CHUNK_SIZE; i += 1; + + if shutdown.shutdown_requested()? { + self.remove_failed_disk(client).await?; + bail!("user canceled request"); + } }; for tx in senders { @@ -410,9 +465,7 @@ impl crate::AuthenticatedCmd for CmdDiskImport { // some part of reading from the disk and sending to the upload // threads failed, so unwind. stop the bulk write process, finalize // the disk, then delete it. - self.unwind_disk_bulk_write_stop(client).await?; - self.unwind_disk_finalize(client).await?; - self.unwind_disk_delete(client).await?; + self.remove_failed_disk(client).await?; // return the original error return Err(e); @@ -427,9 +480,7 @@ impl crate::AuthenticatedCmd for CmdDiskImport { if results.iter().any(|x| x.is_err()) { // If any of the upload threads returned an error, unwind the disk. eprintln_nopipe!("one of the upload threads failed"); - self.unwind_disk_bulk_write_stop(client).await?; - self.unwind_disk_finalize(client).await?; - self.unwind_disk_delete(client).await?; + self.remove_failed_disk(client).await?; bail!("one of the upload threads failed"); } @@ -437,46 +488,40 @@ impl crate::AuthenticatedCmd for CmdDiskImport { pb.finish(); // Stop the bulk write process - let stop_bulk_write_response = client - .disk_bulk_write_import_stop() - .project(&self.project) - .disk(self.disk.clone()) - .send() - .await; - - if let Err(e) = stop_bulk_write_response { - eprintln_nopipe!("stopping the bulk write process failed with {:?}", e); - - // Attempt to unwind the disk, although it will probably fail - the - // first step is to stop the bulk write process! - self.unwind_disk_bulk_write_stop(client).await?; - self.unwind_disk_finalize(client).await?; - self.unwind_disk_delete(client).await?; - - return Err(e.into()); - } - - // Finalize the disk, optionally making a snapshot - let request = client - .disk_finalize_import() - .project(&self.project) - .disk(self.disk.clone()) - .body(FinalizeDisk { - snapshot_name: self.snapshot.clone(), - }); - - let finalize_response = request.send().await; - - if let Err(e) = finalize_response { - eprintln_nopipe!("finalizing the disk failed with {:?}", e); - - // Attempt to unwind the disk, although it will probably fail - the - // first step is to finalize the disk! - self.unwind_disk_finalize(client).await?; - self.unwind_disk_delete(client).await?; + shutdown + .run_with_cleanup( + client + .disk_bulk_write_import_stop() + .project(&self.project) + .disk(self.disk.clone()) + .send(), + Cleanup { + // Attempt to unwind the disk, although it will probably fail - the + // first step is to stop the bulk write process! + future: self.remove_failed_disk(client), + context: "stopping the bulk write process failed", + }, + ) + .await?; - return Err(e.into()); - } + shutdown + .run_with_cleanup( + client + .disk_finalize_import() + .project(&self.project) + .disk(self.disk.clone()) + .body(FinalizeDisk { + snapshot_name: self.snapshot.clone(), + }) + .send(), + Cleanup { + // Attempt to unwind the disk, although it will probably fail. + // first step is to finalize the disk! + future: self.remove_failed_disk(client), + context: "finalizing the disk failed", + }, + ) + .await?; // optionally, make an image out of that snapshot if let Some(image_name) = &self.image { diff --git a/cli/src/main.rs b/cli/src/main.rs index 4acbedae..e073f220 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -31,6 +31,7 @@ mod cmd_instance; mod cmd_net; mod cmd_timeseries; mod context; +mod shutdown; mod cmd_version; #[allow(unused_mut)] diff --git a/cli/src/shutdown.rs b/cli/src/shutdown.rs new file mode 100644 index 00000000..a1061c0a --- /dev/null +++ b/cli/src/shutdown.rs @@ -0,0 +1,225 @@ +use crate::eprintln_nopipe; + +use anyhow::{anyhow, Error, Result}; +use std::future::Future; +use tokio::signal; + +/// GracefulShutdown listens for CTRL+C events and excutes futures with +/// a cleanup action on failure or interrupt. A second CTRL+C will cause the +/// process to exit immediately. +#[derive(Debug)] +pub struct GracefulShutdown { + /// The channel which will provide notification that a CTRL+C signal + /// has been received. + rx: tokio::sync::watch::Receiver<()>, +} + +impl GracefulShutdown { + /// Construct a new GracefulShutdown and start listening for CTRL+C events. + pub fn new(cancel_msg: &'static str, exit_msg: &'static str) -> Self { + let (tx, rx) = tokio::sync::watch::channel(()); + + tokio::spawn(async move { + let mut force_shutdown = false; + loop { + signal::ctrl_c().await.expect("Failed to listen for CTRL+C"); + + if force_shutdown { + eprintln_nopipe!("Shutting down immediately.\n{exit_msg}."); + std::process::exit(130); + } + force_shutdown = true; + + eprintln_nopipe!("{cancel_msg}. Press CTRL+C again to exit immediately."); + tx.send(()).expect("Failed to write to shutdown channel"); + } + }); + + Self { rx } + } + + /// Execute the provided `Future` while listening for a CTRL+C event. + /// If the `Future` fails, or CTRL+C is received, run the cleanup task. + /// If cleanup is executed, then an error will be returned even if it + /// completes successfully. The `Future` will not be executed if a + /// CTRL+C event has already been received. + pub async fn run_with_cleanup( + &mut self, + future: F, + cleanup: Cleanup, + ) -> Result + where + F: Future>, + E: Into, + C: Future>, + { + let cancel_err = anyhow!("user canceled request"); + + if self.rx.has_changed()? { + return Err(cleanup.run(cancel_err).await); + } + + tokio::select! { + result = future => { + match result { + Ok(o) => Ok(o), + Err(e) => { + Err(cleanup.run(e.into()).await) + } + } + }, + _ = self.rx.changed() => { + Err(cleanup.run(cancel_err).await) + } + } + } + + /// Whether the caller should start to unwind the command being executed. + pub fn shutdown_requested(&self) -> Result { + self.rx.has_changed().map_err(|e| e.into()) + } +} + +/// A `Future` to be run on interrupt or failure. +#[derive(Debug)] +pub struct Cleanup +where + C: Future>, +{ + pub future: C, + pub context: &'static str, +} + +impl Cleanup +where + C: Future>, +{ + async fn run(self, source_err: Error) -> Error { + match self.future.await { + Ok(()) => source_err.context(self.context), + Err(e) => source_err.context(e).context(self.context), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Error; + + #[tokio::test] + async fn test_shutdown_future_ok() { + let mut gs = GracefulShutdown::new("foo", "bar"); + + let result: Result<(), Error> = gs + .run_with_cleanup::<_, (), _, Error>( + async { Ok(()) }, + Cleanup { + future: async { Err(anyhow!("ran cleanup unexpectedly")) }, + context: "bar", + }, + ) + .await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_shutdown_future_failed_cleanup_ok() { + let mut gs = GracefulShutdown::new("foo", "bar"); + + let result: Result<(), Error> = gs + .run_with_cleanup( + async { Err(anyhow!("future failed")) }, + Cleanup { + future: async { Ok(()) }, + context: "failed to frobnicate", + }, + ) + .await; + + let err = match result { + Ok(_) => panic!("unexpected OK"), + Err(e) => format!("{e:#}"), + }; + assert_eq!("failed to frobnicate: future failed", err); + } + + #[tokio::test] + async fn test_shutdown_future_failed_cleanup_failed() { + let mut gs = GracefulShutdown::new("foo", "bar"); + + let result: Result<(), Error> = gs + .run_with_cleanup( + async { Err(anyhow!("future failed")) }, + Cleanup { + future: async { Err(anyhow!("cleanup failed")) }, + context: "failed to frobnicate", + }, + ) + .await; + + let err = match result { + Ok(_) => panic!("unexpected OK"), + Err(e) => format!("{e:#}"), + }; + assert_eq!("failed to frobnicate: cleanup failed: future failed", err); + } + + #[tokio::test] + async fn test_shutdown_cancel_cleanup_ok() { + let mut gs = GracefulShutdown::new("foo", "bar"); + gs.rx.mark_changed(); + + let result: Result<(), Error> = gs + .run_with_cleanup::<_, (), _, Error>( + async { Ok(()) }, + Cleanup { + future: async { Ok(()) }, + context: "failed to frobnicate", + }, + ) + .await; + + let err = match result { + Ok(_) => panic!("unexpected OK"), + Err(e) => format!("{e:#}"), + }; + assert_eq!("failed to frobnicate: user canceled request", err); + } + + #[tokio::test] + async fn test_shutdown_cancel_cleanup_failed() { + let mut gs = GracefulShutdown::new("foo", "bar"); + gs.rx.mark_changed(); + + let result: Result<(), Error> = gs + .run_with_cleanup::<_, (), _, Error>( + async { Ok(()) }, + Cleanup { + future: async { Err(anyhow!("cleanup failed")) }, + context: "failed to frobnicate", + }, + ) + .await; + + let err = match result { + Ok(_) => panic!("unexpected OK"), + Err(e) => format!("{e:#}"), + }; + assert_eq!( + "failed to frobnicate: cleanup failed: user canceled request", + err + ); + } + + #[tokio::test] + async fn test_shutdown_requested() { + let mut gs = GracefulShutdown::new("foo", "bar"); + + assert!(!gs.shutdown_requested().unwrap()); + + gs.rx.mark_changed(); + + assert!(gs.shutdown_requested().unwrap()); + } +} diff --git a/cli/tests/test_disk_import.rs b/cli/tests/test_disk_import.rs index 01df7dd3..a01b5c54 100644 --- a/cli/tests/test_disk_import.rs +++ b/cli/tests/test_disk_import.rs @@ -423,169 +423,6 @@ fn test_disk_import_image_exists_already() { image_view_mock.assert(); } -// A disk import where the bulk_import_start fails -#[test] -fn test_disk_import_bulk_import_start_fail() { - let mut src = rand::rngs::SmallRng::seed_from_u64(425); - let server = MockServer::start(); - - let disk_view_mock = server.disk_view(|when, then| { - when.into_inner().any_request(); - then.client_error( - 404, - &oxide::types::Error { - error_code: None, - message: "disk not found".into(), - request_id: Uuid::mock_value(&mut src).unwrap().to_string(), - }, - ); - }); - - let disk_create_mock = server.disk_create(|when, then| { - when.into_inner().any_request(); - then.created(&Disk { - name: "test-disk-import-bulk-import-start-fail".parse().unwrap(), - ..Disk::mock_value(&mut src).unwrap() - }); - }); - - let start_bulk_write_mock = server.disk_bulk_write_import_start(|when, then| { - when.into_inner().any_request(); - then.server_error( - 503, - &oxide::types::Error { - error_code: None, - message: "I can't do that Dave".into(), - request_id: Uuid::mock_value(&mut src).unwrap().to_string(), - }, - ); - }); - - let unwind_finalize_mock = server.disk_finalize_import(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let unwind_disk_delete_mock = server.disk_delete(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let test_file = Testfile::new_random(CHUNK_SIZE * 2).unwrap(); - - Command::cargo_bin("oxide") - .unwrap() - .env("RUST_BACKTRACE", "1") - .env("OXIDE_HOST", server.url("")) - .env("OXIDE_TOKEN", "test_disk_import_bulk_import_start_fail") - .arg("disk") - .arg("import") - .arg("--project") - .arg("myproj") - .arg("--description") - .arg("disk description") - .arg("--path") - .arg(test_file.path()) - .arg("--disk") - .arg("test-disk-import-bulk-import-start-fail") - .assert() - .failure(); - - disk_view_mock.assert(); - disk_create_mock.assert(); - start_bulk_write_mock.assert(); - unwind_finalize_mock.assert(); - unwind_disk_delete_mock.assert(); -} - -// A disk import where the bulk_write_import fails -#[test] -fn test_disk_import_bulk_write_import_fail() { - let mut src = rand::rngs::SmallRng::seed_from_u64(426); - let server = MockServer::start(); - - let disk_view_mock = server.disk_view(|when, then| { - when.into_inner().any_request(); - then.client_error( - 404, - &oxide::types::Error { - error_code: None, - message: "disk not found".into(), - request_id: Uuid::mock_value(&mut src).unwrap().to_string(), - }, - ); - }); - - let disk_create_mock = server.disk_create(|when, then| { - when.into_inner().any_request(); - then.created(&Disk { - name: "test-disk-import-bulk-write-import-fail".parse().unwrap(), - ..Disk::mock_value(&mut src).unwrap() - }); - }); - - let start_bulk_write_mock = server.disk_bulk_write_import_start(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let disk_bulk_write_mock = server.disk_bulk_write_import(|when, then| { - when.into_inner().any_request(); - then.server_error( - 503, - &oxide::types::Error { - error_code: None, - message: "I can't do that Dave".into(), - request_id: Uuid::mock_value(&mut src).unwrap().to_string(), - }, - ); - }); - - let unwind_stop_bulk_write_mock = server.disk_bulk_write_import_stop(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let unwind_finalize_mock = server.disk_finalize_import(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let unwind_disk_delete_mock = server.disk_delete(|when, then| { - when.into_inner().any_request(); - then.no_content(); - }); - - let test_file = Testfile::new_random(CHUNK_SIZE * 2).unwrap(); - - Command::cargo_bin("oxide") - .unwrap() - .env("RUST_BACKTRACE", "1") - .env("OXIDE_HOST", server.url("")) - .env("OXIDE_TOKEN", "test_disk_import_bulk_write_import_fail") - .arg("disk") - .arg("import") - .arg("--project") - .arg("myproj") - .arg("--description") - .arg("disk description") - .arg("--path") - .arg(test_file.path()) - .arg("--disk") - .arg("test-disk-import-bulk-write-import-fail") - .assert() - .failure(); - - disk_view_mock.assert(); - disk_create_mock.assert(); - start_bulk_write_mock.assert(); - // there are 8 upload tasks, and both will receive a 500 on their POSTs - disk_bulk_write_mock.assert_hits(2); - unwind_stop_bulk_write_mock.assert(); - unwind_finalize_mock.assert(); - unwind_disk_delete_mock.assert(); -} - // A disk import where the requested block size is invalid #[test] fn test_disk_import_bad_block_size() {