diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index cb05bb575b..b9196188d7 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -51,6 +51,7 @@ use nexus_types::internal_api::background::RegionSnapshotReplacementFinishStatus use nexus_types::internal_api::background::RegionSnapshotReplacementGarbageCollectStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementStartStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementStepStatus; +use nexus_types::internal_api::background::TufArtifactReplicationStatus; use nexus_types::inventory::BaseboardId; use omicron_uuid_kinds::CollectionUuid; use omicron_uuid_kinds::DemoSagaUuid; @@ -1928,6 +1929,33 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { } } }; + } else if name == "tuf_artifact_replication" { + match serde_json::from_value::( + details.clone(), + ) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + Ok(status) => { + const ROWS: &[&str] = &[ + "requests ok:", + "requests errored:", + "requests outstanding:", + "local repos:", + ]; + const WIDTH: usize = const_max_len(ROWS); + println!(" last execution:"); + for (label, value) in ROWS.iter().zip([ + status.requests_ok, + status.requests_err, + status.requests_outstanding, + status.local_repos, + ]) { + println!(" {label:3}"); + } + } + } } else { println!( "warning: unknown background task: {:?} \ diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index f6b56e2b7b..86f4d0ee09 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -166,6 +166,10 @@ task: "switch_port_config_manager" manages switch port settings for rack switches +task: "tuf_artifact_replication" + replicate update repo artifacts across sleds + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -337,6 +341,10 @@ task: "switch_port_config_manager" manages switch port settings for rack switches +task: "tuf_artifact_replication" + replicate update repo artifacts across sleds + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -495,6 +503,10 @@ task: "switch_port_config_manager" manages switch port settings for rack switches +task: "tuf_artifact_replication" + replicate update repo artifacts across sleds + + task: "v2p_manager" manages opte v2p mappings for vpc networking diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 6974c0b36b..9f0e7775ff 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -384,6 +384,10 @@ task: "switch_port_config_manager" manages switch port settings for rack switches +task: "tuf_artifact_replication" + replicate update repo artifacts across sleds + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -698,6 +702,17 @@ task: "switch_port_config_manager" started at (s ago) and ran for ms warning: unknown background task: "switch_port_config_manager" (don't know how to interpret details: Object {}) +task: "tuf_artifact_replication" + configured period: every m + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms + last execution: + requests ok: 0 + requests errored: 0 + requests outstanding: 0 + local repos: 0 + task: "v2p_manager" configured period: every s currently executing: no @@ -1141,6 +1156,17 @@ task: "switch_port_config_manager" started at (s ago) and ran for ms warning: unknown background task: "switch_port_config_manager" (don't know how to interpret details: Object {}) +task: "tuf_artifact_replication" + configured period: every m + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms + last execution: + requests ok: 0 + requests errored: 0 + requests outstanding: 0 + local repos: 0 + task: "v2p_manager" configured period: every s currently executing: no diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index 82362f2f0d..8ac1ead40b 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -418,6 +418,8 @@ pub struct BackgroundTaskConfig { /// configuration for region snapshot replacement finisher task pub region_snapshot_replacement_finish: RegionSnapshotReplacementFinishConfig, + /// configuration for TUF artifact replication task + pub tuf_artifact_replication: TufArtifactReplicationConfig, } #[serde_as] @@ -703,6 +705,14 @@ pub struct RegionSnapshotReplacementFinishConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct TufArtifactReplicationConfig { + /// period (in seconds) for periodic activations of this background task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + /// Configuration for a nexus server #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct PackageConfig { @@ -958,6 +968,7 @@ mod test { region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 + tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1156,6 +1167,10 @@ mod test { RegionSnapshotReplacementFinishConfig { period_secs: Duration::from_secs(30), }, + tuf_artifact_replication: + TufArtifactReplicationConfig { + period_secs: Duration::from_secs(60) + }, }, default_region_allocation_strategy: crate::nexus_config::RegionAllocationStrategy::Random { @@ -1237,6 +1252,7 @@ mod test { region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 + tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] type = "random" "##, diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 218cad84da..bac8eb6d95 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -907,6 +907,7 @@ table! { sled_policy -> crate::sled_policy::SledPolicyEnum, sled_state -> crate::SledStateEnum, sled_agent_gen -> Int8, + repo_depot_port -> Int4, } } diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 70450a7776..f37aed1d7a 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(114, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(115, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(115, "tuf-artifact-replication"), KnownVersion::new(114, "crucible-ref-count-records"), KnownVersion::new(113, "add-tx-eq"), KnownVersion::new(112, "blueprint-dataset"), diff --git a/nexus/db-model/src/sled.rs b/nexus/db-model/src/sled.rs index b586ad0fc5..ff0a92282a 100644 --- a/nexus/db-model/src/sled.rs +++ b/nexus/db-model/src/sled.rs @@ -81,6 +81,9 @@ pub struct Sled { /// This is specifically distinct from `rcgen`, which is incremented by /// child resources as part of `DatastoreCollectionConfig`. pub sled_agent_gen: Generation, + + // ServiceAddress (Repo Depot API). Uses `ip`. + pub repo_depot_port: SqlU16, } impl Sled { @@ -169,6 +172,7 @@ impl From for params::SledAgentInfo { }; Self { sa_address: sled.address(), + repo_depot_port: sled.repo_depot_port.into(), role, baseboard: Baseboard { serial: sled.serial_number.clone(), @@ -220,6 +224,9 @@ pub struct SledUpdate { pub ip: ipv6::Ipv6Addr, pub port: SqlU16, + // ServiceAddress (Repo Depot API). Uses `ip`. + pub repo_depot_port: SqlU16, + // Generation number - owned and incremented by sled-agent. pub sled_agent_gen: Generation, } @@ -228,6 +235,7 @@ impl SledUpdate { pub fn new( id: Uuid, addr: SocketAddrV6, + repo_depot_port: u16, baseboard: SledBaseboard, hardware: SledSystemHardware, rack_id: Uuid, @@ -247,6 +255,7 @@ impl SledUpdate { reservoir_size: hardware.reservoir_size, ip: addr.ip().into(), port: addr.port().into(), + repo_depot_port: repo_depot_port.into(), sled_agent_gen, } } @@ -282,6 +291,7 @@ impl SledUpdate { reservoir_size: self.reservoir_size, ip: self.ip, port: self.port, + repo_depot_port: self.repo_depot_port, last_used_address, sled_agent_gen: self.sled_agent_gen, } diff --git a/nexus/db-queries/src/db/datastore/dataset.rs b/nexus/db-queries/src/db/datastore/dataset.rs index f931403cb9..31f8d4367b 100644 --- a/nexus/db-queries/src/db/datastore/dataset.rs +++ b/nexus/db-queries/src/db/datastore/dataset.rs @@ -375,6 +375,7 @@ mod test { let sled = SledUpdate::new( *sled_id.as_untyped_uuid(), "[::1]:0".parse().unwrap(), + 0, SledBaseboard { serial_number: "test-sn".to_string(), part_number: "test-pn".to_string(), diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 5bd35fbba9..4fd1bb0c55 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -467,6 +467,7 @@ mod test { use nexus_db_model::{to_db_typed_uuid, Generation}; use nexus_types::external_api::params; use nexus_types::silo::DEFAULT_SILO_ID; + use omicron_common::address::REPO_DEPOT_PORT; use omicron_common::api::external::{ ByteCount, Error, IdentityMetadataCreateParams, LookupType, Name, }; @@ -684,12 +685,14 @@ mod test { 0, 0, ); + let bogus_repo_depot_port = 8081; let rack_id = Uuid::new_v4(); let sled_id = SledUuid::new_v4(); let sled_update = SledUpdate::new( sled_id.into_untyped_uuid(), bogus_addr, + bogus_repo_depot_port, sled_baseboard_for_test(), sled_system_hardware_for_test(), rack_id, @@ -1691,6 +1694,7 @@ mod test { let sled1 = db::model::SledUpdate::new( sled1_id, addr1, + REPO_DEPOT_PORT, sled_baseboard_for_test(), sled_system_hardware_for_test(), rack_id, @@ -1703,6 +1707,7 @@ mod test { let sled2 = db::model::SledUpdate::new( sled2_id, addr2, + REPO_DEPOT_PORT, sled_baseboard_for_test(), sled_system_hardware_for_test(), rack_id, diff --git a/nexus/db-queries/src/db/datastore/physical_disk.rs b/nexus/db-queries/src/db/datastore/physical_disk.rs index 52c93239aa..11d538f3af 100644 --- a/nexus/db-queries/src/db/datastore/physical_disk.rs +++ b/nexus/db-queries/src/db/datastore/physical_disk.rs @@ -343,10 +343,12 @@ mod test { async fn create_test_sled(db: &DataStore) -> Sled { let sled_id = Uuid::new_v4(); let addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0); + let repo_depot_port = 0; let rack_id = Uuid::new_v4(); let sled_update = SledUpdate::new( sled_id, addr, + repo_depot_port, sled_baseboard_for_test(), sled_system_hardware_for_test(), rack_id, diff --git a/nexus/db-queries/src/db/datastore/rack.rs b/nexus/db-queries/src/db/datastore/rack.rs index 74b3440a7d..fd37e74667 100644 --- a/nexus/db-queries/src/db/datastore/rack.rs +++ b/nexus/db-queries/src/db/datastore/rack.rs @@ -1239,9 +1239,11 @@ mod test { async fn create_test_sled(db: &DataStore, sled_id: Uuid) -> Sled { let addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0); + let repo_depot_port = 0; let sled_update = SledUpdate::new( sled_id, addr, + repo_depot_port, sled_baseboard_for_test(), sled_system_hardware_for_test(), rack_id(), diff --git a/nexus/db-queries/src/db/datastore/sled.rs b/nexus/db-queries/src/db/datastore/sled.rs index 7cd05d8272..aea2048cee 100644 --- a/nexus/db-queries/src/db/datastore/sled.rs +++ b/nexus/db-queries/src/db/datastore/sled.rs @@ -72,6 +72,7 @@ impl DataStore { dsl::time_modified.eq(now), dsl::ip.eq(sled_update.ip), dsl::port.eq(sled_update.port), + dsl::repo_depot_port.eq(sled_update.repo_depot_port), dsl::rack_id.eq(sled_update.rack_id), dsl::is_scrimlet.eq(sled_update.is_scrimlet()), dsl::usable_hardware_threads @@ -1489,9 +1490,11 @@ pub(in crate::db::datastore) mod test { pub(crate) fn test_new_sled_update() -> SledUpdate { let sled_id = Uuid::new_v4(); let addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0); + let repo_depot_port = 0; SledUpdate::new( sled_id, addr, + repo_depot_port, sled_baseboard_for_test(), sled_system_hardware_for_test(), rack_id(), diff --git a/nexus/db-queries/src/db/datastore/update.rs b/nexus/db-queries/src/db/datastore/update.rs index 37339beb62..59d4108133 100644 --- a/nexus/db-queries/src/db/datastore/update.rs +++ b/nexus/db-queries/src/db/datastore/update.rs @@ -12,14 +12,15 @@ use crate::context::OpContext; use crate::db; use crate::db::error::{public_error_from_diesel, ErrorHandler}; use crate::db::model::SemverVersion; +use crate::db::pagination::paginated; use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use diesel::result::Error as DieselError; use nexus_db_model::{ArtifactHash, TufArtifact, TufRepo, TufRepoDescription}; use omicron_common::api::external::{ - self, CreateResult, LookupResult, LookupType, ResourceType, - TufRepoInsertStatus, + self, CreateResult, DataPageParams, ListResultVec, LookupResult, + LookupType, ResourceType, TufRepoInsertStatus, }; use omicron_uuid_kinds::TufRepoKind; use omicron_uuid_kinds::TypedUuid; @@ -147,6 +148,23 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; Ok(TufRepoDescription { repo, artifacts }) } + + /// Returns the list of all TUF repo artifacts known to the system. + pub async fn update_tuf_artifact_list( + &self, + opctx: &OpContext, + pagparams: &DataPageParams<'_, ArtifactHash>, + ) -> ListResultVec { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + + use db::schema::tuf_artifact::dsl; + + paginated(dsl::tuf_artifact, dsl::sha256, pagparams) + .select(TufArtifact::as_select()) + .load_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } // This is a separate method mostly to make rustfmt not bail out on long lines diff --git a/nexus/db-queries/src/db/datastore/vpc.rs b/nexus/db-queries/src/db/datastore/vpc.rs index 30033e96a2..47dfc91348 100644 --- a/nexus/db-queries/src/db/datastore/vpc.rs +++ b/nexus/db-queries/src/db/datastore/vpc.rs @@ -3058,6 +3058,7 @@ mod tests { .sled_upsert(SledUpdate::new( sled_id.into_untyped_uuid(), "[::1]:0".parse().unwrap(), + 0, sled_baseboard_for_test(), sled_system_hardware_for_test(), rack_id, diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index a955766554..e1b14d346f 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -145,6 +145,7 @@ region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index ce3dfe5751..dc437e49ff 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -131,6 +131,7 @@ region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index 570a3512eb..3df94fa063 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -736,6 +736,7 @@ mod tests { let sled = SledUpdate::new( sled_id.into_untyped_uuid(), "[::1]:0".parse().unwrap(), + 0, SledBaseboard { serial_number: format!("test-{sled_id}"), part_number: "test-sled".to_string(), diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index ad39777054..a220ab8bae 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -117,6 +117,7 @@ use super::tasks::saga_recovery; use super::tasks::service_firewall_rules; use super::tasks::sync_service_zone_nat::ServiceZoneNatTracker; use super::tasks::sync_switch_configuration::SwitchPortSettingsManager; +use super::tasks::tuf_artifact_replication; use super::tasks::v2p_mappings::V2PManager; use super::tasks::vpc_routes; use super::Activator; @@ -133,7 +134,9 @@ use omicron_uuid_kinds::OmicronZoneUuid; use oximeter::types::ProducerRegistry; use std::collections::BTreeMap; use std::sync::Arc; +use tokio::sync::mpsc; use tokio::sync::watch; +use update_common::artifacts::ArtifactsWithPlan; use uuid::Uuid; /// Interface for activating various background tasks and read data that they @@ -172,6 +175,7 @@ pub struct BackgroundTasks { pub task_region_snapshot_replacement_garbage_collection: Activator, pub task_region_snapshot_replacement_step: Activator, pub task_region_snapshot_replacement_finish: Activator, + pub task_tuf_artifact_replication: Activator, // Handles to activate background tasks that do not get used by Nexus // at-large. These background tasks are implementation details as far as @@ -259,6 +263,7 @@ impl BackgroundTasksInitializer { ), task_region_snapshot_replacement_step: Activator::new(), task_region_snapshot_replacement_finish: Activator::new(), + task_tuf_artifact_replication: Activator::new(), task_internal_dns_propagation: Activator::new(), task_external_dns_propagation: Activator::new(), @@ -325,6 +330,7 @@ impl BackgroundTasksInitializer { task_region_snapshot_replacement_garbage_collection, task_region_snapshot_replacement_step, task_region_snapshot_replacement_finish, + task_tuf_artifact_replication, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -825,13 +831,28 @@ impl BackgroundTasksInitializer { done", period: config.region_snapshot_replacement_finish.period_secs, task_impl: Box::new(RegionSnapshotReplacementFinishDetector::new( - datastore, + datastore.clone(), )), opctx: opctx.child(BTreeMap::new()), watchers: vec![], activator: task_region_snapshot_replacement_finish, }); + driver.register(TaskDefinition { + name: "tuf_artifact_replication", + description: "replicate update repo artifacts across sleds", + period: config.tuf_artifact_replication.period_secs, + task_impl: Box::new( + tuf_artifact_replication::ArtifactReplication::new( + datastore.clone(), + args.tuf_artifact_replication_rx, + ), + ), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_tuf_artifact_replication, + }); + driver } } @@ -856,6 +877,8 @@ pub struct BackgroundTasksData { pub producer_registry: ProducerRegistry, /// Helpers for saga recovery pub saga_recovery: saga_recovery::SagaRecoveryHelpers>, + /// Channel for TUF repository artifacts to be replicated out to sleds + pub tuf_artifact_replication_rx: mpsc::Receiver, } /// Starts the three DNS-propagation-related background tasks for either diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index a9d47af117..42d2690439 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -322,9 +322,11 @@ mod test { let SocketAddr::V6(addr) = server.addr() else { panic!("Expected Ipv6 address. Got {}", server.addr()); }; + let bogus_repo_depot_port = 0; let update = SledUpdate::new( sled_id.into_untyped_uuid(), addr, + bogus_repo_depot_port, SledBaseboard { serial_number: i.to_string(), part_number: "test".into(), diff --git a/nexus/src/app/background/tasks/inventory_collection.rs b/nexus/src/app/background/tasks/inventory_collection.rs index 0b361b2014..6565dda312 100644 --- a/nexus/src/app/background/tasks/inventory_collection.rs +++ b/nexus/src/app/background/tasks/inventory_collection.rs @@ -400,6 +400,7 @@ mod test { let sled = SledUpdate::new( Uuid::new_v4(), SocketAddrV6::new(Ipv6Addr::LOCALHOST, 1200 + i, 0, 0), + 1200 + i, SledBaseboard { serial_number: format!("serial-{}", i), part_number: String::from("fake-sled"), diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index e4bbbfe6d0..fa9d972aa8 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -34,5 +34,6 @@ pub mod saga_recovery; pub mod service_firewall_rules; pub mod sync_service_zone_nat; pub mod sync_switch_configuration; +pub mod tuf_artifact_replication; pub mod v2p_mappings; pub mod vpc_routes; diff --git a/nexus/src/app/background/tasks/tuf_artifact_replication.rs b/nexus/src/app/background/tasks/tuf_artifact_replication.rs new file mode 100644 index 0000000000..5f68edde51 --- /dev/null +++ b/nexus/src/app/background/tasks/tuf_artifact_replication.rs @@ -0,0 +1,447 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! TUF Repo Depot: Artifact replication across sleds (RFD 424) +//! +//! `Nexus::updates_put_repository` accepts a TUF repository, which Nexus +//! unpacks, verifies, and reasons about the artifacts in. This uses temporary +//! storage within the Nexus zone, so the update artifacts have to go somewhere. +//! We've settled for now on "everywhere": a copy of each artifact is stored on +//! each sled's M.2 devices. +//! +//! This background task is responsible for getting locally-stored artifacts +//! onto sleds, and ensuring all sleds have copies of all artifacts. +//! `Nexus::updates_put_repository` sends the [`ArtifactsWithPlan`] object to +//! this task via an [`mpsc`] channel and activates it. +//! +//! During each activation: +//! +//! 1. The task moves `ArtifactsWithPlan` objects of the `mpsc` channel and into +//! a `Vec`. +//! 2. The task queries the list of artifacts stored on each sled, and compares +//! it to the list of artifacts in CockroachDB. Sled artifact storage is +//! content-addressed by SHA-256 checksum. Errors are logged but otherwise +//! ignored (unless no sleds respond); the task proceeds as if that sled +//! has no artifacts. (This means that the task will always be trying to +//! replicate artifacts to that sled until it comes back or is pulled out +//! of service.) +//! 3. The task compares the list of artifacts from the database to the list of +//! artifacts available locally. +//! 4. If all the artifacts belonging to an `ArtifactsWithPlan` object have +//! been replicated to at least `MIN_SLED_REPLICATION` sleds, the task drops +//! the object from its `Vec` (thus cleaning up the local storage of those +//! files). +//! 5. The task generates a list of requests that need to be sent: +//! - PUT each locally-stored artifact not present on any sleds to a random +//! sled. +//! - For each partially-replicated artifact, choose a sled that is missing +//! the artifact, and tell it (via `artifact_copy_from_depot`) to fetch the +//! artifact from a random sled that has it. +//! - DELETE all artifacts no longer tracked in CockroachDB from all sleds +//! that have that artifact. +//! 6. The task randomly choose requests up to per-activation limits and +//! sends them. Up to `MAX_REQUESTS` total requests are sent, with up to +//! `MAX_PUT_REQUESTS` PUT requests. Successful and unsuccessful responses +//! are logged. + +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; + +use anyhow::ensure; +use futures::future::BoxFuture; +use futures::FutureExt; +use nexus_auth::context::OpContext; +use nexus_db_queries::db::{ + datastore::SQL_BATCH_SIZE, pagination::Paginator, DataStore, +}; +use nexus_networking::sled_client_from_address; +use nexus_types::deployment::SledFilter; +use nexus_types::internal_api::background::TufArtifactReplicationStatus; +use omicron_common::update::ArtifactHash; +use omicron_uuid_kinds::{GenericUuid, SledUuid}; +use rand::seq::SliceRandom; +use slog_error_chain::InlineErrorChain; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; +use update_common::artifacts::{ + ArtifactsWithPlan, ExtractedArtifactDataHandle, +}; + +use crate::app::background::BackgroundTask; + +// The maximum number of PUT requests to send during each activation. This is +// relatively small to avoid significant bandwidth out from this sled. +const MAX_PUT_REQUESTS: usize = 8; +// The maximum total number of requests to send during each activation, +// including PUT requests capped by `MAX_PUT_REQUESTS`. +const MAX_REQUESTS: usize = 32; +// The number of sleds that artifacts must be present on before the local copy +// of artifacts is dropped. This is ignored if there are fewer than this many +// sleds in the system. +const MIN_SLED_REPLICATION: usize = 3; + +pub struct ArtifactReplication { + datastore: Arc, + local: Vec, + local_rx: mpsc::Receiver, +} + +struct Sled { + id: SledUuid, + client: sled_agent_client::Client, + depot_base_url: String, +} + +#[derive(Default)] +struct ArtifactPresence<'a> { + sleds: Vec<&'a Sled>, + counts: HashMap, + local: Option, +} + +enum Request<'a> { + Put { + sled: &'a Sled, + handle: ExtractedArtifactDataHandle, + hash: ArtifactHash, + }, + CopyFromDepot { + target_sled: &'a Sled, + depot_sled: &'a Sled, + hash: ArtifactHash, + }, + Delete { + sled: &'a Sled, + hash: String, + }, +} + +impl BackgroundTask for ArtifactReplication { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async { + match self.activate_internal(opctx).await { + Ok(status) => serde_json::to_value(status).unwrap(), + Err(err) => { + let err_string = format!("{:#}", err); + error!( + &opctx.log, + "error during artifact replication"; + "error" => &err_string, + ); + serde_json::json!({ + "error": err_string, + }) + } + } + } + .boxed() + } +} + +impl ArtifactReplication { + pub fn new( + datastore: Arc, + local_rx: mpsc::Receiver, + ) -> ArtifactReplication { + ArtifactReplication { datastore, local: Vec::new(), local_rx } + } + + async fn activate_internal<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> anyhow::Result { + let log = &opctx.log; + let datastore = &self.datastore; + + // Move any received artifacts out of `local_rx` into `local`. + loop { + match self.local_rx.try_recv() { + Ok(artifacts) => self.local.push(artifacts), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + // If the last sender for this channel is dropped, then the + // `Nexus` type has also been dropped. This is presumably + // a bug. + panic!("artifact replication receiver disconnected"); + } + } + } + + // Query the database for artifacts that we ought to have. + let mut artifacts = HashMap::new(); + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + while let Some(p) = paginator.next() { + let batch = datastore + .update_tuf_artifact_list(opctx, &p.current_pagparams()) + .await?; + paginator = p.found_batch(&batch, &|a| a.sha256); + for artifact in batch { + artifacts + .insert(artifact.sha256.0, ArtifactPresence::default()); + } + } + // Ask all sled agents to list the artifacts they have, and mark those + // artifacts as present on those sleds. + let sleds = datastore + .sled_list_all_batched(&opctx, SledFilter::InService) + .await? + .into_iter() + .map(|sled| Sled { + id: SledUuid::from_untyped_uuid(sled.identity.id), + client: sled_client_from_address( + sled.identity.id, + sled.address(), + log, + ), + depot_base_url: format!( + "http://{}", + sled.address_with_port(sled.repo_depot_port.into()) + ), + }) + .collect::>(); + ensure!(!sleds.is_empty(), "no sleds"); + let responses = + futures::future::join_all(sleds.iter().map(|sled| async move { + let response = match sled.client.artifact_list().await { + Ok(response) => response.into_inner(), + Err(err) => { + error!( + log, + "Failed to get artifact list"; + "error" => InlineErrorChain::new(&err), + "sled" => sled.client.baseurl(), + ); + HashMap::new() + } + }; + (sled, response) + })) + .await; + let mut delete_requests = Vec::new(); + for (sled, response) in responses { + for (hash, count) in response { + if let Some(presence) = ArtifactHash::from_str(&hash) + .ok() + .and_then(|hash| artifacts.get_mut(&hash)) + { + presence.counts.insert(sled.id, count); + presence.sleds.push(sled); + } else { + delete_requests.push(Request::Delete { sled, hash }); + } + } + } + + // Mark all the artifacts found in `self.local` as locally available. + // If all of the artifacts in an `ArtifactsWithPlan` are sufficiently + // replicated, drop the `ArtifactsWithPlan`. + let sufficient_sleds = MIN_SLED_REPLICATION.min(sleds.len()); + self.local.retain(|plan| { + let mut keep = false; + for hash_id in plan.by_id().values().flatten() { + if let Some(handle) = plan.get_by_hash(hash_id) { + if let Some(presence) = artifacts.get_mut(&hash_id.hash) { + presence.local = Some(handle); + if presence.sleds.len() < sufficient_sleds { + keep = true; + } + } + } + } + keep + }); + + // Generate and send a random set of requests up to our per-activation + // limits. + let (requests, requests_outstanding) = + generate_requests(&sleds, artifacts, delete_requests); + let futures = requests.iter().map(|request| request.execute(log)); + let result = futures::future::join_all(futures).await; + + // TODO: If there are any missing artifacts with no known copies, + // check the status of the assigned Nexus for its repositories. If the + // assigned Nexus is expunged, or if we are the assigned Nexus and we + // don't have the artifact, mark the repository as failed. + + Ok(TufArtifactReplicationStatus { + requests_ok: result.iter().filter(|x| **x).count(), + requests_err: result.iter().filter(|x| !*x).count(), + requests_outstanding, + local_repos: self.local.len(), + }) + } +} + +impl<'a> Request<'a> { + async fn execute(&self, log: &slog::Logger) -> bool { + let (action, sled, hash) = match self { + Request::Put { sled, handle, hash } => { + let sha256 = hash.to_string(); + let file = match handle.file().await { + Ok(file) => file, + Err(err) => { + error!( + log, + "Failed to open artifact file"; + "error" => &format!("{:#}", err), + "sha256" => &sha256, + ); + return false; + } + }; + if let Err(err) = sled.client.artifact_put(&sha256, file).await + { + error!( + log, + "Failed to put artifact"; + "error" => InlineErrorChain::new(&err), + "sled" => sled.client.baseurl(), + "sha256" => &sha256, + ); + return false; + } + ("PUT".to_owned(), sled, sha256) + } + Request::CopyFromDepot { target_sled, depot_sled, hash } => { + let sha256 = hash.to_string(); + if let Err(err) = target_sled + .client + .artifact_copy_from_depot( + &sha256, + &sled_agent_client::types::ArtifactCopyFromDepotBody { + depot_base_url: depot_sled.depot_base_url.clone(), + }, + ) + .await + { + error!( + log, + "Failed to request artifact copy from depot"; + "error" => InlineErrorChain::new(&err), + "sled" => target_sled.client.baseurl(), + "sha256" => &sha256, + ); + return false; + } + ( + format!("copy from depot {}", depot_sled.depot_base_url), + target_sled, + sha256, + ) + } + Request::Delete { sled, hash } => { + if let Err(err) = sled.client.artifact_delete(&hash).await { + error!( + log, + "Failed to request artifact deletion"; + "error" => InlineErrorChain::new(&err), + "sled" => sled.client.baseurl(), + "sha256" => &hash, + ); + return false; + } + ("DELETE".to_owned(), sled, hash.clone()) + } + }; + info!( + log, + "Request succeeded"; + "action" => &action, + "sled" => sled.client.baseurl(), + "sha256" => &hash, + ); + true + } +} + +fn generate_requests<'a>( + sleds: &'a [Sled], + artifacts: HashMap>, + delete_requests: Vec>, +) -> (Vec>, usize) { + let mut rng = rand::thread_rng(); + let mut put_requests = Vec::new(); + let mut other_requests = delete_requests; + let mut low_priority_requests = Vec::new(); + for (hash, presence) in artifacts { + if presence.sleds.is_empty() { + if let Some(handle) = presence.local { + // Randomly choose a sled to PUT the artifact to. + let sled = sleds.choose(&mut rng).expect("sleds is not empty"); + put_requests.push(Request::Put { sled, handle, hash }); + } + } else { + // Randomly choose a sled where the artifact is not present. + let missing_sleds = sleds + .iter() + .filter_map(|sled| { + let count = presence + .counts + .get(&sled.id) + .copied() + .unwrap_or_default(); + // TODO: We should check on the number of non-expunged M.2s + // the sled is known to have instead of a hardcoded value. + (count < 2).then_some((sled, count)) + }) + .collect::>(); + if let Some((target_sled, count)) = missing_sleds.choose(&mut rng) { + if *count > 0 { + // This sled doesn't have a full set of copies, but it does + // have _a_ copy, so ensuring that both M.2s have a copy is + // a lower-priority concern. + &mut low_priority_requests + } else { + &mut other_requests + } + .push(Request::CopyFromDepot { + target_sled, + depot_sled: presence + .sleds + .choose(&mut rng) + .expect("presence.sleds is not empty"), + hash, + }); + } + } + } + + let total = + put_requests.len() + other_requests.len() + low_priority_requests.len(); + + let mut requests = put_requests; + sample_vec(&mut rng, &mut requests, MAX_PUT_REQUESTS); + sample_vec( + &mut rng, + &mut other_requests, + MAX_REQUESTS.saturating_sub(requests.len()), + ); + requests.extend(other_requests); + sample_vec( + &mut rng, + &mut low_priority_requests, + MAX_REQUESTS.saturating_sub(requests.len()), + ); + requests.extend(low_priority_requests); + let outstanding = total - requests.len(); + (requests, outstanding) +} + +fn sample_vec(rng: &mut R, vec: &mut Vec, amount: usize) +where + R: rand::Rng + ?Sized, +{ + let end = vec.len(); + if amount >= end { + return; + } + for i in 0..amount { + vec.swap(i, rng.gen_range((i + 1)..end)); + } + vec.truncate(amount); +} diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index e451119bfc..762cde8035 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -43,6 +43,7 @@ use std::net::{IpAddr, Ipv6Addr}; use std::sync::Arc; use std::sync::OnceLock; use tokio::sync::mpsc; +use update_common::artifacts::ArtifactsWithPlan; use uuid::Uuid; // The implementation of Nexus is large, and split into a number of submodules @@ -216,6 +217,10 @@ pub struct Nexus { /// List of demo sagas awaiting a request to complete them demo_sagas: Arc>, + + /// Sender for TUF repository artifacts temporarily stored in this zone to + /// be replicated out to sleds in the background + tuf_artifact_replication_tx: mpsc::Sender, } impl Nexus { @@ -297,6 +302,14 @@ impl Nexus { saga_create_tx, )); + // Create a channel for replicating repository artifacts. 16 is a + // dubious bound for the channel but it seems unlikely that an operator + // would want to upload more than one at a time, and at most have two + // or three on the system during an upgrade (we've sized the artifact + // datasets to fit at most 10 repositories for this reason). + let (tuf_artifact_replication_tx, tuf_artifact_replication_rx) = + mpsc::channel(16); + let client_state = dpd_client::ClientState { tag: String::from("nexus"), log: log.new(o!( @@ -528,6 +541,7 @@ impl Nexus { demo_sagas: Arc::new(std::sync::Mutex::new( CompletingDemoSagas::new(), )), + tuf_artifact_replication_tx, }; // TODO-cleanup all the extra Arcs here seems wrong @@ -582,6 +596,7 @@ impl Nexus { registry: sagas::ACTION_REGISTRY.clone(), sagas_started_rx: saga_recovery_rx, }, + tuf_artifact_replication_rx, }, ); diff --git a/nexus/src/app/sled.rs b/nexus/src/app/sled.rs index ff4cbc89c5..4f10a7cc66 100644 --- a/nexus/src/app/sled.rs +++ b/nexus/src/app/sled.rs @@ -62,6 +62,7 @@ impl super::Nexus { let sled = db::model::SledUpdate::new( id, info.sa_address, + info.repo_depot_port, db::model::SledBaseboard { serial_number: info.baseboard.serial, part_number: info.baseboard.part, diff --git a/nexus/src/app/update/mod.rs b/nexus/src/app/update/mod.rs index d4a47375bc..39598a10db 100644 --- a/nexus/src/app/update/mod.rs +++ b/nexus/src/app/update/mod.rs @@ -56,17 +56,30 @@ impl super::Nexus { ArtifactsWithPlan::from_stream(body, Some(file_name), &self.log) .await .map_err(|error| error.to_http_error())?; - - // Now store the artifacts in the database. let tuf_repo_description = TufRepoDescription::from_external( artifacts_with_plan.description().clone(), ); - + // Move the `ArtifactsWithPlan`, which carries with it the + // `Utf8TempDir`s storing the artifacts, into the artifact replication + // background task. This is done before the database insert because if + // this fails, we shouldn't record the artifacts in the database. + self.tuf_artifact_replication_tx + .send(artifacts_with_plan) + .await + .map_err(|err| { + Error::internal_error(&format!( + "failed to send artifacts for replication: {err}" + )) + })?; + // Now store the artifacts in the database. let response = self .db_datastore .update_tuf_repo_insert(opctx, tuf_repo_description) .await .map_err(HttpError::from)?; + // Finally, immediately activate the artifact replication task. + self.background_tasks.task_tuf_artifact_replication.activate(); + Ok(response.into_external()) } diff --git a/nexus/test-utils/src/background.rs b/nexus/test-utils/src/background.rs index 32a2f24d9d..5939f6af08 100644 --- a/nexus/test-utils/src/background.rs +++ b/nexus/test-utils/src/background.rs @@ -32,6 +32,44 @@ fn most_recent_activate_time( } } +/// Given the name of a background task, wait for it to complete if it's +/// running, then return the last polled `BackgroundTask` object. Panics if the +/// task has never been activated. +pub async fn wait_background_task( + internal_client: &ClientTestContext, + task_name: &str, +) -> BackgroundTask { + // Wait for the task to finish + let last_task_poll = wait_for_condition( + || async { + let task = NexusRequest::object_get( + internal_client, + &format!("/bgtasks/view/{task_name}"), + ) + .execute_and_parse_unwrap::() + .await; + + // Wait until the task has actually run and then is idle + if matches!(&task.current, CurrentStatus::Idle) { + match &task.last { + LastResult::Completed(_) => Ok(task), + LastResult::NeverCompleted => { + panic!("task never activated") + } + } + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(500), + &Duration::from_secs(60), + ) + .await + .unwrap(); + + last_task_poll +} + /// Given the name of a background task, activate it, then wait for it to /// complete. Return the last polled `BackgroundTask` object. pub async fn activate_background_task( @@ -337,3 +375,68 @@ pub async fn run_replacement_tasks_to_completion( .await .unwrap(); } + +pub async fn wait_tuf_artifact_replication_step( + internal_client: &ClientTestContext, +) -> TufArtifactReplicationStatus { + let last_background_task = + wait_background_task(&internal_client, "tuf_artifact_replication") + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!( + "unexpected {:?} returned from tuf_artifact_replication task", + last_background_task.last, + ); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + assert_eq!(status.requests_err, 0); + status +} + +pub async fn run_tuf_artifact_replication_step( + internal_client: &ClientTestContext, +) -> TufArtifactReplicationStatus { + let last_background_task = + activate_background_task(&internal_client, "tuf_artifact_replication") + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!( + "unexpected {:?} returned from tuf_artifact_replication task", + last_background_task.last, + ); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + assert_eq!(status.requests_err, 0); + status +} + +/// Run the `tuf_artifact_replication` task until the status indicates the task +/// has stabilized: no outstanding requests, and no local repositories. Panics +/// if the status does not change between runs. +pub async fn run_tuf_artifact_replication_to_completion( + internal_client: &ClientTestContext, +) { + let mut status = run_tuf_artifact_replication_step(internal_client).await; + while status.requests_outstanding > 0 || status.local_repos > 0 { + let new_status = + run_tuf_artifact_replication_step(internal_client).await; + if new_status == status { + panic!("TUF artifact replication stalled: {new_status:?}"); + } + status = new_status; + } +} diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index 94d22d491f..e1da34469b 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -151,6 +151,9 @@ region_snapshot_replacement_start.period_secs = 60 region_snapshot_replacement_garbage_collection.period_secs = 60 region_snapshot_replacement_step.period_secs = 60 region_snapshot_replacement_finish.period_secs = 60 +# The default activation period for this task is 60s, but we want to activate it +# manually to test the result of each activation. +tuf_artifact_replication.period_secs = 600 [default_region_allocation_strategy] # we only have one sled in the test environment, so we need to use the diff --git a/nexus/tests/integration_tests/rack.rs b/nexus/tests/integration_tests/rack.rs index 2cc7dc8977..f4b67e0afd 100644 --- a/nexus/tests/integration_tests/rack.rs +++ b/nexus/tests/integration_tests/rack.rs @@ -118,6 +118,7 @@ async fn test_sled_list_uninitialized(cptestctx: &ControlPlaneTestContext) { let sled_uuid = Uuid::new_v4(); let sa = SledAgentInfo { sa_address: "[fd00:1122:3344:0100::1]:8080".parse().unwrap(), + repo_depot_port: 8081, role: SledRole::Gimlet, baseboard, usable_hardware_threads: 32, @@ -211,6 +212,7 @@ async fn test_sled_add(cptestctx: &ControlPlaneTestContext) { .sled_upsert(SledUpdate::new( sled_id.into_untyped_uuid(), "[::1]:0".parse().unwrap(), + 0, SledBaseboard { serial_number: baseboard.serial.clone(), part_number: baseboard.part.clone(), diff --git a/nexus/tests/integration_tests/updates.rs b/nexus/tests/integration_tests/updates.rs index 3797df1232..475e63dd97 100644 --- a/nexus/tests/integration_tests/updates.rs +++ b/nexus/tests/integration_tests/updates.rs @@ -9,11 +9,14 @@ use anyhow::{ensure, Context, Result}; use camino::Utf8Path; -use camino_tempfile::{Builder, Utf8TempDir, Utf8TempPath}; +use camino_tempfile::{Builder, Utf8TempPath}; use clap::Parser; use dropshot::test_util::LogContext; use http::{Method, StatusCode}; use nexus_config::UpdatesConfig; +use nexus_test_utils::background::run_tuf_artifact_replication_step; +use nexus_test_utils::background::run_tuf_artifact_replication_to_completion; +use nexus_test_utils::background::wait_tuf_artifact_replication_step; use nexus_test_utils::http_testing::{AuthnMode, NexusRequest, RequestBuilder}; use nexus_test_utils::{load_test_config, test_setup, test_setup_with_config}; use omicron_common::api::external::{ @@ -24,32 +27,15 @@ use omicron_common::api::internal::nexus::KnownArtifactKind; use omicron_sled_agent::sim; use pretty_assertions::assert_eq; use serde::Deserialize; +use std::collections::HashSet; use std::fmt::Debug; -use std::fs::File; use std::io::Write; use tufaceous_lib::assemble::{DeserializedManifest, ManifestTweak}; -const FAKE_MANIFEST_PATH: &'static str = "../tufaceous/manifests/fake.toml"; - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_update_uninitialized() -> Result<()> { +async fn test_repo_upload_uninitialized() -> Result<()> { let mut config = load_test_config(); let logctx = LogContext::new("test_update_uninitialized", &config.pkg.log); - - // Build a fake TUF repo - let temp_dir = Utf8TempDir::new()?; - let archive_path = temp_dir.path().join("archive.zip"); - - let args = tufaceous::Args::try_parse_from([ - "tufaceous", - "assemble", - FAKE_MANIFEST_PATH, - archive_path.as_str(), - ]) - .context("error parsing args")?; - - args.exec(&logctx.log).await.context("error executing assemble command")?; - let cptestctx = test_setup_with_config::( "test_update_uninitialized", &mut config, @@ -59,6 +45,9 @@ async fn test_update_uninitialized() -> Result<()> { .await; let client = &cptestctx.external_client; + // Build a fake TUF repo + let archive_path = make_archive(&logctx.log).await?; + // Attempt to upload the repository to Nexus. This should fail with a 500 // error because the updates system is not configured. { @@ -72,6 +61,13 @@ async fn test_update_uninitialized() -> Result<()> { .context("repository upload should have failed with 500 error")?; } + // The artifact replication background task should have nothing to do. + let status = + run_tuf_artifact_replication_step(&cptestctx.internal_client).await; + assert_eq!(status.requests_ok, 0); + assert_eq!(status.requests_outstanding, 0); + assert_eq!(status.local_repos, 0); + // Attempt to fetch a repository description from Nexus. This should also // fail with a 500 error. { @@ -92,7 +88,7 @@ async fn test_update_uninitialized() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_update_end_to_end() -> Result<()> { +async fn test_repo_upload() -> Result<()> { let mut config = load_test_config(); config.pkg.updates = Some(UpdatesConfig { // XXX: This is currently not used by the update system, but @@ -100,21 +96,6 @@ async fn test_update_end_to_end() -> Result<()> { trusted_root: "does-not-exist.json".into(), }); let logctx = LogContext::new("test_update_end_to_end", &config.pkg.log); - - // Build a fake TUF repo - let temp_dir = Utf8TempDir::new()?; - let archive_path = temp_dir.path().join("archive.zip"); - - let args = tufaceous::Args::try_parse_from([ - "tufaceous", - "assemble", - FAKE_MANIFEST_PATH, - archive_path.as_str(), - ]) - .context("error parsing args")?; - - args.exec(&logctx.log).await.context("error executing assemble command")?; - let cptestctx = test_setup_with_config::( "test_update_end_to_end", &mut config, @@ -124,6 +105,9 @@ async fn test_update_end_to_end() -> Result<()> { .await; let client = &cptestctx.external_client; + // Build a fake TUF repo + let archive_path = make_archive(&logctx.log).await?; + // Upload the repository to Nexus. let mut initial_description = { let response = @@ -138,6 +122,30 @@ async fn test_update_end_to_end() -> Result<()> { assert_eq!(response.status, TufRepoInsertStatus::Inserted); response.recorded }; + let unique_sha256_count = initial_description + .artifacts + .iter() + .map(|artifact| artifact.hash) + .collect::>() + .len(); + + // The artifact replication background task should have been activated, and + // we should see a local repo, successful tasks, and pending tasks. + let status = + wait_tuf_artifact_replication_step(&cptestctx.internal_client).await; + eprintln!("{status:?}"); + assert!(status.requests_ok > 0); + assert!(status.requests_outstanding > 0); + // The total number of requests (ok and outstanding) should be the same as the number of + // artifacts. + assert_eq!( + status.requests_ok + status.requests_outstanding, + unique_sha256_count, + ); + assert_eq!(status.local_repos, 1); + // Given a few more executions, the task should stabilize. + run_tuf_artifact_replication_to_completion(&cptestctx.internal_client) + .await; // Upload the repository to Nexus again. This should return a 200 with an // `AlreadyExists` status. @@ -187,7 +195,15 @@ async fn test_update_end_to_end() -> Result<()> { "initial description matches fetched description" ); - // TODO: attempt to download extracted artifacts. + // Even though the repository already exists, the artifacts are sent to the + // replication task ahead of database insertion. The task should have run + // once, found nothing to do, and deleted the artifacts. + let status = + wait_tuf_artifact_replication_step(&cptestctx.internal_client).await; + eprintln!("{status:?}"); + assert_eq!(status.requests_ok, 0); + assert_eq!(status.requests_outstanding, 0); + assert_eq!(status.local_repos, 0); // Upload a new repository with the same system version but a different // version for one of the components. This will produce a different hash, @@ -197,8 +213,7 @@ async fn test_update_end_to_end() -> Result<()> { kind: KnownArtifactKind::GimletSp, version: "2.0.0".parse().unwrap(), }]; - let archive_path = - make_tweaked_archive(&logctx.log, &temp_dir, tweaks).await?; + let archive_path = make_tweaked_archive(&logctx.log, tweaks).await?; let response = make_upload_request( client, @@ -217,6 +232,16 @@ async fn test_update_end_to_end() -> Result<()> { )?; } + // Even though the repository was rejected, the artifacts are sent to the + // replication task ahead of database insertion. The task should have run + // once, found nothing to do, and deleted the artifacts. + let status = + wait_tuf_artifact_replication_step(&cptestctx.internal_client).await; + eprintln!("{status:?}"); + assert_eq!(status.requests_ok, 0); + assert_eq!(status.requests_outstanding, 0); + assert_eq!(status.local_repos, 0); + // Upload a new repository with a different system version and different // contents (but same version) for an artifact. { @@ -227,8 +252,7 @@ async fn test_update_end_to_end() -> Result<()> { size_delta: 1024, }, ]; - let archive_path = - make_tweaked_archive(&logctx.log, &temp_dir, tweaks).await?; + let archive_path = make_tweaked_archive(&logctx.log, tweaks).await?; let response = make_upload_request(client, &archive_path, StatusCode::CONFLICT) @@ -244,12 +268,21 @@ async fn test_update_end_to_end() -> Result<()> { )?; } + // Even though the repository was rejected, the artifacts are sent to the + // replication task ahead of database insertion. The task should have run + // once, found nothing to do, and deleted the artifacts. + let status = + wait_tuf_artifact_replication_step(&cptestctx.internal_client).await; + eprintln!("{status:?}"); + assert_eq!(status.requests_ok, 0); + assert_eq!(status.requests_outstanding, 0); + assert_eq!(status.local_repos, 0); + // Upload a new repository with a different system version but no other // changes. This should be accepted. { let tweaks = &[ManifestTweak::SystemVersion("2.0.0".parse().unwrap())]; - let archive_path = - make_tweaked_archive(&logctx.log, &temp_dir, tweaks).await?; + let archive_path = make_tweaked_archive(&logctx.log, tweaks).await?; let response = make_upload_request(client, &archive_path, StatusCode::OK) @@ -263,35 +296,49 @@ async fn test_update_end_to_end() -> Result<()> { assert_eq!(response.status, TufRepoInsertStatus::Inserted); } + // No artifacts changed, so the task should have nothing to do and should + // delete the local artifacts. + let status = + wait_tuf_artifact_replication_step(&cptestctx.internal_client).await; + eprintln!("{status:?}"); + assert_eq!(status.requests_ok, 0); + assert_eq!(status.requests_outstanding, 0); + assert_eq!(status.local_repos, 0); + cptestctx.teardown().await; logctx.cleanup_successful(); Ok(()) } +async fn make_archive(log: &slog::Logger) -> anyhow::Result { + make_tweaked_archive(log, &[]).await +} + async fn make_tweaked_archive( log: &slog::Logger, - temp_dir: &Utf8TempDir, tweaks: &[ManifestTweak], ) -> anyhow::Result { let manifest = DeserializedManifest::tweaked_fake(tweaks); - let manifest_path = temp_dir.path().join("fake2.toml"); - let mut manifest_file = - File::create(&manifest_path).context("error creating manifest file")?; + let mut manifest_file = Builder::new() + .prefix("manifest") + .suffix(".toml") + .tempfile() + .context("error creating temp file for manifest")?; let manifest_to_toml = manifest.to_toml()?; manifest_file.write_all(manifest_to_toml.as_bytes())?; let archive_path = Builder::new() .prefix("archive") .suffix(".zip") - .tempfile_in(temp_dir.path()) - .context("error creating temp file for tweaked archive")? + .tempfile() + .context("error creating temp file for archive")? .into_temp_path(); let args = tufaceous::Args::try_parse_from([ "tufaceous", "assemble", - manifest_path.as_str(), + manifest_file.path().as_str(), archive_path.as_str(), ]) .context("error parsing args")?; diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index cf3d652587..8ec08149ca 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -191,3 +191,19 @@ impl std::fmt::Display for ReincarnatableInstance { write!(f, "{instance_id} ({reason})") } } + +/// The status of a `tuf_artifact_replication` background task activation +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct TufArtifactReplicationStatus { + /// Number of requests handled by sled agents successfully. + pub requests_ok: usize, + /// Number of requests to sled agents that failed. + pub requests_err: usize, + /// Number of requests that were not sent during this activation because the + /// limits had been reached. + pub requests_outstanding: usize, + /// The number of repositories this Nexus instance is keeping in local + /// storage. Local repositories are deleted once all their artifacts are + /// sufficiently replicated. + pub local_repos: usize, +} diff --git a/nexus/types/src/internal_api/params.rs b/nexus/types/src/internal_api/params.rs index 32a16788b4..3e05c2d327 100644 --- a/nexus/types/src/internal_api/params.rs +++ b/nexus/types/src/internal_api/params.rs @@ -35,6 +35,9 @@ pub struct SledAgentInfo { /// The address of the sled agent's API endpoint pub sa_address: SocketAddrV6, + /// The port of the Repo Depot API endpoint, on the same IP as `sa_address` + pub repo_depot_port: u16, + /// Describes the responsibilities of the sled pub role: SledRole, diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 70b281c143..6d916d31f2 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -5453,6 +5453,12 @@ } ] }, + "repo_depot_port": { + "description": "The port of the Repo Depot API endpoint, on the same IP as `sa_address`", + "type": "integer", + "format": "uint16", + "minimum": 0 + }, "reservoir_size": { "description": "Amount of RAM dedicated to the VMM reservoir\n\nMust be smaller than \"usable_physical_ram\"", "allOf": [ @@ -5492,6 +5498,7 @@ "baseboard", "decommissioned", "generation", + "repo_depot_port", "reservoir_size", "role", "sa_address", diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index f689c7e9f7..864d03c34d 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -188,7 +188,11 @@ CREATE TABLE IF NOT EXISTS omicron.public.sled ( sled_state omicron.public.sled_state NOT NULL, /* Generation number owned and incremented by the sled-agent */ - sled_agent_gen INT8 NOT NULL DEFAULT 1 + sled_agent_gen INT8 NOT NULL DEFAULT 1, + + /* The bound port of the Repo Depot API server, running on the same IP as + the sled agent server. */ + repo_depot_port INT4 CHECK (port BETWEEN 0 AND 65535) NOT NULL ); -- Add an index that ensures a given physical sled (identified by serial and @@ -2321,12 +2325,17 @@ CREATE TABLE IF NOT EXISTS omicron.public.tuf_repo ( id UUID PRIMARY KEY, time_created TIMESTAMPTZ NOT NULL, + -- TODO: Repos fetched over HTTP will not have a SHA256 hash; this is an + -- implementation detail of our ZIP archives. sha256 STRING(64) NOT NULL, -- The version of the targets.json role that was used to generate the repo. targets_role_version INT NOT NULL, -- The valid_until time for the repo. + -- TODO: Figure out timestamp validity policy for uploaded repos vs those + -- fetched over HTTP; my (iliana's) current presumption is that we will make + -- this NULL for uploaded ZIP archives of repos. valid_until TIMESTAMPTZ NOT NULL, -- The system version described in the TUF repo. @@ -2371,6 +2380,8 @@ CREATE TABLE IF NOT EXISTS omicron.public.tuf_artifact ( PRIMARY KEY (name, version, kind) ); +CREATE INDEX IF NOT EXISTS tuf_artifact_sha256 ON omicron.public.tuf_artifact (sha256); + -- Reflects that a particular artifact was provided by a particular TUF repo. -- This is a many-many mapping. CREATE TABLE IF NOT EXISTS omicron.public.tuf_repo_artifact ( @@ -4684,7 +4695,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '114.0.0', NULL) + (TRUE, NOW(), NOW(), '115.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/crdb/tuf-artifact-replication/up01.sql b/schema/crdb/tuf-artifact-replication/up01.sql new file mode 100644 index 0000000000..891940ff4d --- /dev/null +++ b/schema/crdb/tuf-artifact-replication/up01.sql @@ -0,0 +1 @@ +CREATE INDEX IF NOT EXISTS tuf_artifact_sha256 ON omicron.public.tuf_artifact (sha256); diff --git a/schema/crdb/tuf-artifact-replication/up02.sql b/schema/crdb/tuf-artifact-replication/up02.sql new file mode 100644 index 0000000000..ad962d2991 --- /dev/null +++ b/schema/crdb/tuf-artifact-replication/up02.sql @@ -0,0 +1,4 @@ +ALTER TABLE omicron.public.sled + ADD COLUMN IF NOT EXISTS repo_depot_port INT4 + CHECK (port BETWEEN 0 AND 65535) + NOT NULL DEFAULT 0; diff --git a/schema/crdb/tuf-artifact-replication/up03.sql b/schema/crdb/tuf-artifact-replication/up03.sql new file mode 100644 index 0000000000..142ac4c60a --- /dev/null +++ b/schema/crdb/tuf-artifact-replication/up03.sql @@ -0,0 +1,2 @@ +ALTER TABLE omicron.public.sled + ALTER COLUMN repo_depot_port DROP DEFAULT; diff --git a/sled-agent/src/artifact_store.rs b/sled-agent/src/artifact_store.rs index 5b78c0099d..bf2bbf0ccd 100644 --- a/sled-agent/src/artifact_store.rs +++ b/sled-agent/src/artifact_store.rs @@ -20,7 +20,7 @@ use std::net::SocketAddrV6; use std::str::FromStr; use std::time::Duration; -use camino::{Utf8Path, Utf8PathBuf}; +use camino::Utf8PathBuf; use camino_tempfile::{NamedUtf8TempFile, Utf8TempPath}; use dropshot::{ Body, ConfigDropshot, FreeformBody, HttpError, HttpResponseOk, Path, @@ -29,7 +29,6 @@ use dropshot::{ use futures::{Stream, TryStreamExt}; use http::StatusCode; use omicron_common::address::REPO_DEPOT_PORT; -use omicron_common::disk::{DatasetKind, DatasetsConfig}; use omicron_common::update::ArtifactHash; use repo_depot_api::*; use sha2::{Digest, Sha256}; @@ -38,7 +37,7 @@ use sled_storage::dataset::M2_ARTIFACT_DATASET; use sled_storage::error::Error as StorageError; use sled_storage::manager::StorageHandle; use slog::{error, info, Logger}; -use slog_error_chain::SlogInlineError; +use slog_error_chain::{InlineErrorChain, SlogInlineError}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncWriteExt; @@ -445,25 +444,10 @@ pub(crate) trait DatasetsManager: Sync { ) -> Result + '_, StorageError>; } -/// Iterator `.filter().map()` common to `DatasetsManager` implementations. -pub(crate) fn filter_dataset_mountpoints( - config: DatasetsConfig, - root: &Utf8Path, -) -> impl Iterator + '_ { - config - .datasets - .into_values() - .filter(|dataset| *dataset.name.dataset() == DatasetKind::Update) - .map(|dataset| dataset.name.mountpoint(root)) -} - impl DatasetsManager for StorageHandle { async fn artifact_storage_paths( &self, ) -> Result + '_, StorageError> { - // TODO: When datasets are managed by Reconfigurator (#6229), - // this should be changed to use `self.datasets_config_list()` and - // `filter_dataset_mountpoints`. Ok(self .get_latest_disks() .await @@ -687,25 +671,33 @@ pub(crate) enum Error { impl From for HttpError { fn from(err: Error) -> HttpError { match err { + // 4xx errors + Error::HashMismatch { .. } => { + HttpError::for_bad_request(None, err.to_string()) + } + Error::NotFound { .. } => { + HttpError::for_not_found(None, err.to_string()) + } Error::AlreadyInProgress { .. } => HttpError::for_client_error( None, StatusCode::CONFLICT, err.to_string(), ), + + // 5xx errors: ensure the error chain is logged Error::Body(inner) => inner, Error::DatasetConfig(_) | Error::NoUpdateDataset => { - HttpError::for_unavail(None, err.to_string()) + HttpError::for_unavail( + None, + InlineErrorChain::new(&err).to_string(), + ) } Error::DepotCopy { .. } | Error::File { .. } | Error::FileRename { .. } - | Error::Join(_) => HttpError::for_internal_error(err.to_string()), - Error::HashMismatch { .. } => { - HttpError::for_bad_request(None, err.to_string()) - } - Error::NotFound { .. } => { - HttpError::for_not_found(None, err.to_string()) - } + | Error::Join(_) => HttpError::for_internal_error( + InlineErrorChain::new(&err).to_string(), + ), } } } @@ -765,10 +757,16 @@ mod test { &self, ) -> Result + '_, StorageError> { - Ok(super::filter_dataset_mountpoints( - self.datasets.clone(), - self.mountpoint_root.path(), - )) + Ok(self + .datasets + .datasets + .values() + .filter(|dataset| { + *dataset.name.dataset() == DatasetKind::Update + }) + .map(|dataset| { + dataset.name.mountpoint(self.mountpoint_root.path()) + })) } } diff --git a/sled-agent/src/nexus.rs b/sled-agent/src/nexus.rs index d1646823bb..fbacdce113 100644 --- a/sled-agent/src/nexus.rs +++ b/sled-agent/src/nexus.rs @@ -164,6 +164,7 @@ type GetSledAgentInfo = Box SledAgentInfo + Send>; pub struct NexusNotifierInput { pub sled_id: SledUuid, pub sled_address: SocketAddrV6, + pub repo_depot_port: u16, pub nexus_client: NexusClient, pub hardware: HardwareManager, pub vmm_reservoir_manager: VmmReservoirManagerHandle, @@ -248,6 +249,7 @@ impl NexusNotifierTask { let NexusNotifierInput { sled_id, sled_address, + repo_depot_port, nexus_client, hardware, vmm_reservoir_manager, @@ -265,6 +267,7 @@ impl NexusNotifierTask { }; SledAgentInfo { sa_address: sled_address.to_string(), + repo_depot_port, role, baseboard: hardware.baseboard().convert(), usable_hardware_threads: hardware.online_processor_count(), @@ -643,6 +646,7 @@ mod test { let latest_sled_agent_info = Arc::new(std::sync::Mutex::new(SledAgentInfo { sa_address: sa_address.clone(), + repo_depot_port: 0, role: nexus_client::types::SledRole::Gimlet, baseboard: Baseboard::new_pc("test".into(), "test".into()) .convert(), diff --git a/sled-agent/src/sim/artifact_store.rs b/sled-agent/src/sim/artifact_store.rs index d73f5a2880..f1f5ae3e69 100644 --- a/sled-agent/src/sim/artifact_store.rs +++ b/sled-agent/src/sim/artifact_store.rs @@ -5,25 +5,29 @@ //! Implementation of `crate::artifact_store::StorageBackend` for our simulated //! storage. -use std::sync::Arc; - use camino_tempfile::Utf8TempDir; -use futures::lock::Mutex; +use dropshot::{ + Body, ConfigDropshot, FreeformBody, HttpError, HttpResponseOk, HttpServer, + Path, RequestContext, ServerBuilder, +}; +use repo_depot_api::*; use sled_storage::error::Error as StorageError; +use std::sync::Arc; -use super::storage::Storage; -use crate::artifact_store::DatasetsManager; +use crate::artifact_store::{ArtifactStore, DatasetsManager}; +#[derive(Clone)] pub(super) struct SimArtifactStorage { - root: Utf8TempDir, - backend: Arc>, + dirs: Arc<(Utf8TempDir, Utf8TempDir)>, } impl SimArtifactStorage { - pub(super) fn new(backend: Arc>) -> SimArtifactStorage { + pub(super) fn new() -> SimArtifactStorage { SimArtifactStorage { - root: camino_tempfile::tempdir().unwrap(), - backend, + dirs: Arc::new(( + camino_tempfile::tempdir().unwrap(), + camino_tempfile::tempdir().unwrap(), + )), } } } @@ -33,16 +37,48 @@ impl DatasetsManager for SimArtifactStorage { &self, ) -> Result + '_, StorageError> { - let config = self - .backend - .lock() - .await - .datasets_config_list() - .await - .map_err(|_| StorageError::LedgerNotFound)?; - Ok(crate::artifact_store::filter_dataset_mountpoints( - config, - self.root.path(), - )) + Ok([self.dirs.0.path().to_owned(), self.dirs.1.path().to_owned()] + .into_iter()) + } +} + +impl ArtifactStore { + pub(super) fn start( + &self, + log: &slog::Logger, + dropshot_config: &ConfigDropshot, + ) -> HttpServer { + let mut config = dropshot_config.clone(); + config.bind_address.set_port(0); + ServerBuilder::new( + repo_depot_api_mod::api_description::() + .expect("registered entrypoints"), + self.clone(), + log.new(o!("component" => "dropshot (Repo Depot)")), + ) + .config(config) + .start() + .unwrap() + } +} + +/// Implementation of the Repo Depot API backed by an +/// `ArtifactStore`. +pub(super) enum RepoDepotImpl {} + +impl RepoDepotApi for RepoDepotImpl { + type Context = ArtifactStore; + + async fn artifact_get_by_sha256( + rqctx: RequestContext, + path_params: Path, + ) -> Result, HttpError> { + let sha256 = path_params.into_inner().sha256; + let file = rqctx.context().get(sha256).await?; + let file_access = hyper_staticfile::vfs::TokioFileAccess::new(file); + let file_stream = + hyper_staticfile::util::FileBytesStream::new(file_access); + let body = Body::wrap(hyper_staticfile::Body::Full(file_stream)); + Ok(HttpResponseOk(FreeformBody(body))) } } diff --git a/sled-agent/src/sim/server.rs b/sled-agent/src/sim/server.rs index a7cf8bb382..a24b71ba12 100644 --- a/sled-agent/src/sim/server.rs +++ b/sled-agent/src/sim/server.rs @@ -116,6 +116,7 @@ impl Server { // TODO-robustness if this returns a 400 error, we probably want to // return a permanent error from the `notify_nexus` closure. let sa_address = http_server.local_addr(); + let repo_depot_port = sled_agent.repo_depot.local_addr().port(); let config_clone = config.clone(); let log_clone = log.clone(); let task = tokio::spawn(async move { @@ -129,6 +130,7 @@ impl Server { &config.id, &NexusTypes::SledAgentInfo { sa_address: sa_address.to_string(), + repo_depot_port, role: NexusTypes::SledRole::Scrimlet, baseboard: NexusTypes::Baseboard { serial: format!( diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index a5c094ec21..0a706c1c58 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -93,7 +93,8 @@ pub struct SledAgent { fake_zones: Mutex, instance_ensure_state_error: Mutex>, pub bootstore_network_config: Mutex, - artifacts: ArtifactStore, + pub(super) repo_depot: + dropshot::HttpServer>, pub log: Logger, } @@ -176,8 +177,8 @@ impl SledAgent { config.storage.ip, storage_log, ))); - let artifacts = - ArtifactStore::new(&log, SimArtifactStorage::new(storage.clone())); + let repo_depot = ArtifactStore::new(&log, SimArtifactStorage::new()) + .start(&log, &config.dropshot); Arc::new(SledAgent { id, @@ -207,7 +208,7 @@ impl SledAgent { zones: vec![], }), instance_ensure_state_error: Mutex::new(None), - artifacts, + repo_depot, log, bootstore_network_config, }) @@ -570,7 +571,7 @@ impl SledAgent { } pub(super) fn artifact_store(&self) -> &ArtifactStore { - &self.artifacts + self.repo_depot.app_private() } pub async fn vmm_count(&self) -> usize { diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 569e78a196..170bc59f63 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -570,11 +570,16 @@ impl SledAgent { ) .await?; + let repo_depot = ArtifactStore::new(&log, storage_manager.clone()) + .start(sled_address, &config.dropshot) + .await?; + // Spawn a background task for managing notifications to nexus // about this sled-agent. let nexus_notifier_input = NexusNotifierInput { sled_id: request.body.id, sled_address: get_sled_address(request.body.subnet), + repo_depot_port: repo_depot.local_addr().port(), nexus_client: nexus_client.clone(), hardware: long_running_task_handles.hardware_manager.clone(), vmm_reservoir_manager: vmm_reservoir_manager.clone(), @@ -596,10 +601,6 @@ impl SledAgent { log.new(o!("component" => "ProbeManager")), ); - let repo_depot = ArtifactStore::new(&log, storage_manager.clone()) - .start(sled_address, &config.dropshot) - .await?; - let sled_agent = SledAgent { inner: Arc::new(SledAgentInner { id: request.body.id, diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index b824f4b7c9..9c319e730f 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -71,6 +71,7 @@ region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index e966b3eabd..d9feaf8ba6 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -71,6 +71,7 @@ region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 region_snapshot_replacement_finish.period_secs = 30 +tuf_artifact_replication.period_secs = 60 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds. diff --git a/tufaceous/manifests/fake.toml b/tufaceous/manifests/fake.toml index a71a5e853f..c3f6404f53 100644 --- a/tufaceous/manifests/fake.toml +++ b/tufaceous/manifests/fake.toml @@ -23,7 +23,7 @@ version = "1.0.0" [artifact.host.source] kind = "composite-host" phase_1 = { kind = "fake", size = "512KiB" } -phase_2 = { kind = "fake", size = "3MiB" } +phase_2 = { kind = "fake", size = "1MiB" } [[artifact.trampoline]] name = "fake-trampoline" @@ -31,7 +31,7 @@ version = "1.0.0" [artifact.trampoline.source] kind = "composite-host" phase_1 = { kind = "fake", size = "512KiB" } -phase_2 = { kind = "fake", size = "3MiB" } +phase_2 = { kind = "fake", size = "1MiB" } [[artifact.control_plane]] name = "fake-control-plane" diff --git a/update-common/src/artifacts/extracted_artifacts.rs b/update-common/src/artifacts/extracted_artifacts.rs index 5ac4a3a395..309b188a9d 100644 --- a/update-common/src/artifacts/extracted_artifacts.rs +++ b/update-common/src/artifacts/extracted_artifacts.rs @@ -25,8 +25,8 @@ use tokio_util::io::ReaderStream; /// Handle to the data of an extracted artifact. /// -/// This does not contain the actual data; use `reader_stream()` to get a new -/// handle to the underlying file to read it on demand. +/// This does not contain the actual data; use `file()` or `reader_stream()` to +/// get a new handle to the underlying file to read it on demand. /// /// Note that although this type implements `Clone` and that cloning is /// relatively cheap, it has additional implications on filesystem cleanup. @@ -69,20 +69,26 @@ impl ExtractedArtifactDataHandle { self.hash_id.hash } - /// Async stream to read the contents of this artifact on demand. + /// Opens the file for this artifact. /// /// This can fail due to I/O errors outside our control (e.g., something /// removed the contents of our temporary directory). - pub async fn reader_stream( - &self, - ) -> anyhow::Result> { + pub async fn file(&self) -> anyhow::Result { let path = path_for_artifact(&self.tempdir, &self.hash_id); - let file = tokio::fs::File::open(&path) + tokio::fs::File::open(&path) .await - .with_context(|| format!("failed to open {path}"))?; + .with_context(|| format!("failed to open {path}")) + } - Ok(ReaderStream::new(file)) + /// Async stream to read the contents of this artifact on demand. + /// + /// This can fail due to I/O errors outside our control (e.g., something + /// removed the contents of our temporary directory). + pub async fn reader_stream( + &self, + ) -> anyhow::Result> { + Ok(ReaderStream::new(self.file().await?)) } }