Skip to content

Commit

Permalink
container: Cache new manifest/config in prepare, add API to query
Browse files Browse the repository at this point in the history
Closes: ostreedev#496

In coreos/rpm-ostree#4486 we
were working on fixing `rpm-ostree upgrade --check` with containers.

However, what we really want here is to *persist* the updated
manifest (and config) that we fetch.  And if we do that, we might
as well just make it part of the current `prepare()` API so it
happens automatically.

In this change, we do so via detached commit metadata.  An important
thing here is that the data is then automatically lifecycle
bound to the merge commit - and the merge commit always
changes when we fetch a new manifest.

In order to do an offline query (e.g. in rpm-ostree we want to
re-synthesize a higher level summary of the queued update)
add an API which allows querying a previously saved cached update.

Hence a flow like this should work:

- OS boots
- OS updater does a background "check for updates" via calling `prepare()`
- OS updater finds an update, and renders metadata to the user
  or orchestration system
- <time passes; OS update is not downloaded - e.g. user is on
   metered data or whatever>
- system reboots for other reasons
- OS updater can re-render the fact that a queued update was
  found *without* touching the network
- User can initiate a full fetch (e.g. including image layers)
  targeting *exactly* the previously prepared fetch.  This
  makes things much more race-free; if the image was GC'd
  in the meantime we correctly fail.
  • Loading branch information
