Skip to content

Commit

Permalink
Cleanup partially imported disks on CTRL+C
Browse files Browse the repository at this point in the history
If a `disk import` request is interrupted then the disk is likely to be
left in a `ImportReady` or `ImportingFromBulkWrites` state, which
prevents the user from deleting it. The commonly occurs when users
interrupt the request using ^C, and is a frustrating experience.

Add a new `GracefulShutdown` struct to listen for SIGINTs. This will
cancel the executing `Future` on interrupt and run a cleanup task which
will return an error, even if cleanup succeeds. A second SIGINT causes
the process to exit immediately.

Hook calls make in the critical section of the import into the new
system to ensure we delete the partially imported disk.

Closes #651
  • Loading branch information
wfchandler committed Jul 23, 2024
1 parent baea17e commit 4891399
Show file tree
Hide file tree
Showing 3 changed files with 350 additions and 79 deletions.
203 changes: 124 additions & 79 deletions cli/src/cmd_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

// Copyright 2024 Oxide Computer Company

use crate::shutdown::{Cleanup, GracefulShutdown};
use crate::{eprintln_nopipe, println_nopipe};

use anyhow::bail;
use anyhow::Result;
use anyhow::{bail, Result};
use async_trait::async_trait;
use base64::Engine;
use clap::Parser;
Expand All @@ -16,6 +16,7 @@ use oxide::types::BlockSize;
use oxide::types::ByteCount;
use oxide::types::DiskCreate;
use oxide::types::DiskSource;
use oxide::types::DiskState;
use oxide::types::FinalizeDisk;
use oxide::types::ImageCreate;
use oxide::types::ImageSource;
Expand All @@ -33,6 +34,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{self, Duration};

/// Create a disk from a file upload
///
Expand Down Expand Up @@ -210,6 +212,50 @@ impl CmdDiskImport {

Ok(())
}

async fn get_disk_state(&self, client: &Client) -> Result<DiskState> {
let response = client
.disk_view()
.project(&self.project)
.disk(NameOrId::Name(self.disk.clone()))
.send()
.await?;

Ok(response.into_inner().state)
}

// A shutdown may race with the newly created disk transitioning from Created
// to ImportReady state. Wait until the disk has moved past Created before
// attempting tear it down.
async fn wait_for_disk(&self, client: &Client) -> Result<DiskState> {
const RETRY_CT: usize = 10;
const RETRY_DELAY: Duration = Duration::from_millis(500);

let mut disk_state = self.get_disk_state(client).await?;

for _ in 0..RETRY_CT {
if !matches!(disk_state, DiskState::Creating) {
return Ok(disk_state);
}

time::sleep(RETRY_DELAY).await;
disk_state = self.get_disk_state(client).await?;
}

bail!("disk remained in Creating state for more than 5 seconds")
}

async fn remove_failed_disk(&self, client: &Client) -> Result<()> {
let disk_state = self.wait_for_disk(client).await?;

// TODO hook this into the progress bar.
if matches!(disk_state, DiskState::ImportingFromBulkWrites) {
self.unwind_disk_bulk_write_stop(client).await?;
}

self.unwind_disk_finalize(client).await?;
self.unwind_disk_delete(client).await
}
}

