From 4a96721161200fb6679ba17ba88443bf9e772b47 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sun, 18 Aug 2024 21:36:45 +0000 Subject: [PATCH] wip --- rust/composefs-oci/Cargo.toml | 2 + rust/composefs-oci/src/bin/main.rs | 14 ++ rust/composefs-oci/src/repo.rs | 215 ++++++++++++++++++++++++++--- 3 files changed, 209 insertions(+), 22 deletions(-) diff --git a/rust/composefs-oci/Cargo.toml b/rust/composefs-oci/Cargo.toml index bf1c674b..d8239139 100644 --- a/rust/composefs-oci/Cargo.toml +++ b/rust/composefs-oci/Cargo.toml @@ -22,6 +22,8 @@ tar = "0.4.38" tokio = { features = ["io-std", "time", "process", "rt", "net"], version = ">= 1.13.0" } tokio-util = { features = ["io-util"], version = "0.7" } tokio-stream = { features = ["sync"], version = "0.1.8" } +tracing = "0.1" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } hex = "0.4.3" serde_json = "1.0.117" diff --git a/rust/composefs-oci/src/bin/main.rs b/rust/composefs-oci/src/bin/main.rs index b1d3db4d..fc0361d1 100644 --- a/rust/composefs-oci/src/bin/main.rs +++ b/rust/composefs-oci/src/bin/main.rs @@ -3,6 +3,20 @@ use anyhow::Result; /// The code called after we've done process global init and created /// an async runtime. async fn async_main() -> Result<()> { + // Don't include timestamps and such because they're not really useful and + // too verbose, and plus several log targets such as journald will already + // include timestamps. + let format = tracing_subscriber::fmt::format() + .without_time() + .with_target(false) + .compact(); + // Log to stderr by default + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .event_format(format) + .with_writer(std::io::stderr) + .init(); + tracing::trace!("starting"); // As you can see, the role of this file is mostly to just be a shim // to call into the code that lives in the internal shared library. composefs_oci::run_from_iter(std::env::args()).await diff --git a/rust/composefs-oci/src/repo.rs b/rust/composefs-oci/src/repo.rs index 40a70ccf..825598a8 100644 --- a/rust/composefs-oci/src/repo.rs +++ b/rust/composefs-oci/src/repo.rs @@ -3,15 +3,17 @@ use std::io::{self, Seek, Write}; use std::ops::Add; use std::os::fd::AsFd; use std::path::Path; +use std::sync::atomic::{AtomicU32, AtomicU64}; use std::sync::{Arc, Mutex, OnceLock}; use anyhow::{Context, Result}; -use camino::Utf8PathBuf; +use camino::{Utf8Path, Utf8PathBuf}; use cap_std::fs::Dir; use cap_std_ext::cap_tempfile::{TempDir, TempFile}; +use cap_std_ext::cmdext::CapStdExtCommandExt; use cap_std_ext::dirext::CapStdExtDirExt; use cap_std_ext::{cap_std, cap_tempfile}; -use composefs::dumpfile::Entry; +use composefs::dumpfile::{Entry, Item, Mtime}; use composefs::fsverity::Digest; use fn_error_context::context; use ocidir::cap_std::fs::MetadataExt; @@ -299,36 +301,37 @@ fn linkat_allow_exists( )) } -// Rename all regular files from -> to. Non-regular files will be ignored. +// Rename all regular files from -> to. Non-regular and non-symlink files will be ignored. // If a target file with the given name already exists in "to", the file is left // in the "from" directory. -async fn merge_dir_to(from: Dir, to: Dir) -> Result<()> { +async fn merge_dir_to(from: Dir, to: Dir) -> Result { let from_to = Arc::new((from, to)); let mut tasks = tokio::task::JoinSet::new(); + let mut merged = 0u64; for ent in from_to.0.entries()? { let ent = ent?; let ftype = ent.file_type()?; - if !ftype.is_file() { + if !(ftype.is_file() || ftype.is_symlink()) { continue; } + merged += 1; let name = ent.file_name(); let from_to = Arc::clone(&from_to); tasks.spawn_blocking(move || -> Result<()> { let from = &from_to.0; let to = &from_to.1; - let f = from.open(&name)?; - f.sync_all().context("fsync")?; - match from.rename(&name, &to, &name) { - Ok(()) => Ok(()), - Err(e) if matches!(e.kind(), std::io::ErrorKind::AlreadyExists) => Ok(()), - Err(e) => Err(e.into()), + if ftype.is_file() { + let f = from.open(&name)?; + f.sync_all().context("fsync")?; } + ignore_rustix_eexist(rustix::fs::renameat(from, &name, &to, &name))?; + Ok(()) }); } while let Some(r) = tasks.join_next().await { r.context("join")?.context("Renaming into global")?; } - Ok(()) + Ok(merged) } /// An opaque object representing an active transaction on the repository. @@ -345,6 +348,7 @@ pub struct RepoTransaction { impl RepoTransaction { const TMPROOT: &'static str = "tmp/root"; + const BY_SHA256_UPLINK: &'static str = "../../"; fn new(repo: &Repo) -> Result { let parent = Arc::clone(&repo.0); @@ -449,6 +453,7 @@ impl RepoTransaction { #[context("Committing objects")] // Given two "split checksum" directories, rename all files from -> to async fn commit_objects(from: &Dir, to: &Dir) -> Result<()> { + let mut merged = 0u64; for d in from.entries()? { let d = d?; if !d.file_type()?.is_dir() { @@ -460,8 +465,9 @@ impl RepoTransaction { }; let from = from.open_dir(&name).context("tmp objects")?; let to = to.open_dir(&name).context("global objects")?; - merge_dir_to(from, to).await?; + merged += merge_dir_to(from, to).await?; } + tracing::debug!("Merged: {merged}"); Ok(()) } @@ -523,7 +529,7 @@ impl RepoTransaction { let descriptor_sha256 = descriptor.sha256()?; let tmpf = tmpf.finish_validate(&descriptor)?; let mut objpath = self.import_object(tmpf)?.into_string(); - objpath.insert_str(0, "../"); + objpath.insert_str(0, Self::BY_SHA256_UPLINK); let mut by_sha256_path = String::from(OBJECTS_BY_SHA256); append_object_path(&mut by_sha256_path, &descriptor_sha256)?; ignore_rustix_eexist(rustix::fs::symlinkat( @@ -531,9 +537,33 @@ impl RepoTransaction { &self.repo.0.dir, &by_sha256_path, ))?; + tracing::debug!( + "Added descriptor {} to {by_sha256_path}", + descriptor.digest() + ); Ok(()) } + #[context("Reading object path of descriptor {}", descriptor.digest())] + fn fsverity_digest_for_descriptor(&self, descriptor: &Descriptor) -> Result { + let descriptor_sha256 = descriptor.sha256()?; + let mut by_sha256_path = String::from(OBJECTS_BY_SHA256); + append_object_path(&mut by_sha256_path, &descriptor_sha256)?; + let buf = rustix::fs::readlinkat(&self.repo.0.dir, &by_sha256_path, Vec::new())?; + let mut buf = buf.into_string()?; + if !(buf.chars().all(|c| c.is_ascii()) + && buf.starts_with(Self::BY_SHA256_UPLINK) + && buf.bytes().nth(2) == Some(b'/')) + { + anyhow::bail!("Invalid descriptor symlink: {buf}"); + } + buf.replace_range(0..Self::BY_SHA256_UPLINK.len(), ""); + buf.remove(2); + // Verify + let _ = Sha256Hex::new(&buf); + Ok(buf) + } + #[context("Unpacking regfile")] fn unpack_regfile( &self, @@ -566,14 +596,20 @@ impl RepoTransaction { // Commit this transaction, returning statistics async fn commit(self) -> Result { + std::process::Command::new("find") + .args(["-type", "f"]) + .cwd_dir(self.repo.0.dir.try_clone()?) + .status()?; + // First, handle the objects + Self::commit_objects(&self.repo.0.objects, &self.parent.objects).await?; + // Then all the derived data and links let from_basedir = &self.repo.0.dir; let to_basedir = &self.parent.dir; - Self::commit_objects(&self.repo.0.objects, &self.parent.objects).await?; { let from_by_sha256 = from_basedir .open_dir(OBJECTS_BY_SHA256) .context(OBJECTS_BY_SHA256)?; - let to_by_sha256 = from_basedir + let to_by_sha256 = to_basedir .open_dir(OBJECTS_BY_SHA256) .context(OBJECTS_BY_SHA256)?; Self::commit_objects(&from_by_sha256, &to_by_sha256).await?; @@ -601,6 +637,44 @@ impl RepoTransaction { } } +fn dir_cfs_entry(path: &Utf8Path) -> Entry<'_> { + let item = Item::Directory { size: 0, nlink: 1 }; + Entry { + path: path.into(), + uid: 0, + gid: 0, + mode: libc::S_IFDIR | 0700, + mtime: Mtime { sec: 0, nsec: 0 }, + item, + xattrs: Default::default(), + } +} + +fn cfs_entry_for_descriptor( + d: &Descriptor, + fsverity_digest: &str, + path: &Utf8Path, +) -> Result> { + let size = d.size().try_into()?; + let item = Item::Regular { + size, + nlink: 1, + inline_content: None, + fsverity_digest: Some(fsverity_digest.to_string()), + }; + let path = std::path::PathBuf::from(path); + let e = Entry { + path: path.into(), + uid: 0, + gid: 0, + mode: 0400, + mtime: Mtime { sec: 0, nsec: 0 }, + item, + xattrs: Default::default(), + }; + Ok(e) +} + #[derive(Debug)] struct RepoInner { dir: Dir, @@ -775,7 +849,7 @@ impl Repo { ); if self.has_artifact_manifest(&manifest_descriptor)? { - println!("Already stored: {manifest_digest}"); + tracing::debug!("Already stored: {manifest_digest}"); return Ok((txn, manifest_descriptor)); } @@ -807,7 +881,7 @@ impl Repo { Ok(acc) })?; - println!("Layers to fetch: {}", layers_to_fetch.len()); + tracing::debug!("Layers to fetch: {}", layers_to_fetch.len()); for layer in layers_to_fetch { let size = layer.size().try_into().context("Invalid size")?; let (blob_reader, driver) = proxy.get_blob(&img, layer.digest(), size).await?; @@ -825,8 +899,64 @@ impl Repo { let _: () = driver?; let _: () = import_task.unwrap()?; } - // SAFETY: We joined all the threads - let txn = Arc::into_inner(txn).unwrap(); + tracing::debug!("Imported all layers"); + + let (send_entries, recv_entries) = std::sync::mpsc::sync_channel(5); + let txn_clone = Arc::clone(&txn); + let cfs_worker = tokio::task::spawn_blocking(move || -> Result<_> { + let cfs_object = txn_clone.new_object()?; + let cfs_object_file = cfs_object.as_file().try_clone()?.into_std(); + composefs::mkcomposefs::mkcomposefs(Default::default(), recv_entries, cfs_object_file)?; + let cfs_path = txn_clone.import_object(cfs_object)?; + tracing::debug!("Committed artifact: {cfs_path}"); + Ok(cfs_path) + }); + let txn_clone = Arc::clone(&txn); + let manifest_desc_ref = &manifest_descriptor; + let manifest_ref = &manifest; + let send_task = async move { + let manifest_fsverity = txn_clone.fsverity_digest_for_descriptor(manifest_desc_ref)?; + // If we fail to send on the channel, then we should get an error from the mkcomposefs job + if let Err(_) = send_entries.send(dir_cfs_entry("/".into())) { + return Ok(()); + } + let path = Utf8Path::new("/manifest.json"); + if let Err(_) = send_entries.send(cfs_entry_for_descriptor( + &manifest_desc_ref, + &manifest_fsverity, + path, + )?) { + return Ok(()); + } + let config_fsverity = txn_clone.fsverity_digest_for_descriptor(&config_descriptor)?; + let path = Utf8Path::new("/config.json"); + if let Err(_) = send_entries.send(cfs_entry_for_descriptor( + &config_descriptor, + &config_fsverity, + path, + )?) { + return Ok(()); + } + if let Err(_) = send_entries.send(dir_cfs_entry("/layers".into())) { + return Ok(()); + } + for (i, layer) in manifest_ref.layers().iter().enumerate() { + let digest = txn_clone.fsverity_digest_for_descriptor(&layer)?; + let path = &format!("/layers/{i}"); + if let Err(_) = + send_entries.send(cfs_entry_for_descriptor(&layer, &digest, path.as_ref())?) + { + return Ok(()); + } + } + tracing::debug!("Wrote all cfs entries"); + drop(send_entries); + anyhow::Ok(()) + }; + + let (mkcfs_result, send_result) = tokio::join!(cfs_worker, send_task); + let cfs_objpath = mkcfs_result.unwrap()?; + let _: () = send_result?; // let repo = self.clone(); // tokio::task::spawn_blocking(move || -> Result<_> { @@ -834,6 +964,8 @@ impl Repo { // }) // .await // .unwrap() + // SAFETY: We joined all the threads + let txn = Arc::into_inner(txn).unwrap(); Ok((txn, manifest_descriptor)) } @@ -950,8 +1082,14 @@ mod tests { use std::io::BufWriter; use std::process::Command; + use ocidir::oci_spec::image::{ + ImageConfigurationBuilder, ImageManifest, ImageManifestBuilder, Platform, + }; + use super::*; + const EMPTY_DIFFID: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + fn new_memfd(buf: &[u8]) -> Result { use rustix::fs::MemfdFlags; let f = rustix::fs::memfd_create("test memfd", MemfdFlags::CLOEXEC)?; @@ -970,8 +1108,6 @@ mod tests { let repo = Repo::init(&td.open_dir("repo")?, false).unwrap(); eprintln!("verity={}", repo.has_verity()); - const EMPTY_DIFFID: &str = - "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; assert!(!repo.has_layer(EMPTY_DIFFID).unwrap()); // A no-op import @@ -1011,4 +1147,39 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_import_ocidir() -> Result<()> { + let td = TempDir::new(cap_std::ambient_authority())?; + let td = &*td; + + // td.create_dir("oci")?; + // let ocidir = ocidir::OciDir::ensure(&td.open_dir("oci")?)?; + + // // A dummy layer + // let mut blobw = ocidir.create_gzip_layer(Default::default())?; + // blobw.write_all(b"pretend this is a tarball")?; + // let blob = blobw.complete()?; + // let mut manifest = ocidir::new_empty_manifest().build().unwrap(); + // let mut config = ImageConfigurationBuilder::default().build().unwrap(); + // ocidir.push_layer(&mut manifest, &mut config, blob, "empty blob", Default::default()); + + // ocidir.insert_manifest_and_config(manifest, config, Some("latest"), Platform::default())?; + + td.create_dir("repo")?; + let repo = Repo::init(&td.open_dir("repo")?, false).unwrap(); + let proxy = containers_image_proxy::ImageProxy::new().await?; + + let txn = repo.new_transaction()?; + let (txn, desc) = repo + .pull_artifact(txn, &proxy, "docker://quay.io/fedora/fedora:40") + .await + .unwrap(); + let r = txn.commit().await.unwrap(); + assert_eq!(r.extant_objects_count, 0); + assert_eq!(r.imported_objects_count, 0); + assert_eq!(r.imported_objects_size, 0); + + Ok(()) + } }