diff --git a/Cargo.lock b/Cargo.lock index 3a51cc1d2..64fc94dba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -527,13 +527,13 @@ dependencies = [ [[package]] name = "aws-throwaway" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be0da812cb994ccf68233bb00551df33d55b6841f83ab35f98095316be82133c" +version = "0.6.1" +source = "git+https://github.com/rukai/aws-throwaway?branch=allow_reusing_instances#99ca38e7d26b29528700887f9d4b4d00d91c997e" dependencies = [ "anyhow", "async-trait", "base64", + "futures", "russh", "russh-keys", "serde", @@ -2232,7 +2232,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.5", + "socket2 0.4.10", "tokio", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index a990d586f..8daa1eae5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ rand_distr = "0.4.1" clap = { version = "4.0.4", features = ["cargo", "derive"] } async-trait = "0.1.30" typetag = "0.2.5" -aws-throwaway = { version = "0.6.0", default-features = false } +aws-throwaway = { version = "0.6.0", default-features = false, branch = "allow_reusing_instances", git = "https://github.com/rukai/aws-throwaway" } tokio-bin-process = "0.4.0" ordered-float = { version = "4.0.0", features = ["serde"] } hyper = { version = "0.14.14", features = ["server"] } diff --git a/shotover-proxy/benches/windsock/cloud/aws.rs b/shotover-proxy/benches/windsock/cloud/aws.rs index d5b668d3d..316b9c542 100644 --- a/shotover-proxy/benches/windsock/cloud/aws.rs +++ b/shotover-proxy/benches/windsock/cloud/aws.rs @@ -1,6 +1,7 @@ use aws_throwaway::{Aws, Ec2Instance, InstanceType}; use aws_throwaway::{CleanupResources, Ec2InstanceDefinition}; use regex::Regex; +use serde::{Deserialize, Serialize}; use std::fmt::Write; use std::time::Duration; use std::{ @@ -175,6 +176,7 @@ sudo apt-get install -y sysstat"#, } /// Despite the name can also run shotover +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithDocker { pub instance: Ec2Instance, } @@ -275,6 +277,7 @@ fn get_compatible_instance_type() -> InstanceType { } } +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithBencher { pub instance: Ec2Instance, } @@ -292,6 +295,7 @@ impl Ec2InstanceWithBencher { } } +#[derive(Serialize, Deserialize)] pub struct Ec2InstanceWithShotover { pub instance: Ec2Instance, } diff --git a/shotover-proxy/benches/windsock/cloud/mod.rs b/shotover-proxy/benches/windsock/cloud/mod.rs index 8d6b34899..f28c3ad2e 100644 --- a/shotover-proxy/benches/windsock/cloud/mod.rs +++ b/shotover-proxy/benches/windsock/cloud/mod.rs @@ -8,7 +8,8 @@ pub use aws::{ use async_trait::async_trait; use aws::AwsInstances; -use std::sync::Arc; +use serde::{Deserialize, Serialize}; +use std::{path::Path, sync::Arc}; use windsock::cloud::{BenchInfo, Cloud}; pub struct AwsCloud { @@ -62,6 +63,7 @@ impl Cloud for AwsCloud { shotover, docker, bencher, + required, } } @@ -97,9 +99,62 @@ impl Cloud for AwsCloud { // TODO: spin up background tokio task to delete unneeded EC2 instances once we add the functionality to aws_throwaway } + + async fn store_resources_to_disk(&mut self, path: &Path, resources: CloudResources) { + let resources = CloudResourcesDisk { + shotover: resources + .shotover + .into_iter() + .map(|x| { + Arc::into_inner(x) + .expect("A bench is still referencing an Ec2InstanceWithShotover") + }) + .collect(), + docker: resources + .docker + .into_iter() + .map(|x| { + Arc::into_inner(x) + .expect("A bench is still referencing an Ec2InstanceWithDocker") + }) + .collect(), + bencher: resources.bencher.map(|x| { + Arc::into_inner(x).expect("A bench is still referencing an Ec2InstanceWithBencher") + }), + required: resources.required, + }; + std::fs::write(path, serde_json::to_string(&resources).unwrap()).unwrap(); + } + + async fn load_resources_from_disk( + &mut self, + path: &Path, + required_resources: Vec, + ) -> CloudResources { + let required_resources = required_resources.into_iter().fold( + CloudResourcesRequired::default(), + CloudResourcesRequired::combine, + ); + let resources: CloudResourcesDisk = + serde_json::from_str(&std::fs::read_to_string(path).unwrap()).unwrap(); + if resources.required != required_resources { + panic!( + "Stored resources do not meet the requirements of this bench run. Maybe try rerunning --store-cloud-resources-to-disk\nloaded:\n{:?}\nrequired:\n{:?}", + resources.required, + required_resources + ); + } + + CloudResources { + shotover: resources.shotover.into_iter().map(Arc::new).collect(), + docker: resources.docker.into_iter().map(Arc::new).collect(), + bencher: resources.bencher.map(Arc::new), + required: resources.required, + } + } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct CloudResourcesRequired { pub shotover_instance_count: usize, pub docker_instance_count: usize, @@ -125,4 +180,13 @@ pub struct CloudResources { pub shotover: Vec>, pub docker: Vec>, pub bencher: Option>, + pub required: CloudResourcesRequired, +} + +#[derive(Serialize, Deserialize)] +pub struct CloudResourcesDisk { + pub shotover: Vec, + pub docker: Vec, + pub bencher: Option, + pub required: CloudResourcesRequired, } diff --git a/windsock/src/cli.rs b/windsock/src/cli.rs index ddd1f6b29..d6b5ffe38 100644 --- a/windsock/src/cli.rs +++ b/windsock/src/cli.rs @@ -68,6 +68,18 @@ pub struct Args { #[clap(long, verbatim_doc_comment)] pub cleanup_cloud_resources: bool, + /// Skip running of benches. + /// Skip automatic deletion of cloud resources on bench run completion. + /// Instead, just create cloud resources and write details of the resources to disk so they may be restored via `--load-cloud-resources-from-disk` + #[clap(long, verbatim_doc_comment)] + pub store_cloud_resources_to_disk: bool, + + /// Skip automatic creation of cloud resources on bench run completion. + /// Skip automatic deletion of cloud resources on bench run completion. + /// Instead, details of the resources are loaded from disk as saved via a previous run using `--store-cloud-resources-to-disk` + #[clap(long, verbatim_doc_comment)] + pub load_cloud_resources_from_disk: bool, + /// Display results from the last benchmark run by: /// Comparing various benches against a specific base bench. /// diff --git a/windsock/src/cloud.rs b/windsock/src/cloud.rs index 004fbc821..04c6df4e1 100644 --- a/windsock/src/cloud.rs +++ b/windsock/src/cloud.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use async_trait::async_trait; /// Implement this to give windsock some control over your cloud. @@ -21,6 +23,24 @@ pub trait Cloud { required_resources: Vec, ) -> Self::CloudResources; + /// Construct a file at the provided path that will allow restoring the passed resources + /// + /// It is gauranteed this will be called after all the benches have completed. + async fn store_resources_to_disk(&mut self, path: &Path, resources: Self::CloudResources); + + /// Restore the resources from the data in the passed file. + /// It is the same file path that was passed to [`Cloud::store_resources_to_disk`] + /// + /// The implementation should panic when the loaded messages cannot meet the requirements of the passed `required_sources`. + /// This is done rather than loading the required resources from disk as this case usually represents a user error. + /// Loading from disk is used for more consistent results across benches but the user cannot hope to get consistent results while changing the benches that will be run. + /// They are better off recreating the resources from scratch in this case. + async fn load_resources_from_disk( + &mut self, + path: &Path, + required_resources: Vec, + ) -> Self::CloudResources; + /// This is called once at start up before running any benches. /// The returned Vec specifies the order in which to run benches. fn order_benches( @@ -32,6 +52,7 @@ pub trait Cloud { /// This is called before running each bench. /// Use it to destroy or create resources as needed. + /// However, this method will not be called when `--save-resources-to-disk` or `--load-resources-from-disk` is set. /// /// It is recommended to create all resources within create_resources for faster completion time, but it may be desirable in some circumstances to create some of them here. /// It is recommended to always destroy resources that will never be used again here. @@ -66,4 +87,11 @@ impl Cloud for NoCloud { type CloudResources = (); async fn cleanup_resources(&mut self) {} async fn create_resources(&mut self, _requests: Vec<()>) {} + async fn store_resources_to_disk(&mut self, _path: &Path, _resources: ()) {} + async fn load_resources_from_disk( + &mut self, + _path: &Path, + _required_resources: Vec, + ) { + } } diff --git a/windsock/src/data.rs b/windsock/src/data.rs index 7fc28d0a0..76242e92c 100644 --- a/windsock/src/data.rs +++ b/windsock/src/data.rs @@ -12,3 +12,7 @@ pub fn windsock_path() -> PathBuf { PathBuf::from("windsock_data") } + +pub fn cloud_resources_path() -> PathBuf { + windsock_path().join("cloud_resources") +} diff --git a/windsock/src/lib.rs b/windsock/src/lib.rs index f3c5b3fc3..42dd1e003 100644 --- a/windsock/src/lib.rs +++ b/windsock/src/lib.rs @@ -8,6 +8,7 @@ mod report; mod tables; pub use bench::{Bench, BenchParameters, BenchTask, Profiling}; +use data::cloud_resources_path; pub use report::{ ExternalReport, LatencyPercentile, Metric, OperationsReport, PubSubReport, Report, ReportArchive, @@ -20,7 +21,7 @@ use clap::Parser; use cli::Args; use cloud::{BenchInfo, Cloud}; use filter::Filter; -use std::process::exit; +use std::{path::Path, process::exit}; use tokio::runtime::Runtime; pub struct Windsock { @@ -156,35 +157,50 @@ impl Windsock Result<()> { ReportArchive::clear_last_run(); + let resources_path = cloud_resources_path(); - match self.benches.iter_mut().find(|x| x.tags.get_name() == name) { + let bench = match self.benches.iter_mut().find(|x| x.tags.get_name() == name) { Some(bench) => { if args .profilers .iter() .all(|x| bench.supported_profilers.contains(x)) { - let resources = if args.cloud { - Some( - self.cloud - .create_resources(vec![bench.required_cloud_resources()]) - .await, - ) - } else { - None - }; bench - .orchestrate(&args, running_in_release, resources) - .await; } else { return Err(anyhow!("Specified bench {name:?} was requested to run with the profilers {:?} but it only supports the profilers {:?}", args.profilers, bench.supported_profilers)); } } None => return Err(anyhow!("Specified bench {name:?} does not exist.")), + }; + + let resources = if args.cloud { + let resources = vec![bench.required_cloud_resources()]; + Some(if args.load_cloud_resources_from_disk { + self.cloud + .load_resources_from_disk(&resources_path, resources) + .await + } else { + self.cloud.create_resources(resources).await + }) + } else { + None + }; + + if args.store_cloud_resources_to_disk { + println!( + "Cloud resources have been created in preparation for running the bench:\n {name}" + ); + println!("Make sure to use `--cleanup-cloud-resources` when you are finished with these resources."); + } else { + bench + .orchestrate(&args, running_in_release, resources.clone()) + .await; } if args.cloud { - self.cloud.cleanup_resources().await; + self.cleanup_cloud_resources(resources, &args, &resources_path) + .await; } Ok(()) } @@ -196,6 +212,7 @@ impl Windsock Result<()> { ReportArchive::clear_last_run(); let filter = parse_filter(&args)?; + let resources_path = cloud_resources_path(); let mut bench_infos = vec![]; for bench in &mut self.benches { @@ -219,35 +236,71 @@ impl Windsock, + args: &Args, + resources_path: &Path, + ) { + if args.store_cloud_resources_to_disk { + if let Some(resources) = resources { + self.cloud + .store_resources_to_disk(resources_path, resources) + .await; + } + } else if args.load_cloud_resources_from_disk { + println!("Cloud resources have not been cleaned up."); + println!( + "Make sure to use `--cleanup-cloud-resources` when you are finished with them." + ); + } else { + std::fs::remove_file(resources_path).ok(); + self.cloud.cleanup_resources().await; + } + } + async fn run_filtered_benches_local( &mut self, args: Args,