Skip to content

Commit

Permalink
Merge pull request #969 from PratikMahajan/fix-sig-log-retry
Browse files Browse the repository at this point in the history
OTA-1378,OTA-1379: add retry logic for pulling images and less logs for sigs
  • Loading branch information
openshift-merge-bot[bot] authored Oct 24, 2024
2 parents 4de7870 + 31ceb1d commit 894e8e8
Showing 1 changed file with 117 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ use std::io::Read;
use std::iter::Iterator;
use std::path::{Path, PathBuf};
use std::string::String;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tar::Archive;
use tokio::time::sleep;
use url::Url;

use dkregistry::mediatypes::MediaTypes::{ManifestList, ManifestV2S1Signed, ManifestV2S2};
Expand Down Expand Up @@ -315,31 +318,36 @@ pub async fn fetch_releases(
Arc::new(FuturesMutex::new(Vec::with_capacity(estimated_releases)))
};

// releases containing signatures that we want to ignore
let ignored_sig_releases = Arc::new(AtomicUsize::new(0));
// releases that had some issues and were not included in the graph
let ignored_releases = Arc::new(AtomicUsize::new(0));
tags.try_for_each_concurrent(concurrency, |tag| {
let registry_client = registry_client.clone();
let cache = cache.clone();
let releases = releases.clone();
let sig_releases = ignored_sig_releases.clone();
let skip_releases = ignored_releases.clone();

async move {
let (arch, manifestref, mut layers_digests) =
match get_manifest_layers(tag.to_owned(), &repo, &registry_client).await
{
// todo: we're ignoring the images which we do not understand with this change.
// this change is required because we dont want cincinnati to error out on encountering
// cosign signatures.
// unintended consequence of this change can be that cincinnati continues with a log when it encounters
// incorrect image whereas it should error out. Cincinnati won't include this image in the update graph.
// We should ideally try to ignore only the cosign .sig signatures. this change acts a
// stopgap till we teach cincinnati to deal with signatures
match get_manifest_layers(tag.to_owned(), &repo, &registry_client).await {
Ok(result) => result,
Err(e) => {
warn!(
"error fetching manifest and manifestref for {}:{}: {}, ignoring this image",
&repo,
&tag,
e
if tag.contains(".sig") {
debug!(
"encountered a signature for {}:{}: {}, ignoring this image",
&repo, &tag, e
);
sig_releases.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
error!(
"fetching manifest and manifestref for {}:{}: {}",
&repo, &tag, e
);
return Ok(()) ;
skip_releases.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
};

Expand Down Expand Up @@ -389,6 +397,22 @@ pub async fn fetch_releases(
})
.await?;

let ignored_sig_count = ignored_sig_releases.load(Ordering::SeqCst);
if ignored_sig_count > 0 {
info!(
"ignored {} releases containing only signatures",
ignored_sig_count
);
}

let ignored_release_count = ignored_releases.load(Ordering::SeqCst);
if ignored_release_count > 0 {
warn!(
"ignored {} releases and were not included in graph",
ignored_release_count
);
}

let releases = Arc::<
FuturesMutex<Vec<cincinnati::plugins::internal::graph_builder::release::Release>>,
>::try_unwrap(releases)
Expand Down Expand Up @@ -505,17 +529,49 @@ async fn get_manifest_and_ref(
registry_client: &dkregistry::v2::Client,
) -> Result<(String, dkregistry::v2::manifest::Manifest, String), Error> {
trace!("[{}] Processing {}", &tag, &repo);
let (manifest, manifestref) = registry_client
.get_manifest_and_ref(&repo, &tag)
.map_err(|e| {
format_err!(
"fetching manifest and manifestref for {}:{}: {}",
&repo,
&tag,
e
)
})
.await?;

let (manifest, manifestref) = {
let mut attempts = 0;
const MAX_ATTEMPTS: u32 = 3;
const RETRY_DELAY: Duration = Duration::from_secs(1);

loop {
attempts += 1;

match registry_client
.get_manifest_and_ref(&repo, &tag)
.map_err(|e| {
format_err!(
"fetching manifest and manifestref for {}:{}: {}",
&repo,
&tag,
e
)
})
.await
{
Ok(manifest_and_ref) => break manifest_and_ref,
Err(e) => {
// signatures are not identified by dkregistry and not useful for cincinnati graph, dont retry and return error
if tag.contains(".sig") {
return Err(e);
}

if attempts >= MAX_ATTEMPTS {
return Err(e);
}

warn!(
"getting manifest and manifestref failed (attempt {}/{}): {}, retrying...",
attempts, MAX_ATTEMPTS, e
);

// Wait before retrying
sleep(RETRY_DELAY).await;
}
}
}
};

let manifestref =
manifestref.ok_or_else(|| format_err!("no manifestref found for {}:{}", &repo, &tag))?;
Expand All @@ -537,17 +593,41 @@ async fn find_first_release_metadata(
trace!("[{}] Downloading layer {}", &tag, &layer_digest);
let (repo, tag) = (repo.clone(), tag.clone());

let blob = registry_client
.get_blob(&repo, &layer_digest)
.map_err(|e| {
format_err!(
"fetching blob for repo {} with layer_digest {}: {}",
&repo,
&layer_digest,
e
)
})
.await?;
let blob = {
let mut attempts = 0;
const MAX_ATTEMPTS: u32 = 3;
const RETRY_DELAY: Duration = Duration::from_secs(1);
loop {
attempts += 1;
match registry_client
.get_blob(&repo, &layer_digest)
.map_err(|e| {
format_err!(
"fetching blob for repo {} with layer_digest {}: {}",
&repo,
&layer_digest,
e
)
})
.await
{
Ok(layer_blob) => break layer_blob,
Err(e) => {
if attempts >= MAX_ATTEMPTS {
return Err(e);
}

warn!(
"getting blob failed (attempt {}/{}): {}, retrying...",
attempts, MAX_ATTEMPTS, e
);

// Wait before retrying
sleep(RETRY_DELAY).await;
}
}
}
};

let metadata_filename = "release-manifests/release-metadata";

Expand Down

0 comments on commit 894e8e8

Please sign in to comment.