Skip to content

Commit

Permalink
windsock: add --load-cloud-resources-from-disk and --store-cloud-reso…
Browse files Browse the repository at this point in the history
…urces-to-disk
  • Loading branch information
rukai committed Feb 6, 2024
1 parent 15e030c commit b154c4f
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 64 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ rand_distr = "0.4.1"
clap = { version = "4.0.4", features = ["cargo", "derive"] }
async-trait = "0.1.30"
typetag = "0.2.5"
aws-throwaway = { version = "0.6.0", default-features = false }
aws-throwaway = { version = "0.6.0", default-features = false, git = "https://github.com/shotover/aws-throwaway" }
tokio-bin-process = "0.4.0"
ordered-float = { version = "4.0.0", features = ["serde"] }
hyper = { version = "0.14.14", features = ["server"] }
Expand Down
87 changes: 58 additions & 29 deletions shotover-proxy/benches/windsock/cloud/aws.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::Result;
use aws_throwaway::{Aws, Ec2Instance, InstanceType};
use aws_throwaway::{CleanupResources, Ec2InstanceDefinition};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::fmt::Write;
use std::time::Duration;
use std::{
Expand Down Expand Up @@ -84,14 +86,7 @@ sudo apt-get update
sudo apt-get install -y sysstat"#,
)
.await;
instance
.instance
.ssh()
.push_file(
std::env::current_exe().unwrap().as_ref(),
Path::new("windsock"),
)
.await;
instance.upload_bencher().await;
instance
}

Expand Down Expand Up @@ -121,19 +116,14 @@ curl -sSL https://get.docker.com/ | sudo sh"#,
)
.await;

let local_shotover_path = bin_path!("shotover-proxy");
if include_shotover {
instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
upload_shotover(&instance).await;
}

Arc::new(Ec2InstanceWithDocker { instance })
Arc::new(Ec2InstanceWithDocker {
instance,
include_shotover,
})
}

pub async fn create_shotover_instance(&self) -> Arc<Ec2InstanceWithShotover> {
Expand All @@ -156,17 +146,8 @@ sudo apt-get install -y sysstat"#,
)
.await;

let local_shotover_path = bin_path!("shotover-proxy");
instance
.instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
// TODO: skip if not needed
upload_shotover(&instance.instance).await;

instance
}
Expand All @@ -177,11 +158,22 @@ sudo apt-get install -y sysstat"#,
}

/// Despite the name can also run shotover
#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithDocker {
pub instance: Ec2Instance,
pub include_shotover: bool,
}

impl Ec2InstanceWithDocker {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
if self.include_shotover {
upload_shotover(&self.instance).await;
}

Ok(())
}

pub async fn run_container(&self, image: &str, envs: &[(String, String)]) {
// cleanup old resources
// TODO: we need a way to ensure there are no shotover resources running.
Expand Down Expand Up @@ -277,11 +269,28 @@ fn get_compatible_instance_type() -> InstanceType {
}
}

#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithBencher {
pub instance: Ec2Instance,
}

impl Ec2InstanceWithBencher {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
self.upload_bencher().await;
Ok(())
}

pub async fn upload_bencher(&self) {
self.instance
.ssh()
.push_file(
std::env::current_exe().unwrap().as_ref(),
Path::new("windsock"),
)
.await;
}

pub async fn run_bencher(&self, args: &str, name: &str) {
self.instance
.ssh()
Expand All @@ -294,11 +303,19 @@ impl Ec2InstanceWithBencher {
}
}

#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithShotover {
pub instance: Ec2Instance,
}

impl Ec2InstanceWithShotover {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
upload_shotover(&self.instance).await;

Ok(())
}

pub async fn run_shotover(self: Arc<Self>, topology: &str) -> RunningShotover {
self.instance
.ssh()
Expand Down Expand Up @@ -388,3 +405,15 @@ impl RunningShotover {
}
}
}

pub async fn upload_shotover(instance: &Ec2Instance) {
let local_shotover_path = bin_path!("shotover-proxy");
instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
}
92 changes: 90 additions & 2 deletions shotover-proxy/benches/windsock/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ pub use aws::{
Ec2InstanceWithBencher, Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover,
};

use anyhow::Result;
use async_trait::async_trait;
use aws::AwsInstances;
use std::sync::Arc;
use futures::StreamExt;
use futures::{stream::FuturesUnordered, Future};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::{path::Path, sync::Arc};
use windsock::cloud::{BenchInfo, Cloud};

pub struct AwsCloud {
Expand Down Expand Up @@ -62,6 +67,7 @@ impl Cloud for AwsCloud {
shotover,
docker,
bencher,
required,
}
}

Expand Down Expand Up @@ -97,9 +103,82 @@ impl Cloud for AwsCloud {

// TODO: spin up background tokio task to delete unneeded EC2 instances once we add the functionality to aws_throwaway
}

async fn store_resources_to_disk(&mut self, path: &Path, resources: CloudResources) {
let resources = CloudResourcesDisk {
shotover: resources
.shotover
.into_iter()
.map(|x| {
Arc::into_inner(x)
.expect("A bench is still referencing an Ec2InstanceWithShotover")
})
.collect(),
docker: resources
.docker
.into_iter()
.map(|x| {
Arc::into_inner(x)
.expect("A bench is still referencing an Ec2InstanceWithDocker")
})
.collect(),
bencher: resources.bencher.map(|x| {
Arc::into_inner(x).expect("A bench is still referencing an Ec2InstanceWithBencher")
}),
required: resources.required,
};
std::fs::write(path, serde_json::to_string(&resources).unwrap()).unwrap();
}

