Skip to content

Commit

Permalink
Merge branch 'main' into make-tls-builder-take-refs
Browse files Browse the repository at this point in the history
  • Loading branch information
justinweng-instaclustr authored May 30, 2024
2 parents 12555e9 + c1ee55f commit 8eeb7f8
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 38 deletions.
19 changes: 19 additions & 0 deletions shotover/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,25 @@ impl Message {
}
}

/// Same as [`Message::frame`] but consumes the message and returns an owned [`Frame`]
/// It is useful when the transform generates a request and consumes the response without the involvement of the client.
pub fn into_frame(mut self) -> Option<Frame> {
let (inner, result) = self.inner.take().unwrap().ensure_parsed(self.codec_state);
if let Err(err) = result {
// TODO: If we could include a stacktrace in this error it would be really helpful
tracing::error!("{:?}", err.context("Failed to parse frame"));
return None;
}

match inner {
MessageInner::RawBytes { .. } => {
unreachable!("Cannot be RawBytes because ensure_parsed was called")
}
MessageInner::Parsed { frame, .. } => Some(frame),
MessageInner::Modified { frame } => Some(frame),
}
}

/// Return the shotover assigned MessageId
pub fn id(&self) -> MessageId {
self.id
Expand Down
72 changes: 60 additions & 12 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use kafka_protocol::messages::{
};
use kafka_protocol::protocol::{Builder, StrBytes};
use kafka_protocol::ResponseError;
use metrics::{counter, Counter};
use node::{ConnectionFactory, KafkaAddress, KafkaNode};
use rand::rngs::SmallRng;
use rand::seq::{IteratorRandom, SliceRandom};
Expand Down Expand Up @@ -95,7 +96,7 @@ const NAME: &str = "KafkaSinkCluster";
impl TransformConfig for KafkaSinkClusterConfig {
async fn get_builder(
&self,
_transform_context: TransformContextConfig,
transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>> {
let tls = self.tls.as_ref().map(TlsConnector::new).transpose()?;

Expand All @@ -119,6 +120,7 @@ impl TransformConfig for KafkaSinkClusterConfig {
shotover_nodes.sort_by_key(|x| x.broker_id);

Ok(Box::new(KafkaSinkClusterBuilder::new(
transform_context.chain_name,
self.first_contact_points.clone(),
&self.authorize_scram_over_mtls,
shotover_nodes,
Expand Down Expand Up @@ -152,10 +154,13 @@ pub struct KafkaSinkClusterBuilder {
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
authorize_scram_over_mtls: Option<AuthorizeScramOverMtlsBuilder>,
tls: Option<TlsConnector>,
out_of_rack_requests: Counter,
}

impl KafkaSinkClusterBuilder {
#[allow(clippy::too_many_arguments)]
pub fn new(
chain_name: String,
first_contact_points: Vec<String>,
authorize_scram_over_mtls: &Option<AuthorizeScramOverMtlsConfig>,
shotover_nodes: Vec<ShotoverNode>,
Expand All @@ -182,6 +187,7 @@ impl KafkaSinkClusterBuilder {
topic_by_name: Arc::new(DashMap::new()),
topic_by_id: Arc::new(DashMap::new()),
nodes_shared: Arc::new(RwLock::new(vec![])),
out_of_rack_requests: counter!("shotover_out_of_rack_requests_count", "chain" => chain_name, "transform" => NAME),
tls,
})
}
Expand All @@ -192,7 +198,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
Box::new(KafkaSinkCluster {
first_contact_points: self.first_contact_points.clone(),
shotover_nodes: self.shotover_nodes.clone(),
_rack: self.rack.clone(),
rack: self.rack.clone(),
nodes: vec![],
nodes_shared: self.nodes_shared.clone(),
controller_broker: self.controller_broker.clone(),
Expand All @@ -214,6 +220,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
temp_responses_buffer: Default::default(),
sasl_mechanism: None,
authorize_scram_over_mtls: self.authorize_scram_over_mtls.as_ref().map(|x| x.build()),
out_of_rack_requests: self.out_of_rack_requests.clone(),
})
}

Expand Down Expand Up @@ -251,8 +258,7 @@ impl AtomicBrokerId {
pub struct KafkaSinkCluster {
first_contact_points: Vec<String>,
shotover_nodes: Vec<ShotoverNode>,
// TODO: use this for rack aware routing
_rack: StrBytes,
rack: StrBytes,
nodes: Vec<KafkaNode>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
controller_broker: Arc<AtomicBrokerId>,
Expand All @@ -272,6 +278,7 @@ pub struct KafkaSinkCluster {
temp_responses_buffer: Vec<Message>,
sasl_mechanism: Option<String>,
authorize_scram_over_mtls: Option<AuthorizeScramOverMtls>,
out_of_rack_requests: Counter,
}

/// State of a Request/Response is maintained by this enum.
Expand Down Expand Up @@ -734,12 +741,30 @@ impl KafkaSinkCluster {
let destination = if let Some(partition) =
topic_meta.partitions.get(partition_index)
{
self.nodes
if let Some(node) = self
.nodes
.iter_mut()
.filter(|node| partition.replica_nodes.contains(&node.broker_id))
.filter(|node| {
partition
.shotover_rack_replica_nodes
.contains(&node.broker_id)
})
.choose(&mut self.rng)
.unwrap()
.broker_id
{
node.broker_id
} else {
self.out_of_rack_requests.increment(1);
self.nodes
.iter_mut()
.filter(|node| {
partition
.external_rack_replica_nodes
.contains(&node.broker_id)
})
.choose(&mut self.rng)
.unwrap()
.broker_id
}
} else {
let partition_len = topic_meta.partitions.len();
let topic_name = Self::format_topic_name(&topic);
Expand Down Expand Up @@ -1314,8 +1339,19 @@ impl KafkaSinkCluster {
.iter()
.map(|partition| Partition {
index: partition.partition_index,
leader_id: *partition.leader_id,
replica_nodes: partition.replica_nodes.iter().map(|x| x.0).collect(),
leader_id: partition.leader_id,
shotover_rack_replica_nodes: partition
.replica_nodes
.iter()
.cloned()
.filter(|replica_node_id| self.broker_within_rack(*replica_node_id))
.collect(),
external_rack_replica_nodes: partition
.replica_nodes
.iter()
.cloned()
.filter(|replica_node_id| !self.broker_within_rack(*replica_node_id))
.collect(),
})
.collect();
partitions.sort_by_key(|x| x.index);
Expand Down Expand Up @@ -1543,6 +1579,17 @@ impl KafkaSinkCluster {
self.update_local_nodes().await;
}
}

fn broker_within_rack(&self, broker_id: BrokerId) -> bool {
self.nodes.iter().any(|node| {
node.broker_id == broker_id
&& node
.rack
.as_ref()
.map(|rack| rack == &self.rack)
.unwrap_or(false)
})
}
}

fn hash_partition(topic_id: Uuid, partition_index: i32) -> usize {
Expand All @@ -1560,8 +1607,9 @@ struct Topic {
#[derive(Debug, Clone)]
struct Partition {
index: i32,
leader_id: i32,
replica_nodes: Vec<i32>,
leader_id: BrokerId,
shotover_rack_replica_nodes: Vec<BrokerId>,
external_rack_replica_nodes: Vec<BrokerId>,
}

struct FindCoordinator {
Expand Down
64 changes: 38 additions & 26 deletions shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ async fn task(
// + From our testing delegation tokens should be propagated within 0.5s to 1s on unloaded kafka clusters of size 15 to 30 nodes.
let token = tokio::time::timeout(
Duration::from_secs(120),
create_delegation_token_for_user(&mut connections, username.clone(), &mut rng),
create_delegation_token_for_user_with_wait(
&mut connections,
username.clone(),
&mut rng,
),
)
.await
.with_context(|| format!("Delegation token creation for {username:?} timedout"))?
Expand Down Expand Up @@ -200,11 +204,26 @@ pub enum OriginalScramState {
AuthSuccess,
}

pub async fn create_delegation_token_for_user(
pub async fn create_delegation_token_for_user_with_wait(
connections: &mut [SinkConnection],
username: StrBytes,
rng: &mut SmallRng,
) -> Result<DelegationToken> {
let create_response = create_delegation_token_for_user(connections, &username, rng).await?;
wait_until_delegation_token_ready_on_all_brokers(connections, &create_response, username)
.await?;

Ok(DelegationToken {
token_id: create_response.token_id.as_str().to_owned(),
hmac: StrBytes::from_string(general_purpose::STANDARD.encode(&create_response.hmac)),
})
}

pub async fn create_delegation_token_for_user(
connections: &mut [SinkConnection],
username: &StrBytes,
rng: &mut SmallRng,
) -> Result<CreateDelegationTokenResponse> {
let connection = connections.choose_mut(rng).unwrap();
connection.send(vec![Message::from_frame(Frame::Kafka(
KafkaFrame::Request {
Expand All @@ -222,32 +241,25 @@ pub async fn create_delegation_token_for_user(
),
},
))])?;
let mut response = connection.recv().await?.pop().unwrap();
let create_response = if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::CreateDelegationToken(response),
..
})) = response.frame()
{
if let Some(err) = ResponseError::try_from_code(response.error_code) {
return Err(anyhow!(
"kafka responded to CreateDelegationToken with error {err}",
));
} else {
response

let response = connection.recv().await?.pop().unwrap();
match response.into_frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::CreateDelegationToken(response),
..
})) => {
if let Some(err) = ResponseError::try_from_code(response.error_code) {
Err(anyhow!(
"kafka responded to CreateDelegationToken with error {err}",
))
} else {
Ok(response)
}
}
} else {
return Err(anyhow!(
response => Err(anyhow!(
"Unexpected response to CreateDelegationToken {response:?}"
));
};

wait_until_delegation_token_ready_on_all_brokers(connections, create_response, username)
.await?;

Ok(DelegationToken {
token_id: create_response.token_id.as_str().to_owned(),
hmac: StrBytes::from_string(general_purpose::STANDARD.encode(&create_response.hmac)),
})
)),
}
}

async fn wait_until_delegation_token_ready_on_all_brokers(
Expand Down

0 comments on commit 8eeb7f8

Please sign in to comment.