diff --git a/Cargo.lock b/Cargo.lock index 0805c875c..bd8a1f753 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -527,9 +527,9 @@ dependencies = [ [[package]] name = "aws-throwaway" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d5955140d4b11b9e3cb8c21546885b68472a3f20f6d8af58f4fbea8747f2e5b" +checksum = "64948b59109942d202323bf8e091f167c598d10cca102d63c2b8c4d885d6f08d" dependencies = [ "anyhow", "async-trait", diff --git a/shotover-proxy/benches/windsock/cloud/aws.rs b/shotover-proxy/benches/windsock/cloud/aws.rs index a25bc56c9..f204ccef3 100644 --- a/shotover-proxy/benches/windsock/cloud/aws.rs +++ b/shotover-proxy/benches/windsock/cloud/aws.rs @@ -1,6 +1,8 @@ +use anyhow::Result; use aws_throwaway::{Aws, Ec2Instance, InstanceType}; use aws_throwaway::{CleanupResources, Ec2InstanceDefinition}; use regex::Regex; +use serde::{Deserialize, Serialize}; use std::fmt::Write; use std::time::Duration; use std::{ @@ -34,38 +36,47 @@ impl AwsInstances { .await } - pub async fn create_bencher_instances(&self, count: usize) -> Vec> { + pub async fn create_bencher_instances( + &self, + benches_will_run: bool, + count: usize, + ) -> Vec> { let mut futures = vec![]; for _ in 0..count { - futures.push(self.create_bencher_instance()); + futures.push(self.create_bencher_instance(benches_will_run)); } futures::future::join_all(futures).await } pub async fn create_shotover_instances( &self, + benches_will_run: bool, count: usize, ) -> Vec> { let mut futures = vec![]; for _ in 0..count { - futures.push(self.create_shotover_instance()); + futures.push(self.create_shotover_instance(benches_will_run)); } futures::future::join_all(futures).await } pub async fn create_docker_instances( &self, + benches_will_run: bool, include_shotover: bool, count: usize, ) -> Vec> { let mut futures = vec![]; for _ in 0..count { - futures.push(self.create_docker_instance(include_shotover)); + futures.push(self.create_docker_instance(benches_will_run, include_shotover)); } futures::future::join_all(futures).await } - pub async fn create_bencher_instance(&self) -> Arc { + pub async fn create_bencher_instance( + &self, + benches_will_run: bool, + ) -> Arc { let instance = Arc::new(Ec2InstanceWithBencher { instance: self .aws @@ -84,19 +95,17 @@ sudo apt-get update sudo apt-get install -y sysstat"#, ) .await; - instance - .instance - .ssh() - .push_file( - std::env::current_exe().unwrap().as_ref(), - Path::new("windsock"), - ) - .await; + + if benches_will_run { + instance.upload_bencher().await; + } + instance } pub async fn create_docker_instance( &self, + benches_will_run: bool, include_shotover: bool, ) -> Arc { let instance = self @@ -121,22 +130,20 @@ curl -sSL https://get.docker.com/ | sudo sh"#, ) .await; - let local_shotover_path = bin_path!("shotover-proxy"); - if include_shotover { - instance - .ssh() - .push_file(local_shotover_path, Path::new("shotover-bin")) - .await; - instance - .ssh() - .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) - .await; + if include_shotover && benches_will_run { + upload_shotover(&instance).await; } - Arc::new(Ec2InstanceWithDocker { instance }) + Arc::new(Ec2InstanceWithDocker { + instance, + include_shotover, + }) } - pub async fn create_shotover_instance(&self) -> Arc { + pub async fn create_shotover_instance( + &self, + benches_will_run: bool, + ) -> Arc { let instance = Arc::new(Ec2InstanceWithShotover { instance: self .aws @@ -156,17 +163,9 @@ sudo apt-get install -y sysstat"#, ) .await; - let local_shotover_path = bin_path!("shotover-proxy"); - instance - .instance - .ssh() - .push_file(local_shotover_path, Path::new("shotover-bin")) - .await; - instance - .instance - .ssh() - .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) - .await; + if benches_will_run { + upload_shotover(&instance.instance).await; + } instance } @@ -177,11 +176,22 @@ sudo apt-get install -y sysstat"#, } /// Despite the name can also run shotover +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithDocker { pub instance: Ec2Instance, + pub include_shotover: bool, } impl Ec2InstanceWithDocker { + pub async fn reinit(&mut self) -> Result<()> { + self.instance.init().await?; + if self.include_shotover { + upload_shotover(&self.instance).await; + } + + Ok(()) + } + pub async fn run_container(&self, image: &str, envs: &[(String, String)]) { // cleanup old resources // TODO: we need a way to ensure there are no shotover resources running. @@ -277,11 +287,28 @@ fn get_compatible_instance_type() -> InstanceType { } } +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithBencher { pub instance: Ec2Instance, } impl Ec2InstanceWithBencher { + pub async fn reinit(&mut self) -> Result<()> { + self.instance.init().await?; + self.upload_bencher().await; + Ok(()) + } + + pub async fn upload_bencher(&self) { + self.instance + .ssh() + .push_file( + std::env::current_exe().unwrap().as_ref(), + Path::new("windsock"), + ) + .await; + } + pub async fn run_bencher(&self, args: &str, name: &str) { self.instance .ssh() @@ -294,11 +321,19 @@ impl Ec2InstanceWithBencher { } } +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithShotover { pub instance: Ec2Instance, } impl Ec2InstanceWithShotover { + pub async fn reinit(&mut self) -> Result<()> { + self.instance.init().await?; + upload_shotover(&self.instance).await; + + Ok(()) + } + pub async fn run_shotover(self: Arc, topology: &str) -> RunningShotover { self.instance .ssh() @@ -388,3 +423,23 @@ impl RunningShotover { } } } + +pub async fn upload_shotover(instance: &Ec2Instance) { + let local_shotover_path = bin_path!("shotover-proxy"); + + // a leftover shotover-bin process can prevent uploading to shotover-bin + // so we need to kill any such processes before uploading + instance + .ssh() + .shell("killall -w shotover-bin > /dev/null || true") + .await; + + instance + .ssh() + .push_file(local_shotover_path, Path::new("shotover-bin")) + .await; + instance + .ssh() + .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) + .await; +} diff --git a/shotover-proxy/benches/windsock/cloud/mod.rs b/shotover-proxy/benches/windsock/cloud/mod.rs index 8d6b34899..0a89f0b7f 100644 --- a/shotover-proxy/benches/windsock/cloud/mod.rs +++ b/shotover-proxy/benches/windsock/cloud/mod.rs @@ -6,9 +6,14 @@ pub use aws::{ Ec2InstanceWithBencher, Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover, }; +use anyhow::Result; use async_trait::async_trait; use aws::AwsInstances; -use std::sync::Arc; +use futures::StreamExt; +use futures::{stream::FuturesUnordered, Future}; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; +use std::{path::Path, sync::Arc}; use windsock::cloud::{BenchInfo, Cloud}; pub struct AwsCloud { @@ -37,7 +42,11 @@ impl Cloud for AwsCloud { println!("All AWS throwaway resources have been deleted"); } - async fn create_resources(&mut self, required: Vec) -> CloudResources { + async fn create_resources( + &mut self, + required: Vec, + benches_will_run: bool, + ) -> CloudResources { let required = required.into_iter().fold( CloudResourcesRequired::default(), CloudResourcesRequired::combine, @@ -51,17 +60,19 @@ impl Cloud for AwsCloud { let aws = self.aws.as_ref().unwrap(); let (docker, mut bencher, shotover) = futures::join!( aws.create_docker_instances( + benches_will_run, required.include_shotover_in_docker_instance, required.docker_instance_count ), - aws.create_bencher_instances(1), - aws.create_shotover_instances(required.shotover_instance_count) + aws.create_bencher_instances(benches_will_run, 1), + aws.create_shotover_instances(benches_will_run, required.shotover_instance_count) ); let bencher = bencher.pop(); CloudResources { shotover, docker, bencher, + required, } } @@ -97,9 +108,82 @@ impl Cloud for AwsCloud { // TODO: spin up background tokio task to delete unneeded EC2 instances once we add the functionality to aws_throwaway } + + async fn store_resources_file(&mut self, path: &Path, resources: CloudResources) { + let resources = CloudResourcesDisk { + shotover: resources + .shotover + .into_iter() + .map(|x| { + Arc::into_inner(x) + .expect("A bench is still referencing an Ec2InstanceWithShotover") + }) + .collect(), + docker: resources + .docker + .into_iter() + .map(|x| { + Arc::into_inner(x) + .expect("A bench is still referencing an Ec2InstanceWithDocker") + }) + .collect(), + bencher: resources.bencher.map(|x| { + Arc::into_inner(x).expect("A bench is still referencing an Ec2InstanceWithBencher") + }), + required: resources.required, + }; + std::fs::write(path, serde_json::to_string(&resources).unwrap()).unwrap(); + } + + async fn load_resources_file( + &mut self, + path: &Path, + required_resources: Vec, + ) -> CloudResources { + let required_resources = required_resources.into_iter().fold( + CloudResourcesRequired::default(), + CloudResourcesRequired::combine, + ); + let mut resources: CloudResourcesDisk = + serde_json::from_str(&std::fs::read_to_string(path).unwrap()).unwrap(); + + if resources.required != required_resources { + panic!( + "Stored resources do not meet the requirements of this bench run. Maybe try rerunning --store-cloud-resources-file\nloaded:\n{:?}\nrequired:\n{:?}", + resources.required, + required_resources + ); + } + + { + let mut futures = + FuturesUnordered::> + Send>>>::new(); + for instance in &mut resources.bencher { + futures.push(Box::pin(instance.reinit())); + } + for instance in &mut resources.docker { + futures.push(Box::pin(instance.reinit())); + } + + for instance in &mut resources.shotover { + futures.push(Box::pin(instance.reinit())); + } + + while let Some(result) = futures.next().await { + let _: () = result.unwrap(); + } + } + + CloudResources { + shotover: resources.shotover.into_iter().map(Arc::new).collect(), + docker: resources.docker.into_iter().map(Arc::new).collect(), + bencher: resources.bencher.map(Arc::new), + required: resources.required, + } + } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct CloudResourcesRequired { pub shotover_instance_count: usize, pub docker_instance_count: usize, @@ -125,4 +209,13 @@ pub struct CloudResources { pub shotover: Vec>, pub docker: Vec>, pub bencher: Option>, + pub required: CloudResourcesRequired, +} + +#[derive(Serialize, Deserialize)] +pub struct CloudResourcesDisk { + pub shotover: Vec, + pub docker: Vec, + pub bencher: Option, + pub required: CloudResourcesRequired, } diff --git a/windsock/src/cli.rs b/windsock/src/cli.rs index ddd1f6b29..94a2e0ef3 100644 --- a/windsock/src/cli.rs +++ b/windsock/src/cli.rs @@ -68,6 +68,18 @@ pub struct Args { #[clap(long, verbatim_doc_comment)] pub cleanup_cloud_resources: bool, + /// Skip running of benches. + /// Skip automatic deletion of cloud resources on bench run completion. + /// Instead, just create cloud resources and write details of the resources to disk so they may be restored via `--load-cloud-resources-file` + #[clap(long, verbatim_doc_comment)] + pub store_cloud_resources_file: bool, + + /// Skip automatic creation of cloud resources on bench run completion. + /// Skip automatic deletion of cloud resources on bench run completion. + /// Instead, details of the resources are loaded from disk as saved via a previous run using `--store-cloud-resources-file` + #[clap(long, verbatim_doc_comment)] + pub load_cloud_resources_file: bool, + /// Display results from the last benchmark run by: /// Comparing various benches against a specific base bench. /// diff --git a/windsock/src/cloud.rs b/windsock/src/cloud.rs index 004fbc821..82b4dd379 100644 --- a/windsock/src/cloud.rs +++ b/windsock/src/cloud.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use async_trait::async_trait; /// Implement this to give windsock some control over your cloud. @@ -16,9 +18,32 @@ pub trait Cloud { /// This is called once at start up before running any benches. /// The implementation must return an object containing all the requested cloud resources. /// The `required_resources` contains the `CloudResourcesRequired` returned by each bench that will be executed in this run. + /// + /// benches_will_run: + /// * true - the benches will be run, ensure they have everything they need to complete succesfully. + /// * false - the benches will not be run, due to `--store-cloud-resources-file`, you can skip uploading anything that will be reuploaded when restoring the resources async fn create_resources( &mut self, required_resources: Vec, + benches_will_run: bool, + ) -> Self::CloudResources; + + /// Construct a file at the provided path that will allow restoring the passed resources + /// + /// It is gauranteed this will be called after all the benches have completed. + async fn store_resources_file(&mut self, path: &Path, resources: Self::CloudResources); + + /// Restore the resources from the data in the passed file. + /// It is the same file path that was passed to [`Cloud::store_resources_file`] + /// + /// The implementation should panic when the loaded messages cannot meet the requirements of the passed `required_sources`. + /// This is done rather than loading the required resources from disk as this case usually represents a user error. + /// Loading from disk is used for more consistent results across benches but the user cannot hope to get consistent results while changing the benches that will be run. + /// They are better off recreating the resources from scratch in this case. + async fn load_resources_file( + &mut self, + path: &Path, + required_resources: Vec, ) -> Self::CloudResources; /// This is called once at start up before running any benches. @@ -32,6 +57,7 @@ pub trait Cloud { /// This is called before running each bench. /// Use it to destroy or create resources as needed. + /// However, this method will not be called when `--save-resources-file` or `--load-resources-file` is set. /// /// It is recommended to create all resources within create_resources for faster completion time, but it may be desirable in some circumstances to create some of them here. /// It is recommended to always destroy resources that will never be used again here. @@ -65,5 +91,12 @@ impl Cloud for NoCloud { type CloudResourcesRequired = (); type CloudResources = (); async fn cleanup_resources(&mut self) {} - async fn create_resources(&mut self, _requests: Vec<()>) {} + async fn create_resources(&mut self, _requests: Vec<()>, _benches_will_run: bool) {} + async fn store_resources_file(&mut self, _path: &Path, _resources: ()) {} + async fn load_resources_file( + &mut self, + _path: &Path, + _required_resources: Vec, + ) { + } } diff --git a/windsock/src/data.rs b/windsock/src/data.rs index 7fc28d0a0..76242e92c 100644 --- a/windsock/src/data.rs +++ b/windsock/src/data.rs @@ -12,3 +12,7 @@ pub fn windsock_path() -> PathBuf { PathBuf::from("windsock_data") } + +pub fn cloud_resources_path() -> PathBuf { + windsock_path().join("cloud_resources") +} diff --git a/windsock/src/lib.rs b/windsock/src/lib.rs index f3c5b3fc3..4381d24c8 100644 --- a/windsock/src/lib.rs +++ b/windsock/src/lib.rs @@ -8,6 +8,7 @@ mod report; mod tables; pub use bench::{Bench, BenchParameters, BenchTask, Profiling}; +use data::cloud_resources_path; pub use report::{ ExternalReport, LatencyPercentile, Metric, OperationsReport, PubSubReport, Report, ReportArchive, @@ -20,7 +21,7 @@ use clap::Parser; use cli::Args; use cloud::{BenchInfo, Cloud}; use filter::Filter; -use std::process::exit; +use std::{path::Path, process::exit}; use tokio::runtime::Runtime; pub struct Windsock { @@ -156,35 +157,52 @@ impl Windsock Result<()> { ReportArchive::clear_last_run(); + let resources_path = cloud_resources_path(); - match self.benches.iter_mut().find(|x| x.tags.get_name() == name) { + let bench = match self.benches.iter_mut().find(|x| x.tags.get_name() == name) { Some(bench) => { if args .profilers .iter() .all(|x| bench.supported_profilers.contains(x)) { - let resources = if args.cloud { - Some( - self.cloud - .create_resources(vec![bench.required_cloud_resources()]) - .await, - ) - } else { - None - }; bench - .orchestrate(&args, running_in_release, resources) - .await; } else { return Err(anyhow!("Specified bench {name:?} was requested to run with the profilers {:?} but it only supports the profilers {:?}", args.profilers, bench.supported_profilers)); } } None => return Err(anyhow!("Specified bench {name:?} does not exist.")), + }; + + let resources = if args.cloud { + let resources = vec![bench.required_cloud_resources()]; + Some(if args.load_cloud_resources_file { + self.cloud + .load_resources_file(&resources_path, resources) + .await + } else { + self.cloud + .create_resources(resources, !args.store_cloud_resources_file) + .await + }) + } else { + None + }; + + if args.store_cloud_resources_file { + println!( + "Cloud resources have been created in preparation for running the bench:\n {name}" + ); + println!("Make sure to use `--cleanup-cloud-resources` when you are finished with these resources."); + } else { + bench + .orchestrate(&args, running_in_release, resources.clone()) + .await; } if args.cloud { - self.cloud.cleanup_resources().await; + self.cleanup_cloud_resources(resources, &args, &resources_path) + .await; } Ok(()) } @@ -196,6 +214,7 @@ impl Windsock Result<()> { ReportArchive::clear_last_run(); let filter = parse_filter(&args)?; + let resources_path = cloud_resources_path(); let mut bench_infos = vec![]; for bench in &mut self.benches { @@ -219,35 +238,73 @@ impl Windsock, + args: &Args, + resources_path: &Path, + ) { + if args.store_cloud_resources_file { + if let Some(resources) = resources { + self.cloud + .store_resources_file(resources_path, resources) + .await; + } + } else if args.load_cloud_resources_file { + println!("Cloud resources have not been cleaned up."); + println!( + "Make sure to use `--cleanup-cloud-resources` when you are finished with them." + ); + } else { + std::fs::remove_file(resources_path).ok(); + self.cloud.cleanup_resources().await; + } + } + async fn run_filtered_benches_local( &mut self, args: Args, diff --git a/windsock/src/tables.rs b/windsock/src/tables.rs index d22942b2f..c2cca1444 100644 --- a/windsock/src/tables.rs +++ b/windsock/src/tables.rs @@ -196,7 +196,7 @@ fn base(reports: &[ReportColumn], table_type: &str) { .iter() .any(|x| x.current.operations_report.is_some()) { - rows.push(Row::Heading("(Opns) Operations".to_owned())); + rows.push(Row::Heading("Opns (Operations)".to_owned())); rows.push(Row::measurements(reports, "Total Opns", |report| { report.operations_report.as_ref().map(|report| { (