Skip to content

Commit

Permalink
Fix sasl with python-kafka (#1783)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 25, 2024
1 parent 9930709 commit 421ec9e
Show file tree
Hide file tree
Showing 13 changed files with 491 additions and 112 deletions.
101 changes: 70 additions & 31 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use test_cases::produce_consume_partitions1;
use test_cases::produce_consume_partitions3;
use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls};
use test_helpers::connection::kafka::node::run_node_smoke_test_scram;
use test_helpers::connection::kafka::python::run_python_bad_auth_sasl_scram;
use test_helpers::connection::kafka::python::run_python_smoke_test_sasl_scram;
use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver};
use test_helpers::docker_compose::docker_compose;
use test_helpers::shotover_process::{Count, EventMatcher};
Expand Down Expand Up @@ -37,31 +39,14 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) {
}

#[tokio::test]
async fn passthrough_nodejs() {
async fn passthrough_nodejs_and_python() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml")
.start()
.await;

test_helpers::connection::kafka::node::run_node_smoke_test("127.0.0.1:9192").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[tokio::test]
async fn passthrough_python() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml")
.start()
.await;

test_helpers::connection::kafka::python::run_python_smoke_test("127.0.0.1:9192").await;

tokio::time::timeout(
Expand Down Expand Up @@ -206,6 +191,27 @@ async fn passthrough_sasl_plain(#[case] driver: KafkaDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "alpha-transforms")]
#[rstest]
#[tokio::test]
async fn passthrough_sasl_plain_python() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl-plain/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/passthrough-sasl-plain/topology.yaml")
.start()
.await;

test_helpers::connection::kafka::python::run_python_smoke_test_sasl_plain(
"127.0.0.1:9192",
"user",
"password",
)
.await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
Expand Down Expand Up @@ -745,25 +751,58 @@ async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, us

#[rstest]
#[tokio::test]
async fn cluster_sasl_scram_over_mtls_nodejs() {
async fn cluster_sasl_scram_over_mtls_nodejs_and_python() {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml");
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

run_node_smoke_test_scram("127.0.0.1:9192", "super_user", "super_password").await;
{
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
run_node_smoke_test_scram("127.0.0.1:9192", "super_user", "super_password").await;
run_python_smoke_test_sasl_scram("127.0.0.1:9192", "super_user", "super_password").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

{
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

run_python_bad_auth_sasl_scram("127.0.0.1:9192", "incorrect_user", "super_password").await;
run_python_bad_auth_sasl_scram("127.0.0.1:9192", "super_user", "incorrect_password").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover::server")
.with_message(r#"encountered an error when flushing the chain kafka for shutdown
Caused by:
0: KafkaSinkCluster transform failed
1: Failed to receive responses (without sending requests)
2: Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted.
3: Failed to receive from ControlConnection
4: The other side of this connection closed the connection"#)
.with_count(Count::Times(2))]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
Expand Down
11 changes: 7 additions & 4 deletions shotover/benches/benches/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::{Bytes, BytesMut};
use criterion::{criterion_group, BatchSize, Criterion};
use shotover::codec::kafka::KafkaCodecBuilder;
use shotover::codec::kafka::KafkaCodecState;
use shotover::codec::{CodecBuilder, CodecState, Direction};
use shotover::message::Message;
use tokio_util::codec::{Decoder, Encoder};
Expand Down Expand Up @@ -77,9 +78,10 @@ fn criterion_benchmark(c: &mut Criterion) {
{
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
CodecState::Kafka {
CodecState::Kafka(KafkaCodecState {
request_header: None,
},
raw_sasl: false,
}),
);
// force the message to be parsed and clear raw message
message.frame();
Expand Down Expand Up @@ -113,9 +115,10 @@ fn criterion_benchmark(c: &mut Criterion) {
for (message, _) in KAFKA_REQUESTS {
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
CodecState::Kafka {
CodecState::Kafka(KafkaCodecState {
request_header: None,
},
raw_sasl: false,
}),
);
// force the message to be parsed and clear raw message
message.frame();
Expand Down
Loading

0 comments on commit 421ec9e

Please sign in to comment.