diff --git a/Cargo.lock b/Cargo.lock index 0805c875c..7f85b9d35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -528,8 +528,7 @@ dependencies = [ [[package]] name = "aws-throwaway" version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d5955140d4b11b9e3cb8c21546885b68472a3f20f6d8af58f4fbea8747f2e5b" +source = "git+https://github.com/shotover/aws-throwaway#c66baac6f9f14cdaaa39dc61e884b04a9e106ffb" dependencies = [ "anyhow", "async-trait", @@ -2233,7 +2232,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.5", "tokio", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index a990d586f..d411cbf08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ rand_distr = "0.4.1" clap = { version = "4.0.4", features = ["cargo", "derive"] } async-trait = "0.1.30" typetag = "0.2.5" -aws-throwaway = { version = "0.6.0", default-features = false } +aws-throwaway = { version = "0.6.0", default-features = false, git = "https://github.com/shotover/aws-throwaway" } tokio-bin-process = "0.4.0" ordered-float = { version = "4.0.0", features = ["serde"] } hyper = { version = "0.14.14", features = ["server"] } diff --git a/shotover-proxy/benches/windsock/cloud/aws.rs b/shotover-proxy/benches/windsock/cloud/aws.rs index a25bc56c9..c316a63ae 100644 --- a/shotover-proxy/benches/windsock/cloud/aws.rs +++ b/shotover-proxy/benches/windsock/cloud/aws.rs @@ -1,6 +1,8 @@ +use anyhow::Result; use aws_throwaway::{Aws, Ec2Instance, InstanceType}; use aws_throwaway::{CleanupResources, Ec2InstanceDefinition}; use regex::Regex; +use serde::{Deserialize, Serialize}; use std::fmt::Write; use std::time::Duration; use std::{ @@ -84,14 +86,7 @@ sudo apt-get update sudo apt-get install -y sysstat"#, ) .await; - instance - .instance - .ssh() - .push_file( - std::env::current_exe().unwrap().as_ref(), - Path::new("windsock"), - ) - .await; + instance.upload_bencher().await; instance } @@ -121,19 +116,14 @@ curl -sSL https://get.docker.com/ | sudo sh"#, ) .await; - let local_shotover_path = bin_path!("shotover-proxy"); if include_shotover { - instance - .ssh() - .push_file(local_shotover_path, Path::new("shotover-bin")) - .await; - instance - .ssh() - .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) - .await; + upload_shotover(&instance).await; } - Arc::new(Ec2InstanceWithDocker { instance }) + Arc::new(Ec2InstanceWithDocker { + instance, + include_shotover, + }) } pub async fn create_shotover_instance(&self) -> Arc { @@ -156,17 +146,8 @@ sudo apt-get install -y sysstat"#, ) .await; - let local_shotover_path = bin_path!("shotover-proxy"); - instance - .instance - .ssh() - .push_file(local_shotover_path, Path::new("shotover-bin")) - .await; - instance - .instance - .ssh() - .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) - .await; + // TODO: skip if not needed + upload_shotover(&instance.instance).await; instance } @@ -177,11 +158,22 @@ sudo apt-get install -y sysstat"#, } /// Despite the name can also run shotover +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithDocker { pub instance: Ec2Instance, + pub include_shotover: bool, } impl Ec2InstanceWithDocker { + pub async fn reinit(&mut self) -> Result<()> { + self.instance.init().await?; + if self.include_shotover { + upload_shotover(&self.instance).await; + } + + Ok(()) + } + pub async fn run_container(&self, image: &str, envs: &[(String, String)]) { // cleanup old resources // TODO: we need a way to ensure there are no shotover resources running. @@ -277,11 +269,28 @@ fn get_compatible_instance_type() -> InstanceType { } } +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithBencher { pub instance: Ec2Instance, } impl Ec2InstanceWithBencher { + pub async fn reinit(&mut self) -> Result<()> { + self.instance.init().await?; + self.upload_bencher().await; + Ok(()) + } + + pub async fn upload_bencher(&self) { + self.instance + .ssh() + .push_file( + std::env::current_exe().unwrap().as_ref(), + Path::new("windsock"), + ) + .await; + } + pub async fn run_bencher(&self, args: &str, name: &str) { self.instance .ssh() @@ -294,11 +303,19 @@ impl Ec2InstanceWithBencher { } } +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithShotover { pub instance: Ec2Instance, } impl Ec2InstanceWithShotover { + pub async fn reinit(&mut self) -> Result<()> { + self.instance.init().await?; + upload_shotover(&self.instance).await; + + Ok(()) + } + pub async fn run_shotover(self: Arc, topology: &str) -> RunningShotover { self.instance .ssh() @@ -388,3 +405,15 @@ impl RunningShotover { } } } + +pub async fn upload_shotover(instance: &Ec2Instance) { + let local_shotover_path = bin_path!("shotover-proxy"); + instance + .ssh() + .push_file(local_shotover_path, Path::new("shotover-bin")) + .await; + instance + .ssh() + .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) + .await; +} diff --git a/shotover-proxy/benches/windsock/cloud/mod.rs b/shotover-proxy/benches/windsock/cloud/mod.rs index 8d6b34899..d9c4dbca0 100644 --- a/shotover-proxy/benches/windsock/cloud/mod.rs +++ b/shotover-proxy/benches/windsock/cloud/mod.rs @@ -6,9 +6,14 @@ pub use aws::{ Ec2InstanceWithBencher, Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover, }; +use anyhow::Result; use async_trait::async_trait; use aws::AwsInstances; -use std::sync::Arc; +use futures::StreamExt; +use futures::{stream::FuturesUnordered, Future}; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; +use std::{path::Path, sync::Arc}; use windsock::cloud::{BenchInfo, Cloud}; pub struct AwsCloud { @@ -62,6 +67,7 @@ impl Cloud for AwsCloud { shotover, docker, bencher, + required, } } @@ -97,9 +103,82 @@ impl Cloud for AwsCloud { // TODO: spin up background tokio task to delete unneeded EC2 instances once we add the functionality to aws_throwaway } + + async fn store_resources_to_disk(&mut self, path: &Path, resources: CloudResources) { + let resources = CloudResourcesDisk { + shotover: resources + .shotover + .into_iter() + .map(|x| { + Arc::into_inner(x) + .expect("A bench is still referencing an Ec2InstanceWithShotover") + }) + .collect(), + docker: resources + .docker + .into_iter() + .map(|x| { + Arc::into_inner(x) + .expect("A bench is still referencing an Ec2InstanceWithDocker") + }) + .collect(), + bencher: resources.bencher.map(|x| { + Arc::into_inner(x).expect("A bench is still referencing an Ec2InstanceWithBencher") + }), + required: resources.required, + }; + std::fs::write(path, serde_json::to_string(&resources).unwrap()).unwrap(); + } + + async fn load_resources_from_disk( + &mut self, + path: &Path, + required_resources: Vec, + ) -> CloudResources { + let required_resources = required_resources.into_iter().fold( + CloudResourcesRequired::default(), + CloudResourcesRequired::combine, + ); + let mut resources: CloudResourcesDisk = + serde_json::from_str(&std::fs::read_to_string(path).unwrap()).unwrap(); + + if resources.required != required_resources { + panic!( + "Stored resources do not meet the requirements of this bench run. Maybe try rerunning --store-cloud-resources-to-disk\nloaded:\n{:?}\nrequired:\n{:?}", + resources.required, + required_resources + ); + } + + { + let mut futures = + FuturesUnordered::> + Send>>>::new(); + for instance in &mut resources.bencher { + futures.push(Box::pin(instance.reinit())); + } + for instance in &mut resources.docker { + futures.push(Box::pin(instance.reinit())); + } + + for instance in &mut resources.shotover { + futures.push(Box::pin(instance.reinit())); + } + + while let Some(result) = futures.next().await { + let _: () = result.unwrap(); + } + } + + CloudResources { + shotover: resources.shotover.into_iter().map(Arc::new).collect(), + docker: resources.docker.into_iter().map(Arc::new).collect(), + bencher: resources.bencher.map(Arc::new), + required: resources.required, + } + } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct CloudResourcesRequired { pub shotover_instance_count: usize, pub docker_instance_count: usize, @@ -125,4 +204,13 @@ pub struct CloudResources { pub shotover: Vec>, pub docker: Vec>, pub bencher: Option>, + pub required: CloudResourcesRequired, +} + +#[derive(Serialize, Deserialize)] +pub struct CloudResourcesDisk { + pub shotover: Vec, + pub docker: Vec, + pub bencher: Option, + pub required: CloudResourcesRequired, } diff --git a/windsock/src/cli.rs b/windsock/src/cli.rs index ddd1f6b29..d6b5ffe38 100644 --- a/windsock/src/cli.rs +++ b/windsock/src/cli.rs @@ -68,6 +68,18 @@ pub struct Args { #[clap(long, verbatim_doc_comment)] pub cleanup_cloud_resources: bool, + /// Skip running of benches. + /// Skip automatic deletion of cloud resources on bench run completion. + /// Instead, just create cloud resources and write details of the resources to disk so they may be restored via `--load-cloud-resources-from-disk` + #[clap(long, verbatim_doc_comment)] + pub store_cloud_resources_to_disk: bool, + + /// Skip automatic creation of cloud resources on bench run completion. + /// Skip automatic deletion of cloud resources on bench run completion. + /// Instead, details of the resources are loaded from disk as saved via a previous run using `--store-cloud-resources-to-disk` + #[clap(long, verbatim_doc_comment)] + pub load_cloud_resources_from_disk: bool, + /// Display results from the last benchmark run by: /// Comparing various benches against a specific base bench. /// diff --git a/windsock/src/cloud.rs b/windsock/src/cloud.rs index 004fbc821..04c6df4e1 100644 --- a/windsock/src/cloud.rs +++ b/windsock/src/cloud.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use async_trait::async_trait; /// Implement this to give windsock some control over your cloud. @@ -21,6 +23,24 @@ pub trait Cloud { required_resources: Vec, ) -> Self::CloudResources; + /// Construct a file at the provided path that will allow restoring the passed resources + /// + /// It is gauranteed this will be called after all the benches have completed. + async fn store_resources_to_disk(&mut self, path: &Path, resources: Self::CloudResources); + + /// Restore the resources from the data in the passed file. + /// It is the same file path that was passed to [`Cloud::store_resources_to_disk`] + /// + /// The implementation should panic when the loaded messages cannot meet the requirements of the passed `required_sources`. + /// This is done rather than loading the required resources from disk as this case usually represents a user error. + /// Loading from disk is used for more consistent results across benches but the user cannot hope to get consistent results while changing the benches that will be run. + /// They are better off recreating the resources from scratch in this case. + async fn load_resources_from_disk( + &mut self, + path: &Path, + required_resources: Vec, + ) -> Self::CloudResources; + /// This is called once at start up before running any benches. /// The returned Vec specifies the order in which to run benches. fn order_benches( @@ -32,6 +52,7 @@ pub trait Cloud { /// This is called before running each bench. /// Use it to destroy or create resources as needed. + /// However, this method will not be called when `--save-resources-to-disk` or `--load-resources-from-disk` is set. /// /// It is recommended to create all resources within create_resources for faster completion time, but it may be desirable in some circumstances to create some of them here. /// It is recommended to always destroy resources that will never be used again here. @@ -66,4 +87,11 @@ impl Cloud for NoCloud { type CloudResources = (); async fn cleanup_resources(&mut self) {} async fn create_resources(&mut self, _requests: Vec<()>) {} + async fn store_resources_to_disk(&mut self, _path: &Path, _resources: ()) {} + async fn load_resources_from_disk( + &mut self, + _path: &Path, + _required_resources: Vec, + ) { + } } diff --git a/windsock/src/data.rs b/windsock/src/data.rs index 7fc28d0a0..76242e92c 100644 --- a/windsock/src/data.rs +++ b/windsock/src/data.rs @@ -12,3 +12,7 @@ pub fn windsock_path() -> PathBuf { PathBuf::from("windsock_data") } + +pub fn cloud_resources_path() -> PathBuf { + windsock_path().join("cloud_resources") +} diff --git a/windsock/src/lib.rs b/windsock/src/lib.rs index f3c5b3fc3..42dd1e003 100644 --- a/windsock/src/lib.rs +++ b/windsock/src/lib.rs @@ -8,6 +8,7 @@ mod report; mod tables; pub use bench::{Bench, BenchParameters, BenchTask, Profiling}; +use data::cloud_resources_path; pub use report::{ ExternalReport, LatencyPercentile, Metric, OperationsReport, PubSubReport, Report, ReportArchive, @@ -20,7 +21,7 @@ use clap::Parser; use cli::Args; use cloud::{BenchInfo, Cloud}; use filter::Filter; -use std::process::exit; +use std::{path::Path, process::exit}; use tokio::runtime::Runtime; pub struct Windsock { @@ -156,35 +157,50 @@ impl Windsock Result<()> { ReportArchive::clear_last_run(); + let resources_path = cloud_resources_path(); - match self.benches.iter_mut().find(|x| x.tags.get_name() == name) { + let bench = match self.benches.iter_mut().find(|x| x.tags.get_name() == name) { Some(bench) => { if args .profilers .iter() .all(|x| bench.supported_profilers.contains(x)) { - let resources = if args.cloud { - Some( - self.cloud - .create_resources(vec![bench.required_cloud_resources()]) - .await, - ) - } else { - None - }; bench - .orchestrate(&args, running_in_release, resources) - .await; } else { return Err(anyhow!("Specified bench {name:?} was requested to run with the profilers {:?} but it only supports the profilers {:?}", args.profilers, bench.supported_profilers)); } } None => return Err(anyhow!("Specified bench {name:?} does not exist.")), + }; + + let resources = if args.cloud { + let resources = vec![bench.required_cloud_resources()]; + Some(if args.load_cloud_resources_from_disk { + self.cloud + .load_resources_from_disk(&resources_path, resources) + .await + } else { + self.cloud.create_resources(resources).await + }) + } else { + None + }; + + if args.store_cloud_resources_to_disk { + println!( + "Cloud resources have been created in preparation for running the bench:\n {name}" + ); + println!("Make sure to use `--cleanup-cloud-resources` when you are finished with these resources."); + } else { + bench + .orchestrate(&args, running_in_release, resources.clone()) + .await; } if args.cloud { - self.cloud.cleanup_resources().await; + self.cleanup_cloud_resources(resources, &args, &resources_path) + .await; } Ok(()) } @@ -196,6 +212,7 @@ impl Windsock Result<()> { ReportArchive::clear_last_run(); let filter = parse_filter(&args)?; + let resources_path = cloud_resources_path(); let mut bench_infos = vec![]; for bench in &mut self.benches { @@ -219,35 +236,71 @@ impl Windsock, + args: &Args, + resources_path: &Path, + ) { + if args.store_cloud_resources_to_disk { + if let Some(resources) = resources { + self.cloud + .store_resources_to_disk(resources_path, resources) + .await; + } + } else if args.load_cloud_resources_from_disk { + println!("Cloud resources have not been cleaned up."); + println!( + "Make sure to use `--cleanup-cloud-resources` when you are finished with them." + ); + } else { + std::fs::remove_file(resources_path).ok(); + self.cloud.cleanup_resources().await; + } + } + async fn run_filtered_benches_local( &mut self, args: Args,