Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTA-1378,OTA-1379: add retry logic for pulling images and less logs for sigs #969

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ use std::io::Read;
use std::iter::Iterator;
use std::path::{Path, PathBuf};
use std::string::String;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tar::Archive;
use tokio::time::sleep;
use url::Url;

use dkregistry::mediatypes::MediaTypes::{ManifestList, ManifestV2S1Signed, ManifestV2S2};
Expand Down Expand Up @@ -315,31 +318,36 @@ pub async fn fetch_releases(
Arc::new(FuturesMutex::new(Vec::with_capacity(estimated_releases)))
};

// releases containing signatures that we want to ignore
let ignored_sig_releases = Arc::new(AtomicUsize::new(0));
// releases that had some issues and were not included in the graph
let ignored_releases = Arc::new(AtomicUsize::new(0));
tags.try_for_each_concurrent(concurrency, |tag| {
let registry_client = registry_client.clone();
let cache = cache.clone();
let releases = releases.clone();
let sig_releases = ignored_sig_releases.clone();
let skip_releases = ignored_releases.clone();

async move {
let (arch, manifestref, mut layers_digests) =
match get_manifest_layers(tag.to_owned(), &repo, &registry_client).await
{
// todo: we're ignoring the images which we do not understand with this change.
// this change is required because we dont want cincinnati to error out on encountering
// cosign signatures.
// unintended consequence of this change can be that cincinnati continues with a log when it encounters
// incorrect image whereas it should error out. Cincinnati won't include this image in the update graph.
// We should ideally try to ignore only the cosign .sig signatures. this change acts a
// stopgap till we teach cincinnati to deal with signatures
match get_manifest_layers(tag.to_owned(), &repo, &registry_client).await {
Ok(result) => result,
Err(e) => {
warn!(
"error fetching manifest and manifestref for {}:{}: {}, ignoring this image",
&repo,
&tag,
e
if tag.contains(".sig") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tag-based discovery is one option for finding Sigstore signatures. In the future, we might move to listing referrers (containers/image#2030). But if we do, the failure modes for this line:

  • Misclassifying a non-sig as a Sigstore signature because it happens to use this tag structure, or
  • Misclassifying a Sigstore signature as a non-sig, non-release ignored image,

both seem low, so 🤷, I'm ok with this heuristic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for misclassifying non-sig to sig will always be a risk if we do string comparison. but imo should be rare.
if signature gets classifies as non-sig, the logs should bring that into notice. Not a lot worried about the mismatch.
We can also change this logic when we get listing referrers in dkregistry and pull it downstream to cincinnati.

debug!(
"encountered a signature for {}:{}: {}, ignoring this image",
PratikMahajan marked this conversation as resolved.
Show resolved Hide resolved
&repo, &tag, e
);
sig_releases.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
error!(
"fetching manifest and manifestref for {}:{}: {}",
&repo, &tag, e
);
return Ok(()) ;
skip_releases.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
};

Expand Down Expand Up @@ -389,6 +397,22 @@ pub async fn fetch_releases(
})
.await?;

let ignored_sig_count = ignored_sig_releases.load(Ordering::SeqCst);
if ignored_sig_count > 0 {
info!(
"ignored {} releases containing only signatures",
ignored_sig_count
);
}

let ignored_release_count = ignored_releases.load(Ordering::SeqCst);
if ignored_release_count > 0 {
warn!(
"ignored {} releases and were not included in graph",
ignored_release_count
);
}

let releases = Arc::<
FuturesMutex<Vec<cincinnati::plugins::internal::graph_builder::release::Release>>,
>::try_unwrap(releases)
Expand Down Expand Up @@ -505,17 +529,49 @@ async fn get_manifest_and_ref(
registry_client: &dkregistry::v2::Client,
) -> Result<(String, dkregistry::v2::manifest::Manifest, String), Error> {
trace!("[{}] Processing {}", &tag, &repo);
let (manifest, manifestref) = registry_client
.get_manifest_and_ref(&repo, &tag)
.map_err(|e| {
format_err!(
"fetching manifest and manifestref for {}:{}: {}",
&repo,
&tag,
e
)
})
.await?;

let (manifest, manifestref) = {
let mut attempts = 0;
const MAX_ATTEMPTS: u32 = 3;
const RETRY_DELAY: Duration = Duration::from_secs(1);

loop {
attempts += 1;

match registry_client
.get_manifest_and_ref(&repo, &tag)
.map_err(|e| {
format_err!(
"fetching manifest and manifestref for {}:{}: {}",
&repo,
&tag,
e
)
})
.await
{
Ok(manifest_and_ref) => break manifest_and_ref,
Err(e) => {
// signatures are not identified by dkregistry and not useful for cincinnati graph, dont retry and return error
if tag.contains(".sig") {
return Err(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then this error bubbles up and is converted to a debug message via the .sig branch of fetch_releases's get_manifest_layers handling in 022a8d6.

}

if attempts >= MAX_ATTEMPTS {
return Err(e);
}

warn!(
"getting manifest and manifestref failed (attempt {}/{}): {}, retrying...",
attempts, MAX_ATTEMPTS, e
);

// Wait before retrying
sleep(RETRY_DELAY).await;
}
}
}
};

let manifestref =
manifestref.ok_or_else(|| format_err!("no manifestref found for {}:{}", &repo, &tag))?;
Expand All @@ -537,17 +593,41 @@ async fn find_first_release_metadata(
trace!("[{}] Downloading layer {}", &tag, &layer_digest);
let (repo, tag) = (repo.clone(), tag.clone());

let blob = registry_client
.get_blob(&repo, &layer_digest)
.map_err(|e| {
format_err!(
"fetching blob for repo {} with layer_digest {}: {}",
&repo,
&layer_digest,
e
)
})
.await?;
let blob = {
let mut attempts = 0;
const MAX_ATTEMPTS: u32 = 3;
const RETRY_DELAY: Duration = Duration::from_secs(1);
loop {
attempts += 1;
match registry_client
.get_blob(&repo, &layer_digest)
.map_err(|e| {
format_err!(
"fetching blob for repo {} with layer_digest {}: {}",
&repo,
&layer_digest,
e
)
})
.await
{
Ok(layer_blob) => break layer_blob,
Err(e) => {
if attempts >= MAX_ATTEMPTS {
return Err(e);
}

warn!(
"getting blob failed (attempt {}/{}): {}, retrying...",
attempts, MAX_ATTEMPTS, e
);

// Wait before retrying
sleep(RETRY_DELAY).await;
}
}
}
};

let metadata_filename = "release-manifests/release-metadata";

Expand Down