#[async_trait]
Expand Down Expand Up @@ -280,43 +326,47 @@ impl crate::AuthenticatedCmd for CmdDiskImport {
// to the above check.
const UPLOAD_TASKS: usize = 8;

let mut shutdown = GracefulShutdown::new(
"Cleaning up partially imported disk",
"See https://docs.oxide.computer/guides/troubleshooting#_cant_delete_disk_after_canceled_image_import for instructions on removing the disk",
);

// Create the disk in state "importing blocks"
client
.disk_create()
.project(&self.project)
.body(DiskCreate {
name: self.disk.clone(),
description: self.description.clone(),
disk_source: DiskSource::ImportingBlocks {
block_size: disk_block_size.clone(),
shutdown
.run_with_cleanup(
client
.disk_create()
.project(&self.project)
.body(DiskCreate {
name: self.disk.clone(),
description: self.description.clone(),
disk_source: DiskSource::ImportingBlocks {
block_size: disk_block_size.clone(),
},
size: disk_size.into(),
})
.send(),
Cleanup {
future: self.remove_failed_disk(client),
context: "creating new disk failed",
},
size: disk_size.into(),
})
.send()
)
.await?;

// Start the bulk write process
let start_bulk_write_response = client
.disk_bulk_write_import_start()
.project(&self.project)
.disk(self.disk.clone())
.send()
.await;

if let Err(e) = start_bulk_write_response {
eprintln_nopipe!("starting the bulk write process failed with {:?}", e);

// If this fails, the disk is in state import-ready. Finalize it so
// it can be deleted.
self.unwind_disk_finalize(client).await?;

// The finalize succeeded, so delete the disk.
self.unwind_disk_delete(client).await?;

// Finalizing and deleting the disk succeeded, so return the
// original error.
return Err(e.into());
}
shutdown
.run_with_cleanup(
client
.disk_bulk_write_import_start()
.project(&self.project)
.disk(self.disk.clone())
.send(),
Cleanup {
future: self.remove_failed_disk(client),
context: "starting the bulk write process failed",
},
)
.await?;

// Create one tokio task for each thread that will upload file chunks
let mut handles: Vec<tokio::task::JoinHandle<Result<()>>> =
Expand Down Expand Up @@ -400,6 +450,11 @@ impl crate::AuthenticatedCmd for CmdDiskImport {

offset += CHUNK_SIZE;
i += 1;

if shutdown.shutdown_requested()? {
self.remove_failed_disk(client).await?;
bail!("user canceled request");
}
};

for tx in senders {
Expand All @@ -410,9 +465,7 @@ impl crate::AuthenticatedCmd for CmdDiskImport {
// some part of reading from the disk and sending to the upload
// threads failed, so unwind. stop the bulk write process, finalize
// the disk, then delete it.
self.unwind_disk_bulk_write_stop(client).await?;
self.unwind_disk_finalize(client).await?;
self.unwind_disk_delete(client).await?;
self.remove_failed_disk(client).await?;

// return the original error
return Err(e);
Expand All @@ -427,56 +480,48 @@ impl crate::AuthenticatedCmd for CmdDiskImport {
if results.iter().any(|x| x.is_err()) {
// If any of the upload threads returned an error, unwind the disk.
eprintln_nopipe!("one of the upload threads failed");
self.unwind_disk_bulk_write_stop(client).await?;
self.unwind_disk_finalize(client).await?;
self.unwind_disk_delete(client).await?;
self.remove_failed_disk(client).await?;
bail!("one of the upload threads failed");
}

// wait for upload threads to complete, then finish the progress bar
pb.finish();

// Stop the bulk write process
let stop_bulk_write_response = client
.disk_bulk_write_import_stop()
.project(&self.project)
.disk(self.disk.clone())
.send()
.await;

if let Err(e) = stop_bulk_write_response {
eprintln_nopipe!("stopping the bulk write process failed with {:?}", e);

// Attempt to unwind the disk, although it will probably fail - the
// first step is to stop the bulk write process!
self.unwind_disk_bulk_write_stop(client).await?;
self.unwind_disk_finalize(client).await?;
self.unwind_disk_delete(client).await?;

return Err(e.into());
}

// Finalize the disk, optionally making a snapshot
let request = client
.disk_finalize_import()
.project(&self.project)
.disk(self.disk.clone())
.body(FinalizeDisk {
snapshot_name: self.snapshot.clone(),
});

let finalize_response = request.send().await;

if let Err(e) = finalize_response {
eprintln_nopipe!("finalizing the disk failed with {:?}", e);

// Attempt to unwind the disk, although it will probably fail - the
// first step is to finalize the disk!
self.unwind_disk_finalize(client).await?;
self.unwind_disk_delete(client).await?;
shutdown
.run_with_cleanup(
client
.disk_bulk_write_import_stop()
.project(&self.project)
.disk(self.disk.clone())
.send(),
Cleanup {
// Attempt to unwind the disk, although it will probably fail - the
// first step is to stop the bulk write process!
future: self.remove_failed_disk(client),
context: "stopping the bulk write process failed",
},
)
.await?;

return Err(e.into());
}
shutdown
.run_with_cleanup(
client
.disk_finalize_import()
.project(&self.project)
.disk(self.disk.clone())
.body(FinalizeDisk {
snapshot_name: self.snapshot.clone(),
})
.send(),
Cleanup {
// Attempt to unwind the disk, although it will probably fail.
// first step is to finalize the disk!
future: self.remove_failed_disk(client),
context: "finalizing the disk failed",
},
)
.await?;

// optionally, make an image out of that snapshot
if let Some(image_name) = &self.image {
Expand Down
1 change: 1 addition & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod cmd_instance;
mod cmd_net;
mod cmd_timeseries;
mod context;
mod shutdown;

mod cmd_version;
#[allow(unused_mut)]
Expand Down
Loading

0 comments on commit 4891399

Please sign in to comment.