diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/node.rs index 1cf4581a0..84011703a 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/node.rs @@ -7,8 +7,6 @@ use crate::message::Message; use crate::tls::TlsConnector; use crate::transforms::kafka::sink_cluster::SASL_SCRAM_MECHANISMS; use anyhow::{anyhow, Context, Result}; -use base64::engine::general_purpose; -use base64::Engine; use bytes::Bytes; use kafka_protocol::messages::{ApiKey, BrokerId, RequestHeader, SaslAuthenticateRequest}; use kafka_protocol::protocol::{Builder, StrBytes}; @@ -143,10 +141,9 @@ impl ConnectionFactory { } // SCRAM client-first - let hmac = general_purpose::STANDARD.encode(&scram_over_mtls.delegation_token.hmac); let mut scram = Scram::::new( scram_over_mtls.delegation_token.token_id.clone(), - hmac, + scram_over_mtls.delegation_token.hmac.to_string(), ChannelBinding::None, "tokenauth=true".to_owned(), String::new(), diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs index d9796875e..db70b0f5e 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs @@ -9,11 +9,13 @@ use crate::{ tls::{TlsConnector, TlsConnectorConfig}, }; use anyhow::{anyhow, Context, Result}; +use base64::{engine::general_purpose, Engine}; use futures::stream::FuturesUnordered; use kafka_protocol::{ messages::{ describe_delegation_token_request::DescribeDelegationTokenOwner, ApiKey, - CreateDelegationTokenRequest, DescribeDelegationTokenRequest, RequestHeader, + CreateDelegationTokenRequest, CreateDelegationTokenResponse, + DescribeDelegationTokenRequest, RequestHeader, }, protocol::{Builder, StrBytes}, ResponseError, @@ -176,7 +178,7 @@ impl AuthorizeScramOverMtlsBuilder { token_task: self.token_task.clone(), delegation_token: DelegationToken { token_id: String::new(), - hmac: vec![], + hmac: StrBytes::default(), }, } } @@ -221,7 +223,7 @@ pub async fn create_delegation_token_for_user( }, ))])?; let mut response = connection.recv().await?.pop().unwrap(); - let token = if let Some(Frame::Kafka(KafkaFrame::Response { + let create_response = if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::CreateDelegationToken(response), .. })) = response.frame() @@ -231,10 +233,7 @@ pub async fn create_delegation_token_for_user( "kafka responded to CreateDelegationToken with error {err}", )); } else { - DelegationToken { - token_id: response.token_id.as_str().to_owned(), - hmac: response.hmac.to_vec(), - } + response } } else { return Err(anyhow!( @@ -242,19 +241,23 @@ pub async fn create_delegation_token_for_user( )); }; - wait_until_delegation_token_ready_on_all_brokers(connections, &token, username.clone()).await?; + wait_until_delegation_token_ready_on_all_brokers(connections, create_response, username) + .await?; - Ok(token) + Ok(DelegationToken { + token_id: create_response.token_id.as_str().to_owned(), + hmac: StrBytes::from_string(general_purpose::STANDARD.encode(&create_response.hmac)), + }) } async fn wait_until_delegation_token_ready_on_all_brokers( connections: &mut [SinkConnection], - token: &DelegationToken, + create_response: &CreateDelegationTokenResponse, username: StrBytes, ) -> Result<()> { let connections_len = connections.len(); for (i, connection) in connections.iter_mut().enumerate() { - while !is_delegation_token_ready(connection, token, username.clone()) + while !is_delegation_token_ready(connection, create_response, username.clone()) .await .with_context(|| { format!("Failed to check delegation token was ready. Succesful connections {i}/{connections_len}") @@ -273,7 +276,7 @@ async fn wait_until_delegation_token_ready_on_all_brokers( /// Returns Err(_) if an error occured with the kafka connection. async fn is_delegation_token_ready( connection: &mut SinkConnection, - token: &DelegationToken, + create_response: &CreateDelegationTokenResponse, username: StrBytes, ) -> Result { // TODO: Create a single request Message, convert it into raw bytes, and then reuse for all following requests @@ -312,7 +315,7 @@ async fn is_delegation_token_ready( if response .tokens .iter() - .any(|x| x.hmac == token.hmac && x.token_id.as_str() == token.token_id) + .any(|x| x.hmac == create_response.hmac && x.token_id == create_response.token_id) { Ok(true) } else { @@ -328,6 +331,5 @@ async fn is_delegation_token_ready( #[derive(Clone)] pub struct DelegationToken { pub token_id: String, - // TODO: store as base64 string - pub hmac: Vec, + pub hmac: StrBytes, }