cgwalters committed Sep 13, 2023
1 parent 454dd88 commit 615a632
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 31 deletions.
20 changes: 19 additions & 1 deletion lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
use anyhow::{Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use cap_std::fs::Dir;
use cap_std_ext::cap_std;
use cap_std_ext::prelude::CapStdExtDirExt;
use clap::{Parser, Subcommand};
use fn_error_context::context;
use io_lifetimes::AsFd;
Expand Down Expand Up @@ -220,6 +222,12 @@ pub(crate) enum ContainerImageOpts {
/// Don't display progress
#[clap(long)]
quiet: bool,

/// Just check for an updated manifest, but do not download associated container layers.
/// If an updated manifest is found, a file at the provided path will be created and contain
/// the new manifest.
#[clap(long)]
check: Option<Utf8PathBuf>,
},

/// Output metadata about an already stored container image.
Expand Down Expand Up @@ -668,6 +676,7 @@ async fn container_store(
imgref: &OstreeImageReference,
proxyopts: ContainerProxyOpts,
quiet: bool,
check: Option<Utf8PathBuf>,
) -> Result<()> {
let mut imp = ImageImporter::new(repo, imgref, proxyopts.into()).await?;
let prep = match imp.prepare().await? {
Expand All @@ -680,6 +689,14 @@ async fn container_store(
if let Some(warning) = prep.deprecated_warning() {
print_deprecated_warning(warning).await;
}
if let Some(check) = check.as_deref() {
let rootfs = Dir::open_ambient_dir("/", cap_std::ambient_authority())?;
rootfs.atomic_replace_with(check.as_str().trim_start_matches('/'), |w| {
serde_json::to_writer(w, &prep.manifest).context("Serializing manifest")
})?;
// In check mode, we're done
return Ok(());
}
if let Some(previous_state) = prep.previous_state.as_ref() {
let diff = ManifestDiff::new(&previous_state.manifest, &prep.manifest);
diff.print();
Expand Down Expand Up @@ -899,9 +916,10 @@ async fn run_from_opt(opt: Opt) -> Result<()> {
imgref,
proxyopts,
quiet,
check,
} => {
let repo = parse_repo(&repo)?;
container_store(&repo, &imgref, proxyopts, quiet).await
container_store(&repo, &imgref, proxyopts, quiet, check).await
}
ContainerImageOpts::History { repo, imgref } => {
let repo = parse_repo(&repo)?;
Expand Down
184 changes: 154 additions & 30 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,11 @@ fn timestamp_of_manifest_or_config(
}

impl ImageImporter {
/// The metadata key used in ostree commit metadata to serialize
const CACHED_KEY_MANIFEST_DIGEST: &str = "ostree-ext.cached.manifest-digest";
const CACHED_KEY_MANIFEST: &str = "ostree-ext.cached.manifest";
const CACHED_KEY_CONFIG: &str = "ostree-ext.cached.config";

/// Create a new importer.
#[context("Creating importer")]
pub async fn new(
Expand Down Expand Up @@ -498,11 +503,71 @@ impl ImageImporter {
}

/// Determine if there is a new manifest, and if so return its digest.
/// This will also serialize the new manifest and configuration into
/// metadata associated with the image, so that invocations of `[query_cached]`
/// can re-fetch it without accessing the network.
#[context("Preparing import")]
pub async fn prepare(&mut self) -> Result<PrepareResult> {
self.prepare_internal(false).await
}

/// Determine if there is a new manifest using only cached state, and if so return metadata.
#[context("Preparing import")]
pub fn query_cached(&mut self) -> Result<Option<Box<PreparedImport>>> {
let previous_state =
if let Some(previous_state) = try_query_image_ref(&self.repo, &self.imgref.imgref)? {
previous_state
} else {
// If there's no previous container image, there can't be a cached state.
return Ok(None);
};
let previous_imageid = Some(previous_state.manifest.config().digest().clone());

let commitmeta = if let Some(meta) = self
.repo
.read_commit_detached_metadata(&previous_state.merge_commit, gio::Cancellable::NONE)?
{
glib::VariantDict::new(Some(&meta))
} else {
// Older ostree-ext releases won't have written cached metadata.
return Ok(None);
};

// Try to retrieve the manifest digest key from the commit detached metadata.
let manifest_digest =
if let Some(d) = commitmeta.lookup::<String>(Self::CACHED_KEY_MANIFEST_DIGEST)? {
d
} else {
// It's possible that something *else* wrote detached metadata; gracefully handle that.
return Ok(None);
};

// If we found the cached manifest digest key, then we must have the manifest and config;
// otherwise that's an error.
let manifest = commitmeta.lookup_value(Self::CACHED_KEY_MANIFEST, None);
let manifest: oci_image::ImageManifest = manifest
.as_ref()
.and_then(|v| v.str())
.map(serde_json::from_str)
.transpose()?
.ok_or_else(|| anyhow!("Expected cached manifest {}", Self::CACHED_KEY_MANIFEST))?;
let config = commitmeta.lookup_value(Self::CACHED_KEY_CONFIG, None);
let config: oci_image::ImageConfiguration = config
.as_ref()
.and_then(|v| v.str())
.map(serde_json::from_str)
.transpose()?
.ok_or_else(|| anyhow!("Expected cached manifest {}", Self::CACHED_KEY_CONFIG))?;
self.create_prepared_import(
manifest_digest,
manifest,
config,
Some(previous_state),
previous_imageid,
)
.map(Some)
}

/// Create a channel receiver that will get notifications for layer fetches.
pub fn request_progress(&mut self) -> Receiver<ImportProgress> {
assert!(self.layer_progress.is_none());
Expand All @@ -521,6 +586,81 @@ impl ImageImporter {
r
}

/// Serialize the metadata about a pending fetch as detached metadata on the commit object,
/// so it can be retrieved later offline
#[context("Writing cached pending manifest")]
pub(crate) async fn cache_pending(
&self,
commit: &str,
manifest_digest: &str,
manifest: &ImageManifest,
config: &ImageConfiguration,
) -> Result<()> {
let commitmeta = glib::VariantDict::new(None);
commitmeta.insert(Self::CACHED_KEY_MANIFEST_DIGEST, manifest_digest);
let cached_manifest = serde_json::to_string(manifest).context("Serializing manifest")?;
commitmeta.insert(Self::CACHED_KEY_MANIFEST, cached_manifest);
let cached_config = serde_json::to_string(config).context("Serializing config")?;
commitmeta.insert(Self::CACHED_KEY_CONFIG, cached_config);
let commitmeta = commitmeta.to_variant();
// Clone these to move into blocking method
let commit = commit.to_string();
let repo = self.repo.clone();
crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| {
repo.write_commit_detached_metadata(&commit, Some(&commitmeta), Some(cancellable))
.map_err(anyhow::Error::msg)
})
.await
}

/// Given existing metadata (manifest, config, previous image statE) generate a PreparedImport structure
/// which e.g. includes a diff of the layers.
fn create_prepared_import(
&mut self,
manifest_digest: String,
manifest: ImageManifest,
config: ImageConfiguration,
previous_state: Option<Box<LayeredImageState>>,
previous_imageid: Option<String>,
) -> Result<Box<PreparedImport>> {
let config_labels = super::labels_of(&config);
if self.require_bootable {
let bootable_key = *ostree::METADATA_KEY_BOOTABLE;
let bootable = config_labels.map_or(false, |l| l.contains_key(bootable_key));
if !bootable {
anyhow::bail!("Target image does not have {bootable_key} label");
}
}

let (commit_layer, component_layers, remaining_layers) =
parse_manifest_layout(&manifest, &config)?;

let query = |l: &Descriptor| query_layer(&self.repo, l.clone());
let commit_layer = query(commit_layer)?;
let component_layers = component_layers
.into_iter()
.map(query)
.collect::<Result<Vec<_>>>()?;
let remaining_layers = remaining_layers
.into_iter()
.map(query)
.collect::<Result<Vec<_>>>()?;

let previous_manifest_digest = previous_state.as_ref().map(|s| s.manifest_digest.clone());
let imp = PreparedImport {
manifest_digest,
manifest,
config,
previous_state,
previous_manifest_digest,
previous_imageid,
ostree_layers: component_layers,
ostree_commit_layer: commit_layer,
layers: remaining_layers,
};
Ok(Box::new(imp))
}

/// Determine if there is a new manifest, and if so return its digest.
#[context("Fetching manifest")]
pub(crate) async fn prepare_internal(&mut self, verify_layers: bool) -> Result<PrepareResult> {
Expand Down Expand Up @@ -559,43 +699,27 @@ impl ImageImporter {
};

let config = self.proxy.fetch_config(&self.proxy_img).await?;
let config_labels = super::labels_of(&config);

if self.require_bootable {
let bootable_key = *ostree::METADATA_KEY_BOOTABLE;
let bootable = config_labels.map_or(false, |l| l.contains_key(bootable_key));
if !bootable {
anyhow::bail!("Target image does not have {bootable_key} label");
}
// If there is a currently fetched image, cache the new pending manifest+config
// as detached commit metadata, so that future fetches can query it offline.
if let Some(previous_state) = previous_state.as_ref() {
self.cache_pending(
previous_state.merge_commit.as_str(),
manifest_digest.as_str(),
&manifest,
&config,
)
.await?;
}

let (commit_layer, component_layers, remaining_layers) =
parse_manifest_layout(&manifest, &config)?;

let query = |l: &Descriptor| query_layer(&self.repo, l.clone());
let commit_layer = query(commit_layer)?;
let component_layers = component_layers
.into_iter()
.map(query)
.collect::<Result<Vec<_>>>()?;
let remaining_layers = remaining_layers
.into_iter()
.map(query)
.collect::<Result<Vec<_>>>()?;

let previous_manifest_digest = previous_state.as_ref().map(|s| s.manifest_digest.clone());
let imp = PreparedImport {
manifest,
let imp = self.create_prepared_import(
manifest_digest,
manifest,
config,
previous_state,
previous_manifest_digest,
previous_imageid,
ostree_layers: component_layers,
ostree_commit_layer: commit_layer,
layers: remaining_layers,
};
Ok(PrepareResult::Ready(Box::new(imp)))
)?;
Ok(PrepareResult::Ready(imp))
}

/// Extract the base ostree commit.
Expand Down
14 changes: 14 additions & 0 deletions lib/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ async fn test_container_chunked() -> Result<()> {

let mut imp =
store::ImageImporter::new(fixture.destrepo(), &imgref, Default::default()).await?;
assert!(imp.query_cached()?.is_none());
let prep = match imp.prepare().await.context("Init prep derived")? {
store::PrepareResult::AlreadyPresent(_) => panic!("should not be already imported"),
store::PrepareResult::Ready(r) => r,
Expand Down Expand Up @@ -748,6 +749,14 @@ async fn test_container_chunked() -> Result<()> {
.unwrap()
.is_none()
);
// Verify there are no updates.
let mut imp =
store::ImageImporter::new(fixture.destrepo(), &imgref, Default::default()).await?;
match imp.prepare().await? {
store::PrepareResult::AlreadyPresent(_) => {}
store::PrepareResult::Ready(_) => panic!("should be already imported"),
};
assert!(imp.query_cached()?.is_none());

const ADDITIONS: &str = indoc::indoc! { "
r usr/bin/bash bash-v0
Expand All @@ -765,6 +774,11 @@ r usr/bin/bash bash-v0
store::PrepareResult::AlreadyPresent(_) => panic!("should not be already imported"),
store::PrepareResult::Ready(r) => r,
};
let cached_prep = imp.query_cached()?.expect("cached update");
assert_eq!(
cached_prep.manifest_digest.as_str(),
prep.manifest_digest.as_str()
);
let to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
assert_eq!(to_fetch.len(), 2);
assert_eq!(expected_digest, prep.manifest_digest.as_str());
Expand Down

0 comments on commit 615a632

Please sign in to comment.