Skip to content

Commit

Permalink
feat: add progress info to conda install (#470)
Browse files Browse the repository at this point in the history
Add nice progress messages to the conda installation phase as well.
  • Loading branch information
baszalmstra authored Nov 17, 2023
1 parent df0e5ff commit 53fb8e7
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pixi.lock
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version: 2
metadata:
content_hash:
linux-64: e90c2ee71ad70fc0a1c8302029533a7d1498f2bffcd0eaa8d2934700e775dc1d
Expand Down Expand Up @@ -10231,4 +10232,3 @@ package:
license_family: BSD
size: 343428
timestamp: 1693151615801
version: 1
125 changes: 97 additions & 28 deletions src/install.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use crate::default_retry_policy;
use crate::progress::{default_progress_style, finished_progress_style, global_multi_progress};
use crate::progress::{
default_progress_style, finished_progress_style, global_multi_progress,
ProgressBarMessageFormatter,
};
use futures::future::ready;
use futures::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use indicatif::ProgressBar;
use itertools::Itertools;
use miette::{IntoDiagnostic, WrapErr};
use rattler::install::{
link_package, InstallDriver, InstallOptions, Transaction, TransactionOperation,
};
use rattler::package_cache::PackageCache;
use rattler_conda_types::{PrefixRecord, RepoDataRecord};
use rattler_networking::AuthenticatedClient;
use std::cmp::Ordering;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::time::Duration;
Expand Down Expand Up @@ -49,23 +53,61 @@ pub async fn execute_transaction(
.with_prefix("downloading"),
);
pb.enable_steady_tick(Duration::from_millis(100));
Some(pb)
Some(ProgressBarMessageFormatter::new(pb))
} else {
None
};

// Create a progress bar to track all operations.
let total_operations = transaction.operations.len();
let link_pb = multi_progress.add(
indicatif::ProgressBar::new(total_operations as u64)
.with_style(default_progress_style())
.with_finish(indicatif::ProgressFinish::WithMessage("Done!".into()))
.with_prefix("linking"),
);
link_pb.enable_steady_tick(Duration::from_millis(100));
let link_pb = {
let pb = multi_progress.add(
indicatif::ProgressBar::new(total_operations as u64)
.with_style(default_progress_style())
.with_finish(indicatif::ProgressFinish::WithMessage("Done!".into()))
.with_prefix("linking"),
);
pb.enable_steady_tick(Duration::from_millis(100));
ProgressBarMessageFormatter::new(pb)
};

// Sort the operations to try to optimize the installation time.
let sorted_operations = transaction
.operations
.iter()
.enumerate()
.sorted_unstable_by(|&(a_idx, a), &(b_idx, b)| {
// Sort the operations so we first install packages and then remove them. We do it in
// this order because downloading takes time so we want to do that as soon as possible
match (a.record_to_install(), b.record_to_install()) {
(Some(a), Some(b)) => {
// If we have two packages sort them by size, the biggest goes first.
let a_size = a.package_record.size.or(a.package_record.legacy_bz2_size);
let b_size = b.package_record.size.or(b.package_record.legacy_bz2_size);
if let (Some(a_size), Some(b_size)) = (a_size, b_size) {
match a_size.cmp(&b_size) {
Ordering::Less => return Ordering::Greater,
Ordering::Greater => return Ordering::Less,
Ordering::Equal => {}
}
}
}
(Some(_), None) => {
return Ordering::Less;
}
(None, Some(_)) => {
return Ordering::Greater;
}
_ => {}
}

// Otherwise keep the original order as much as possible.
a_idx.cmp(&b_idx)
})
.map(|(_, op)| op);

