diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index e79381847..a7b559f38 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -54,6 +54,24 @@ async fn passthrough_nodejs() { .expect("Shotover did not shutdown within 10s"); } +#[tokio::test] +async fn passthrough_python() { + let _docker_compose = + docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml"); + let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml") + .start() + .await; + + test_helpers::connection::kafka::python::run_python_smoke_test("127.0.0.1:9192").await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); +} + #[rstest] #[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 0c3886c35..8d6b3ae70 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -8,6 +8,7 @@ use std::{ pub mod cpp; pub mod java; pub mod node; +pub mod python; use anyhow::Result; #[cfg(feature = "kafka-cpp-driver-tests")] diff --git a/test-helpers/src/connection/kafka/node.rs b/test-helpers/src/connection/kafka/node.rs index 2ee105693..771a95ea2 100644 --- a/test-helpers/src/connection/kafka/node.rs +++ b/test-helpers/src/connection/kafka/node.rs @@ -1,5 +1,7 @@ use std::path::Path; +use crate::run_command_async; + pub async fn run_node_smoke_test(address: &str) { let dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/kafka/node"); let config = format!( @@ -8,8 +10,8 @@ pub async fn run_node_smoke_test(address: &str) { brokers: ["{address}"], }})"# ); - run_command(&dir, "npm", &["install"]).await; - run_command(&dir, "npm", &["start", &config]).await; + run_command_async(&dir, "npm", &["install"]).await; + run_command_async(&dir, "npm", &["start", &config]).await; } pub async fn run_node_smoke_test_scram(address: &str, user: &str, password: &str) { @@ -25,22 +27,6 @@ pub async fn run_node_smoke_test_scram(address: &str, user: &str, password: &str }} }})"# ); - run_command(&dir, "npm", &["install"]).await; - run_command(&dir, "npm", &["start", &config]).await; -} - -async fn run_command(current_dir: &Path, command: &str, args: &[&str]) -> String { - let output = tokio::process::Command::new(command) - .args(args) - .current_dir(current_dir) - .output() - .await - .unwrap(); - - let stdout = String::from_utf8(output.stdout).unwrap(); - let stderr = String::from_utf8(output.stderr).unwrap(); - if !output.status.success() { - panic!("command {command} {args:?} failed:\nstdout:\n{stdout}\nstderr:\n{stderr}") - } - stdout + run_command_async(&dir, "npm", &["install"]).await; + run_command_async(&dir, "npm", &["start", &config]).await; } diff --git a/test-helpers/src/connection/kafka/python.rs b/test-helpers/src/connection/kafka/python.rs new file mode 100644 index 000000000..4a44a9033 --- /dev/null +++ b/test-helpers/src/connection/kafka/python.rs @@ -0,0 +1,64 @@ +use tokio::process::Command; + +use crate::run_command_async; +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; + +pub async fn run_python_smoke_test(address: &str) { + ensure_uv_is_installed().await; + + let project_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/kafka/python"); + let uv_binary = uv_binary_path(); + let config = format!( + r#"{{ + 'bootstrap_servers': ["{address}"], +}}"# + ); + tokio::time::timeout( + Duration::from_secs(60), + run_command_async( + &project_dir, + uv_binary.to_str().unwrap(), + &["run", "main.py", &config], + ), + ) + .await + .unwrap(); +} + +/// Install a specific version of UV to: +/// * avoid developers having to manually install an external tool +/// * avoid issues due to a different version being installed +pub async fn ensure_uv_is_installed() { + let uv_binary = uv_binary_path(); + + if let Ok(output) = Command::new(uv_binary).arg("--help").output().await { + if output.status.success() { + // already correctly installed + return; + } + } + + // Install to this custom path to avoid overwriting any UV already installed by the user. + // Specifically uses `..` instead of absolute path to avoid spaces messing up the bash script + let path = "../target/uv"; + + run_command_async( + Path::new("."), + "bash", + &[ + "-c", + &format!("curl -LsSf https://astral.sh/uv/0.4.6/install.sh | env INSTALLER_NO_MODIFY_PATH=1 UV_INSTALL_DIR={path} sh"), + ], + ) + .await +} + +fn uv_binary_path() -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .join("target/uv/bin/uv") +} diff --git a/test-helpers/src/connection/kafka/python/.python-version b/test-helpers/src/connection/kafka/python/.python-version new file mode 100644 index 000000000..e4fba2183 --- /dev/null +++ b/test-helpers/src/connection/kafka/python/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/test-helpers/src/connection/kafka/python/main.py b/test-helpers/src/connection/kafka/python/main.py new file mode 100644 index 000000000..927383fe6 --- /dev/null +++ b/test-helpers/src/connection/kafka/python/main.py @@ -0,0 +1,30 @@ +from kafka import KafkaConsumer +from kafka import KafkaProducer +import sys + +def main(): + config = eval(sys.argv[1]) + print("Running kafka-python script with config:") + print(config) + + producer = KafkaProducer(**config) + producer.send('test_topic', b'some_message_bytes').get(timeout=10) + producer.send('test_topic', b'another_message').get(timeout=10) + + consumer = KafkaConsumer('test_topic', auto_offset_reset='earliest', **config) + + msg = next(consumer) + assert(msg.topic == "test_topic") + assert(msg.value == b"some_message_bytes") + assert(msg.offset == 0) + + msg = next(consumer) + assert(msg.topic == "test_topic") + assert(msg.value == b"another_message") + assert(msg.offset == 1) + + print("kafka-python script passed all test cases") + + +if __name__ == "__main__": + main() diff --git a/test-helpers/src/connection/kafka/python/pyproject.toml b/test-helpers/src/connection/kafka/python/pyproject.toml new file mode 100644 index 000000000..d0190b401 --- /dev/null +++ b/test-helpers/src/connection/kafka/python/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "python" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "kafka-python-ng>=2.2.3", +] diff --git a/test-helpers/src/connection/kafka/python/uv.lock b/test-helpers/src/connection/kafka/python/uv.lock new file mode 100644 index 000000000..3ff2f933f --- /dev/null +++ b/test-helpers/src/connection/kafka/python/uv.lock @@ -0,0 +1,22 @@ +version = 1 +requires-python = ">=3.12" + +[[package]] +name = "kafka-python-ng" +version = "2.2.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ce/04/1d65bdf3f0103a08710e226b851de4b357ac702f1cadabf6128bab7518a7/kafka_python_ng-2.2.3.tar.gz", hash = "sha256:f79f28e10ade9b5a9860b2ec15b7cc8dc510d5702f5a399430478cff5f93a05a", size = 330644 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0f/61/22e778f642465a157c449782300d8817ebbc106794a8a7ebe88cbb846b05/kafka_python_ng-2.2.3-py2.py3-none-any.whl", hash = "sha256:adc6e82147c441ca4ae1f22e291fc08efab0d10971cbd4aa1481d2ffa38e9480", size = 232824 }, +] + +[[package]] +name = "python" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "kafka-python-ng" }, +] + +[package.metadata] +requires-dist = [{ name = "kafka-python-ng", specifier = ">=2.2.3" }] diff --git a/test-helpers/src/lib.rs b/test-helpers/src/lib.rs index b704c8f48..ba3a52bc4 100644 --- a/test-helpers/src/lib.rs +++ b/test-helpers/src/lib.rs @@ -7,6 +7,7 @@ pub mod shotover_process; mod test_tracing; use anyhow::{anyhow, Result}; +use std::path::Path; use subprocess::{Exec, Redirection}; /// Runs a command and returns the output as a string. @@ -36,3 +37,16 @@ pub fn run_command(command: &str, args: &[&str]) -> Result { )) } } + +pub async fn run_command_async(current_dir: &Path, command: &str, args: &[&str]) { + let output = tokio::process::Command::new(command) + .args(args) + .current_dir(current_dir) + .status() + .await + .unwrap(); + + if !output.success() { + panic!("command {command} {args:?} failed. See above output.") + } +}