Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

windsock: add --load-cloud-resources-from-disk and --store-cloud-resources-to-disk #1453

Merged
merged 1 commit into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

127 changes: 91 additions & 36 deletions shotover-proxy/benches/windsock/cloud/aws.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::Result;
use aws_throwaway::{Aws, Ec2Instance, InstanceType};
use aws_throwaway::{CleanupResources, Ec2InstanceDefinition};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::fmt::Write;
use std::time::Duration;
use std::{
Expand Down Expand Up @@ -34,38 +36,47 @@ impl AwsInstances {
.await
}

pub async fn create_bencher_instances(&self, count: usize) -> Vec<Arc<Ec2InstanceWithBencher>> {
pub async fn create_bencher_instances(
&self,
benches_will_run: bool,
count: usize,
) -> Vec<Arc<Ec2InstanceWithBencher>> {
let mut futures = vec![];
for _ in 0..count {
futures.push(self.create_bencher_instance());
futures.push(self.create_bencher_instance(benches_will_run));
}
futures::future::join_all(futures).await
}

pub async fn create_shotover_instances(
&self,
benches_will_run: bool,
count: usize,
) -> Vec<Arc<Ec2InstanceWithShotover>> {
let mut futures = vec![];
for _ in 0..count {
futures.push(self.create_shotover_instance());
futures.push(self.create_shotover_instance(benches_will_run));
}
futures::future::join_all(futures).await
}

pub async fn create_docker_instances(
&self,
benches_will_run: bool,
include_shotover: bool,
count: usize,
) -> Vec<Arc<Ec2InstanceWithDocker>> {
let mut futures = vec![];
for _ in 0..count {
futures.push(self.create_docker_instance(include_shotover));
futures.push(self.create_docker_instance(benches_will_run, include_shotover));
}
futures::future::join_all(futures).await
}

pub async fn create_bencher_instance(&self) -> Arc<Ec2InstanceWithBencher> {
pub async fn create_bencher_instance(
&self,
benches_will_run: bool,
) -> Arc<Ec2InstanceWithBencher> {
let instance = Arc::new(Ec2InstanceWithBencher {
instance: self
.aws
Expand All @@ -84,19 +95,17 @@ sudo apt-get update
sudo apt-get install -y sysstat"#,
)
.await;
instance
.instance
.ssh()
.push_file(
std::env::current_exe().unwrap().as_ref(),
Path::new("windsock"),
)
.await;

if benches_will_run {
instance.upload_bencher().await;
}

instance
}

pub async fn create_docker_instance(
&self,
benches_will_run: bool,
include_shotover: bool,
) -> Arc<Ec2InstanceWithDocker> {
let instance = self
Expand All @@ -121,22 +130,20 @@ curl -sSL https://get.docker.com/ | sudo sh"#,
)
.await;

let local_shotover_path = bin_path!("shotover-proxy");
if include_shotover {
instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
if include_shotover && benches_will_run {
upload_shotover(&instance).await;
}

Arc::new(Ec2InstanceWithDocker { instance })
Arc::new(Ec2InstanceWithDocker {
instance,
include_shotover,
})
}

pub async fn create_shotover_instance(&self) -> Arc<Ec2InstanceWithShotover> {
pub async fn create_shotover_instance(
&self,
benches_will_run: bool,
) -> Arc<Ec2InstanceWithShotover> {
let instance = Arc::new(Ec2InstanceWithShotover {
instance: self
.aws
Expand All @@ -156,17 +163,9 @@ sudo apt-get install -y sysstat"#,
)
.await;

let local_shotover_path = bin_path!("shotover-proxy");
instance
.instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
if benches_will_run {
upload_shotover(&instance.instance).await;
}

instance
}
Expand All @@ -177,11 +176,22 @@ sudo apt-get install -y sysstat"#,
}

/// Despite the name can also run shotover
#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithDocker {
pub instance: Ec2Instance,
pub include_shotover: bool,
}

impl Ec2InstanceWithDocker {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
if self.include_shotover {
upload_shotover(&self.instance).await;
}

Ok(())
}

pub async fn run_container(&self, image: &str, envs: &[(String, String)]) {
// cleanup old resources
// TODO: we need a way to ensure there are no shotover resources running.
Expand Down Expand Up @@ -277,11 +287,28 @@ fn get_compatible_instance_type() -> InstanceType {
}
}

#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithBencher {
pub instance: Ec2Instance,
}

impl Ec2InstanceWithBencher {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
self.upload_bencher().await;
Ok(())
}

pub async fn upload_bencher(&self) {
self.instance
.ssh()
.push_file(
std::env::current_exe().unwrap().as_ref(),
Path::new("windsock"),
)
.await;
}

pub async fn run_bencher(&self, args: &str, name: &str) {
self.instance
.ssh()
Expand All @@ -294,11 +321,19 @@ impl Ec2InstanceWithBencher {
}
}

#[derive(Serialize, Deserialize)]
pub struct Ec2InstanceWithShotover {
pub instance: Ec2Instance,
}

impl Ec2InstanceWithShotover {
pub async fn reinit(&mut self) -> Result<()> {
self.instance.init().await?;
upload_shotover(&self.instance).await;

Ok(())
}

pub async fn run_shotover(self: Arc<Self>, topology: &str) -> RunningShotover {
self.instance
.ssh()
Expand Down Expand Up @@ -388,3 +423,23 @@ impl RunningShotover {
}
}
}

pub async fn upload_shotover(instance: &Ec2Instance) {
let local_shotover_path = bin_path!("shotover-proxy");

// a leftover shotover-bin process can prevent uploading to shotover-bin
// so we need to kill any such processes before uploading
instance
.ssh()
.shell("killall -w shotover-bin > /dev/null || true")
.await;

instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;
}
103 changes: 98 additions & 5 deletions shotover-proxy/benches/windsock/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ pub use aws::{
Ec2InstanceWithBencher, Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover,
};

use anyhow::Result;
use async_trait::async_trait;
use aws::AwsInstances;
use std::sync::Arc;
use futures::StreamExt;
use futures::{stream::FuturesUnordered, Future};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::{path::Path, sync::Arc};
use windsock::cloud::{BenchInfo, Cloud};

pub struct AwsCloud {
Expand Down Expand Up @@ -37,7 +42,11 @@ impl Cloud for AwsCloud {
println!("All AWS throwaway resources have been deleted");
}

async fn create_resources(&mut self, required: Vec<CloudResourcesRequired>) -> CloudResources {
async fn create_resources(
&mut self,
required: Vec<CloudResourcesRequired>,
benches_will_run: bool,
) -> CloudResources {
let required = required.into_iter().fold(
CloudResourcesRequired::default(),
CloudResourcesRequired::combine,
Expand All @@ -51,17 +60,19 @@ impl Cloud for AwsCloud {
let aws = self.aws.as_ref().unwrap();
let (docker, mut bencher, shotover) = futures::join!(
aws.create_docker_instances(
benches_will_run,
required.include_shotover_in_docker_instance,
required.docker_instance_count
),
aws.create_bencher_instances(1),
aws.create_shotover_instances(required.shotover_instance_count)
aws.create_bencher_instances(benches_will_run, 1),
aws.create_shotover_instances(benches_will_run, required.shotover_instance_count)
);
let bencher = bencher.pop();
CloudResources {
shotover,
docker,
bencher,
required,
}
}

Expand Down Expand Up @@ -97,9 +108,82 @@ impl Cloud for AwsCloud {

// TODO: spin up background tokio task to delete unneeded EC2 instances once we add the functionality to aws_throwaway
}

async fn store_resources_file(&mut self, path: &Path, resources: CloudResources) {
let resources = CloudResourcesDisk {
shotover: resources
.shotover
.into_iter()
.map(|x| {
Arc::into_inner(x)
.expect("A bench is still referencing an Ec2InstanceWithShotover")
})
.collect(),
docker: resources
.docker
.into_iter()
.map(|x| {
Arc::into_inner(x)
.expect("A bench is still referencing an Ec2InstanceWithDocker")
})
.collect(),
bencher: resources.bencher.map(|x| {
Arc::into_inner(x).expect("A bench is still referencing an Ec2InstanceWithBencher")
}),
required: resources.required,
};
std::fs::write(path, serde_json::to_string(&resources).unwrap()).unwrap();
}

async fn load_resources_file(
&mut self,
path: &Path,
required_resources: Vec<CloudResourcesRequired>,
) -> CloudResources {
let required_resources = required_resources.into_iter().fold(
CloudResourcesRequired::default(),
CloudResourcesRequired::combine,
);
let mut resources: CloudResourcesDisk =
serde_json::from_str(&std::fs::read_to_string(path).unwrap()).unwrap();

if resources.required != required_resources {
panic!(
"Stored resources do not meet the requirements of this bench run. Maybe try rerunning --store-cloud-resources-file\nloaded:\n{:?}\nrequired:\n{:?}",
resources.required,
required_resources
);
}

{
let mut futures =
FuturesUnordered::<Pin<Box<dyn Future<Output = Result<()>> + Send>>>::new();
for instance in &mut resources.bencher {
futures.push(Box::pin(instance.reinit()));
}
for instance in &mut resources.docker {
futures.push(Box::pin(instance.reinit()));
}

for instance in &mut resources.shotover {
futures.push(Box::pin(instance.reinit()));
}

while let Some(result) = futures.next().await {
let _: () = result.unwrap();
}
}

CloudResources {
shotover: resources.shotover.into_iter().map(Arc::new).collect(),
docker: resources.docker.into_iter().map(Arc::new).collect(),
bencher: resources.bencher.map(Arc::new),
required: resources.required,
}
}
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct CloudResourcesRequired {
pub shotover_instance_count: usize,
pub docker_instance_count: usize,
Expand All @@ -125,4 +209,13 @@ pub struct CloudResources {
pub shotover: Vec<Arc<Ec2InstanceWithShotover>>,
pub docker: Vec<Arc<Ec2InstanceWithDocker>>,
pub bencher: Option<Arc<Ec2InstanceWithBencher>>,
pub required: CloudResourcesRequired,
}

#[derive(Serialize, Deserialize)]
pub struct CloudResourcesDisk {
pub shotover: Vec<Ec2InstanceWithShotover>,
pub docker: Vec<Ec2InstanceWithDocker>,
pub bencher: Option<Ec2InstanceWithBencher>,
pub required: CloudResourcesRequired,
}
Loading
Loading