Skip to content

Commit

Permalink
windsock: add --load-cloud-resources-from-disk and --store-cloud-reso…
Browse files Browse the repository at this point in the history
…urces-to-disk
  • Loading branch information
rukai committed Feb 7, 2024
1 parent 34208ba commit 2efdbb1
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 74 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

127 changes: 91 additions & 36 deletions shotover-proxy/benches/windsock/cloud/aws.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::Result;
use aws_throwaway::{Aws, Ec2Instance, InstanceType};
use aws_throwaway::{CleanupResources, Ec2InstanceDefinition};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::fmt::Write;
use std::time::Duration;
use std::{
Expand Down Expand Up @@ -34,38 +36,47 @@ impl AwsInstances {
.await
}

pub async fn create_bencher_instances(&self, count: usize) -> Vec<Arc<Ec2InstanceWithBencher>> {
pub async fn create_bencher_instances(
&self,
benches_will_run: bool,
count: usize,
) -> Vec<Arc<Ec2InstanceWithBencher>> {
let mut futures = vec![];
for _ in 0..count {
futures.push(self.create_bencher_instance());
futures.push(self.create_bencher_instance(benches_will_run));
}
futures::future::join_all(futures).await
}

pub async fn create_shotover_instances(
&self,
benches_will_run: bool,
count: usize,
) -> Vec<Arc<Ec2InstanceWithShotover>> {
let mut futures = vec![];
for _ in 0..count {
futures.push(self.create_shotover_instance());
futures.push(self.create_shotover_instance(benches_will_run));
}
futures::future::join_all(futures).await
}

pub async fn create_docker_instances(
&self,
benches_will_run: bool,
include_shotover: bool,
count: usize,
) -> Vec<Arc<Ec2InstanceWithDocker>> {
let mut futures = vec![];
for _ in 0..count {
futures.push(self.create_docker_instance(include_shotover));
futures.push(self.create_docker_instance(benches_will_run, include_shotover));
}
futures::future::join_all(futures).await
}

pub async fn create_bencher_instance(&self) -> Arc<Ec2InstanceWithBencher> {
pub async fn create_bencher_instance(
&self,
benches_will_run: bool,
) -> Arc<Ec2InstanceWithBencher> {
let instance = Arc::new(Ec2InstanceWithBencher {
instance: self
.aws
Expand All @@ -84,19 +95,17 @@ sudo apt-get update
sudo apt-get install -y sysstat"#,
)
.await;
instance
.instance
.ssh()
.push_file(
std::env::current_exe().unwrap().as_ref(),
Path::new("windsock"),
)
.await;

if benches_will_run {
instance.upload_bencher().await;
}

instance
}

pub async fn create_docker_instance(
&self,
benches_will_run: bool,
include_shotover: bool,
) -> Arc<Ec2InstanceWithDocker> {
let instance = self
Expand All @@ -121,22 +130,20 @@ curl -sSL https://get.docker.com/ | sudo sh"#,
)
.await;

let local_shotover_path = bin_path!("shotover-proxy");
if include_shotover {
instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
if include_shotover && benches_will_run {
upload_shotover(&instance).await;
}

Arc::new(Ec2InstanceWithDocker { instance })
Arc::new(Ec2InstanceWithDocker {
instance,
include_shotover,
})
}

pub async fn create_shotover_instance(&self) -> Arc<Ec2InstanceWithShotover> {
pub async fn create_shotover_instance(
&self,
benches_will_run: bool,
) -> Arc<Ec2InstanceWithShotover> {
let instance = Arc::new(Ec2InstanceWithShotover {
instance: self
.aws
Expand All @@ -156,17 +163,9 @@ sudo apt-get install -y sysstat"#,
)
.await;

let local_shotover_path = bin_path!("shotover-proxy");
instance
.instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
if benches_will_run {
upload_shotover(&instance.instance).await;
}

instance
}
Expand All @@ -177,11 +176,22 @@ sudo apt-get install -y sysstat"#,
}

/// Despite the name can also run shotover
#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithDocker {
pub instance: Ec2Instance,
pub include_shotover: bool,
}

impl Ec2InstanceWithDocker {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
if self.include_shotover {
upload_shotover(&self.instance).await;
}

Ok(())
}

pub async fn run_container(&self, image: &str, envs: &[(String, String)]) {
// cleanup old resources
// TODO: we need a way to ensure there are no shotover resources running.
Expand Down Expand Up @@ -277,11 +287,28 @@ fn get_compatible_instance_type() -> InstanceType {
}
}

#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithBencher {
pub instance: Ec2Instance,
}

impl Ec2InstanceWithBencher {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
self.upload_bencher().await;
Ok(())
}

pub async fn upload_bencher(&self) {
self.instance
.ssh()
.push_file(
std::env::current_exe().unwrap().as_ref(),
Path::new("windsock"),
)
.await;
}

pub async fn run_bencher(&self, args: &str, name: &str) {
self.instance
.ssh()
Expand All @@ -294,11 +321,19 @@ impl Ec2InstanceWithBencher {
}
}

#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithShotover {
pub instance: Ec2Instance,
}

impl Ec2InstanceWithShotover {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
upload_shotover(&self.instance).await;

Ok(())
}

pub async fn run_shotover(self: Arc<Self>, topology: &str) -> RunningShotover {
self.instance
.ssh()
Expand Down Expand Up @@ -388,3 +423,23 @@ impl RunningShotover {
}
}
}

pub async fn upload_shotover(instance: &Ec2Instance) {
let local_shotover_path = bin_path!("shotover-proxy");

// a leftover shotover-bin process can prevent uploading to shotover-bin
// so we need to kill any such processes before uploading
instance
.ssh()
.shell("killall -w shotover-bin > /dev/null || true")
.await;

instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
}
103 changes: 98 additions & 5 deletions shotover-proxy/benches/windsock/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ pub use aws::{
Ec2InstanceWithBencher, Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover,
};

use anyhow::Result;
use async_trait::async_trait;
use aws::AwsInstances;
use std::sync::Arc;
use futures::StreamExt;
use futures::{stream::FuturesUnordered, Future};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::{path::Path, sync::Arc};
use windsock::cloud::{BenchInfo, Cloud};

pub struct AwsCloud {
Expand Down Expand Up @@ -37,7 +42,11 @@ impl Cloud for AwsCloud {
println!("All AWS throwaway resources have been deleted");
}

async fn create_resources(&mut self, required: Vec<CloudResourcesRequired>) -> CloudResources {
async fn create_resources(
&mut self,
required: Vec<CloudResourcesRequired>,
benches_will_run: bool,
) -> CloudResources {
let required = required.into_iter().fold(
CloudResourcesRequired::default(),
CloudResourcesRequired::combine,
Expand All @@ -51,17 +60,19 @@ impl Cloud for AwsCloud {
let aws = self.aws.as_ref().unwrap();
let (docker, mut bencher, shotover) = futures::join!(
aws.create_docker_instances(
benches_will_run,
required.include_shotover_in_docker_instance,
required.docker_instance_count
),
aws.create_bencher_instances(1),
aws.create_shotover_instances(required.shotover_instance_count)
aws.create_bencher_instances(benches_will_run, 1),
aws.create_shotover_instances(benches_will_run, required.shotover_instance_count)
);
let bencher = bencher.pop();
CloudResources {
shotover,
docker,
bencher,
required,
}
}

Expand Down Expand Up @@ -97,9 +108,82 @@ impl Cloud for AwsCloud {

// TODO: spin up background tokio task to delete unneeded EC2 instances once we add the functionality to aws_throwaway
}

async fn store_resources_file(&mut self, path: &Path, resources: CloudResources) {
let resources = CloudResourcesDisk {
shotover: resources
.shotover
.into_iter()
.map(|x| {
Arc::into_inner(x)
.expect("A bench is still referencing an Ec2InstanceWithShotover")
})
.collect(),
docker: resources
.docker
.into_iter()
.map(|x| {
Arc::into_inner(x)
.expect("A bench is still referencing an Ec2InstanceWithDocker")
})
.collect(),
bencher: resources.bencher.map(|x| {
Arc::into_inner(x).expect("A bench is still referencing an Ec2InstanceWithBencher")
}),
required: resources.required,
};
std::fs::write(path, serde_json::to_string(&resources).unwrap()).unwrap();
}

async fn load_resources_file(
&mut self,
path: &Path,
required_resources: Vec<CloudResourcesRequired>,
) -> CloudResources {
let required_resources = required_resources.into_iter().fold(
CloudResourcesRequired::default(),
CloudResourcesRequired::combine,
);
let mut resources: CloudResourcesDisk =
serde_json::from_str(&std::fs::read_to_string(path).unwrap()).unwrap();

if resources.required != required_resources {
panic!(
"Stored resources do not meet the requirements of this bench run. Maybe try rerunning --store-cloud-resources-file\nloaded:\n{:?}\nrequired:\n{:?}",
resources.required,
required_resources
);
}

{
let mut futures =
FuturesUnordered::<Pin<Box<dyn Future<Output = Result<()>> + Send>>>::new();
for instance in &mut resources.bencher {
futures.push(Box::pin(instance.reinit()));
}
for instance in &mut resources.docker {
futures.push(Box::pin(instance.reinit()));
}

for instance in &mut resources.shotover {
futures.push(Box::pin(instance.reinit()));
}

while let Some(result) = futures.next().await {
let _: () = result.unwrap();
}
}

CloudResources {
shotover: resources.shotover.into_iter().map(Arc::new).collect(),
docker: resources.docker.into_iter().map(Arc::new).collect(),
bencher: resources.bencher.map(Arc::new),
required: resources.required,
}
}
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct CloudResourcesRequired {
pub shotover_instance_count: usize,
pub docker_instance_count: usize,
Expand All @@ -125,4 +209,13 @@ pub struct CloudResources {
pub shotover: Vec<Arc<Ec2InstanceWithShotover>>,
pub docker: Vec<Arc<Ec2InstanceWithDocker>>,
pub bencher: Option<Arc<Ec2InstanceWithBencher>>,
pub required: CloudResourcesRequired,
}

#[derive(Serialize, Deserialize)]
pub struct CloudResourcesDisk {
pub shotover: Vec<Ec2InstanceWithShotover>,
pub docker: Vec<Ec2InstanceWithDocker>,
pub bencher: Option<Ec2InstanceWithBencher>,
pub required: CloudResourcesRequired,
}
Loading

0 comments on commit 2efdbb1

Please sign in to comment.