diff --git a/pixi.lock b/pixi.lock index d35e19d83..72fb9d057 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,4 @@ +version: 2 metadata: content_hash: linux-64: e90c2ee71ad70fc0a1c8302029533a7d1498f2bffcd0eaa8d2934700e775dc1d @@ -10231,4 +10232,3 @@ package: license_family: BSD size: 343428 timestamp: 1693151615801 -version: 1 diff --git a/src/install.rs b/src/install.rs index 36fadc5df..da337550f 100644 --- a/src/install.rs +++ b/src/install.rs @@ -1,8 +1,11 @@ use crate::default_retry_policy; -use crate::progress::{default_progress_style, finished_progress_style, global_multi_progress}; +use crate::progress::{ + default_progress_style, finished_progress_style, global_multi_progress, + ProgressBarMessageFormatter, +}; use futures::future::ready; use futures::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; -use indicatif::ProgressBar; +use itertools::Itertools; use miette::{IntoDiagnostic, WrapErr}; use rattler::install::{ link_package, InstallDriver, InstallOptions, Transaction, TransactionOperation, @@ -10,6 +13,7 @@ use rattler::install::{ use rattler::package_cache::PackageCache; use rattler_conda_types::{PrefixRecord, RepoDataRecord}; use rattler_networking::AuthenticatedClient; +use std::cmp::Ordering; use std::io::ErrorKind; use std::path::{Path, PathBuf}; use std::time::Duration; @@ -49,23 +53,61 @@ pub async fn execute_transaction( .with_prefix("downloading"), ); pb.enable_steady_tick(Duration::from_millis(100)); - Some(pb) + Some(ProgressBarMessageFormatter::new(pb)) } else { None }; // Create a progress bar to track all operations. let total_operations = transaction.operations.len(); - let link_pb = multi_progress.add( - indicatif::ProgressBar::new(total_operations as u64) - .with_style(default_progress_style()) - .with_finish(indicatif::ProgressFinish::WithMessage("Done!".into())) - .with_prefix("linking"), - ); - link_pb.enable_steady_tick(Duration::from_millis(100)); + let link_pb = { + let pb = multi_progress.add( + indicatif::ProgressBar::new(total_operations as u64) + .with_style(default_progress_style()) + .with_finish(indicatif::ProgressFinish::WithMessage("Done!".into())) + .with_prefix("linking"), + ); + pb.enable_steady_tick(Duration::from_millis(100)); + ProgressBarMessageFormatter::new(pb) + }; + + // Sort the operations to try to optimize the installation time. + let sorted_operations = transaction + .operations + .iter() + .enumerate() + .sorted_unstable_by(|&(a_idx, a), &(b_idx, b)| { + // Sort the operations so we first install packages and then remove them. We do it in + // this order because downloading takes time so we want to do that as soon as possible + match (a.record_to_install(), b.record_to_install()) { + (Some(a), Some(b)) => { + // If we have two packages sort them by size, the biggest goes first. + let a_size = a.package_record.size.or(a.package_record.legacy_bz2_size); + let b_size = b.package_record.size.or(b.package_record.legacy_bz2_size); + if let (Some(a_size), Some(b_size)) = (a_size, b_size) { + match a_size.cmp(&b_size) { + Ordering::Less => return Ordering::Greater, + Ordering::Greater => return Ordering::Less, + Ordering::Equal => {} + } + } + } + (Some(_), None) => { + return Ordering::Less; + } + (None, Some(_)) => { + return Ordering::Greater; + } + _ => {} + } + + // Otherwise keep the original order as much as possible. + a_idx.cmp(&b_idx) + }) + .map(|(_, op)| op); // Perform all transactions operations in parallel. - let result = stream::iter(transaction.operations.iter()) + let result = stream::iter(sorted_operations.into_iter()) .map(Ok) .try_for_each_concurrent(50, |op| { let target_prefix = target_prefix.clone(); @@ -93,9 +135,9 @@ pub async fn execute_transaction( // Clear progress bars if let Some(download_pb) = download_pb { - download_pb.finish_and_clear(); + download_pb.into_progress_bar().finish_and_clear(); } - link_pb.finish_and_clear(); + link_pb.into_progress_bar().finish_and_clear(); result } @@ -108,8 +150,8 @@ async fn execute_operation( download_client: AuthenticatedClient, package_cache: &PackageCache, install_driver: &InstallDriver, - download_pb: Option<&ProgressBar>, - link_pb: &ProgressBar, + download_pb: Option<&ProgressBarMessageFormatter>, + link_pb: &ProgressBarMessageFormatter, op: &TransactionOperation, install_options: &InstallOptions, ) -> miette::Result<()> { @@ -119,7 +161,20 @@ async fn execute_operation( // Create a future to remove the existing package let remove_future = if let Some(remove_record) = remove_record { - remove_package_from_environment(target_prefix, remove_record).left_future() + link_pb + .wrap( + format!( + "removing {} {}", + &remove_record + .repodata_record + .package_record + .name + .as_source(), + &remove_record.repodata_record.package_record.version + ), + remove_package_from_environment(target_prefix, remove_record), + ) + .left_future() } else { ready(Ok(())).right_future() }; @@ -127,6 +182,15 @@ async fn execute_operation( // Create a future to download the package let cached_package_dir_fut = if let Some(install_record) = install_record { async { + let task = if let Some(pb) = download_pb { + Some( + pb.start(install_record.package_record.name.as_source().to_string()) + .await, + ) + } else { + None + }; + // Make sure the package is available in the package cache. let result = package_cache .get_or_fetch_from_url_with_retry( @@ -140,7 +204,8 @@ async fn execute_operation( .into_diagnostic(); // Increment the download progress bar. - if let Some(pb) = download_pb { + if let Some(task) = task { + let pb = task.finish().await; pb.inc(1); if pb.length() == Some(pb.position()) { pb.set_style(finished_progress_style()); @@ -159,20 +224,24 @@ async fn execute_operation( // If there is a package to install, do that now. if let Some((record, package_dir)) = install_package { - install_package_to_environment( - target_prefix, - package_dir, - record.clone(), - install_driver, - install_options, - ) - .await?; + link_pb + .wrap( + record.package_record.name.as_source().to_string(), + install_package_to_environment( + target_prefix, + package_dir, + record.clone(), + install_driver, + install_options, + ), + ) + .await?; } // Increment the link progress bar since we finished a step! - link_pb.inc(1); - if link_pb.length() == Some(link_pb.position()) { - link_pb.set_style(finished_progress_style()); + link_pb.progress_bar().inc(1); + if link_pb.progress_bar().length() == Some(link_pb.progress_bar().position()) { + link_pb.progress_bar().set_style(finished_progress_style()); } Ok(()) diff --git a/src/progress.rs b/src/progress.rs index 49f8c7ee5..968234553 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -112,6 +112,7 @@ pub async fn await_in_progress>( #[derive(Debug, Clone)] pub struct ProgressBarMessageFormatter { sender: Sender, + pb: ProgressBar, } enum Operation { @@ -122,6 +123,7 @@ enum Operation { pub struct ScopedTask { name: String, sender: Option>, + pb: ProgressBar, } impl Drop for ScopedTask { @@ -136,7 +138,7 @@ impl Drop for ScopedTask { impl ScopedTask { /// Finishes the execution of the task. - pub async fn finish(mut self) { + pub async fn finish(mut self) -> ProgressBar { // Send the finished operation. If this fails the receiving end was most likely already // closed and we can just ignore the error. if let Some(sender) = self.sender.take() { @@ -144,12 +146,19 @@ impl ScopedTask { .send(Operation::Finished(std::mem::take(&mut self.name))) .await; } + self.pb.clone() + } + + /// Returns the progress bar associated with the task + pub fn progress_bar(&self) -> &ProgressBar { + &self.pb } } impl ProgressBarMessageFormatter { /// Construct a new instance that will update the given progress bar. pub fn new(progress_bar: ProgressBar) -> Self { + let pb = progress_bar.clone(); let (tx, mut rx) = channel::(20); tokio::spawn(async move { let mut pending = VecDeque::with_capacity(20); @@ -173,7 +182,12 @@ impl ProgressBarMessageFormatter { } } }); - Self { sender: tx } + Self { sender: tx, pb } + } + + /// Returns the associated progress bar + pub fn progress_bar(&self) -> &ProgressBar { + &self.pb } /// Adds the start of another task to the progress bar and returns an object that is used to @@ -187,6 +201,21 @@ impl ProgressBarMessageFormatter { ScopedTask { name: op, sender: Some(self.sender.clone()), + pb: self.pb.clone(), } } + + /// Wraps an future into a task which starts when the task starts and ends when the future + /// returns. + pub async fn wrap>(&self, name: impl Into, fut: F) -> T { + let task = self.start(name.into()).await; + let result = fut.await; + task.finish().await; + result + } + + /// Convert this instance into the underlying progress bar. + pub fn into_progress_bar(self) -> ProgressBar { + self.pb + } }