Skip to content

Commit

Permalink
[nexus] Sled Expungement causes PhysicalDisk expungement (#5371)
Browse files Browse the repository at this point in the history
Fixes #5369

This updates the "expunge sled" endpoint which is part of the internal
API, but the same implementation should still apply when the endpoint is
exposed to operators.
  • Loading branch information
smklein authored Apr 1, 2024
1 parent afb2e9a commit ac11458
Showing 1 changed file with 220 additions and 60 deletions.
280 changes: 220 additions & 60 deletions nexus/db-queries/src/db/datastore/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ impl DataStore {
/// sufficient warning to the operator.
///
/// This is idempotent, and it returns the old policy of the sled.
///
/// Calling this function also implicitly marks the disks attached to a sled
/// as "expunged".
pub async fn sled_set_policy_to_expunged(
&self,
opctx: &OpContext,
Expand All @@ -348,73 +351,127 @@ impl DataStore {
&self,
opctx: &OpContext,
authz_sled: &authz::Sled,
new_policy: SledPolicy,
new_sled_policy: SledPolicy,
check: ValidateTransition,
) -> Result<SledPolicy, TransitionError> {
use db::schema::sled::dsl;

opctx.authorize(authz::Action::Modify, authz_sled).await?;

let sled_id = authz_sled.id();
let query = diesel::update(dsl::sled)
.filter(dsl::time_deleted.is_null())
.filter(dsl::id.eq(sled_id));

let t = SledTransition::Policy(new_policy);
let valid_old_policies = t.valid_old_policies();
let valid_old_states = t.valid_old_states();

let query = match check {
ValidateTransition::Yes => query
.filter(dsl::sled_policy.eq_any(
valid_old_policies.into_iter().map(to_db_sled_policy),
))
.filter(
dsl::sled_state.eq_any(valid_old_states.iter().copied()),
)
.into_boxed(),
#[cfg(test)]
ValidateTransition::No => query.into_boxed(),
};
let err = OptionalError::new();
let conn = self.pool_connection_authorized(opctx).await?;
let policy = self
.transaction_retry_wrapper("sled_set_policy")
.transaction(&conn, |conn| {
let err = err.clone();

let query = query
.set((
dsl::sled_policy.eq(to_db_sled_policy(new_policy)),
dsl::time_modified.eq(Utc::now()),
))
.check_if_exists::<Sled>(sled_id);
async move {
let t = SledTransition::Policy(new_sled_policy);
let valid_old_policies = t.valid_old_policies();
let valid_old_states = t.valid_old_states();

use db::schema::sled::dsl;
let query = diesel::update(dsl::sled)
.filter(dsl::time_deleted.is_null())
.filter(dsl::id.eq(sled_id));

let query = match check {
ValidateTransition::Yes => query
.filter(
dsl::sled_policy.eq_any(
valid_old_policies
.into_iter()
.map(to_db_sled_policy),
),
)
.filter(
dsl::sled_state
.eq_any(valid_old_states.iter().copied()),
)
.into_boxed(),
#[cfg(test)]
ValidateTransition::No => query.into_boxed(),
};

let query = query
.set((
dsl::sled_policy
.eq(to_db_sled_policy(new_sled_policy)),
dsl::time_modified.eq(Utc::now()),
))
.check_if_exists::<Sled>(sled_id);

let result = query.execute_and_check(&conn).await?;

let old_policy = match (check, result.status) {
(ValidateTransition::Yes, UpdateStatus::Updated) => {
result.found.policy()
}
(
ValidateTransition::Yes,
UpdateStatus::NotUpdatedButExists,
) => {
// Two reasons this can happen:
// 1. An idempotent update: this is treated as a success.
// 2. Invalid state transition: a failure.
//
// To differentiate between the two, check that the new policy
// is the same as the old policy, and that the old state is
// valid.
if result.found.policy() == new_sled_policy
&& valid_old_states
.contains(&result.found.state())
{
result.found.policy()
} else {
return Err(err.bail(
TransitionError::InvalidTransition {
current: result.found,
transition: SledTransition::Policy(
new_sled_policy,
),
},
));
}
}
#[cfg(test)]
(ValidateTransition::No, _) => result.found.policy(),
};

// When a sled is expunged, the associated disks with that
// sled should also be implicitly set to expunged.
let new_disk_policy = match new_sled_policy {
SledPolicy::InService { .. } => None,
SledPolicy::Expunged => {
Some(nexus_db_model::PhysicalDiskPolicy::Expunged)
}
};
if let Some(new_disk_policy) = new_disk_policy {
use db::schema::physical_disk::dsl as physical_disk_dsl;
diesel::update(physical_disk_dsl::physical_disk)
.filter(physical_disk_dsl::time_deleted.is_null())
.filter(physical_disk_dsl::sled_id.eq(sled_id))
.set(
physical_disk_dsl::disk_policy
.eq(new_disk_policy),
)
.execute_async(&conn)
.await?;
}

let result = query
.execute_and_check(&*self.pool_connection_authorized(opctx).await?)
Ok(old_policy)
}
})
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;

match (check, result.status) {
(ValidateTransition::Yes, UpdateStatus::Updated) => {
Ok(result.found.policy())
}
(ValidateTransition::Yes, UpdateStatus::NotUpdatedButExists) => {
// Two reasons this can happen:
// 1. An idempotent update: this is treated as a success.
// 2. Invalid state transition: a failure.
//
// To differentiate between the two, check that the new policy
// is the same as the old policy, and that the old state is
// valid.
if result.found.policy() == new_policy
&& valid_old_states.contains(&result.found.state())
{
Ok(result.found.policy())
} else {
Err(TransitionError::InvalidTransition {
current: result.found,
transition: SledTransition::Policy(new_policy),
})
.map_err(|e| {
if let Some(err) = err.take() {
return err;
}
}
#[cfg(test)]
(ValidateTransition::No, _) => Ok(result.found.policy()),
}
TransitionError::from(public_error_from_diesel(
e,
ErrorHandler::Server,
))
})?;
Ok(policy)
}

/// Marks the state of the sled as decommissioned, as believed by Nexus.
Expand Down Expand Up @@ -675,6 +732,9 @@ mod test {
use anyhow::{Context, Result};
use itertools::Itertools;
use nexus_db_model::Generation;
use nexus_db_model::PhysicalDisk;
use nexus_db_model::PhysicalDiskKind;
use nexus_db_model::PhysicalDiskPolicy;
use nexus_test_utils::db::test_setup_database;
use nexus_types::identity::Asset;
use omicron_common::api::external;
Expand Down Expand Up @@ -967,6 +1027,107 @@ mod test {
logctx.cleanup_successful();
}

async fn lookup_physical_disk(
datastore: &DataStore,
id: Uuid,
) -> PhysicalDisk {
use db::schema::physical_disk::dsl;
dsl::physical_disk
.filter(dsl::id.eq(id))
.filter(dsl::time_deleted.is_null())
.select(PhysicalDisk::as_select())
.get_result_async(
&*datastore
.pool_connection_for_tests()
.await
.expect("No connection"),
)
.await
.expect("Failed to lookup physical disk")
}

#[tokio::test]
async fn test_sled_expungement_also_expunges_disks() {
let logctx =
dev::test_setup_log("test_sled_expungement_also_expunges_disks");
let mut db = test_setup_database(&logctx.log).await;

let (opctx, datastore) = datastore_test(&logctx, &db).await;

// Set up a sled to test against.
let sled = datastore.sled_upsert(test_new_sled_update()).await.unwrap();
let sled_id = sled.id();

// Add a couple disks to this sled.
//
// (Note: This isn't really enough DB fakery to actually provision e.g.
// Crucible regions, but it creates enough of a control plane object to
// be associated with the Sled by UUID)
let disk1 = PhysicalDisk::new(
Uuid::new_v4(),
"vendor1".to_string(),
"serial1".to_string(),
"model1".to_string(),
PhysicalDiskKind::U2,
sled_id,
);
let disk2 = PhysicalDisk::new(
Uuid::new_v4(),
"vendor2".to_string(),
"serial2".to_string(),
"model2".to_string(),
PhysicalDiskKind::U2,
sled_id,
);

datastore
.physical_disk_upsert(&opctx, disk1.clone())
.await
.expect("Failed to upsert physical disk");
datastore
.physical_disk_upsert(&opctx, disk2.clone())
.await
.expect("Failed to upsert physical disk");

// Confirm the disks are "in-service".
//
// We verify this state because it should be changing below.
assert_eq!(
PhysicalDiskPolicy::InService,
lookup_physical_disk(&datastore, disk1.id()).await.disk_policy
);
assert_eq!(
PhysicalDiskPolicy::InService,
lookup_physical_disk(&datastore, disk2.id()).await.disk_policy
);

// Expunge the sled. As a part of this process, the query should UPDATE
// the physical_disk table.
sled_set_policy(
&opctx,
&datastore,
sled_id,
SledPolicy::Expunged,
ValidateTransition::Yes,
Expected::Ok(SledPolicy::provisionable()),
)
.await
.expect("Could not expunge sled");

// Observe that the disk state is now expunged
assert_eq!(
PhysicalDiskPolicy::Expunged,
lookup_physical_disk(&datastore, disk1.id()).await.disk_policy
);
assert_eq!(
PhysicalDiskPolicy::Expunged,
lookup_physical_disk(&datastore, disk2.id()).await.disk_policy
);

db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_sled_transitions() {
// Test valid and invalid state and policy transitions.
Expand Down Expand Up @@ -1199,8 +1360,7 @@ mod test {
/// Tests listing large numbers of sleds via the batched interface
#[tokio::test]
async fn sled_list_batch() {
let logctx =
dev::test_setup_log("sled_reservation_create_non_provisionable");
let logctx = dev::test_setup_log("sled_list_batch");
let mut db = test_setup_database(&logctx.log).await;
let (opctx, datastore) = datastore_test(&logctx, &db).await;

Expand Down

0 comments on commit ac11458

Please sign in to comment.