async fn load_resources_from_disk(
&mut self,
path: &Path,
required_resources: Vec<CloudResourcesRequired>,
) -> CloudResources {
let required_resources = required_resources.into_iter().fold(
CloudResourcesRequired::default(),
CloudResourcesRequired::combine,
);
let mut resources: CloudResourcesDisk =
serde_json::from_str(&std::fs::read_to_string(path).unwrap()).unwrap();

if resources.required != required_resources {
panic!(
"Stored resources do not meet the requirements of this bench run. Maybe try rerunning --store-cloud-resources-to-disk\nloaded:\n{:?}\nrequired:\n{:?}",
resources.required,
required_resources
);
}

{
let mut futures =
FuturesUnordered::<Pin<Box<dyn Future<Output = Result<()>> + Send>>>::new();
for instance in &mut resources.bencher {
futures.push(Box::pin(instance.reinit()));
}
for instance in &mut resources.docker {
futures.push(Box::pin(instance.reinit()));
}

for instance in &mut resources.shotover {
futures.push(Box::pin(instance.reinit()));
}

while let Some(result) = futures.next().await {
let _: () = result.unwrap();
}
}

CloudResources {
shotover: resources.shotover.into_iter().map(Arc::new).collect(),
docker: resources.docker.into_iter().map(Arc::new).collect(),
bencher: resources.bencher.map(Arc::new),
required: resources.required,
}
}
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct CloudResourcesRequired {
pub shotover_instance_count: usize,
pub docker_instance_count: usize,
Expand All @@ -125,4 +204,13 @@ pub struct CloudResources {
pub shotover: Vec<Arc<Ec2InstanceWithShotover>>,
pub docker: Vec<Arc<Ec2InstanceWithDocker>>,
pub bencher: Option<Arc<Ec2InstanceWithBencher>>,
pub required: CloudResourcesRequired,
}

#[derive(Serialize, Deserialize)]
pub struct CloudResourcesDisk {
pub shotover: Vec<Ec2InstanceWithShotover>,
pub docker: Vec<Ec2InstanceWithDocker>,
pub bencher: Option<Ec2InstanceWithBencher>,
pub required: CloudResourcesRequired,
}
12 changes: 12 additions & 0 deletions windsock/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ pub struct Args {
#[clap(long, verbatim_doc_comment)]
pub cleanup_cloud_resources: bool,

/// Skip running of benches.
/// Skip automatic deletion of cloud resources on bench run completion.
/// Instead, just create cloud resources and write details of the resources to disk so they may be restored via `--load-cloud-resources-from-disk`
#[clap(long, verbatim_doc_comment)]
pub store_cloud_resources_to_disk: bool,

/// Skip automatic creation of cloud resources on bench run completion.
/// Skip automatic deletion of cloud resources on bench run completion.
/// Instead, details of the resources are loaded from disk as saved via a previous run using `--store-cloud-resources-to-disk`
#[clap(long, verbatim_doc_comment)]
pub load_cloud_resources_from_disk: bool,

/// Display results from the last benchmark run by:
/// Comparing various benches against a specific base bench.
///
Expand Down
28 changes: 28 additions & 0 deletions windsock/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::path::Path;

use async_trait::async_trait;

/// Implement this to give windsock some control over your cloud.
Expand All @@ -21,6 +23,24 @@ pub trait Cloud {
required_resources: Vec<Self::CloudResourcesRequired>,
) -> Self::CloudResources;

/// Construct a file at the provided path that will allow restoring the passed resources
///
/// It is gauranteed this will be called after all the benches have completed.
async fn store_resources_to_disk(&mut self, path: &Path, resources: Self::CloudResources);

/// Restore the resources from the data in the passed file.
/// It is the same file path that was passed to [`Cloud::store_resources_to_disk`]
///
/// The implementation should panic when the loaded messages cannot meet the requirements of the passed `required_sources`.
/// This is done rather than loading the required resources from disk as this case usually represents a user error.
/// Loading from disk is used for more consistent results across benches but the user cannot hope to get consistent results while changing the benches that will be run.
/// They are better off recreating the resources from scratch in this case.
async fn load_resources_from_disk(
&mut self,
path: &Path,
required_resources: Vec<Self::CloudResourcesRequired>,
) -> Self::CloudResources;

/// This is called once at start up before running any benches.
/// The returned Vec specifies the order in which to run benches.
fn order_benches(
Expand All @@ -32,6 +52,7 @@ pub trait Cloud {

/// This is called before running each bench.
/// Use it to destroy or create resources as needed.
/// However, this method will not be called when `--save-resources-to-disk` or `--load-resources-from-disk` is set.
///
/// It is recommended to create all resources within create_resources for faster completion time, but it may be desirable in some circumstances to create some of them here.
/// It is recommended to always destroy resources that will never be used again here.
Expand Down Expand Up @@ -66,4 +87,11 @@ impl Cloud for NoCloud {
type CloudResources = ();
async fn cleanup_resources(&mut self) {}
async fn create_resources(&mut self, _requests: Vec<()>) {}
async fn store_resources_to_disk(&mut self, _path: &Path, _resources: ()) {}
async fn load_resources_from_disk(
&mut self,
_path: &Path,
_required_resources: Vec<Self::CloudResourcesRequired>,
) {
}
}
4 changes: 4 additions & 0 deletions windsock/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ pub fn windsock_path() -> PathBuf {

PathBuf::from("windsock_data")
}

pub fn cloud_resources_path() -> PathBuf {
windsock_path().join("cloud_resources")
}
Loading

0 comments on commit b154c4f

Please sign in to comment.