Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add python-kafka integration test #1782

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ async fn passthrough_nodejs() {
.expect("Shotover did not shutdown within 10s");
}

#[tokio::test]
async fn passthrough_python() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml")
.start()
.await;

test_helpers::connection::kafka::python::run_python_smoke_test("127.0.0.1:9192").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
Expand Down
1 change: 1 addition & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
pub mod cpp;
pub mod java;
pub mod node;
pub mod python;

use anyhow::Result;
#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down
26 changes: 6 additions & 20 deletions test-helpers/src/connection/kafka/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::path::Path;

use crate::run_command_async;

pub async fn run_node_smoke_test(address: &str) {
let dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/kafka/node");
let config = format!(
Expand All @@ -8,8 +10,8 @@ pub async fn run_node_smoke_test(address: &str) {
brokers: ["{address}"],
}})"#
);
run_command(&dir, "npm", &["install"]).await;
run_command(&dir, "npm", &["start", &config]).await;
run_command_async(&dir, "npm", &["install"]).await;
run_command_async(&dir, "npm", &["start", &config]).await;
}

pub async fn run_node_smoke_test_scram(address: &str, user: &str, password: &str) {
Expand All @@ -25,22 +27,6 @@ pub async fn run_node_smoke_test_scram(address: &str, user: &str, password: &str
}}
}})"#
);
run_command(&dir, "npm", &["install"]).await;
run_command(&dir, "npm", &["start", &config]).await;
}

async fn run_command(current_dir: &Path, command: &str, args: &[&str]) -> String {
let output = tokio::process::Command::new(command)
.args(args)
.current_dir(current_dir)
.output()
.await
.unwrap();

let stdout = String::from_utf8(output.stdout).unwrap();
let stderr = String::from_utf8(output.stderr).unwrap();
if !output.status.success() {
panic!("command {command} {args:?} failed:\nstdout:\n{stdout}\nstderr:\n{stderr}")
}
stdout
run_command_async(&dir, "npm", &["install"]).await;
run_command_async(&dir, "npm", &["start", &config]).await;
}
63 changes: 63 additions & 0 deletions test-helpers/src/connection/kafka/python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use tokio::process::Command;

use crate::run_command_async;
use std::{
path::{Path, PathBuf},
time::Duration,
};

pub async fn run_python_smoke_test(address: &str) {
ensure_uv_is_installed().await;

let project_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/kafka/python");
let uv_binary = uv_binary_path();
let config = format!(
r#"{{
'bootstrap_servers': ["{address}"],
}}"#
);
tokio::time::timeout(
Duration::from_secs(60),
run_command_async(
&project_dir,
uv_binary.to_str().unwrap(),
&["run", "main.py", &config],
),
)
.await
.unwrap();
}

/// Install a specific version of UV to:
/// * avoid developers having to manually install an external tool
/// * avoid issues due to a different version being installed
pub async fn ensure_uv_is_installed() {
let uv_binary = uv_binary_path();

if let Ok(output) = Command::new(uv_binary).arg("--help").output().await {
if output.status.success() {
// already correctly installed
return;
}
}

run_command_async(
Path::new("."),
"bash",
&[
"-c",
// Install to a custom path to avoid overwriting any UV already installed by the user.
// Specifically uses `..` instead of absolute path to avoid spaces messing up the bash script.
"curl -LsSf https://astral.sh/uv/0.4.6/install.sh | env INSTALLER_NO_MODIFY_PATH=1 UV_INSTALL_DIR=../target/uv sh",
],
)
.await
}

/// The path the uv binary is installed to
fn uv_binary_path() -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.join("target/uv/bin/uv")
}
1 change: 1 addition & 0 deletions test-helpers/src/connection/kafka/python/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12
30 changes: 30 additions & 0 deletions test-helpers/src/connection/kafka/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from kafka import KafkaConsumer
from kafka import KafkaProducer
import sys

def main():
config = eval(sys.argv[1])
print("Running kafka-python script with config:")
print(config)

producer = KafkaProducer(**config)
producer.send('test_topic', b'some_message_bytes').get(timeout=10)
producer.send('test_topic', b'another_message').get(timeout=10)

consumer = KafkaConsumer('test_topic', auto_offset_reset='earliest', **config)

msg = next(consumer)
assert(msg.topic == "test_topic")
assert(msg.value == b"some_message_bytes")
assert(msg.offset == 0)

msg = next(consumer)
assert(msg.topic == "test_topic")
assert(msg.value == b"another_message")
assert(msg.offset == 1)

print("kafka-python script passed all test cases")


if __name__ == "__main__":
main()
7 changes: 7 additions & 0 deletions test-helpers/src/connection/kafka/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[project]
name = "kafka-smoke-test"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"kafka-python-ng>=2.2.3",
]
22 changes: 22 additions & 0 deletions test-helpers/src/connection/kafka/python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod shotover_process;
mod test_tracing;

use anyhow::{anyhow, Result};
use std::path::Path;
use subprocess::{Exec, Redirection};

/// Runs a command and returns the output as a string.
Expand Down Expand Up @@ -36,3 +37,16 @@ pub fn run_command(command: &str, args: &[&str]) -> Result<String> {
))
}
}

pub async fn run_command_async(current_dir: &Path, command: &str, args: &[&str]) {
let output = tokio::process::Command::new(command)
.args(args)
.current_dir(current_dir)
.status()
.await
.unwrap();

if !output.success() {
panic!("command {command} {args:?} failed. See above output.")
}
}
Loading