// Perform all transactions operations in parallel.
let result = stream::iter(transaction.operations.iter())
let result = stream::iter(sorted_operations.into_iter())
.map(Ok)
.try_for_each_concurrent(50, |op| {
let target_prefix = target_prefix.clone();
Expand Down Expand Up @@ -93,9 +135,9 @@ pub async fn execute_transaction(

// Clear progress bars
if let Some(download_pb) = download_pb {
download_pb.finish_and_clear();
download_pb.into_progress_bar().finish_and_clear();
}
link_pb.finish_and_clear();
link_pb.into_progress_bar().finish_and_clear();

result
}
Expand All @@ -108,8 +150,8 @@ async fn execute_operation(
download_client: AuthenticatedClient,
package_cache: &PackageCache,
install_driver: &InstallDriver,
download_pb: Option<&ProgressBar>,
link_pb: &ProgressBar,
download_pb: Option<&ProgressBarMessageFormatter>,
link_pb: &ProgressBarMessageFormatter,
op: &TransactionOperation<PrefixRecord, RepoDataRecord>,
install_options: &InstallOptions,
) -> miette::Result<()> {
Expand All @@ -119,14 +161,36 @@ async fn execute_operation(

// Create a future to remove the existing package
let remove_future = if let Some(remove_record) = remove_record {
remove_package_from_environment(target_prefix, remove_record).left_future()
link_pb
.wrap(
format!(
"removing {} {}",
&remove_record
.repodata_record
.package_record
.name
.as_source(),
&remove_record.repodata_record.package_record.version
),
remove_package_from_environment(target_prefix, remove_record),
)
.left_future()
} else {
ready(Ok(())).right_future()
};

// Create a future to download the package
let cached_package_dir_fut = if let Some(install_record) = install_record {
async {
let task = if let Some(pb) = download_pb {
Some(
pb.start(install_record.package_record.name.as_source().to_string())
.await,
)
} else {
None
};

// Make sure the package is available in the package cache.
let result = package_cache
.get_or_fetch_from_url_with_retry(
Expand All @@ -140,7 +204,8 @@ async fn execute_operation(
.into_diagnostic();

// Increment the download progress bar.
if let Some(pb) = download_pb {
if let Some(task) = task {
let pb = task.finish().await;
pb.inc(1);
if pb.length() == Some(pb.position()) {
pb.set_style(finished_progress_style());
Expand All @@ -159,20 +224,24 @@ async fn execute_operation(

// If there is a package to install, do that now.
if let Some((record, package_dir)) = install_package {
install_package_to_environment(
target_prefix,
package_dir,
record.clone(),
install_driver,
install_options,
)
.await?;
link_pb
.wrap(
record.package_record.name.as_source().to_string(),
install_package_to_environment(
target_prefix,
package_dir,
record.clone(),
install_driver,
install_options,
),
)
.await?;
}

// Increment the link progress bar since we finished a step!
link_pb.inc(1);
if link_pb.length() == Some(link_pb.position()) {
link_pb.set_style(finished_progress_style());
link_pb.progress_bar().inc(1);
if link_pb.progress_bar().length() == Some(link_pb.progress_bar().position()) {
link_pb.progress_bar().set_style(finished_progress_style());
}

Ok(())
Expand Down
33 changes: 31 additions & 2 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub async fn await_in_progress<T, F: Future<Output = T>>(
#[derive(Debug, Clone)]
pub struct ProgressBarMessageFormatter {
sender: Sender<Operation>,
pb: ProgressBar,
}

enum Operation {
Expand All @@ -122,6 +123,7 @@ enum Operation {
pub struct ScopedTask {
name: String,
sender: Option<Sender<Operation>>,
pb: ProgressBar,
}

impl Drop for ScopedTask {
Expand All @@ -136,20 +138,27 @@ impl Drop for ScopedTask {

impl ScopedTask {
/// Finishes the execution of the task.
pub async fn finish(mut self) {
pub async fn finish(mut self) -> ProgressBar {
// Send the finished operation. If this fails the receiving end was most likely already
// closed and we can just ignore the error.
if let Some(sender) = self.sender.take() {
let _ = sender
.send(Operation::Finished(std::mem::take(&mut self.name)))
.await;
}
self.pb.clone()
}

/// Returns the progress bar associated with the task
pub fn progress_bar(&self) -> &ProgressBar {
&self.pb
}
}

impl ProgressBarMessageFormatter {
/// Construct a new instance that will update the given progress bar.
pub fn new(progress_bar: ProgressBar) -> Self {
let pb = progress_bar.clone();
let (tx, mut rx) = channel::<Operation>(20);
tokio::spawn(async move {
let mut pending = VecDeque::with_capacity(20);
Expand All @@ -173,7 +182,12 @@ impl ProgressBarMessageFormatter {
}
}
});
Self { sender: tx }
Self { sender: tx, pb }
}

/// Returns the associated progress bar
pub fn progress_bar(&self) -> &ProgressBar {
&self.pb
}

/// Adds the start of another task to the progress bar and returns an object that is used to
Expand All @@ -187,6 +201,21 @@ impl ProgressBarMessageFormatter {
ScopedTask {
name: op,
sender: Some(self.sender.clone()),
pb: self.pb.clone(),
}
}

/// Wraps an future into a task which starts when the task starts and ends when the future
/// returns.
pub async fn wrap<T, F: Future<Output = T>>(&self, name: impl Into<String>, fut: F) -> T {
let task = self.start(name.into()).await;
let result = fut.await;
task.finish().await;
result
}

/// Convert this instance into the underlying progress bar.
pub fn into_progress_bar(self) -> ProgressBar {
self.pb
}
}

0 comments on commit 53fb8e7

Please sign in to comment.