From ba36c797d1779358caa87d2b55cd514399cbd46d Mon Sep 17 00:00:00 2001 From: iliana etaoin Date: Mon, 18 Nov 2024 21:08:19 +0000 Subject: [PATCH] background task --- dev-tools/omdb/src/bin/omdb/nexus.rs | 28 ++ dev-tools/omdb/tests/env.out | 12 + dev-tools/omdb/tests/successes.out | 26 + nexus-config/src/nexus_config.rs | 16 + nexus/db-model/src/schema_versions.rs | 3 +- nexus/db-queries/src/db/datastore/update.rs | 22 +- nexus/examples/config-second.toml | 1 + nexus/examples/config.toml | 1 + nexus/src/app/background/init.rs | 25 +- nexus/src/app/background/tasks/mod.rs | 1 + .../tasks/tuf_artifact_replication.rs | 447 ++++++++++++++++++ nexus/src/app/mod.rs | 15 + nexus/src/app/update/mod.rs | 19 +- nexus/tests/config.test.toml | 1 + nexus/types/src/internal_api/background.rs | 16 + schema/crdb/dbinit.sql | 9 +- schema/crdb/tuf-artifact-replication/up.sql | 1 + smf/nexus/multi-sled/config-partial.toml | 1 + smf/nexus/single-sled/config-partial.toml | 1 + .../src/artifacts/extracted_artifacts.rs | 24 +- 20 files changed, 652 insertions(+), 17 deletions(-) create mode 100644 nexus/src/app/background/tasks/tuf_artifact_replication.rs create mode 100644 schema/crdb/tuf-artifact-replication/up.sql diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index cb05bb575b..b9196188d7 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -51,6 +51,7 @@ use nexus_types::internal_api::background::RegionSnapshotReplacementFinishStatus use nexus_types::internal_api::background::RegionSnapshotReplacementGarbageCollectStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementStartStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementStepStatus; +use nexus_types::internal_api::background::TufArtifactReplicationStatus; use nexus_types::inventory::BaseboardId; use omicron_uuid_kinds::CollectionUuid; use omicron_uuid_kinds::DemoSagaUuid; @@ -1928,6 +1929,33 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { } } }; + } else if name == "tuf_artifact_replication" { + match serde_json::from_value::( + details.clone(), + ) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + Ok(status) => { + const ROWS: &[&str] = &[ + "requests ok:", + "requests errored:", + "requests outstanding:", + "local repos:", + ]; + const WIDTH: usize = const_max_len(ROWS); + println!(" last execution:"); + for (label, value) in ROWS.iter().zip([ + status.requests_ok, + status.requests_err, + status.requests_outstanding, + status.local_repos, + ]) { + println!(" {label:3}"); + } + } + } } else { println!( "warning: unknown background task: {:?} \ diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index f6b56e2b7b..86f4d0ee09 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -166,6 +166,10 @@ task: "switch_port_config_manager" manages switch port settings for rack switches +task: "tuf_artifact_replication" + replicate update repo artifacts across sleds + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -337,6 +341,10 @@ task: "switch_port_config_manager" manages switch port settings for rack switches +task: "tuf_artifact_replication" + replicate update repo artifacts across sleds + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -495,6 +503,10 @@ task: "switch_port_config_manager" manages switch port settings for rack switches +task: "tuf_artifact_replication" + replicate update repo artifacts across sleds + + task: "v2p_manager" manages opte v2p mappings for vpc networking diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 6974c0b36b..9f0e7775ff 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -384,6 +384,10 @@ task: "switch_port_config_manager" manages switch port settings for rack switches +task: "tuf_artifact_replication" + replicate update repo artifacts across sleds + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -698,6 +702,17 @@ task: "switch_port_config_manager" started at (s ago) and ran for ms warning: unknown background task: "switch_port_config_manager" (don't know how to interpret details: Object {}) +task: "tuf_artifact_replication" + configured period: every m + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms + last execution: + requests ok: 0 + requests errored: 0 + requests outstanding: 0 + local repos: 0 + task: "v2p_manager" configured period: every s currently executing: no @@ -1141,6 +1156,17 @@ task: "switch_port_config_manager" started at (s ago) and ran for ms warning: unknown background task: "switch_port_config_manager" (don't know how to interpret details: Object {}) +task: "tuf_artifact_replication" + configured period: every m + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms + last execution: + requests ok: 0 + requests errored: 0 + requests outstanding: 0 + local repos: 0 + task: "v2p_manager" configured period: every s currently executing: no diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index 82362f2f0d..8ac1ead40b 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -418,6 +418,8 @@ pub struct BackgroundTaskConfig { /// configuration for region snapshot replacement finisher task pub region_snapshot_replacement_finish: RegionSnapshotReplacementFinishConfig, + /// configuration for TUF artifact replication task + pub tuf_artifact_replication: TufArtifactReplicationConfig, } #[serde_as] @@ -703,6 +705,14 @@ pub struct RegionSnapshotReplacementFinishConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct TufArtifactReplicationConfig { + /// period (in seconds) for periodic activations of this background task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + /// Configuration for a nexus server #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct PackageConfig { @@ -958,6 +968,7 @@ mod test { region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 + tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1156,6 +1167,10 @@ mod test { RegionSnapshotReplacementFinishConfig { period_secs: Duration::from_secs(30), }, + tuf_artifact_replication: + TufArtifactReplicationConfig { + period_secs: Duration::from_secs(60) + }, }, default_region_allocation_strategy: crate::nexus_config::RegionAllocationStrategy::Random { @@ -1237,6 +1252,7 @@ mod test { region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 + tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] type = "random" "##, diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 70450a7776..f37aed1d7a 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(114, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(115, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(115, "tuf-artifact-replication"), KnownVersion::new(114, "crucible-ref-count-records"), KnownVersion::new(113, "add-tx-eq"), KnownVersion::new(112, "blueprint-dataset"), diff --git a/nexus/db-queries/src/db/datastore/update.rs b/nexus/db-queries/src/db/datastore/update.rs index 37339beb62..59d4108133 100644 --- a/nexus/db-queries/src/db/datastore/update.rs +++ b/nexus/db-queries/src/db/datastore/update.rs @@ -12,14 +12,15 @@ use crate::context::OpContext; use crate::db; use crate::db::error::{public_error_from_diesel, ErrorHandler}; use crate::db::model::SemverVersion; +use crate::db::pagination::paginated; use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use diesel::result::Error as DieselError; use nexus_db_model::{ArtifactHash, TufArtifact, TufRepo, TufRepoDescription}; use omicron_common::api::external::{ - self, CreateResult, LookupResult, LookupType, ResourceType, - TufRepoInsertStatus, + self, CreateResult, DataPageParams, ListResultVec, LookupResult, + LookupType, ResourceType, TufRepoInsertStatus, }; use omicron_uuid_kinds::TufRepoKind; use omicron_uuid_kinds::TypedUuid; @@ -147,6 +148,23 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; Ok(TufRepoDescription { repo, artifacts }) } + + /// Returns the list of all TUF repo artifacts known to the system. + pub async fn update_tuf_artifact_list( + &self, + opctx: &OpContext, + pagparams: &DataPageParams<'_, ArtifactHash>, + ) -> ListResultVec { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + + use db::schema::tuf_artifact::dsl; + + paginated(dsl::tuf_artifact, dsl::sha256, pagparams) + .select(TufArtifact::as_select()) + .load_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } // This is a separate method mostly to make rustfmt not bail out on long lines diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index a955766554..e1b14d346f 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -145,6 +145,7 @@ region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index ce3dfe5751..dc437e49ff 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -131,6 +131,7 @@ region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index ad39777054..a220ab8bae 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -117,6 +117,7 @@ use super::tasks::saga_recovery; use super::tasks::service_firewall_rules; use super::tasks::sync_service_zone_nat::ServiceZoneNatTracker; use super::tasks::sync_switch_configuration::SwitchPortSettingsManager; +use super::tasks::tuf_artifact_replication; use super::tasks::v2p_mappings::V2PManager; use super::tasks::vpc_routes; use super::Activator; @@ -133,7 +134,9 @@ use omicron_uuid_kinds::OmicronZoneUuid; use oximeter::types::ProducerRegistry; use std::collections::BTreeMap; use std::sync::Arc; +use tokio::sync::mpsc; use tokio::sync::watch; +use update_common::artifacts::ArtifactsWithPlan; use uuid::Uuid; /// Interface for activating various background tasks and read data that they @@ -172,6 +175,7 @@ pub struct BackgroundTasks { pub task_region_snapshot_replacement_garbage_collection: Activator, pub task_region_snapshot_replacement_step: Activator, pub task_region_snapshot_replacement_finish: Activator, + pub task_tuf_artifact_replication: Activator, // Handles to activate background tasks that do not get used by Nexus // at-large. These background tasks are implementation details as far as @@ -259,6 +263,7 @@ impl BackgroundTasksInitializer { ), task_region_snapshot_replacement_step: Activator::new(), task_region_snapshot_replacement_finish: Activator::new(), + task_tuf_artifact_replication: Activator::new(), task_internal_dns_propagation: Activator::new(), task_external_dns_propagation: Activator::new(), @@ -325,6 +330,7 @@ impl BackgroundTasksInitializer { task_region_snapshot_replacement_garbage_collection, task_region_snapshot_replacement_step, task_region_snapshot_replacement_finish, + task_tuf_artifact_replication, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -825,13 +831,28 @@ impl BackgroundTasksInitializer { done", period: config.region_snapshot_replacement_finish.period_secs, task_impl: Box::new(RegionSnapshotReplacementFinishDetector::new( - datastore, + datastore.clone(), )), opctx: opctx.child(BTreeMap::new()), watchers: vec![], activator: task_region_snapshot_replacement_finish, }); + driver.register(TaskDefinition { + name: "tuf_artifact_replication", + description: "replicate update repo artifacts across sleds", + period: config.tuf_artifact_replication.period_secs, + task_impl: Box::new( + tuf_artifact_replication::ArtifactReplication::new( + datastore.clone(), + args.tuf_artifact_replication_rx, + ), + ), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_tuf_artifact_replication, + }); + driver } } @@ -856,6 +877,8 @@ pub struct BackgroundTasksData { pub producer_registry: ProducerRegistry, /// Helpers for saga recovery pub saga_recovery: saga_recovery::SagaRecoveryHelpers>, + /// Channel for TUF repository artifacts to be replicated out to sleds + pub tuf_artifact_replication_rx: mpsc::Receiver, } /// Starts the three DNS-propagation-related background tasks for either diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index e4bbbfe6d0..fa9d972aa8 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -34,5 +34,6 @@ pub mod saga_recovery; pub mod service_firewall_rules; pub mod sync_service_zone_nat; pub mod sync_switch_configuration; +pub mod tuf_artifact_replication; pub mod v2p_mappings; pub mod vpc_routes; diff --git a/nexus/src/app/background/tasks/tuf_artifact_replication.rs b/nexus/src/app/background/tasks/tuf_artifact_replication.rs new file mode 100644 index 0000000000..30b7e5be35 --- /dev/null +++ b/nexus/src/app/background/tasks/tuf_artifact_replication.rs @@ -0,0 +1,447 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! TUF Repo Depot: Artifact replication across sleds (RFD 424) +//! +//! `Nexus::updates_put_repository` accepts a TUF repository, which Nexus +//! unpacks, verifies, and reasons about the artifacts in. This uses temporary +//! storage within the Nexus zone, so the update artifacts have to go somewhere. +//! We've settled for now on "everywhere": a copy of each artifact is stored on +//! each sled's M.2 devices. +//! +//! This background task is responsible for getting locally-stored artifacts +//! onto sleds, and ensuring all sleds have copies of all artifacts. +//! `Nexus::updates_put_repository` sends the [`ArtifactsWithPlan`] object to +//! this task via an [`mpsc`] channel and activates it. +//! +//! During each activation: +//! +//! 1. The task moves `ArtifactsWithPlan` objects of the `mpsc` channel and into +//! a `Vec`. +//! 2. The task queries the list of artifacts stored on each sled, and compares +//! it to the list of artifacts in CockroachDB. Sled artifact storage is +//! content-addressed by SHA-256 checksum. Errors are logged but otherwise +//! ignored (unless no sleds respond); the task proceeds as if that sled +//! has no artifacts. (This means that the task will always be trying to +//! replicate artifacts to that sled until it comes back or is pulled out +//! of service.) +//! 3. The task compares the list of artifacts from the database to the list of +//! artifacts available locally. +//! 4. If all the artifacts belonging to an `ArtifactsWithPlan` object have +//! been replicated to at least `MIN_SLED_REPLICATION` sleds, the task drops +//! the object from its `Vec` (thus cleaning up the local storage of those +//! files). +//! 5. The task generates a list of requests that need to be sent: +//! - PUT each locally-stored artifact not present on any sleds to a random +//! sled. +//! - For each partially-replicated artifact, choose a sled that is missing +//! the artifact, and tell it (via `artifact_copy_from_depot`) to fetch the +//! artifact from a random sled that has it. +//! - DELETE all artifacts no longer tracked in CockroachDB from all sleds +//! that have that artifact. +//! 6. The task randomly choose requests up to per-activation limits and +//! sends them. Up to `MAX_REQUESTS` total requests are sent, with up to +//! `MAX_PUT_REQUESTS` PUT requests. Successful and unsuccessful responses +//! are logged. + +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; + +use anyhow::ensure; +use futures::future::BoxFuture; +use futures::FutureExt; +use nexus_auth::context::OpContext; +use nexus_db_queries::db::{ + datastore::SQL_BATCH_SIZE, pagination::Paginator, DataStore, +}; +use nexus_networking::sled_client_from_address; +use nexus_types::deployment::SledFilter; +use nexus_types::internal_api::background::TufArtifactReplicationStatus; +use omicron_common::{address::REPO_DEPOT_PORT, update::ArtifactHash}; +use omicron_uuid_kinds::{GenericUuid, SledUuid}; +use rand::seq::SliceRandom; +use slog_error_chain::InlineErrorChain; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; +use update_common::artifacts::{ + ArtifactsWithPlan, ExtractedArtifactDataHandle, +}; + +use crate::app::background::BackgroundTask; + +// The maximum number of PUT requests to send during each activation. This is +// relatively small to avoid significant bandwidth out from this sled. +const MAX_PUT_REQUESTS: usize = 8; +// The maximum total number of requests to send during each activation, +// including PUT requests capped by `MAX_PUT_REQUESTS`. +const MAX_REQUESTS: usize = 32; +// The number of sleds that artifacts must be present on before the local copy +// of artifacts is dropped. This is ignored if there are fewer than this many +// sleds in the system. +const MIN_SLED_REPLICATION: usize = 3; + +pub struct ArtifactReplication { + datastore: Arc, + local: Vec, + local_rx: mpsc::Receiver, +} + +struct Sled { + id: SledUuid, + client: sled_agent_client::Client, + depot_base_url: String, +} + +#[derive(Default)] +struct ArtifactPresence<'a> { + sleds: Vec<&'a Sled>, + counts: HashMap, + local: Option, +} + +enum Request<'a> { + Put { + sled: &'a Sled, + handle: ExtractedArtifactDataHandle, + hash: ArtifactHash, + }, + CopyFromDepot { + target_sled: &'a Sled, + depot_sled: &'a Sled, + hash: ArtifactHash, + }, + Delete { + sled: &'a Sled, + hash: String, + }, +} + +impl BackgroundTask for ArtifactReplication { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async { + match self.activate_internal(opctx).await { + Ok(status) => serde_json::to_value(status).unwrap(), + Err(err) => { + let err_string = format!("{:#}", err); + error!( + &opctx.log, + "error during artifact replication"; + "error" => &err_string, + ); + serde_json::json!({ + "error": err_string, + }) + } + } + } + .boxed() + } +} + +impl ArtifactReplication { + pub fn new( + datastore: Arc, + local_rx: mpsc::Receiver, + ) -> ArtifactReplication { + ArtifactReplication { datastore, local: Vec::new(), local_rx } + } + + async fn activate_internal<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> anyhow::Result { + let log = &opctx.log; + let datastore = &self.datastore; + + // Move any received artifacts out of `local_rx` into `local`. + loop { + match self.local_rx.try_recv() { + Ok(artifacts) => self.local.push(artifacts), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + // If the last sender for this channel is dropped, then the + // `Nexus` type has also been dropped. This is presumably + // a bug. + panic!("artifact replication receiver disconnected"); + } + } + } + + // Query the database for artifacts that we ought to have. + let mut artifacts = HashMap::new(); + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + while let Some(p) = paginator.next() { + let batch = datastore + .update_tuf_artifact_list(opctx, &p.current_pagparams()) + .await?; + paginator = p.found_batch(&batch, &|a| a.sha256); + for artifact in batch { + artifacts + .insert(artifact.sha256.0, ArtifactPresence::default()); + } + } + // Ask all sled agents to list the artifacts they have, and mark those + // artifacts as present on those sleds. + let sleds = datastore + .sled_list_all_batched(&opctx, SledFilter::InService) + .await? + .into_iter() + .map(|sled| Sled { + id: SledUuid::from_untyped_uuid(sled.identity.id), + client: sled_client_from_address( + sled.identity.id, + sled.address(), + log, + ), + depot_base_url: format!( + "http://{}", + sled.address_with_port(REPO_DEPOT_PORT) + ), + }) + .collect::>(); + ensure!(!sleds.is_empty(), "no sleds"); + let responses = + futures::future::join_all(sleds.iter().map(|sled| async move { + let response = match sled.client.artifact_list().await { + Ok(response) => response.into_inner(), + Err(err) => { + error!( + log, + "Failed to get artifact list"; + "error" => InlineErrorChain::new(&err), + "sled" => sled.client.baseurl(), + ); + HashMap::new() + } + }; + (sled, response) + })) + .await; + let mut delete_requests = Vec::new(); + for (sled, response) in responses { + for (hash, count) in response { + if let Some(presence) = ArtifactHash::from_str(&hash) + .ok() + .and_then(|hash| artifacts.get_mut(&hash)) + { + presence.counts.insert(sled.id, count); + presence.sleds.push(sled); + } else { + delete_requests.push(Request::Delete { sled, hash }); + } + } + } + + // Mark all the artifacts found in `self.local` as locally available. + // If all of the artifacts in an `ArtifactsWithPlan` are sufficiently + // replicated, drop the `ArtifactsWithPlan`. + let sufficient_sleds = MIN_SLED_REPLICATION.min(sleds.len()); + self.local.retain(|plan| { + let mut keep = false; + for hash_id in plan.by_id().values().flatten() { + if let Some(handle) = plan.get_by_hash(hash_id) { + if let Some(presence) = artifacts.get_mut(&hash_id.hash) { + presence.local = Some(handle); + if presence.sleds.len() < sufficient_sleds { + keep = true; + } + } + } + } + keep + }); + + // Generate and send a random set of requests up to our per-activation + // limits. + let (requests, requests_outstanding) = + generate_requests(&sleds, artifacts, delete_requests); + let futures = requests.iter().map(|request| request.execute(log)); + let result = futures::future::join_all(futures).await; + + // TODO: If there are any missing artifacts with no known copies, + // check the status of the assigned Nexus for its repositories. If the + // assigned Nexus is expunged, or if we are the assigned Nexus and we + // don't have the artifact, mark the repository as failed. + + Ok(TufArtifactReplicationStatus { + requests_ok: result.iter().filter(|x| **x).count(), + requests_err: result.iter().filter(|x| !*x).count(), + requests_outstanding, + local_repos: self.local.len(), + }) + } +} + +impl<'a> Request<'a> { + async fn execute(&self, log: &slog::Logger) -> bool { + let (action, sled, hash) = match self { + Request::Put { sled, handle, hash } => { + let sha256 = hash.to_string(); + let file = match handle.file().await { + Ok(file) => file, + Err(err) => { + error!( + log, + "Failed to open artifact file"; + "error" => &format!("{:#}", err), + "sha256" => &sha256, + ); + return false; + } + }; + if let Err(err) = sled.client.artifact_put(&sha256, file).await + { + error!( + log, + "Failed to put artifact"; + "error" => InlineErrorChain::new(&err), + "sled" => sled.client.baseurl(), + "sha256" => &sha256, + ); + return false; + } + ("PUT".to_owned(), sled, sha256) + } + Request::CopyFromDepot { target_sled, depot_sled, hash } => { + let sha256 = hash.to_string(); + if let Err(err) = target_sled + .client + .artifact_copy_from_depot( + &sha256, + &sled_agent_client::types::ArtifactCopyFromDepotBody { + depot_base_url: depot_sled.depot_base_url.clone(), + }, + ) + .await + { + error!( + log, + "Failed to request artifact copy from depot"; + "error" => InlineErrorChain::new(&err), + "sled" => target_sled.client.baseurl(), + "sha256" => &sha256, + ); + return false; + } + ( + format!("copy from depot {}", depot_sled.depot_base_url), + target_sled, + sha256, + ) + } + Request::Delete { sled, hash } => { + if let Err(err) = sled.client.artifact_delete(&hash).await { + error!( + log, + "Failed to request artifact deletion"; + "error" => InlineErrorChain::new(&err), + "sled" => sled.client.baseurl(), + "sha256" => &hash, + ); + return false; + } + ("DELETE".to_owned(), sled, hash.clone()) + } + }; + info!( + log, + "Request succeeded"; + "action" => &action, + "sled" => sled.client.baseurl(), + "sha256" => &hash, + ); + true + } +} + +fn generate_requests<'a>( + sleds: &'a [Sled], + artifacts: HashMap>, + delete_requests: Vec>, +) -> (Vec>, usize) { + let mut rng = rand::thread_rng(); + let mut put_requests = Vec::new(); + let mut other_requests = delete_requests; + let mut low_priority_requests = Vec::new(); + for (hash, presence) in artifacts { + if presence.sleds.is_empty() { + if let Some(handle) = presence.local { + // Randomly choose a sled to PUT the artifact to. + let sled = sleds.choose(&mut rng).expect("sleds is not empty"); + put_requests.push(Request::Put { sled, handle, hash }); + } + } else { + // Randomly choose a sled where the artifact is not present. + let missing_sleds = sleds + .iter() + .filter_map(|sled| { + let count = presence + .counts + .get(&sled.id) + .copied() + .unwrap_or_default(); + // TODO: We should check on the number of non-expunged M.2s + // the sled is known to have instead of a hardcoded value. + (count < 2).then_some((sled, count)) + }) + .collect::>(); + if let Some((target_sled, count)) = missing_sleds.choose(&mut rng) { + if *count > 0 { + // This sled doesn't have a full set of copies, but it does + // have _a_ copy, so ensuring that both M.2s have a copy is + // a lower-priority concern. + &mut low_priority_requests + } else { + &mut other_requests + } + .push(Request::CopyFromDepot { + target_sled, + depot_sled: presence + .sleds + .choose(&mut rng) + .expect("presence.sleds is not empty"), + hash, + }); + } + } + } + + let total = + put_requests.len() + other_requests.len() + low_priority_requests.len(); + + let mut requests = put_requests; + sample_vec(&mut rng, &mut requests, MAX_PUT_REQUESTS); + sample_vec( + &mut rng, + &mut other_requests, + MAX_REQUESTS.saturating_sub(requests.len()), + ); + requests.extend(other_requests); + sample_vec( + &mut rng, + &mut low_priority_requests, + MAX_REQUESTS.saturating_sub(requests.len()), + ); + requests.extend(low_priority_requests); + let outstanding = total - requests.len(); + (requests, outstanding) +} + +fn sample_vec(rng: &mut R, vec: &mut Vec, amount: usize) +where + R: rand::Rng + ?Sized, +{ + let end = vec.len(); + if amount >= end { + return; + } + for i in 0..amount { + vec.swap(i, rng.gen_range((i + 1)..end)); + } + vec.truncate(amount); +} diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index e451119bfc..762cde8035 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -43,6 +43,7 @@ use std::net::{IpAddr, Ipv6Addr}; use std::sync::Arc; use std::sync::OnceLock; use tokio::sync::mpsc; +use update_common::artifacts::ArtifactsWithPlan; use uuid::Uuid; // The implementation of Nexus is large, and split into a number of submodules @@ -216,6 +217,10 @@ pub struct Nexus { /// List of demo sagas awaiting a request to complete them demo_sagas: Arc>, + + /// Sender for TUF repository artifacts temporarily stored in this zone to + /// be replicated out to sleds in the background + tuf_artifact_replication_tx: mpsc::Sender, } impl Nexus { @@ -297,6 +302,14 @@ impl Nexus { saga_create_tx, )); + // Create a channel for replicating repository artifacts. 16 is a + // dubious bound for the channel but it seems unlikely that an operator + // would want to upload more than one at a time, and at most have two + // or three on the system during an upgrade (we've sized the artifact + // datasets to fit at most 10 repositories for this reason). + let (tuf_artifact_replication_tx, tuf_artifact_replication_rx) = + mpsc::channel(16); + let client_state = dpd_client::ClientState { tag: String::from("nexus"), log: log.new(o!( @@ -528,6 +541,7 @@ impl Nexus { demo_sagas: Arc::new(std::sync::Mutex::new( CompletingDemoSagas::new(), )), + tuf_artifact_replication_tx, }; // TODO-cleanup all the extra Arcs here seems wrong @@ -582,6 +596,7 @@ impl Nexus { registry: sagas::ACTION_REGISTRY.clone(), sagas_started_rx: saga_recovery_rx, }, + tuf_artifact_replication_rx, }, ); diff --git a/nexus/src/app/update/mod.rs b/nexus/src/app/update/mod.rs index d4a47375bc..39598a10db 100644 --- a/nexus/src/app/update/mod.rs +++ b/nexus/src/app/update/mod.rs @@ -56,17 +56,30 @@ impl super::Nexus { ArtifactsWithPlan::from_stream(body, Some(file_name), &self.log) .await .map_err(|error| error.to_http_error())?; - - // Now store the artifacts in the database. let tuf_repo_description = TufRepoDescription::from_external( artifacts_with_plan.description().clone(), ); - + // Move the `ArtifactsWithPlan`, which carries with it the + // `Utf8TempDir`s storing the artifacts, into the artifact replication + // background task. This is done before the database insert because if + // this fails, we shouldn't record the artifacts in the database. + self.tuf_artifact_replication_tx + .send(artifacts_with_plan) + .await + .map_err(|err| { + Error::internal_error(&format!( + "failed to send artifacts for replication: {err}" + )) + })?; + // Now store the artifacts in the database. let response = self .db_datastore .update_tuf_repo_insert(opctx, tuf_repo_description) .await .map_err(HttpError::from)?; + // Finally, immediately activate the artifact replication task. + self.background_tasks.task_tuf_artifact_replication.activate(); + Ok(response.into_external()) } diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index 94d22d491f..481a85f1a9 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -151,6 +151,7 @@ region_snapshot_replacement_start.period_secs = 60 region_snapshot_replacement_garbage_collection.period_secs = 60 region_snapshot_replacement_step.period_secs = 60 region_snapshot_replacement_finish.period_secs = 60 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # we only have one sled in the test environment, so we need to use the diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index cf3d652587..2893121d8d 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -191,3 +191,19 @@ impl std::fmt::Display for ReincarnatableInstance { write!(f, "{instance_id} ({reason})") } } + +/// The status of a `tuf_artifact_replication` background task activation +#[derive(Debug, Serialize, Deserialize)] +pub struct TufArtifactReplicationStatus { + /// Number of requests handled by sled agents successfully. + pub requests_ok: usize, + /// Number of requests to sled agents that failed. + pub requests_err: usize, + /// Number of requests that were not sent during this activation because the + /// limits had been reached. + pub requests_outstanding: usize, + /// The number of repositories this Nexus instance is keeping in local + /// storage. Local repositories are deleted once all their artifacts are + /// sufficiently replicated. + pub local_repos: usize, +} diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index f689c7e9f7..9e25ae3af3 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -2321,12 +2321,17 @@ CREATE TABLE IF NOT EXISTS omicron.public.tuf_repo ( id UUID PRIMARY KEY, time_created TIMESTAMPTZ NOT NULL, + -- TODO: Repos fetched over HTTP will not have a SHA256 hash; this is an + -- implementation detail of our ZIP archives. sha256 STRING(64) NOT NULL, -- The version of the targets.json role that was used to generate the repo. targets_role_version INT NOT NULL, -- The valid_until time for the repo. + -- TODO: Figure out timestamp validity policy for uploaded repos vs those + -- fetched over HTTP; my (iliana's) current presumption is that we will make + -- this NULL for uploaded ZIP archives of repos. valid_until TIMESTAMPTZ NOT NULL, -- The system version described in the TUF repo. @@ -2371,6 +2376,8 @@ CREATE TABLE IF NOT EXISTS omicron.public.tuf_artifact ( PRIMARY KEY (name, version, kind) ); +CREATE INDEX IF NOT EXISTS tuf_artifact_sha256 ON omicron.public.tuf_artifact (sha256); + -- Reflects that a particular artifact was provided by a particular TUF repo. -- This is a many-many mapping. CREATE TABLE IF NOT EXISTS omicron.public.tuf_repo_artifact ( @@ -4684,7 +4691,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '114.0.0', NULL) + (TRUE, NOW(), NOW(), '115.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/crdb/tuf-artifact-replication/up.sql b/schema/crdb/tuf-artifact-replication/up.sql new file mode 100644 index 0000000000..891940ff4d --- /dev/null +++ b/schema/crdb/tuf-artifact-replication/up.sql @@ -0,0 +1 @@ +CREATE INDEX IF NOT EXISTS tuf_artifact_sha256 ON omicron.public.tuf_artifact (sha256); diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index b824f4b7c9..9c319e730f 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -71,6 +71,7 @@ region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index e966b3eabd..d9feaf8ba6 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -71,6 +71,7 @@ region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds. diff --git a/update-common/src/artifacts/extracted_artifacts.rs b/update-common/src/artifacts/extracted_artifacts.rs index 5ac4a3a395..309b188a9d 100644 --- a/update-common/src/artifacts/extracted_artifacts.rs +++ b/update-common/src/artifacts/extracted_artifacts.rs @@ -25,8 +25,8 @@ use tokio_util::io::ReaderStream; /// Handle to the data of an extracted artifact. /// -/// This does not contain the actual data; use `reader_stream()` to get a new -/// handle to the underlying file to read it on demand. +/// This does not contain the actual data; use `file()` or `reader_stream()` to +/// get a new handle to the underlying file to read it on demand. /// /// Note that although this type implements `Clone` and that cloning is /// relatively cheap, it has additional implications on filesystem cleanup. @@ -69,20 +69,26 @@ impl ExtractedArtifactDataHandle { self.hash_id.hash } - /// Async stream to read the contents of this artifact on demand. + /// Opens the file for this artifact. /// /// This can fail due to I/O errors outside our control (e.g., something /// removed the contents of our temporary directory). - pub async fn reader_stream( - &self, - ) -> anyhow::Result> { + pub async fn file(&self) -> anyhow::Result { let path = path_for_artifact(&self.tempdir, &self.hash_id); - let file = tokio::fs::File::open(&path) + tokio::fs::File::open(&path) .await - .with_context(|| format!("failed to open {path}"))?; + .with_context(|| format!("failed to open {path}")) + } - Ok(ReaderStream::new(file)) + /// Async stream to read the contents of this artifact on demand. + /// + /// This can fail due to I/O errors outside our control (e.g., something + /// removed the contents of our temporary directory). + pub async fn reader_stream( + &self, + ) -> anyhow::Result> { + Ok(ReaderStream::new(self.file().await?)) } }