Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cgwalters committed Aug 18, 2024
1 parent 7217fbb commit 4a96721
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 22 deletions.
2 changes: 2 additions & 0 deletions rust/composefs-oci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ tar = "0.4.38"
tokio = { features = ["io-std", "time", "process", "rt", "net"], version = ">= 1.13.0" }
tokio-util = { features = ["io-util"], version = "0.7" }
tokio-stream = { features = ["sync"], version = "0.1.8" }
tracing = "0.1"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
hex = "0.4.3"
serde_json = "1.0.117"

Expand Down
14 changes: 14 additions & 0 deletions rust/composefs-oci/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@ use anyhow::Result;
/// The code called after we've done process global init and created
/// an async runtime.
async fn async_main() -> Result<()> {
// Don't include timestamps and such because they're not really useful and
// too verbose, and plus several log targets such as journald will already
// include timestamps.
let format = tracing_subscriber::fmt::format()
.without_time()
.with_target(false)
.compact();
// Log to stderr by default
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.event_format(format)
.with_writer(std::io::stderr)
.init();
tracing::trace!("starting");
// As you can see, the role of this file is mostly to just be a shim
// to call into the code that lives in the internal shared library.
composefs_oci::run_from_iter(std::env::args()).await
Expand Down
215 changes: 193 additions & 22 deletions rust/composefs-oci/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ use std::io::{self, Seek, Write};
use std::ops::Add;
use std::os::fd::AsFd;
use std::path::Path;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::{Arc, Mutex, OnceLock};

use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use camino::{Utf8Path, Utf8PathBuf};
use cap_std::fs::Dir;
use cap_std_ext::cap_tempfile::{TempDir, TempFile};
use cap_std_ext::cmdext::CapStdExtCommandExt;
use cap_std_ext::dirext::CapStdExtDirExt;
use cap_std_ext::{cap_std, cap_tempfile};
use composefs::dumpfile::Entry;
use composefs::dumpfile::{Entry, Item, Mtime};
use composefs::fsverity::Digest;
use fn_error_context::context;
use ocidir::cap_std::fs::MetadataExt;
Expand Down Expand Up @@ -299,36 +301,37 @@ fn linkat_allow_exists(
))
}

