From a9f55e57b4e3721b2a2fad9229c6a5b3f356cce8 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 22 Oct 2024 15:23:39 +1100 Subject: [PATCH] Fix sasl with python-kafka --- shotover-proxy/tests/kafka_int_tests/mod.rs | 44 ++-- shotover/benches/benches/codec/kafka.rs | 12 +- shotover/src/codec/kafka.rs | 241 +++++++++++++++--- shotover/src/codec/mod.rs | 17 +- shotover/src/frame/kafka.rs | 10 +- shotover/src/frame/mod.rs | 7 +- shotover/src/message/mod.rs | 17 ++ .../kafka/sink_cluster/kafka_node.rs | 43 +++- .../src/transforms/kafka/sink_cluster/mod.rs | 7 +- test-helpers/src/connection/kafka/python.rs | 52 ++++ 10 files changed, 369 insertions(+), 81 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index a7b559f38..3eaabb642 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -9,6 +9,7 @@ use test_cases::produce_consume_partitions1; use test_cases::produce_consume_partitions3; use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls}; use test_helpers::connection::kafka::node::run_node_smoke_test_scram; +use test_helpers::connection::kafka::python::run_python_smoke_test_sasl_scram; use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver}; use test_helpers::docker_compose::docker_compose; use test_helpers::shotover_process::{Count, EventMatcher}; @@ -37,7 +38,7 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) { } #[tokio::test] -async fn passthrough_nodejs() { +async fn passthrough_nodejs_and_python() { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml"); let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml") @@ -45,23 +46,6 @@ async fn passthrough_nodejs() { .await; test_helpers::connection::kafka::node::run_node_smoke_test("127.0.0.1:9192").await; - - tokio::time::timeout( - Duration::from_secs(10), - shotover.shutdown_and_then_consume_events(&[]), - ) - .await - .expect("Shotover did not shutdown within 10s"); -} - -#[tokio::test] -async fn passthrough_python() { - let _docker_compose = - docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml"); - let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml") - .start() - .await; - test_helpers::connection::kafka::python::run_python_smoke_test("127.0.0.1:9192").await; tokio::time::timeout( @@ -206,6 +190,27 @@ async fn passthrough_sasl_plain(#[case] driver: KafkaDriver) { shotover.shutdown_and_then_consume_events(&[]).await; } +#[cfg(feature = "alpha-transforms")] +#[rstest] +#[tokio::test] +async fn passthrough_sasl_plain_python() { + let _docker_compose = + docker_compose("tests/test-configs/kafka/passthrough-sasl-plain/docker-compose.yaml"); + let shotover = + shotover_process("tests/test-configs/kafka/passthrough-sasl-plain/topology.yaml") + .start() + .await; + + test_helpers::connection::kafka::python::run_python_smoke_test_sasl_plain( + "127.0.0.1:9192", + "user", + "password", + ) + .await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + #[rstest] #[case::java(KafkaDriver::Java)] #[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear @@ -745,7 +750,7 @@ async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, us #[rstest] #[tokio::test] -async fn cluster_sasl_scram_over_mtls_nodejs() { +async fn cluster_sasl_scram_over_mtls_nodejs_and_python() { test_helpers::cert::generate_kafka_test_certs(); let _docker_compose = @@ -757,6 +762,7 @@ async fn cluster_sasl_scram_over_mtls_nodejs() { .await; run_node_smoke_test_scram("127.0.0.1:9192", "super_user", "super_password").await; + run_python_smoke_test_sasl_scram("127.0.0.1:9192", "super_user", "super_password").await; tokio::time::timeout( Duration::from_secs(10), diff --git a/shotover/benches/benches/codec/kafka.rs b/shotover/benches/benches/codec/kafka.rs index c1ab3bea3..323577786 100644 --- a/shotover/benches/benches/codec/kafka.rs +++ b/shotover/benches/benches/codec/kafka.rs @@ -1,7 +1,7 @@ use bytes::{Bytes, BytesMut}; use criterion::{criterion_group, BatchSize, Criterion}; use shotover::codec::kafka::KafkaCodecBuilder; -use shotover::codec::{CodecBuilder, CodecState, Direction}; +use shotover::codec::{CodecBuilder, CodecState, Direction, KafkaCodecState}; use shotover::message::Message; use tokio_util::codec::{Decoder, Encoder}; @@ -77,9 +77,10 @@ fn criterion_benchmark(c: &mut Criterion) { { let mut message = Message::from_bytes( Bytes::from(message.to_vec()), - CodecState::Kafka { + CodecState::Kafka(KafkaCodecState { request_header: None, - }, + raw_sasl: None, + }), ); // force the message to be parsed and clear raw message message.frame(); @@ -113,9 +114,10 @@ fn criterion_benchmark(c: &mut Criterion) { for (message, _) in KAFKA_REQUESTS { let mut message = Message::from_bytes( Bytes::from(message.to_vec()), - CodecState::Kafka { + CodecState::Kafka(KafkaCodecState { request_header: None, - }, + raw_sasl: None, + }), ); // force the message to be parsed and clear raw message message.frame(); diff --git a/shotover/src/codec/kafka.rs b/shotover/src/codec/kafka.rs index 3ce03910b..e61a9bb45 100644 --- a/shotover/src/codec/kafka.rs +++ b/shotover/src/codec/kafka.rs @@ -1,10 +1,14 @@ use super::{message_latency, CodecWriteError, Direction}; -use crate::codec::{CodecBuilder, CodecReadError, CodecState}; -use crate::frame::MessageType; +use crate::codec::{CodecBuilder, CodecReadError, CodecState, KafkaCodecState}; +use crate::frame::kafka::KafkaFrame; +use crate::frame::{Frame, MessageType}; use crate::message::{Encodable, Message, MessageId, Messages}; use anyhow::{anyhow, Result}; use bytes::BytesMut; -use kafka_protocol::messages::ApiKey; +use kafka_protocol::messages::{ + ApiKey, RequestHeader as RequestHeaderProtocol, RequestKind, ResponseHeader, ResponseKind, + SaslAuthenticateRequest, SaslAuthenticateResponse, +}; use metrics::Histogram; use std::sync::mpsc; use std::time::Instant; @@ -12,6 +16,7 @@ use tokio_util::codec::{Decoder, Encoder}; #[derive(Copy, Clone, Debug, PartialEq)] pub struct RequestHeader { + // TODO: this should be i16??? pub api_key: ApiKey, pub version: i16, } @@ -57,15 +62,25 @@ impl CodecBuilder for KafkaCodecBuilder { } } +#[derive(Debug)] pub struct RequestInfo { header: RequestHeader, id: MessageId, + expect_raw_sasl: Option, +} + +#[derive(Debug, Clone, PartialEq, Copy)] +pub enum SaslType { + Plain, + ScramMessage1, + ScramMessage2, } pub struct KafkaDecoder { // Some when Sink (because it receives responses) request_header_rx: Option>, direction: Direction, + expect_raw_sasl: Option, } impl KafkaDecoder { @@ -76,12 +91,13 @@ impl KafkaDecoder { KafkaDecoder { request_header_rx, direction, + expect_raw_sasl: None, } } } fn get_length_of_full_message(src: &BytesMut) -> Option { - if src.len() > 4 { + if src.len() >= 4 { let size = u32::from_be_bytes(src[0..4].try_into().unwrap()) as usize + 4; if size <= src.len() { Some(size) @@ -106,28 +122,133 @@ impl Decoder for KafkaDecoder { self.direction, pretty_hex::pretty_hex(&bytes) ); - let message = if let Some(rx) = self.request_header_rx.as_ref() { - let RequestInfo { header, id } = rx - .recv() - .map_err(|_| CodecReadError::Parser(anyhow!("kafka encoder half was lost")))?; - let mut message = Message::from_bytes_at_instant( - bytes.freeze(), - CodecState::Kafka { - request_header: Some(header), + + struct Meta { + request_header: RequestHeader, + message_id: Option, + } + + let request_info = self + .request_header_rx + .as_ref() + .map(|rx| { + rx.recv() + .map_err(|_| CodecReadError::Parser(anyhow!("kafka encoder half was lost"))) + }) + .transpose()?; + + let message = if self.expect_raw_sasl.is_some() { + // Convert the unframed raw sasl into a framed sasl + // This allows transforms to correctly parse the message and inspect the sasl request + let kafka_frame = match self.direction { + Direction::Source => KafkaFrame::Request { + header: RequestHeaderProtocol::default() + .with_request_api_key(ApiKey::SaslAuthenticateKey as i16), + body: RequestKind::SaslAuthenticate( + SaslAuthenticateRequest::default().with_auth_bytes(bytes.freeze()), + ), }, - Some(received_at), - ); - message.set_request_id(id); - message - } else { - Message::from_bytes_at_instant( - bytes.freeze(), - CodecState::Kafka { - request_header: None, + Direction::Sink => KafkaFrame::Response { + version: 0, + header: ResponseHeader::default(), + body: ResponseKind::SaslAuthenticate( + SaslAuthenticateResponse::default().with_auth_bytes(bytes.freeze()), + // TODO: we need to set with_error_code + ), }, + }; + let codec_state = CodecState::Kafka(KafkaCodecState { + request_header: None, + raw_sasl: self.expect_raw_sasl, + }); + self.expect_raw_sasl = match self.expect_raw_sasl { + Some(SaslType::Plain) => None, + Some(SaslType::ScramMessage1) => Some(SaslType::ScramMessage2), + Some(SaslType::ScramMessage2) => None, + None => None, + }; + Message::from_frame_and_codec_state_at_instant( + Frame::Kafka(kafka_frame), + codec_state, Some(received_at), ) + } else { + let meta = if let Some(RequestInfo { + header, + id, + expect_raw_sasl, + }) = request_info + { + if let Some(expect_raw_sasl) = expect_raw_sasl { + self.expect_raw_sasl = Some(expect_raw_sasl); + } + Meta { + request_header: header, + message_id: Some(id), + } + } else { + Meta { + request_header: RequestHeader { + api_key: ApiKey::try_from(i16::from_be_bytes( + bytes[4..6].try_into().unwrap(), + )) + .unwrap(), + version: i16::from_be_bytes(bytes[6..8].try_into().unwrap()), + }, + message_id: None, + } + }; + let mut message = if let Some(id) = meta.message_id.as_ref() { + let mut message = Message::from_bytes_at_instant( + bytes.freeze(), + CodecState::Kafka(KafkaCodecState { + request_header: Some(meta.request_header), + raw_sasl: None, + }), + Some(received_at), + ); + message.set_request_id(*id); + message + } else { + Message::from_bytes_at_instant( + bytes.freeze(), + CodecState::Kafka(KafkaCodecState { + request_header: None, + raw_sasl: None, + }), + Some(received_at), + ) + }; + + if meta.request_header.api_key == ApiKey::SaslHandshakeKey + && meta.request_header.version == 0 + { + // Only parse the full frame once we manually check its a v0 sasl handshake + if let Some(Frame::Kafka(KafkaFrame::Request { + body: RequestKind::SaslHandshake(sasl_handshake), + .. + })) = message.frame() + { + self.expect_raw_sasl = Some(match sasl_handshake.mechanism.as_str() { + "PLAIN" => SaslType::Plain, + "SCRAM-SHA-512" => SaslType::ScramMessage1, + "SCRAM-SHA-256" => SaslType::ScramMessage1, + mechanism => { + return Err(CodecReadError::Parser(anyhow!( + "Unknown sasl mechanism {mechanism}" + ))) + } + }); + + // Clear raw bytes of the message to force the encoder to encode from frame. + // This is needed because the encoder only has access to the frame if it does not have any raw bytes, + // and the encoder needs to inspect the frame to set its own sasl state. + message.invalidate_cache(); + } + } + message }; + Ok(Some(vec![message])) } else { Ok(None) @@ -167,28 +288,86 @@ impl Encoder for KafkaEncoder { let response_is_dummy = m.response_is_dummy(); let id = m.id(); let received_at = m.received_from_source_or_sink_at; + let codec_state = m.codec_state.as_kafka(); + let mut expect_raw_sasl = None; let result = match m.into_encodable() { Encodable::Bytes(bytes) => { dst.extend_from_slice(&bytes); Ok(()) } - Encodable::Frame(frame) => frame.into_kafka().unwrap().encode(dst), + Encodable::Frame(frame) => { + if codec_state.raw_sasl.is_some() { + match frame { + Frame::Kafka(KafkaFrame::Request { + body: RequestKind::SaslAuthenticate(body), + .. + }) => { + dst.extend_from_slice(&body.auth_bytes); + } + Frame::Kafka(KafkaFrame::Response { + body: ResponseKind::SaslAuthenticate(body), + .. + }) => { + dst.extend_from_slice(&body.auth_bytes); + } + _ => unreachable!("not expected {frame:?}"), + } + Ok(()) + } else { + let frame = frame.into_kafka().unwrap(); + // it is garanteed that all v0 SaslHandshakes will be in a parsed state since we parse it in the KafkaDecoder. + if let KafkaFrame::Request { + body: RequestKind::SaslHandshake(sasl_handshake), + header, + } = &frame + { + if header.request_api_version == 0 { + expect_raw_sasl = Some(match sasl_handshake.mechanism.as_str() { + "PLAIN" => SaslType::Plain, + "SCRAM-SHA-512" => SaslType::ScramMessage1, + "SCRAM-SHA-256" => SaslType::ScramMessage1, + mechanism => { + return Err(CodecWriteError::Encoder(anyhow!( + "Unknown sasl mechanism {mechanism}" + ))) + } + }); + } + } + frame.encode(dst) + } + } }; // Skip if the message wrote nothing to dst, possibly due to being a dummy message. // or if it will generate a dummy response if !dst[start..].is_empty() && !response_is_dummy { if let Some(tx) = self.request_header_tx.as_ref() { - let api_key = i16::from_be_bytes(dst[start + 4..start + 6].try_into().unwrap()); - let version = i16::from_be_bytes(dst[start + 6..start + 8].try_into().unwrap()); - let api_key = ApiKey::try_from(api_key).map_err(|_| { - CodecWriteError::Encoder(anyhow!("unknown api key {api_key}")) - })?; - tx.send(RequestInfo { - header: RequestHeader { api_key, version }, + let header = if codec_state.raw_sasl.is_some() { + RequestHeader { + api_key: ApiKey::SaslAuthenticateKey, + version: 0, + } + } else { + let api_key = + i16::from_be_bytes(dst[start + 4..start + 6].try_into().unwrap()); + let version = + i16::from_be_bytes(dst[start + 6..start + 8].try_into().unwrap()); + // TODO: handle unknown API key + let api_key = ApiKey::try_from(api_key).map_err(|_| { + CodecWriteError::Encoder(anyhow!("unknown api key {api_key}")) + })?; + + RequestHeader { api_key, version } + }; + + let request_info = RequestInfo { + header, id, - }) - .map_err(|e| CodecWriteError::Encoder(anyhow!(e)))?; + expect_raw_sasl, + }; + tx.send(request_info) + .map_err(|e| CodecWriteError::Encoder(anyhow!(e)))?; } } diff --git a/shotover/src/codec/mod.rs b/shotover/src/codec/mod.rs index f750ab526..6a373cfe5 100644 --- a/shotover/src/codec/mod.rs +++ b/shotover/src/codec/mod.rs @@ -6,6 +6,8 @@ use cassandra_protocol::compression::Compression; use core::fmt; #[cfg(feature = "kafka")] use kafka::RequestHeader; +#[cfg(feature = "kafka")] +use kafka::SaslType; use metrics::{histogram, Histogram}; use tokio_util::codec::{Decoder, Encoder}; @@ -53,9 +55,7 @@ pub enum CodecState { #[cfg(feature = "redis")] Redis, #[cfg(feature = "kafka")] - Kafka { - request_header: Option, - }, + Kafka(KafkaCodecState), Dummy, #[cfg(feature = "opensearch")] OpenSearch, @@ -73,9 +73,9 @@ impl CodecState { } #[cfg(feature = "kafka")] - pub fn as_kafka(&self) -> Option { + pub fn as_kafka(&self) -> KafkaCodecState { match self { - CodecState::Kafka { request_header } => *request_header, + CodecState::Kafka(state) => *state, _ => { panic!("This is a {self:?}, expected CodecState::Kafka") } @@ -83,6 +83,13 @@ impl CodecState { } } +#[cfg(feature = "kafka")] +#[derive(Debug, Clone, PartialEq, Copy)] +pub struct KafkaCodecState { + pub request_header: Option, + pub raw_sasl: Option, +} + #[derive(Debug)] pub enum CodecReadError { /// The codec failed to parse a received message diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index 68d373648..c08773b1a 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -1,4 +1,5 @@ use crate::codec::kafka::RequestHeader as CodecRequestHeader; +use crate::codec::KafkaCodecState; use anyhow::{anyhow, Context, Result}; use bytes::{BufMut, Bytes, BytesMut}; use kafka_protocol::messages::{ApiKey, RequestHeader, ResponseHeader}; @@ -68,15 +69,12 @@ impl Display for KafkaFrame { } impl KafkaFrame { - pub fn from_bytes( - mut bytes: Bytes, - request_header: Option, - ) -> Result { + pub fn from_bytes(mut bytes: Bytes, codec_state: KafkaCodecState) -> Result { // remove length header let _ = bytes.split_to(4); - match request_header { - Some(request_header) => KafkaFrame::parse_response(bytes, request_header), + match &codec_state.request_header { + Some(request_header) => KafkaFrame::parse_response(bytes, *request_header), None => KafkaFrame::parse_request(bytes), } } diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index c138c100e..51964faea 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -1,6 +1,8 @@ //! parsed AST-like representations of messages use crate::codec::CodecState; +#[cfg(feature = "kafka")] +use crate::codec::KafkaCodecState; use anyhow::{anyhow, Result}; use bytes::Bytes; #[cfg(feature = "cassandra")] @@ -94,9 +96,10 @@ impl Frame { #[cfg(feature = "redis")] Frame::Redis(_) => CodecState::Redis, #[cfg(feature = "kafka")] - Frame::Kafka(_) => CodecState::Kafka { + Frame::Kafka(_) => CodecState::Kafka(KafkaCodecState { request_header: None, - }, + raw_sasl: None, + }), Frame::Dummy => CodecState::Dummy, #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => CodecState::OpenSearch, diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 160f7888f..64fe897cd 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -158,6 +158,23 @@ impl Message { } } + /// This method should be called when you have just a Frame of a message. + /// This is expected to be used by transforms that are generating custom messages. + /// Providing just the Frame results in better performance when only the Frame is available. + pub fn from_frame_and_codec_state_at_instant( + frame: Frame, + codec_state: CodecState, + received_from_source_or_sink_at: Option, + ) -> Self { + Message { + codec_state, + inner: Some(MessageInner::Modified { frame }), + received_from_source_or_sink_at, + id: rand::random(), + request_id: None, + } + } + /// This method should be called when generating a new request travelling down a seperate chain to an original request. /// The generated request will share the same MessageId as the message it is diverged from. pub fn from_frame_diverged(frame: Frame, diverged_from: &Message) -> Self { diff --git a/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs index 9c06dc9f0..5811da4c2 100644 --- a/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs @@ -117,7 +117,7 @@ impl ConnectionFactory { scram_over_mtls.original_scram_state, OriginalScramState::AuthSuccess ) { - // The original connection is authorized, so we are free to make authorize more session + // The original connection is authorized, so we are free to authorize more sessions self.perform_tokenauth_scram_exchange(scram_over_mtls, connection) .await .context("Failed to perform delegation token SCRAM exchange") @@ -144,10 +144,19 @@ impl ConnectionFactory { scram_over_mtls: &AuthorizeScramOverMtls, connection: &mut SinkConnection, ) -> Result<()> { - let mut auth_requests = self.auth_requests.clone(); - // send/receive SaslHandshake - connection.send(vec![auth_requests.remove(0)])?; + let mut sasl_handshake_request = self.auth_requests.first().unwrap().clone(); + if let Some(Frame::Kafka(KafkaFrame::Request { header, .. })) = + sasl_handshake_request.frame() + { + // If the request is version 0 it requires SaslAuthenticate messages to be sent as raw bytes which is impossible. + // So instead force it to version 1. + if header.request_api_version == 0 { + header.request_api_version = 1; + sasl_handshake_request.invalidate_cache(); + } + } + connection.send(vec![sasl_handshake_request])?; let mut handshake_response = connection.recv().await?.pop().unwrap(); if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::SaslHandshake(handshake_response), @@ -176,19 +185,35 @@ impl ConnectionFactory { ) .map_err(|x| anyhow!("{x:?}"))? .with_first_extensions("tokenauth=true".to_owned()); - connection.send(vec![Self::create_auth_request(scram.initial())])?; + connection + .send(vec![Self::create_auth_request(scram.initial())]) + .context("Failed to send first SCRAM request")?; // SCRAM server-first - let first_scram_response = connection.recv().await?.pop().unwrap(); + let first_scram_response = connection + .recv() + .await + .context("Failed to receive first scram response")? + .pop() + .unwrap(); let first_scram_response = Self::process_auth_response(first_scram_response) .context("first response to delegation token SCRAM reported an error")?; // SCRAM client-final - let final_scram_request = scram.response(&first_scram_response)?; - connection.send(vec![Self::create_auth_request(final_scram_request)])?; + let final_scram_request = scram + .response(&first_scram_response) + .context("Failed to generate final scram request")?; + connection + .send(vec![Self::create_auth_request(final_scram_request)]) + .context("Failed to send final SCRAM request")?; // SCRAM server-final - let final_scram_response = connection.recv().await?.pop().unwrap(); + let final_scram_response = connection + .recv() + .await + .context("Failed to receive second scram response")? + .pop() + .unwrap(); let final_scram_response = Self::process_auth_response(final_scram_response) .context("final response to delegation token SCRAM reported an error")?; scram diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 980bf379d..d03a568ad 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -2972,10 +2972,9 @@ impl KafkaSinkCluster { metadata.controller_id = shotover_node.broker_id; } else { - return Err(anyhow!( - "Invalid metadata, controller points at unknown broker {:?}", - metadata.controller_id - )); + // controller is either -1 or an unknown broker + // In both cases it is reasonable to set to -1 to indicate the controller is unknown. + metadata.controller_id = BrokerId(-1); } Ok(()) diff --git a/test-helpers/src/connection/kafka/python.rs b/test-helpers/src/connection/kafka/python.rs index 4013c782f..90c7e62b1 100644 --- a/test-helpers/src/connection/kafka/python.rs +++ b/test-helpers/src/connection/kafka/python.rs @@ -28,6 +28,58 @@ pub async fn run_python_smoke_test(address: &str) { .unwrap(); } +pub async fn run_python_smoke_test_sasl_plain(address: &str, user: &str, password: &str) { + ensure_uv_is_installed().await; + + let project_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/kafka/python"); + let uv_binary = uv_binary_path(); + let config = format!( + r#"{{ + 'bootstrap_servers': ["{address}"], + 'security_protocol': "SASL_PLAINTEXT", + 'sasl_mechanism': "PLAIN", + 'sasl_plain_username': "{user}", + 'sasl_plain_password': "{password}", +}}"# + ); + tokio::time::timeout( + Duration::from_secs(60), + run_command_async( + &project_dir, + uv_binary.to_str().unwrap(), + &["run", "main.py", &config], + ), + ) + .await + .unwrap(); +} + +pub async fn run_python_smoke_test_sasl_scram(address: &str, user: &str, password: &str) { + ensure_uv_is_installed().await; + + let project_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/kafka/python"); + let uv_binary = uv_binary_path(); + let config = format!( + r#"{{ + 'bootstrap_servers': ["{address}"], + 'security_protocol': "SASL_PLAINTEXT", + 'sasl_mechanism': "SCRAM-SHA-256", + 'sasl_plain_username': "{user}", + 'sasl_plain_password': "{password}", +}}"# + ); + tokio::time::timeout( + Duration::from_secs(60), + run_command_async( + &project_dir, + uv_binary.to_str().unwrap(), + &["run", "main.py", &config], + ), + ) + .await + .unwrap(); +} + /// Install a specific version of UV to: /// * avoid developers having to manually install an external tool /// * avoid issues due to a different version being installed