Skip to content

Commit

Permalink
refactor shotover windsock cloud (#1431)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jan 24, 2024
1 parent 994bcea commit 6574120
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 65 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ rand_distr.workspace = true
async-trait.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
async-once-cell = "0.5.2"
fred = { version = "8.0.0", features = ["enable-rustls"] }
tokio-bin-process.workspace = true
rustls-pemfile = "2.0.0"
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/benches/windsock/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
aws::{
cloud::{CloudResources, CloudResourcesRequired},
Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover,
cloud::{
CloudResources, CloudResourcesRequired, Ec2InstanceWithDocker, Ec2InstanceWithShotover,
RunningShotover,
},
common::{self, Shotover},
profilers::{self, CloudProfilerRunner, ProfilerRunner},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
//! Windsock specific logic built on top of aws_throwaway
pub mod cloud;

use async_once_cell::OnceCell;
use aws_throwaway::{Aws, Ec2Instance, InstanceType};
use aws_throwaway::{CleanupResources, Ec2InstanceDefinition};
use regex::Regex;
Expand All @@ -19,23 +14,22 @@ use windsock::ReportArchive;

static AWS_THROWAWAY_TAG: &str = "windsock";

static AWS: OnceCell<WindsockAws> = OnceCell::new();

/// TODO: move WindsockAws into a private module so only AwsCloud has access to it.
pub struct WindsockAws {
pub struct AwsInstances {
aws: Aws,
}

impl WindsockAws {
pub async fn get() -> &'static Self {
AWS.get_or_init(async move {
WindsockAws {
aws: Aws::builder(CleanupResources::WithAppTag(AWS_THROWAWAY_TAG.to_owned()))
.build()
.await,
}
})
.await
impl AwsInstances {
pub async fn new() -> Self {
AwsInstances {
aws: Aws::builder(CleanupResources::WithAppTag(AWS_THROWAWAY_TAG.to_owned()))
.build()
.await,
}
}

pub async fn cleanup() {
Aws::cleanup_resources_static(CleanupResources::WithAppTag(AWS_THROWAWAY_TAG.to_owned()))
.await
}

pub async fn create_bencher_instances(&self, count: usize) -> Vec<Arc<Ec2InstanceWithBencher>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,53 @@
use super::{
Ec2InstanceWithBencher, Ec2InstanceWithDocker, Ec2InstanceWithShotover, AWS, AWS_THROWAWAY_TAG,
//! Windsock specific logic built on top of aws_throwaway
mod aws;

pub use aws::{
Ec2InstanceWithBencher, Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover,
};

use async_trait::async_trait;
use aws_throwaway::{Aws, CleanupResources};
use aws::AwsInstances;
use std::sync::Arc;
use windsock::cloud::{BenchInfo, Cloud};

pub struct AwsCloud;
pub struct AwsCloud {
aws: Option<AwsInstances>,
}

impl AwsCloud {
pub fn new_boxed() -> Box<
dyn Cloud<CloudResourcesRequired = CloudResourcesRequired, CloudResources = CloudResources>,
> {
Box::new(AwsCloud)
Box::new(AwsCloud { aws: None })
}
}

#[async_trait(?Send)]
impl Cloud for AwsCloud {
type CloudResourcesRequired = CloudResourcesRequired;
type CloudResources = CloudResources;
async fn cleanup_resources(&self) {
match AWS.get() {
async fn cleanup_resources(&mut self) {
match &self.aws {
// AWS is initialized, it'll be faster to cleanup resources making use of the initialization
Some(aws) => aws.cleanup_resources().await,
// AWS is not initialized, it'll be faster to cleanup resources skipping initialization
None => {
Aws::cleanup_resources_static(CleanupResources::WithAppTag(
AWS_THROWAWAY_TAG.to_owned(),
))
.await
}
None => AwsInstances::cleanup().await,
}
}

async fn create_resources(&self, required: Vec<CloudResourcesRequired>) -> CloudResources {
async fn create_resources(&mut self, required: Vec<CloudResourcesRequired>) -> CloudResources {
let required = required.into_iter().fold(
CloudResourcesRequired::default(),
CloudResourcesRequired::combine,
);
println!("Creating AWS resources: {required:#?}");

// TODO: make Option<WindsockAws> field of AwsCloud
let aws = crate::aws::WindsockAws::get().await;
if self.aws.is_none() {
self.aws = Some(crate::cloud::AwsInstances::new().await);
}

let aws = self.aws.as_ref().unwrap();
let (docker, mut bencher, shotover) = futures::join!(
aws.create_docker_instances(
required.include_shotover_in_docker_instance,
Expand All @@ -61,15 +65,15 @@ impl Cloud for AwsCloud {
}

fn order_benches(
&self,
&mut self,
benches: Vec<BenchInfo<CloudResourcesRequired>>,
) -> Vec<BenchInfo<CloudResourcesRequired>> {
// TODO: put benches with most resources first
benches
}

async fn adjust_resources(
&self,
&mut self,
_benches: &[BenchInfo<CloudResourcesRequired>],
_bench_index: usize,
resources: &mut CloudResources,
Expand Down
10 changes: 6 additions & 4 deletions shotover-proxy/benches/windsock/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::aws::cloud::{CloudResources, CloudResourcesRequired};
use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover};
use crate::cloud::{
CloudResources, CloudResourcesRequired, Ec2InstanceWithDocker, Ec2InstanceWithShotover,
RunningShotover,
};
use crate::common::{self, Shotover};
use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner};
use crate::shotover::shotover_process_custom_topology;
Expand Down Expand Up @@ -157,7 +159,7 @@ impl KafkaBench {
&self,
shotover_instance: Option<Arc<Ec2InstanceWithShotover>>,
kafka_instance: Arc<Ec2InstanceWithDocker>,
) -> Option<crate::aws::RunningShotover> {
) -> Option<RunningShotover> {
match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let shotover_instance = shotover_instance.unwrap();
Expand All @@ -176,7 +178,7 @@ impl KafkaBench {
async fn run_aws_shotover_colocated_with_kafka(
&self,
instance: Arc<Ec2InstanceWithDocker>,
) -> Option<crate::aws::RunningShotover> {
) -> Option<RunningShotover> {
let ip = instance.instance.private_ip().to_string();
match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
Expand Down
8 changes: 4 additions & 4 deletions shotover-proxy/benches/windsock/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod aws;
mod cassandra;
mod cloud;
mod common;
#[cfg(feature = "rdkafka-driver-tests")]
mod kafka;
Expand All @@ -12,8 +12,8 @@ use crate::common::*;
#[cfg(feature = "rdkafka-driver-tests")]
use crate::kafka::*;
use crate::redis::*;
use aws::cloud::CloudResources;
use aws::cloud::CloudResourcesRequired;
use cloud::CloudResources;
use cloud::CloudResourcesRequired;
use std::path::Path;
use tracing_subscriber::EnvFilter;
use windsock::{Bench, Windsock};
Expand Down Expand Up @@ -150,7 +150,7 @@ fn main() {
.chain(kafka_benches)
.chain(redis_benches)
.collect(),
aws::cloud::AwsCloud::new_boxed(),
cloud::AwsCloud::new_boxed(),
&["release"],
)
.run();
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/benches/windsock/redis.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
aws::{
cloud::{CloudResources, CloudResourcesRequired},
Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover,
cloud::{
CloudResources, CloudResourcesRequired, Ec2InstanceWithDocker, Ec2InstanceWithShotover,
RunningShotover,
},
common::{self, Shotover},
profilers::{self, CloudProfilerRunner, ProfilerRunner},
Expand Down
12 changes: 6 additions & 6 deletions windsock/src/cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ pub trait Cloud {

/// Cleanup all cloud resources created by windsock.
/// You should destroy not just resources created during this bench run but also resources created in past bench runs that might have missed cleanup due to a panic.
async fn cleanup_resources(&self);
async fn cleanup_resources(&mut self);

/// This is called once at start up before running any benches.
/// The implementation must return an object containing all the requested cloud resources.
/// The `required_resources` contains the `CloudResourcesRequired` returned by each bench that will be executed in this run.
async fn create_resources(
&self,
&mut self,
required_resources: Vec<Self::CloudResourcesRequired>,
) -> Self::CloudResources;

/// This is called once at start up before running any benches.
/// The returned Vec specifies the order in which to run benches.
fn order_benches(
&self,
&mut self,
benches: Vec<BenchInfo<Self::CloudResourcesRequired>>,
) -> Vec<BenchInfo<Self::CloudResourcesRequired>> {
benches
Expand All @@ -36,7 +36,7 @@ pub trait Cloud {
/// It is recommended to create all resources within create_resources for faster completion time, but it may be desirable in some circumstances to create some of them here.
/// It is recommended to always destroy resources that will never be used again here.
async fn adjust_resources(
&self,
&mut self,
_benches: &[BenchInfo<Self::CloudResourcesRequired>],
_bench_index: usize,
_resources: &mut Self::CloudResources,
Expand Down Expand Up @@ -64,6 +64,6 @@ impl NoCloud {
impl Cloud for NoCloud {
type CloudResourcesRequired = ();
type CloudResources = ();
async fn cleanup_resources(&self) {}
async fn create_resources(&self, _requests: Vec<()>) {}
async fn cleanup_resources(&mut self) {}
async fn create_resources(&mut self, _requests: Vec<()>) {}
}

0 comments on commit 6574120

Please sign in to comment.