Skip to content

Commit

Permalink
Fix sasl with python-kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 24, 2024
1 parent 9930709 commit a9f55e5
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 81 deletions.
44 changes: 25 additions & 19 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use test_cases::produce_consume_partitions1;
use test_cases::produce_consume_partitions3;
use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls};
use test_helpers::connection::kafka::node::run_node_smoke_test_scram;
use test_helpers::connection::kafka::python::run_python_smoke_test_sasl_scram;
use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver};
use test_helpers::docker_compose::docker_compose;
use test_helpers::shotover_process::{Count, EventMatcher};
Expand Down Expand Up @@ -37,31 +38,14 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) {
}

#[tokio::test]
async fn passthrough_nodejs() {
async fn passthrough_nodejs_and_python() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml")
.start()
.await;

test_helpers::connection::kafka::node::run_node_smoke_test("127.0.0.1:9192").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[tokio::test]
async fn passthrough_python() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml")
.start()
.await;

test_helpers::connection::kafka::python::run_python_smoke_test("127.0.0.1:9192").await;

tokio::time::timeout(
Expand Down Expand Up @@ -206,6 +190,27 @@ async fn passthrough_sasl_plain(#[case] driver: KafkaDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "alpha-transforms")]
#[rstest]
#[tokio::test]
async fn passthrough_sasl_plain_python() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl-plain/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/passthrough-sasl-plain/topology.yaml")
.start()
.await;

test_helpers::connection::kafka::python::run_python_smoke_test_sasl_plain(
"127.0.0.1:9192",
"user",
"password",
)
.await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
Expand Down Expand Up @@ -745,7 +750,7 @@ async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, us

#[rstest]
#[tokio::test]
async fn cluster_sasl_scram_over_mtls_nodejs() {
async fn cluster_sasl_scram_over_mtls_nodejs_and_python() {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
Expand All @@ -757,6 +762,7 @@ async fn cluster_sasl_scram_over_mtls_nodejs() {
.await;

run_node_smoke_test_scram("127.0.0.1:9192", "super_user", "super_password").await;
run_python_smoke_test_sasl_scram("127.0.0.1:9192", "super_user", "super_password").await;

tokio::time::timeout(
Duration::from_secs(10),
Expand Down
12 changes: 7 additions & 5 deletions shotover/benches/benches/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::{Bytes, BytesMut};
use criterion::{criterion_group, BatchSize, Criterion};
use shotover::codec::kafka::KafkaCodecBuilder;
use shotover::codec::{CodecBuilder, CodecState, Direction};
use shotover::codec::{CodecBuilder, CodecState, Direction, KafkaCodecState};
use shotover::message::Message;
use tokio_util::codec::{Decoder, Encoder};

Expand Down Expand Up @@ -77,9 +77,10 @@ fn criterion_benchmark(c: &mut Criterion) {
{
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
CodecState::Kafka {
CodecState::Kafka(KafkaCodecState {
request_header: None,
},
raw_sasl: None,
}),
);
// force the message to be parsed and clear raw message
message.frame();
Expand Down Expand Up @@ -113,9 +114,10 @@ fn criterion_benchmark(c: &mut Criterion) {
for (message, _) in KAFKA_REQUESTS {
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
CodecState::Kafka {
CodecState::Kafka(KafkaCodecState {
request_header: None,
},
raw_sasl: None,
}),
);
// force the message to be parsed and clear raw message
message.frame();
Expand Down
Loading

0 comments on commit a9f55e5

Please sign in to comment.