// Rename all regular files from -> to. Non-regular files will be ignored.
// Rename all regular files from -> to. Non-regular and non-symlink files will be ignored.
// If a target file with the given name already exists in "to", the file is left
// in the "from" directory.
async fn merge_dir_to(from: Dir, to: Dir) -> Result<()> {
async fn merge_dir_to(from: Dir, to: Dir) -> Result<u64> {
let from_to = Arc::new((from, to));
let mut tasks = tokio::task::JoinSet::new();
let mut merged = 0u64;
for ent in from_to.0.entries()? {
let ent = ent?;
let ftype = ent.file_type()?;
if !ftype.is_file() {
if !(ftype.is_file() || ftype.is_symlink()) {
continue;
}
merged += 1;
let name = ent.file_name();
let from_to = Arc::clone(&from_to);
tasks.spawn_blocking(move || -> Result<()> {
let from = &from_to.0;
let to = &from_to.1;
let f = from.open(&name)?;
f.sync_all().context("fsync")?;
match from.rename(&name, &to, &name) {
Ok(()) => Ok(()),
Err(e) if matches!(e.kind(), std::io::ErrorKind::AlreadyExists) => Ok(()),
Err(e) => Err(e.into()),
if ftype.is_file() {
let f = from.open(&name)?;
f.sync_all().context("fsync")?;
}
ignore_rustix_eexist(rustix::fs::renameat(from, &name, &to, &name))?;
Ok(())
});
}
while let Some(r) = tasks.join_next().await {
r.context("join")?.context("Renaming into global")?;
}
Ok(())
Ok(merged)
}

/// An opaque object representing an active transaction on the repository.
Expand All @@ -345,6 +348,7 @@ pub struct RepoTransaction {

impl RepoTransaction {
const TMPROOT: &'static str = "tmp/root";
const BY_SHA256_UPLINK: &'static str = "../../";

fn new(repo: &Repo) -> Result<Self> {
let parent = Arc::clone(&repo.0);
Expand Down Expand Up @@ -449,6 +453,7 @@ impl RepoTransaction {
#[context("Committing objects")]
// Given two "split checksum" directories, rename all files from -> to
async fn commit_objects(from: &Dir, to: &Dir) -> Result<()> {
let mut merged = 0u64;
for d in from.entries()? {
let d = d?;
if !d.file_type()?.is_dir() {
Expand All @@ -460,8 +465,9 @@ impl RepoTransaction {
};
let from = from.open_dir(&name).context("tmp objects")?;
let to = to.open_dir(&name).context("global objects")?;
merge_dir_to(from, to).await?;
merged += merge_dir_to(from, to).await?;
}
tracing::debug!("Merged: {merged}");
Ok(())
}

Expand Down Expand Up @@ -523,17 +529,41 @@ impl RepoTransaction {
let descriptor_sha256 = descriptor.sha256()?;
let tmpf = tmpf.finish_validate(&descriptor)?;
let mut objpath = self.import_object(tmpf)?.into_string();
objpath.insert_str(0, "../");
objpath.insert_str(0, Self::BY_SHA256_UPLINK);
let mut by_sha256_path = String::from(OBJECTS_BY_SHA256);
append_object_path(&mut by_sha256_path, &descriptor_sha256)?;
ignore_rustix_eexist(rustix::fs::symlinkat(
&objpath,
&self.repo.0.dir,
&by_sha256_path,
))?;
tracing::debug!(
"Added descriptor {} to {by_sha256_path}",
descriptor.digest()
);
Ok(())
}

#[context("Reading object path of descriptor {}", descriptor.digest())]
fn fsverity_digest_for_descriptor(&self, descriptor: &Descriptor) -> Result<String> {
let descriptor_sha256 = descriptor.sha256()?;
let mut by_sha256_path = String::from(OBJECTS_BY_SHA256);
append_object_path(&mut by_sha256_path, &descriptor_sha256)?;
let buf = rustix::fs::readlinkat(&self.repo.0.dir, &by_sha256_path, Vec::new())?;
let mut buf = buf.into_string()?;
if !(buf.chars().all(|c| c.is_ascii())
&& buf.starts_with(Self::BY_SHA256_UPLINK)
&& buf.bytes().nth(2) == Some(b'/'))
{
anyhow::bail!("Invalid descriptor symlink: {buf}");
}
buf.replace_range(0..Self::BY_SHA256_UPLINK.len(), "");
buf.remove(2);
// Verify
let _ = Sha256Hex::new(&buf);
Ok(buf)
}

#[context("Unpacking regfile")]
fn unpack_regfile<E: std::io::Read>(
&self,
Expand Down Expand Up @@ -566,14 +596,20 @@ impl RepoTransaction {

// Commit this transaction, returning statistics
async fn commit(self) -> Result<TransactionStats> {
std::process::Command::new("find")
.args(["-type", "f"])
.cwd_dir(self.repo.0.dir.try_clone()?)
.status()?;
// First, handle the objects
Self::commit_objects(&self.repo.0.objects, &self.parent.objects).await?;
// Then all the derived data and links
let from_basedir = &self.repo.0.dir;
let to_basedir = &self.parent.dir;
Self::commit_objects(&self.repo.0.objects, &self.parent.objects).await?;
{
let from_by_sha256 = from_basedir
.open_dir(OBJECTS_BY_SHA256)
.context(OBJECTS_BY_SHA256)?;
let to_by_sha256 = from_basedir
let to_by_sha256 = to_basedir
.open_dir(OBJECTS_BY_SHA256)
.context(OBJECTS_BY_SHA256)?;
Self::commit_objects(&from_by_sha256, &to_by_sha256).await?;
Expand Down Expand Up @@ -601,6 +637,44 @@ impl RepoTransaction {
}
}

fn dir_cfs_entry(path: &Utf8Path) -> Entry<'_> {
let item = Item::Directory { size: 0, nlink: 1 };
Entry {
path: path.into(),
uid: 0,
gid: 0,
mode: libc::S_IFDIR | 0700,
mtime: Mtime { sec: 0, nsec: 0 },
item,
xattrs: Default::default(),
}
}

fn cfs_entry_for_descriptor(
d: &Descriptor,
fsverity_digest: &str,
path: &Utf8Path,
) -> Result<Entry<'static>> {
let size = d.size().try_into()?;
let item = Item::Regular {
size,
nlink: 1,
inline_content: None,
fsverity_digest: Some(fsverity_digest.to_string()),
};
let path = std::path::PathBuf::from(path);
let e = Entry {
path: path.into(),
uid: 0,
gid: 0,
mode: 0400,
mtime: Mtime { sec: 0, nsec: 0 },
item,
xattrs: Default::default(),
};
Ok(e)
}

#[derive(Debug)]
struct RepoInner {
dir: Dir,
Expand Down Expand Up @@ -775,7 +849,7 @@ impl Repo {
);

if self.has_artifact_manifest(&manifest_descriptor)? {
println!("Already stored: {manifest_digest}");
tracing::debug!("Already stored: {manifest_digest}");
return Ok((txn, manifest_descriptor));
}

Expand Down Expand Up @@ -807,7 +881,7 @@ impl Repo {
Ok(acc)
})?;

println!("Layers to fetch: {}", layers_to_fetch.len());
tracing::debug!("Layers to fetch: {}", layers_to_fetch.len());
for layer in layers_to_fetch {
let size = layer.size().try_into().context("Invalid size")?;
let (blob_reader, driver) = proxy.get_blob(&img, layer.digest(), size).await?;
Expand All @@ -825,15 +899,73 @@ impl Repo {
let _: () = driver?;
let _: () = import_task.unwrap()?;
}
// SAFETY: We joined all the threads
let txn = Arc::into_inner(txn).unwrap();
tracing::debug!("Imported all layers");

let (send_entries, recv_entries) = std::sync::mpsc::sync_channel(5);
let txn_clone = Arc::clone(&txn);
let cfs_worker = tokio::task::spawn_blocking(move || -> Result<_> {
let cfs_object = txn_clone.new_object()?;
let cfs_object_file = cfs_object.as_file().try_clone()?.into_std();
composefs::mkcomposefs::mkcomposefs(Default::default(), recv_entries, cfs_object_file)?;
let cfs_path = txn_clone.import_object(cfs_object)?;
tracing::debug!("Committed artifact: {cfs_path}");
Ok(cfs_path)
});
let txn_clone = Arc::clone(&txn);
let manifest_desc_ref = &manifest_descriptor;
let manifest_ref = &manifest;
let send_task = async move {
let manifest_fsverity = txn_clone.fsverity_digest_for_descriptor(manifest_desc_ref)?;
// If we fail to send on the channel, then we should get an error from the mkcomposefs job
if let Err(_) = send_entries.send(dir_cfs_entry("/".into())) {
return Ok(());
}
let path = Utf8Path::new("/manifest.json");
if let Err(_) = send_entries.send(cfs_entry_for_descriptor(
&manifest_desc_ref,
&manifest_fsverity,
path,
)?) {
return Ok(());
}
let config_fsverity = txn_clone.fsverity_digest_for_descriptor(&config_descriptor)?;
let path = Utf8Path::new("/config.json");
if let Err(_) = send_entries.send(cfs_entry_for_descriptor(
&config_descriptor,
&config_fsverity,
path,
)?) {
return Ok(());
}
if let Err(_) = send_entries.send(dir_cfs_entry("/layers".into())) {
return Ok(());
}
for (i, layer) in manifest_ref.layers().iter().enumerate() {
let digest = txn_clone.fsverity_digest_for_descriptor(&layer)?;
let path = &format!("/layers/{i}");
if let Err(_) =
send_entries.send(cfs_entry_for_descriptor(&layer, &digest, path.as_ref())?)
{
return Ok(());
}
}
tracing::debug!("Wrote all cfs entries");
drop(send_entries);
anyhow::Ok(())
};

let (mkcfs_result, send_result) = tokio::join!(cfs_worker, send_task);
let cfs_objpath = mkcfs_result.unwrap()?;
let _: () = send_result?;

// let repo = self.clone();
// tokio::task::spawn_blocking(move || -> Result<_> {
// repo.as_oci().insert_manifest(manifest, Some("default"), platform)
// })
// .await
// .unwrap()
// SAFETY: We joined all the threads
let txn = Arc::into_inner(txn).unwrap();
Ok((txn, manifest_descriptor))
}

Expand Down Expand Up @@ -950,8 +1082,14 @@ mod tests {
use std::io::BufWriter;
use std::process::Command;

use ocidir::oci_spec::image::{
ImageConfigurationBuilder, ImageManifest, ImageManifestBuilder, Platform,
};

use super::*;

const EMPTY_DIFFID: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";

fn new_memfd(buf: &[u8]) -> Result<File> {
use rustix::fs::MemfdFlags;
let f = rustix::fs::memfd_create("test memfd", MemfdFlags::CLOEXEC)?;
Expand All @@ -970,8 +1108,6 @@ mod tests {
let repo = Repo::init(&td.open_dir("repo")?, false).unwrap();
eprintln!("verity={}", repo.has_verity());

const EMPTY_DIFFID: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
assert!(!repo.has_layer(EMPTY_DIFFID).unwrap());

// A no-op import
Expand Down Expand Up @@ -1011,4 +1147,39 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_import_ocidir() -> Result<()> {
let td = TempDir::new(cap_std::ambient_authority())?;
let td = &*td;

// td.create_dir("oci")?;
// let ocidir = ocidir::OciDir::ensure(&td.open_dir("oci")?)?;

// // A dummy layer
// let mut blobw = ocidir.create_gzip_layer(Default::default())?;
// blobw.write_all(b"pretend this is a tarball")?;
// let blob = blobw.complete()?;
// let mut manifest = ocidir::new_empty_manifest().build().unwrap();
// let mut config = ImageConfigurationBuilder::default().build().unwrap();
// ocidir.push_layer(&mut manifest, &mut config, blob, "empty blob", Default::default());

// ocidir.insert_manifest_and_config(manifest, config, Some("latest"), Platform::default())?;

td.create_dir("repo")?;
let repo = Repo::init(&td.open_dir("repo")?, false).unwrap();
let proxy = containers_image_proxy::ImageProxy::new().await?;

let txn = repo.new_transaction()?;
let (txn, desc) = repo
.pull_artifact(txn, &proxy, "docker://quay.io/fedora/fedora:40")
.await
.unwrap();
let r = txn.commit().await.unwrap();
assert_eq!(r.extant_objects_count, 0);
assert_eq!(r.imported_objects_count, 0);
assert_eq!(r.imported_objects_size, 0);

Ok(())
}
}

0 comments on commit 4a96721

Please sign in to comment.