Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Oct 23, 2023
1 parent cf5e196 commit ca6b950
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 72 deletions.
6 changes: 3 additions & 3 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,11 @@ This is mainly used in conjunction with the `TuneableConsistencyScatter` transfo
This transform sends messages to both the defined sub chain and the remaining down-chain transforms.
The response from the down-chain transform is returned back up-chain but various behaviours can be defined by the `behaviour` field to handle the case when the responses from the sub chain and down-chain do not match.

Tee also exposes an optional HTTP API to switch which chain is the "sub-chain" and which chain is the "primary chain", that being the chain which responses are returned to the client from.
Tee also exposes an optional HTTP API to switch which chain to use as the "result source", that is the chain to return responses from.

`GET` `/switched` will return `"true"` or `"false"` indicating whether the chain has been switched from what was ooriginally defined in the topology file.
`GET` `/result-source` will return `"regular-chain"` or `"tee-chain"` indicating what chain is being used for the result source.

`PUT` `/switch` with the body content as either `true` or `false` will either switch the chain or revert it to the definition in the topology file.
`PUT` `/result-source` with the body content as either `regular-chain` or `tee-chain` to set the result source.

```yaml
- Tee:
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
---
main_log_level: "info,shotover_proxy=info"
main_log_level: "debug,shotover_proxy=debug"
observability_interface: "0.0.0.0:9001"
12 changes: 6 additions & 6 deletions shotover-proxy/tests/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,20 @@ async fn test_switch_main_chain() {
assert_eq!("a", result);

let _ = hyper_request(
format!("http://localhost:{}/switch", switch_port),
format!("http://localhost:{}/result-source", switch_port),
Method::PUT,
Body::from("true"),
Body::from("tee-chain"),
)
.await;

let res = hyper_request(
format!("http://localhost:{}/switched", switch_port),
format!("http://localhost:{}/result-source", switch_port),
Method::GET,
Body::empty(),
)
.await;
let body = read_response_body(res).await.unwrap();
assert_eq!("true", body);
assert_eq!("tee-chain", body);

let result = redis::cmd("SET")
.arg("key")
Expand All @@ -259,9 +259,9 @@ async fn test_switch_main_chain() {
assert_eq!("b", result);

let _ = hyper_request(
format!("http://localhost:{}/switch", switch_port),
format!("http://localhost:{}/result-source", switch_port),
Method::PUT,
Body::from("false"),
Body::from("regular-chain"),
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ metrics-exporter-prometheus = "0.12.0"
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
hyper = { version = "0.14.14", features = ["server"] }
hyper.workspace = true
halfbrown = "0.2.1"

# Transform dependencies
Expand Down
151 changes: 90 additions & 61 deletions shotover/src/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ use crate::transforms::chain::{BufferedChain, TransformChainBuilder};
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use atomic_enum::atomic_enum;
use bytes::Bytes;
use hyper::{
service::{make_service_fn, service_fn},
Method, Request, StatusCode, {Body, Response, Server},
};
use metrics::{register_counter, Counter};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::atomic::Ordering;
use std::{convert::Infallible, net::SocketAddr, str, sync::Arc};
use tokio::sync::Mutex;
use tracing::{debug, error, trace, warn};

pub struct TeeBuilder {
Expand All @@ -22,7 +24,6 @@ pub struct TeeBuilder {
pub timeout_micros: Option<u64>,
dropped_messages: Counter,
switch_port: Option<u16>,
switched_chain: Option<Arc<Mutex<bool>>>,
}

pub enum ConsistencyBehaviorBuilder {
Expand All @@ -39,7 +40,6 @@ impl TeeBuilder {
behavior: ConsistencyBehaviorBuilder,
timeout_micros: Option<u64>,
switch_port: Option<u16>,
switched_chain: Option<Arc<Mutex<bool>>>,
) -> Self {
let dropped_messages = register_counter!("tee_dropped_messages", "chain" => "Tee");

Expand All @@ -50,17 +50,18 @@ impl TeeBuilder {
timeout_micros,
dropped_messages,
switch_port,
switched_chain,
}
}
}

impl TransformBuilder for TeeBuilder {
fn build(&self) -> Transforms {
let result_source = Arc::new(AtomicResultSource::new(ResultSource::RegularChain));

if let Some(switch_port) = self.switch_port {
let chain_switch_listener =
ChainSwitchListener::new(SocketAddr::from(([127, 0, 0, 1], switch_port)));
tokio::spawn(chain_switch_listener.async_run(self.switched_chain.clone().unwrap()));
tokio::spawn(chain_switch_listener.async_run(result_source.clone()));
}

Transforms::Tee(Tee {
Expand All @@ -78,7 +79,7 @@ impl TransformBuilder for TeeBuilder {
buffer_size: self.buffer_size,
timeout_micros: self.timeout_micros,
dropped_messages: self.dropped_messages.clone(),
switched_chain: self.switched_chain.clone(),
result_source: result_source.clone(),
})
}

Expand Down Expand Up @@ -117,7 +118,22 @@ pub struct Tee {
pub behavior: ConsistencyBehavior,
pub timeout_micros: Option<u64>,
dropped_messages: Counter,
switched_chain: Option<Arc<Mutex<bool>>>,
result_source: Arc<AtomicResultSource>,
}

#[atomic_enum]
pub enum ResultSource {
RegularChain,
TeeChain,
}

impl fmt::Display for ResultSource {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ResultSource::RegularChain => write!(f, "regular-chain"),
ResultSource::TeeChain => write!(f, "tee-chain"),
}
}
}

pub enum ConsistencyBehavior {
Expand Down Expand Up @@ -176,7 +192,6 @@ impl TransformConfig for TeeConfig {
behavior,
self.timeout_micros,
self.switch_port,
Some(Arc::new(Mutex::new(false))),
)))
}
}
Expand All @@ -189,53 +204,48 @@ impl Transform for Tee {
}

impl Tee {
async fn is_switched(&mut self) -> bool {
if let Some(switched_chain) = self.switched_chain.as_ref() {
let switched = switched_chain.lock().await;
*switched
} else {
false
}
}

async fn return_response(
&mut self,
tee_result: Messages,
chain_result: Messages,
) -> Result<Messages> {
if self.is_switched().await {
Ok(tee_result)
} else {
Ok(chain_result)
let result_source: ResultSource = self.result_source.load(Ordering::Relaxed);
match result_source {
ResultSource::RegularChain => Ok(chain_result),
ResultSource::TeeChain => Ok(tee_result),
}
}

async fn ignore_behaviour_inner<'a>(
&'a mut self,
requests_wrapper: Wrapper<'a>,
) -> Result<Messages> {
if self.is_switched().await {
let (tee_result, chain_result) = tokio::join!(
self.tx
.process_request(requests_wrapper.clone(), self.timeout_micros),
requests_wrapper.call_next_transform()
);
if let Err(e) = chain_result {
self.dropped_messages.increment(1);
trace!("Tee Ignored error {e}");
let result_source: ResultSource = self.result_source.load(Ordering::Relaxed);
match result_source {
ResultSource::RegularChain => {
let (tee_result, chain_result) = tokio::join!(
self.tx
.process_request_no_return(requests_wrapper.clone(), self.timeout_micros),
requests_wrapper.call_next_transform()
);
if let Err(e) = tee_result {
self.dropped_messages.increment(1);
trace!("Tee Ignored error {e}");
}
chain_result
}
tee_result
} else {
let (tee_result, chain_result) = tokio::join!(
self.tx
.process_request_no_return(requests_wrapper.clone(), self.timeout_micros),
requests_wrapper.call_next_transform()
);
if let Err(e) = tee_result {
self.dropped_messages.increment(1);
trace!("Tee Ignored error {e}");
ResultSource::TeeChain => {
let (tee_result, chain_result) = tokio::join!(
self.tx
.process_request(requests_wrapper.clone(), self.timeout_micros),
requests_wrapper.call_next_transform()
);
if let Err(e) = chain_result {
self.dropped_messages.increment(1);
trace!("Tee Ignored error {e}");
}
tee_result
}
chain_result
}
}

Expand Down Expand Up @@ -334,58 +344,77 @@ impl ChainSwitchListener {
.expect("builder with known status code must not fail")
}

async fn set_switched_chain(body: Bytes, switched_chain: Arc<Mutex<bool>>) -> Result<()> {
let new_switched_chain_state_str = str::from_utf8(body.as_ref())?;
let new_switched_chain_state = new_switched_chain_state_str.parse::<bool>()?;
let mut switched = switched_chain.lock().await;
*switched = new_switched_chain_state;
async fn set_result_source_chain(
body: Bytes,
result_source: Arc<AtomicResultSource>,
) -> Result<()> {
let new_result_source = str::from_utf8(body.as_ref())?;

let new_value = match new_result_source {
"tee-chain" => ResultSource::TeeChain,
"regular-chain" => ResultSource::RegularChain,
_ => {
return Err(anyhow!(
r"Invalid value for result source: {}, should be 'tee-chain' or 'regular-chain'",
new_result_source
))
}
};

debug!("Setting result source to {}", new_value);

result_source.store(new_value, Ordering::Relaxed);
Ok(())
}

async fn async_run(self, switched_chain: Arc<Mutex<bool>>) {
if let Err(err) = self.async_run_inner(switched_chain).await {
async fn async_run(self, result_source: Arc<AtomicResultSource>) {
if let Err(err) = self.async_run_inner(result_source).await {
error!("Error in ChainSwitchListener: {}", err);
}
}

async fn async_run_inner(self, switched_chain: Arc<Mutex<bool>>) -> Result<()> {
async fn async_run_inner(self, result_source: Arc<AtomicResultSource>) -> Result<()> {
let make_svc = make_service_fn(move |_| {
let switched_chain = switched_chain.clone();
let result_source = result_source.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
let switched_chain = switched_chain.clone();
let result_source = result_source.clone();
async move {
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/switched") => {
let switched_chain: bool = *switched_chain.lock().await;
Self::rsp(StatusCode::OK, switched_chain.to_string())
(&Method::GET, "/result-source") => {
let result_source: ResultSource =
result_source.load(Ordering::Relaxed);
Self::rsp(StatusCode::OK, result_source.to_string())
}
(&Method::PUT, "/switch") => {
(&Method::PUT, "/result-source") => {
match hyper::body::to_bytes(req.into_body()).await {
Ok(body) => {
match Self::set_switched_chain(body, switched_chain.clone())
.await
match Self::set_result_source_chain(
body,
result_source.clone(),
)
.await
{
Err(error) => {
error!(?error, "switching chain failed");
error!(?error, "setting result source failed");
Self::rsp(
StatusCode::BAD_REQUEST,
"switching chain failed",
"setting result source failed",
)
}
Ok(()) => Self::rsp(StatusCode::OK, Body::empty()),
}
}
Err(error) => {
error!(%error, "setting filter failed - Couldn't read bytes");
error!(%error, "setting result source failed - Couldn't read bytes");
Self::rsp(
StatusCode::INTERNAL_SERVER_ERROR,
format!("{error:?}"),
)
}
}
}
_ => Self::rsp(StatusCode::NOT_FOUND, "/switched or /switch"),
_ => Self::rsp(StatusCode::NOT_FOUND, "try /result-source"),
};
Ok::<_, Infallible>(response)
}
Expand Down

0 comments on commit ca6b950

Please sign in to comment.