diff --git a/.github/workflows/windsock_benches.yaml b/.github/workflows/windsock_benches.yaml index 664bb5035..14d4ba550 100644 --- a/.github/workflows/windsock_benches.yaml +++ b/.github/workflows/windsock_benches.yaml @@ -26,7 +26,7 @@ jobs: run: | cargo windsock --bench-length-seconds 5 --operations-per-second 100 cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers flamegraph --name cassandra,compression=none,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single - cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers flamegraph --name kafka,shotover=standard,size=1B,topology=single + cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor --name kafka,shotover=standard,size=1B,topology=single # windsock/examples/cassandra.rs - this can stay here until windsock is moved to its own repo cargo run --release --example cassandra -- --bench-length-seconds 5 --operations-per-second 100 diff --git a/Cargo.lock b/Cargo.lock index e4d516af9..37431b3b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,9 +66,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c" dependencies = [ "memchr", ] @@ -135,9 +135,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -271,7 +271,7 @@ dependencies = [ "http", "hyper", "ring", - "time 0.3.24", + "time 0.3.25", "tokio", "tower", "tracing", @@ -460,7 +460,7 @@ dependencies = [ "percent-encoding", "regex", "sha2 0.10.7", - "time 0.3.24", + "time 0.3.25", "tracing", ] @@ -567,7 +567,7 @@ dependencies = [ "itoa", "num-integer", "ryu", - "time 0.3.24", + "time 0.3.25", ] [[package]] @@ -929,7 +929,7 @@ dependencies = [ "num", "snap", "thiserror", - "time 0.3.24", + "time 0.3.25", "uuid", ] @@ -950,9 +950,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.80" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51f1226cd9da55587234753d1245dd5b132343ea240f26b6a9003d68706141ba" +checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" dependencies = [ "libc", ] @@ -1067,9 +1067,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.19" +version = "4.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d" +checksum = "c27cdf28c0f604ba3f512b0c9a409f8de8513e4816705deb0498b627e7c3a3fd" dependencies = [ "clap_builder", "clap_derive", @@ -1078,9 +1078,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.19" +version = "4.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1" +checksum = "08a9f1ab5e9f01a9b81f202e8562eb9a10de70abf9eaeac1be465c28b75aa4aa" dependencies = [ "anstream", "anstyle", @@ -1162,9 +1162,9 @@ dependencies = [ [[package]] name = "const-oid" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "795bc6e66a8e340f075fcf6227e417a2dc976b92b91f3cdc778bb858778b6747" +checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" [[package]] name = "convert_case" @@ -1220,7 +1220,7 @@ dependencies = [ "cassandra-protocol", "futures-util", "http", - "rustls 0.21.5", + "rustls 0.21.6", "rustls-pemfile", "tokio", "tokio-tungstenite 0.19.0", @@ -1548,9 +1548,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8810e7e2cf385b1e9b50d68264908ec367ba642c96d02edfe61c39e88e2a3c01" +checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929" dependencies = [ "serde", ] @@ -1674,9 +1674,9 @@ dependencies = [ [[package]] name = "docker-compose-runner" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a02a210319584c67a336a5d964b23f8598cb101eb8e5929b2c65b2a4f769451c" +checksum = "f9b25dd136b29813ea84b16d2ff9c48fdd727a6a6f3fcf736523bb5d946c41e7" dependencies = [ "anyhow", "regex", @@ -1696,6 +1696,7 @@ name = "ec2-cargo" version = "0.1.0" dependencies = [ "aws-throwaway", + "cargo_metadata", "clap", "rustyline", "shellfish", @@ -1965,7 +1966,7 @@ dependencies = [ "pretty_env_logger", "rand 0.8.5", "redis-protocol", - "rustls 0.21.5", + "rustls 0.21.6", "rustls-native-certs", "rustls-webpki 0.100.1", "semver", @@ -3088,9 +3089,9 @@ dependencies = [ [[package]] name = "openssl" -version = "0.10.55" +version = "0.10.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" +checksum = "729b745ad4a5575dd06a3e1af1414bd330ee561c01b3899eb584baeaa8def17e" dependencies = [ "bitflags 1.3.2", "cfg-if", @@ -3120,18 +3121,18 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "111.26.0+1.1.1u" +version = "111.27.0+1.1.1v" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efc62c9f12b22b8f5208c23a7200a442b2e5999f8bdf80233852122b5a4f6f37" +checksum = "06e8f197c82d7511c5b014030c9b1efeda40d7d5f99d23b4ceed3524a5e63f02" dependencies = [ "cc", ] [[package]] name = "openssl-sys" -version = "0.9.90" +version = "0.9.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6" +checksum = "866b5f16f90776b9bb8dc1e1802ac6f0513de3a7a7465867bfbc563dc737faac" dependencies = [ "cc", "libc", @@ -3263,18 +3264,18 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pin-project" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", @@ -3283,9 +3284,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" [[package]] name = "pin-utils" @@ -3600,7 +3601,7 @@ checksum = "ffbe84efe2f38dea12e9bfc1f65377fdf03e53a18cb3b995faedf7934c7e785b" dependencies = [ "pem", "ring", - "time 0.3.24", + "time 0.3.25", "yasna", ] @@ -3701,13 +3702,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.4", + "regex-automata 0.3.6", "regex-syntax 0.7.4", ] @@ -3722,9 +3723,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ "aho-corasick", "memchr", @@ -4053,9 +4054,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.4" +version = "0.38.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +checksum = "172891ebdceb05aa0005f533a6cbfca599ddd7d966f6f5d4d9b2e70478e70399" dependencies = [ "bitflags 2.3.3", "errno", @@ -4078,13 +4079,13 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" dependencies = [ "log", "ring", - "rustls-webpki 0.101.2", + "rustls-webpki 0.101.3", "sct", ] @@ -4121,9 +4122,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.2" +version = "0.101.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" dependencies = [ "ring", "untrusted", @@ -4314,18 +4315,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", @@ -4368,7 +4369,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros", - "time 0.3.24", + "time 0.3.25", ] [[package]] @@ -4479,7 +4480,7 @@ checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" [[package]] name = "shotover" -version = "0.1.10" +version = "0.1.11" dependencies = [ "anyhow", "async-recursion", @@ -4498,6 +4499,7 @@ dependencies = [ "clap", "cql3-parser", "crc16", + "criterion", "csv", "dashmap", "derivative", @@ -4523,14 +4525,14 @@ dependencies = [ "redis-protocol", "rusoto_kms", "rusoto_signature", - "rustls 0.21.5", + "rustls 0.21.6", "rustls-pemfile", - "rustls-webpki 0.101.2", + "rustls-webpki 0.101.3", "serde", "serde_json", "serde_yaml", "string", - "strum_macros 0.25.1", + "strum_macros 0.25.2", "thiserror", "tokio", "tokio-openssl", @@ -4549,7 +4551,7 @@ dependencies = [ [[package]] name = "shotover-proxy" -version = "0.1.10" +version = "0.1.11" dependencies = [ "anyhow", "async-once-cell", @@ -4563,7 +4565,6 @@ dependencies = [ "chacha20poly1305", "clap", "cql-ws", - "criterion", "csv", "fred", "futures", @@ -4585,6 +4586,7 @@ dependencies = [ "serde_json", "shotover", "test-helpers", + "time 0.3.25", "tokio", "tokio-bin-process", "tokio-util", @@ -4757,7 +4759,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros 0.25.1", + "strum_macros 0.25.2", ] [[package]] @@ -4775,9 +4777,9 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.25.1" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232" +checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" dependencies = [ "heck 0.4.1", "proc-macro2", @@ -4826,9 +4828,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if", "fastrand 2.0.0", @@ -4918,9 +4920,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b79eabcd964882a646b3584543ccabeae7869e9ac32a46f6f22b7a5bd405308b" +checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea" dependencies = [ "deranged", "itoa", @@ -4971,11 +4973,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "2d3ce25f50619af8b0aec2eb23deebe84249e19e2ddd393a6e16e3300a6dadfd" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", @@ -4984,7 +4985,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.4.9", + "socket2 0.5.3", "tokio-macros", "windows-sys 0.48.0", ] @@ -5070,7 +5071,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.5", + "rustls 0.21.6", "tokio", ] @@ -5092,7 +5093,7 @@ source = "git+https://github.com/conorbros/tokio-tungstenite#909b1ca04e02dfd0c7c dependencies = [ "futures-util", "log", - "rustls 0.21.5", + "rustls 0.21.6", "rustls-native-certs", "tokio", "tokio-rustls 0.24.1", @@ -5190,7 +5191,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" dependencies = [ "crossbeam-channel", - "time 0.3.24", + "time 0.3.25", "tracing-subscriber", ] @@ -5296,7 +5297,7 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "rustls 0.21.5", + "rustls 0.21.6", "sha1", "thiserror", "url", @@ -5801,14 +5802,15 @@ dependencies = [ "scylla", "serde", "strum 0.25.0", + "time 0.3.25", "tokio", ] [[package]] name = "winnow" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f46aab759304e4d7b2075a9aecba26228bb073ee8c50db796b2c72c676b5d807" +checksum = "acaaa1190073b2b101e15083c38ee8ec891b5e05cbee516521e94ec008f61e64" dependencies = [ "memchr", ] @@ -5854,7 +5856,7 @@ checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" dependencies = [ "bit-vec", "num-bigint 0.4.3", - "time 0.3.24", + "time 0.3.25", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a6a7575f2..d8c3cde76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,4 +52,4 @@ rand_distr = "0.4.1" clap = { version = "4.0.4", features = ["cargo", "derive"] } async-trait = "0.1.30" typetag = "0.2.5" -aws-throwaway = "0.1.0" +aws-throwaway = "0.1.1" diff --git a/README.md b/README.md index 7b4d40a2b..cdbe424c9 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ Shotover logo

-[![Rust](https://github.com/shotover/shotover-proxy/workflows/Rust/badge.svg)](https://github.com/shotover/shotover-proxy/actions?query=workflow%3ARust) +[![Crates.io](https://img.shields.io/crates/v/shotover.svg)](https://crates.io/crates/shotover) +[![Docs](https://docs.rs/shotover/badge.svg)](https://docs.rs/shotover) [![dependency status](https://deps.rs/repo/github/shotover/shotover-proxy/status.svg)](https://deps.rs/repo/github/shotover/shotover-proxy) ## Documentation diff --git a/changelog.md b/changelog.md index 0852c7b7a..656832123 100644 --- a/changelog.md +++ b/changelog.md @@ -3,6 +3,17 @@ Any breaking changes to the `topology.yaml` or `shotover` rust API should be documented here. This assists us in knowing when to make the next release a breaking release and assists users with making upgrades to new breaking releases. +## 0.1.11 + +### topology.yaml + +* No recorded changes + +### shotover rust api + +* `shotover::message_value` is now `shotover::frame::value` +* `shotover::message_value::MessageValue` is now `shotover::frame::value::GenericValue` + ## 0.1.10 ### topology.yaml diff --git a/custom-transforms-example/src/redis_get_rewrite.rs b/custom-transforms-example/src/redis_get_rewrite.rs index ff61b2b06..a9b679127 100644 --- a/custom-transforms-example/src/redis_get_rewrite.rs +++ b/custom-transforms-example/src/redis_get_rewrite.rs @@ -6,6 +6,7 @@ use shotover::message::Messages; use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper}; #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct RedisGetRewriteConfig { pub result: String, } diff --git a/ec2-cargo/Cargo.toml b/ec2-cargo/Cargo.toml index 55a1b731c..c45ca7aba 100644 --- a/ec2-cargo/Cargo.toml +++ b/ec2-cargo/Cargo.toml @@ -15,3 +15,4 @@ aws-throwaway.workspace = true tracing-appender.workspace = true shellfish = { version = "0.8.0", features = ["async"] } rustyline = "11.0.0" +cargo_metadata = "0.15.4" diff --git a/ec2-cargo/src/main.rs b/ec2-cargo/src/main.rs index 77837186e..fc85dc1a7 100644 --- a/ec2-cargo/src/main.rs +++ b/ec2-cargo/src/main.rs @@ -1,4 +1,5 @@ use aws_throwaway::{ec2_instance::Ec2Instance, Aws, InstanceType}; +use cargo_metadata::{Metadata, MetadataCommand}; use clap::Parser; use rustyline::DefaultEditor; use shellfish::{async_fn, handler::DefaultAsyncHandler, Command, Shell}; @@ -7,7 +8,7 @@ use tracing_subscriber::EnvFilter; /// Spins up an EC2 instance and then presents a shell from which you can run `cargo test` on the ec2 instance. /// -/// TODO: Every time the shell runs a cargo command, any local changes to the repo are reuploaded to the ec2 instance. +/// Every time the shell runs a cargo command, any local changes to the repo are reuploaded to the ec2 instance. /// /// When the shell is exited all created EC2 instances are destroyed. #[derive(Parser, Clone)] @@ -30,6 +31,7 @@ async fn main() { .with_writer(non_blocking) .init(); + let cargo_meta = MetadataCommand::new().exec().unwrap(); let args = Args::parse(); if args.cleanup { Aws::cleanup_resources_static().await; @@ -74,8 +76,16 @@ docker compose "$@" ' | sudo dd of=/bin/docker-compose sudo chmod +x /bin/docker-compose -git clone https://github.com/shotover/shotover-proxy echo "export RUST_BACKTRACE=1" >> .profile +echo "export CARGO_TERM_COLOR=always" >> .profile +echo 'source "$HOME/.cargo/env"' >> .profile + +source .profile +if [ "$(uname -m)" = "aarch64" ]; then + curl -LsSf https://get.nexte.st/latest/linux-arm | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin +else + curl -LsSf https://get.nexte.st/latest/linux | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin +fi "#).await; while let Some(line) = receiver.recv().await { println!("{}", line) @@ -84,7 +94,10 @@ echo "export RUST_BACKTRACE=1" >> .profile println!("Finished creating instance."); let mut shell = Shell::new_with_async_handler( - State { instance }, + State { + cargo_meta, + instance, + }, "ec2-cargo$ ", DefaultAsyncHandler::default(), DefaultEditor::new().unwrap(), @@ -111,6 +124,7 @@ echo "export RUST_BACKTRACE=1" >> .profile } async fn test(state: &mut State, mut args: Vec) -> Result<(), Box> { + rsync_shotover(state).await; args.remove(0); let args = args.join(" "); let mut receiver = state @@ -118,8 +132,9 @@ async fn test(state: &mut State, mut args: Vec) -> Result<(), Box&1 +source .profile +cd shotover +cargo nextest run {} 2>&1 "#, args )) @@ -131,6 +146,57 @@ RUST_BACKTRACE=1 ~/.cargo/bin/cargo test --color always {} 2>&1 Ok(()) } +async fn rsync_shotover(state: &State) { + let instance = &state.instance; + let target_dir = &state.cargo_meta.target_directory; + let project_root_dir = &state.cargo_meta.workspace_root; + + let key_path = target_dir.join("ec2-cargo-privatekey"); + tokio::fs::remove_file(&key_path).await.ok(); + tokio::fs::write(&key_path, instance.client_private_key()) + .await + .unwrap(); + let output = tokio::process::Command::new("chmod") + .args(&["400".to_owned(), format!("{}", key_path)]) + .output() + .await + .unwrap(); + if !output.status.success() { + let stdout = String::from_utf8(output.stdout).unwrap(); + let stderr = String::from_utf8(output.stderr).unwrap(); + panic!("chmod failed:\nstdout:\n{stdout}\nstderr:\n{stderr}") + } + + let known_hosts_path = target_dir.join("ec2-cargo-known_hosts"); + tokio::fs::write(&known_hosts_path, instance.openssh_known_hosts_line()) + .await + .unwrap(); + + let address = instance.public_ip(); + let output = tokio::process::Command::new("rsync") + .args(&[ + "--delete".to_owned(), + "--exclude".to_owned(), + "target".to_owned(), + "-e".to_owned(), + format!( + "ssh -i {} -o 'UserKnownHostsFile {}'", + key_path, known_hosts_path + ), + "-ra".to_owned(), + format!("{}/", project_root_dir), // trailing slash means copy the contents of the directory instead of the directory itself + format!("ubuntu@{address}:/home/ubuntu/shotover"), + ]) + .output() + .await + .unwrap(); + if !output.status.success() { + let stdout = String::from_utf8(output.stdout).unwrap(); + let stderr = String::from_utf8(output.stderr).unwrap(); + panic!("rsync failed:\nstdout:\n{stdout}\nstderr:\n{stderr}") + } +} + fn ssh_instructions(state: &mut State, mut _args: Vec) -> Result<(), Box> { println!( "Run the following to ssh into the EC2 instance:\n{}", @@ -141,5 +207,6 @@ fn ssh_instructions(state: &mut State, mut _args: Vec) -> Result<(), Box } struct State { + cargo_meta: Metadata, instance: Ec2Instance, } diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 8d10a503e..ab272c1a7 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shotover-proxy" -version = "0.1.10" +version = "0.1.11" authors = ["Ben "] edition = "2021" license = "Apache-2.0" @@ -16,7 +16,6 @@ anyhow.workspace = true tokio.workspace = true tracing.workspace = true clap.workspace = true -criterion = { version = "0.5.0", features = ["async_tokio"] } rstest = "0.18.0" cassandra-cpp = { version = "2.0.0" } test-helpers = { path = "../test-helpers" } @@ -53,10 +52,7 @@ regex = "1.7.0" cql-ws = { git = "https://github.com/conorbros/cql-ws" } opensearch = "2.1.0" serde_json = "1.0.103" - -[[bench]] -name = "benches" -harness = false +time = { version = "0.3.25" } [features] # Include WIP alpha transforms in the public API diff --git a/shotover-proxy/build.rs b/shotover-proxy/build.rs index 8484a5049..a566c2b4c 100644 --- a/shotover-proxy/build.rs +++ b/shotover-proxy/build.rs @@ -3,5 +3,5 @@ use std::env; fn main() { let profile = env::var("PROFILE").unwrap(); println!("cargo:rustc-env=PROFILE={profile}"); - println!("cargo:rerun-if-env-changed=PROFILE"); + println!("cargo:rerun-if-changed=build.rs"); } diff --git a/shotover-proxy/examples/windsock/aws/mod.rs b/shotover-proxy/examples/windsock/aws/mod.rs index a97a2dc60..1ba783913 100644 --- a/shotover-proxy/examples/windsock/aws/mod.rs +++ b/shotover-proxy/examples/windsock/aws/mod.rs @@ -52,6 +52,15 @@ impl WindsockAws { .create_ec2_instance(InstanceType::M6aLarge, 8) .await, }); + instance + .instance + .ssh() + .shell( + r#" +sudo apt-get update +sudo apt-get install -y sysstat"#, + ) + .await; instance .instance .ssh() @@ -85,6 +94,7 @@ until sudo apt-get update -qq do sleep 1 done +sudo apt-get install -y sysstat curl -sSL https://get.docker.com/ | sudo sh"#, ) .await; @@ -108,6 +118,15 @@ curl -sSL https://get.docker.com/ | sudo sh"#, .create_ec2_instance(InstanceType::M6aLarge, 8) .await, }); + instance + .instance + .ssh() + .shell( + r#" +sudo apt-get update +sudo apt-get install -y sysstat"#, + ) + .await; // PROFILE is set in build.rs from PROFILE listed in https://doc.rust-lang.org/cargo/reference/environment-variables.html#environment-variables-cargo-sets-for-build-scripts let profile = if env!("PROFILE") == "release" { diff --git a/shotover-proxy/examples/windsock/cassandra.rs b/shotover-proxy/examples/windsock/cassandra.rs index cc956dfc9..d36e1075e 100644 --- a/shotover-proxy/examples/windsock/cassandra.rs +++ b/shotover-proxy/examples/windsock/cassandra.rs @@ -1,10 +1,11 @@ use crate::{ aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover}, common::{rewritten_file, Shotover}, - profilers::ProfilerRunner, + profilers::{self, CloudProfilerRunner, ProfilerRunner}, }; use anyhow::Result; use async_trait::async_trait; +use aws_throwaway::ec2_instance::Ec2Instance; use cdrs_tokio::{ cluster::{ session::{ @@ -419,7 +420,7 @@ impl Bench for CassandraBench { } fn supported_profilers(&self) -> Vec { - ProfilerRunner::supported_profilers(self.shotover) + profilers::supported_profilers(self.shotover) } fn cores_required(&self) -> usize { @@ -429,7 +430,7 @@ impl Bench for CassandraBench { async fn orchestrate_cloud( &self, _running_in_release: bool, - _profiling: Profiling, + profiling: Profiling, parameters: BenchParameters, ) -> Result<()> { let aws = crate::aws::WindsockAws::get().await; @@ -451,6 +452,24 @@ impl Bench for CassandraBench { let cassandra_ip = cassandra_instance1.instance.private_ip().to_string(); let shotover_ip = shotover_instance.instance.private_ip().to_string(); + let mut profiler_instances: HashMap = + [("bencher".to_owned(), &bench_instance.instance)].into(); + if let Shotover::ForcedMessageParsed | Shotover::Standard = self.shotover { + profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); + } + match self.topology { + Topology::Cluster3 => { + profiler_instances.insert("cassandra1".to_owned(), &cassandra_instance1.instance); + profiler_instances.insert("cassandra2".to_owned(), &cassandra_instance2.instance); + profiler_instances.insert("cassandra3".to_owned(), &cassandra_instance3.instance); + } + Topology::Single => { + profiler_instances.insert("cassandra".to_owned(), &cassandra_instance1.instance); + } + } + let mut profiler = + CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; + let cassandra_nodes = vec![ AwsNodeInfo { instance: cassandra_instance1.clone(), @@ -487,6 +506,8 @@ impl Bench for CassandraBench { .run_bencher(&self.run_args(&destination, ¶meters), &self.name()) .await; + profiler.finish(); + if let Some(running_shotover) = running_shotover { running_shotover.shutdown().await; } @@ -524,7 +545,7 @@ impl Bench for CassandraBench { panic!("Mocked cassandra database does not provide a clustered mode") } }; - let mut profiler = ProfilerRunner::new(profiling); + let mut profiler = ProfilerRunner::new(self.name(), profiling); let shotover = match self.shotover { Shotover::Standard => Some( ShotoverProcessBuilder::new_with_topology(&format!("{config_dir}/topology.yaml")) diff --git a/shotover-proxy/examples/windsock/common.rs b/shotover-proxy/examples/windsock/common.rs index 6e1f64b67..cf505bfea 100644 --- a/shotover-proxy/examples/windsock/common.rs +++ b/shotover-proxy/examples/windsock/common.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; +use anyhow::Context; use std::path::Path; #[derive(Clone, Copy)] @@ -24,7 +24,7 @@ impl Shotover { pub async fn rewritten_file(path: &Path, find_replace: &[(&str, &str)]) -> String { let mut text = tokio::fs::read_to_string(path) .await - .map_err(|e| anyhow!(e).context(format!("Failed to read from {path:?}"))) + .with_context(|| format!("Failed to read from {path:?}")) .unwrap(); for (find, replace) in find_replace { text = text.replace(find, replace); diff --git a/shotover-proxy/examples/windsock/kafka.rs b/shotover-proxy/examples/windsock/kafka.rs index e6bfb349e..838b5e45c 100644 --- a/shotover-proxy/examples/windsock/kafka.rs +++ b/shotover-proxy/examples/windsock/kafka.rs @@ -1,8 +1,9 @@ use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover}; use crate::common::{rewritten_file, Shotover}; -use crate::profilers::ProfilerRunner; +use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner}; use anyhow::Result; use async_trait::async_trait; +use aws_throwaway::ec2_instance::Ec2Instance; use futures::StreamExt; use rdkafka::config::ClientConfig; use rdkafka::consumer::{Consumer, StreamConsumer}; @@ -58,13 +59,13 @@ impl Bench for KafkaBench { } fn supported_profilers(&self) -> Vec { - ProfilerRunner::supported_profilers(self.shotover) + profilers::supported_profilers(self.shotover) } async fn orchestrate_cloud( &self, _running_in_release: bool, - _profiling: Profiling, + profiling: Profiling, parameters: BenchParameters, ) -> Result<()> { let aws = crate::aws::WindsockAws::get().await; @@ -75,6 +76,17 @@ impl Bench for KafkaBench { aws.create_shotover_instance() ); + let mut profiler_instances: HashMap = [ + ("bencher".to_owned(), &bench_instance.instance), + ("kafka".to_owned(), &kafka_instance.instance), + ] + .into(); + if let Shotover::ForcedMessageParsed | Shotover::Standard = self.shotover { + profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); + } + let mut profiler = + CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; + let kafka_ip = kafka_instance.instance.private_ip().to_string(); let shotover_ip = shotover_instance.instance.private_ip().to_string(); @@ -93,6 +105,8 @@ impl Bench for KafkaBench { .run_bencher(&self.run_args(&destination_ip, ¶meters), &self.name()) .await; + profiler.finish(); + if let Some(running_shotover) = running_shotover { running_shotover.shutdown().await; } @@ -108,7 +122,7 @@ impl Bench for KafkaBench { let config_dir = "tests/test-configs/kafka/bench"; let _compose = docker_compose(&format!("{}/docker-compose.yaml", config_dir)); - let mut profiler = ProfilerRunner::new(profiling); + let mut profiler = ProfilerRunner::new(self.name(), profiling); let shotover = match self.shotover { Shotover::Standard => Some( ShotoverProcessBuilder::new_with_topology(&format!("{config_dir}/topology.yaml")) diff --git a/shotover-proxy/examples/windsock/profilers.rs b/shotover-proxy/examples/windsock/profilers.rs index f35f9a718..40ec69143 100644 --- a/shotover-proxy/examples/windsock/profilers.rs +++ b/shotover-proxy/examples/windsock/profilers.rs @@ -1,23 +1,37 @@ use crate::common::Shotover; -use std::path::PathBuf; +use aws_throwaway::ec2_instance::Ec2Instance; +use std::{collections::HashMap, path::PathBuf}; use test_helpers::{flamegraph::Perf, shotover_process::BinProcess}; +use tokio::sync::mpsc::UnboundedReceiver; use windsock::Profiling; +mod sar; + pub struct ProfilerRunner { + bench_name: String, run_flamegraph: bool, + run_sys_monitor: bool, results_path: PathBuf, perf: Option, + sys_monitor: Option>, } impl ProfilerRunner { - pub fn new(profiling: Profiling) -> Self { + pub fn new(bench_name: String, profiling: Profiling) -> Self { let run_flamegraph = profiling .profilers_to_use .contains(&"flamegraph".to_owned()); + let run_sys_monitor = profiling + .profilers_to_use + .contains(&"sys_monitor".to_owned()); + ProfilerRunner { + bench_name, run_flamegraph, + run_sys_monitor, results_path: profiling.results_path, perf: None, + sys_monitor: None, } } @@ -34,6 +48,11 @@ impl ProfilerRunner { } else { None }; + self.sys_monitor = if self.run_sys_monitor { + Some(sar::run_sar_local()) + } else { + None + }; } pub fn shotover_profile(&self) -> Option<&'static str> { @@ -43,14 +62,6 @@ impl ProfilerRunner { None } } - - pub fn supported_profilers(shotover: Shotover) -> Vec { - if let Shotover::None = shotover { - vec![] - } else { - vec!["flamegraph".to_owned()] - } - } } impl Drop for ProfilerRunner { @@ -58,5 +69,55 @@ impl Drop for ProfilerRunner { if let Some(perf) = self.perf.take() { perf.flamegraph(); } + if let Some(mut rx) = self.sys_monitor.take() { + sar::insert_sar_results_to_bench_archive(&self.bench_name, "", sar::parse_sar(&mut rx)); + } + } +} + +pub struct CloudProfilerRunner { + bench_name: String, + monitor_instances: HashMap>, +} + +impl CloudProfilerRunner { + pub async fn new( + bench_name: String, + profiling: Profiling, + instances: HashMap, + ) -> Self { + let run_sys_monitor = profiling + .profilers_to_use + .contains(&"sys_monitor".to_owned()); + + let mut monitor_instances = HashMap::new(); + if run_sys_monitor { + for (name, instance) in instances { + monitor_instances.insert(name, sar::run_sar_remote(instance).await); + } + } + + CloudProfilerRunner { + bench_name, + monitor_instances, + } + } + + pub fn finish(&mut self) { + for (name, instance_rx) in &mut self.monitor_instances { + sar::insert_sar_results_to_bench_archive( + &self.bench_name, + name, + sar::parse_sar(instance_rx), + ); + } + } +} + +pub fn supported_profilers(shotover: Shotover) -> Vec { + if let Shotover::None = shotover { + vec!["sys_monitor".to_owned()] + } else { + vec!["flamegraph".to_owned(), "sys_monitor".to_owned()] } } diff --git a/shotover-proxy/examples/windsock/profilers/sar.rs b/shotover-proxy/examples/windsock/profilers/sar.rs new file mode 100644 index 000000000..7514c7b47 --- /dev/null +++ b/shotover-proxy/examples/windsock/profilers/sar.rs @@ -0,0 +1,184 @@ +//! This module provides abstractions for getting system usage from the unix command `sar`, on ubuntu it is contained within the package `sysstat`. + +use aws_throwaway::ec2_instance::Ec2Instance; +use std::{collections::HashMap, process::Stdio}; +use time::OffsetDateTime; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::Command, + sync::mpsc::{unbounded_channel, UnboundedReceiver}, +}; +use windsock::{Goal, Metric, ReportArchive}; + +/// Reads the bench archive for `bench_name` from disk. +/// Inserts the passed sar metrics for `instance_name`. +/// Then writes the resulting archive back to disk +pub fn insert_sar_results_to_bench_archive( + bench_name: &str, + instance_name: &str, + mut sar: ParsedSar, +) { + let mut report = ReportArchive::load(bench_name).unwrap(); + + // The bench will start after sar has started so we need to throw away all sar metrics that were recorded before the bench started. + let time_diff = report.bench_started_at - sar.started_at; + let inital_values_to_discard = time_diff.as_seconds_f32().round() as usize; + for values in sar.named_values.values_mut() { + values.drain(0..inital_values_to_discard); + } + + // use short names so we can keep each call on one line. + let p = instance_name; + let s = &sar; + report.metrics.extend([ + metric(s, p, "CPU User", "%", "%user", Goal::SmallerIsBetter), + metric(s, p, "CPU System", "%", "%system", Goal::SmallerIsBetter), + metric(s, p, "CPU Nice", "%", "%nice", Goal::SmallerIsBetter), + metric(s, p, "CPU IO Wait", "%", "%iowait", Goal::SmallerIsBetter), + metric(s, p, "CPU Steal", "%", "%steal", Goal::SmallerIsBetter), + metric(s, p, "CPU Idle", "%", "%idle", Goal::BiggerIsBetter), + metric_with_formatter( + s, + p, + "Memory Used", + |value| { + // sar calls this a KB (kilobyte) but its actually a KiB (kibibyte) + let value_kib: f32 = value.parse().unwrap(); + let value_mib = value_kib / 1024.0; + format!("{} MiB", value_mib) + }, + "kbmemused", + Goal::SmallerIsBetter, + ), + ]); + + report.save(); +} + +/// Shortcut for common metric formatting case +fn metric( + sar: &ParsedSar, + prefix: &str, + name: &str, + unit: &str, + sar_name: &str, + goal: Goal, +) -> Metric { + metric_with_formatter(sar, prefix, name, |x| format!("{x}{unit}"), sar_name, goal) +} + +/// Take a sars metric and transform it into a metric that can be stored in a bench archive +fn metric_with_formatter String>( + sar: &ParsedSar, + prefix: &str, + name: &str, + value_formatter: F, + sar_name: &str, + goal: Goal, +) -> Metric { + let name = if prefix.is_empty() { + name.to_owned() + } else { + format!("{prefix} - {name}") + }; + Metric::EachSecond { + name, + values: sar + .named_values + .get(sar_name) + .ok_or_else(|| format!("No key {} in {:?}", sar_name, sar.named_values)) + .unwrap() + .iter() + .map(|x| (x.parse().unwrap(), value_formatter(x), goal)) + .collect(), + } +} + +/// parse lines of output from the sar command which looks like: +/// ```text +/// Linux 6.4.8-arch1-1 (memes) 09/08/23 _x86_64_ (24 CPU) +/// +/// 12:19:51 CPU %user %nice %system %iowait %steal %idle +/// 12:19:52 all 4.39 0.00 0.17 0.00 0.00 95.44 +/// +/// 12:19:51 kbmemfree kbavail kbmemused %memused kbbuffers kbcached kbcommit %commit kbactive kbinact kbdirty +/// 12:19:52 10848136 17675452 14406672 43.91 482580 6566224 20441872 62.30 13626936 7304044 76 +/// +/// 12:19:52 CPU %user %nice %system %iowait %steal %idle +/// 12:19:53 all 4.45 0.00 0.50 0.12 0.00 94.92 +/// +/// 12:19:52 kbmemfree kbavail kbmemused %memused kbbuffers kbcached kbcommit %commit kbactive kbinact kbdirty +/// 12:19:53 10827924 17655248 14426872 43.97 482592 6566224 20441924 62.30 13649508 7304056 148 +/// ``` +pub fn parse_sar(rx: &mut UnboundedReceiver) -> ParsedSar { + let mut named_values = HashMap::new(); + + // read date command + let Ok(started_at) = rx.try_recv() else { return ParsedSar { started_at: OffsetDateTime::UNIX_EPOCH, named_values: HashMap::new()}}; + let started_at = + OffsetDateTime::from_unix_timestamp_nanos(started_at.parse().unwrap()).unwrap(); + + // skip header + if rx.try_recv().is_err() { + return ParsedSar { + started_at: OffsetDateTime::UNIX_EPOCH, + named_values: HashMap::new(), + }; + } + + // keep reading until we exhaust the receiver + loop { + let Ok(_blank_line) = rx.try_recv() else { return ParsedSar { started_at, named_values } }; + let Ok(header) = rx.try_recv() else { return ParsedSar { started_at, named_values } }; + let Ok(data) = rx.try_recv() else { return ParsedSar { started_at, named_values } }; + for (head, data) in header + .split_whitespace() + .zip(data.split_whitespace()) + .skip(1) + { + named_values + .entry(head.to_owned()) + .or_default() + .push(data.to_owned()); + } + } +} + +pub struct ParsedSar { + /// The time in UTC at which the sar command was started + started_at: OffsetDateTime, + /// The key contains the name of the metric + /// The value contains a list of values recorded for that metric over the runtime of sar + named_values: HashMap>, +} + +const SAR_COMMAND: &str = "date +%s%N; sar -r -u 1"; + +/// Run the sar command on the local machine. +/// Each line of output is returned via the `UnboundedReceiver` +pub fn run_sar_local() -> UnboundedReceiver { + let (tx, rx) = unbounded_channel(); + tokio::spawn(async move { + let mut child = Command::new("bash") + .args(["-c", SAR_COMMAND]) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .unwrap(); + let mut reader = BufReader::new(child.stdout.take().unwrap()).lines(); + while let Some(line) = reader.next_line().await.unwrap() { + if tx.send(line).is_err() { + child.kill().await.unwrap(); + return; + } + } + }); + + rx +} + +/// Run the sar command over ssh on the passed instance. +/// Each line of output is returned via the `UnboundedReceiver` +pub async fn run_sar_remote(instance: &Ec2Instance) -> UnboundedReceiver { + instance.ssh().shell_stdout_lines(SAR_COMMAND).await +} diff --git a/shotover-proxy/examples/windsock/redis.rs b/shotover-proxy/examples/windsock/redis.rs index a73401b26..6ad3ff077 100644 --- a/shotover-proxy/examples/windsock/redis.rs +++ b/shotover-proxy/examples/windsock/redis.rs @@ -1,10 +1,11 @@ use crate::{ aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover, WindsockAws}, common::{rewritten_file, Shotover}, - profilers::ProfilerRunner, + profilers::{self, CloudProfilerRunner, ProfilerRunner}, }; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; +use aws_throwaway::ec2_instance::Ec2Instance; use fred::{ prelude::*, rustls::{Certificate, ClientConfig, PrivateKey, RootCertStore}, @@ -101,7 +102,7 @@ impl Bench for RedisBench { } fn supported_profilers(&self) -> Vec { - ProfilerRunner::supported_profilers(self.shotover) + profilers::supported_profilers(self.shotover) } fn cores_required(&self) -> usize { @@ -111,7 +112,7 @@ impl Bench for RedisBench { async fn orchestrate_cloud( &self, _running_in_release: bool, - _profiling: Profiling, + profiling: Profiling, parameters: BenchParameters, ) -> Result<()> { let aws = WindsockAws::get().await; @@ -122,6 +123,24 @@ impl Bench for RedisBench { aws.create_shotover_instance() ); + let mut profiler_instances: HashMap = + [("bencher".to_owned(), &bench_instance.instance)].into(); + if let Shotover::ForcedMessageParsed | Shotover::Standard = self.shotover { + profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); + } + match &redis_instances { + RedisCluster::Cluster3 { instances, .. } => { + for (i, instance) in instances.iter().enumerate() { + profiler_instances.insert(format!("redis{i}"), &instance.instance); + } + } + RedisCluster::Single(instance) => { + profiler_instances.insert("redis".to_owned(), &instance.instance); + } + } + let mut profiler = + CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; + let redis_ip = redis_instances.private_ips()[0].to_string(); let shotover_ip = shotover_instance.instance.private_ip().to_string(); @@ -148,6 +167,8 @@ impl Bench for RedisBench { .run_bencher(&self.run_args(&destination_ip, ¶meters), &self.name()) .await; + profiler.finish(); + if let Some(running_shotover) = running_shotover { running_shotover.shutdown().await; } @@ -180,7 +201,7 @@ impl Bench for RedisBench { (RedisTopology::Cluster3, Encryption::Tls) => "tests/test-configs/redis-cluster-tls", }; let _compose = docker_compose(&format!("{config_dir}/docker-compose.yaml")); - let mut profiler = ProfilerRunner::new(profiling); + let mut profiler = ProfilerRunner::new(self.name(), profiling); let shotover = match self.shotover { Shotover::Standard => Some( ShotoverProcessBuilder::new_with_topology(&format!("{config_dir}/topology.yaml")) @@ -283,7 +304,7 @@ impl Bench for RedisBench { fn load_certs(path: &str) -> Vec { load_certs_inner(path) - .map_err(|err| anyhow!(err).context(format!("Failed to read certs at {path:?}"))) + .with_context(|| format!("Failed to read certs at {path:?}")) .unwrap() } fn load_certs_inner(path: &str) -> Result> { @@ -294,7 +315,7 @@ fn load_certs_inner(path: &str) -> Result> { fn load_private_key(path: &str) -> PrivateKey { load_private_key_inner(path) - .map_err(|err| anyhow!(err).context(format!("Failed to read private key at {path:?}"))) + .with_context(|| format!("Failed to read private key at {path:?}")) .unwrap() } fn load_private_key_inner(path: &str) -> Result { @@ -310,7 +331,7 @@ fn load_private_key_inner(path: &str) -> Result { fn load_ca(path: &str) -> RootCertStore { load_ca_inner(path) - .map_err(|e| e.context(format!("Failed to load CA at {path:?}"))) + .with_context(|| format!("Failed to load CA at {path:?}")) .unwrap() } fn load_ca_inner(path: &str) -> Result { diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster/single_rack_v4.rs b/shotover-proxy/tests/cassandra_int_tests/cluster/single_rack_v4.rs index 29a5e6348..2ea187adb 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster/single_rack_v4.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster/single_rack_v4.rs @@ -277,7 +277,7 @@ pub async fn test_topology_task(ca_path: Option<&str>, cassandra_port: Option, _nonce: Nonce, diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 26a088a85..6effcb8f8 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shotover" -version = "0.1.10" +version = "0.1.11" authors = ["Ben "] edition = "2021" license = "Apache-2.0" @@ -93,3 +93,10 @@ rustls-webpki = "0.101.1" string = "0.3.0" xxhash-rust = { version = "0.8.6", features = ["xxh3"] } dashmap = "5.4.0" + +[dev-dependencies] +criterion = { version = "0.5.0", features = ["async_tokio"] } + +[[bench]] +name = "benches" +harness = false diff --git a/shotover-proxy/benches/benches/chain.rs b/shotover/benches/benches/chain.rs similarity index 100% rename from shotover-proxy/benches/benches/chain.rs rename to shotover/benches/benches/chain.rs diff --git a/shotover-proxy/benches/benches/codec.rs b/shotover/benches/benches/codec.rs similarity index 100% rename from shotover-proxy/benches/benches/codec.rs rename to shotover/benches/benches/codec.rs diff --git a/shotover-proxy/benches/benches/kafka_requests/fetch.bin b/shotover/benches/benches/kafka_requests/fetch.bin similarity index 100% rename from shotover-proxy/benches/benches/kafka_requests/fetch.bin rename to shotover/benches/benches/kafka_requests/fetch.bin diff --git a/shotover-proxy/benches/benches/kafka_requests/list_offsets.bin b/shotover/benches/benches/kafka_requests/list_offsets.bin similarity index 100% rename from shotover-proxy/benches/benches/kafka_requests/list_offsets.bin rename to shotover/benches/benches/kafka_requests/list_offsets.bin diff --git a/shotover-proxy/benches/benches/kafka_requests/metadata.bin b/shotover/benches/benches/kafka_requests/metadata.bin similarity index 100% rename from shotover-proxy/benches/benches/kafka_requests/metadata.bin rename to shotover/benches/benches/kafka_requests/metadata.bin diff --git a/shotover-proxy/benches/benches/kafka_requests/produce.bin b/shotover/benches/benches/kafka_requests/produce.bin similarity index 100% rename from shotover-proxy/benches/benches/kafka_requests/produce.bin rename to shotover/benches/benches/kafka_requests/produce.bin diff --git a/shotover-proxy/benches/benches/main.rs b/shotover/benches/benches/main.rs similarity index 100% rename from shotover-proxy/benches/benches/main.rs rename to shotover/benches/benches/main.rs diff --git a/shotover/src/config/chain.rs b/shotover/src/config/chain.rs index ed76e91a8..47c87cc0a 100644 --- a/shotover/src/config/chain.rs +++ b/shotover/src/config/chain.rs @@ -7,6 +7,7 @@ use std::fmt::{self, Debug}; use std::iter; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct TransformChainConfig( #[serde(rename = "TransformChain", deserialize_with = "vec_transform_config")] pub Vec>, diff --git a/shotover/src/config/mod.rs b/shotover/src/config/mod.rs index bcb7b6ec9..96bbf8c6d 100644 --- a/shotover/src/config/mod.rs +++ b/shotover/src/config/mod.rs @@ -1,10 +1,11 @@ -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use serde::Deserialize; pub mod chain; pub mod topology; #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct Config { pub main_log_level: String, pub observability_interface: String, @@ -12,9 +13,9 @@ pub struct Config { impl Config { pub fn from_file(filepath: String) -> Result { - let file = std::fs::File::open(&filepath).map_err(|err| { - anyhow!(err).context(format!("Couldn't open the config file {}", &filepath)) - })?; - serde_yaml::from_reader(file).context(format!("Failed to parse config file {}", &filepath)) + let file = std::fs::File::open(&filepath) + .with_context(|| format!("Couldn't open the config file {}", &filepath))?; + serde_yaml::from_reader(file) + .with_context(|| format!("Failed to parse config file {}", &filepath)) } } diff --git a/shotover/src/config/topology.rs b/shotover/src/config/topology.rs index a85d5bfd4..99d71f3ae 100644 --- a/shotover/src/config/topology.rs +++ b/shotover/src/config/topology.rs @@ -9,6 +9,7 @@ use tokio::sync::watch; use tracing::info; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct Topology { pub sources: HashMap, pub chain_config: HashMap, @@ -17,13 +18,12 @@ pub struct Topology { impl Topology { pub fn from_file(filepath: &str) -> Result { - let file = std::fs::File::open(filepath).map_err(|err| { - anyhow!(err).context(format!("Couldn't open the topology file {}", filepath)) - })?; + let file = std::fs::File::open(filepath) + .with_context(|| format!("Couldn't open the topology file {}", filepath))?; let deserializer = serde_yaml::Deserializer::from_reader(file); serde_yaml::with::singleton_map_recursive::deserialize(deserializer) - .context(format!("Failed to parse topology file {}", filepath)) + .with_context(|| format!("Failed to parse topology file {}", filepath)) } async fn build_chains(&self) -> Result>> { @@ -79,8 +79,8 @@ impl Topology { &mut source_config .get_source(chain, trigger_shutdown_rx.clone()) .await - .map_err(|e| { - e.context(format!("Failed to initialize source {source_name}")) + .with_context(|| { + format!("Failed to initialize source {source_name}") })?, ); } else { diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index 316e174c6..a250124db 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -263,19 +263,23 @@ impl KafkaFrame { } fn decode(bytes: &mut Bytes, version: i16) -> Result { - T::decode(bytes, version).context(format!( - "Failed to decode {} v{} body", - std::any::type_name::(), - version - )) + T::decode(bytes, version).with_context(|| { + format!( + "Failed to decode {} v{} body", + std::any::type_name::(), + version + ) + }) } fn encode(encodable: T, bytes: &mut BytesMut, version: i16) -> Result<()> { - encodable.encode(bytes, version).context(format!( - "Failed to encode {} v{} body", - std::any::type_name::(), - version - )) + encodable.encode(bytes, version).with_context(|| { + format!( + "Failed to encode {} v{} body", + std::any::type_name::(), + version + ) + }) } /// This function is a helper to workaround a really degenerate rust compiler case. diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 93b749f4f..953aa2ce8 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -426,6 +426,7 @@ pub enum Encodable { } #[derive(PartialEq, Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] pub enum QueryType { Read, Write, diff --git a/shotover/src/observability/mod.rs b/shotover/src/observability/mod.rs index 744333c1d..dd96acfe8 100644 --- a/shotover/src/observability/mod.rs +++ b/shotover/src/observability/mod.rs @@ -1,5 +1,5 @@ use crate::runner::ReloadHandle; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use bytes::Bytes; use hyper::{ service::{make_service_fn, service_fn}, @@ -104,7 +104,7 @@ impl LogFilterHttpExporter { let address = self.address; Server::try_bind(&address) - .map_err(|e| anyhow!(e).context(format!("Failed to bind to {}", address)))? + .with_context(|| format!("Failed to bind to {}", address))? .serve(make_svc) .await .map_err(|e| anyhow!(e)) diff --git a/shotover/src/sources/cassandra.rs b/shotover/src/sources/cassandra.rs index 212c944ef..a5f10a4ed 100644 --- a/shotover/src/sources/cassandra.rs +++ b/shotover/src/sources/cassandra.rs @@ -12,6 +12,7 @@ use tokio::task::JoinHandle; use tracing::{error, info}; #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct CassandraConfig { pub listen_addr: String, pub connection_limit: Option, diff --git a/shotover/src/sources/kafka.rs b/shotover/src/sources/kafka.rs index 88f3b65e4..a8ca3fb11 100644 --- a/shotover/src/sources/kafka.rs +++ b/shotover/src/sources/kafka.rs @@ -11,6 +11,7 @@ use tokio::task::JoinHandle; use tracing::{error, info}; #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct KafkaConfig { pub listen_addr: String, pub connection_limit: Option, diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index 0b7f082b4..a2f41b959 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -12,6 +12,7 @@ pub mod kafka; pub mod redis; #[derive(Deserialize, Debug, Clone, Copy)] +#[serde(deny_unknown_fields)] pub enum Transport { Tcp, WebSocket, @@ -35,6 +36,7 @@ impl Source { } #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub enum SourceConfig { Cassandra(CassandraConfig), Redis(RedisConfig), diff --git a/shotover/src/sources/redis.rs b/shotover/src/sources/redis.rs index 505ec3edd..9354b0838 100644 --- a/shotover/src/sources/redis.rs +++ b/shotover/src/sources/redis.rs @@ -11,6 +11,7 @@ use tokio::task::JoinHandle; use tracing::{error, info}; #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct RedisConfig { pub listen_addr: String, pub connection_limit: Option, diff --git a/shotover/src/tcp.rs b/shotover/src/tcp.rs index 1ced83693..dde11acc5 100644 --- a/shotover/src/tcp.rs +++ b/shotover/src/tcp.rs @@ -1,6 +1,6 @@ //! Use to establish a TCP connection to a DB in a sink transform -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use std::time::Duration; use tokio::{ net::{TcpStream, ToSocketAddrs}, @@ -18,7 +18,5 @@ pub async fn tcp_stream( "destination {destination:?} did not respond to connection attempt within {connect_timeout:?}" ) })? - .map_err(|e| { - anyhow!(e).context(format!("Failed to connect to destination {destination:?}")) - }) + .with_context(|| format!("Failed to connect to destination {destination:?}")) } diff --git a/shotover/src/tls.rs b/shotover/src/tls.rs index 045e70e5d..eeb1af7fe 100644 --- a/shotover/src/tls.rs +++ b/shotover/src/tls.rs @@ -19,6 +19,7 @@ use tokio_rustls::server::TlsStream as TlsStreamServer; use tokio_rustls::{TlsAcceptor as RustlsAcceptor, TlsConnector as RustlsConnector}; #[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct TlsAcceptorConfig { /// Path to the certificate authority in PEM format pub certificate_authority_path: Option, @@ -74,27 +75,25 @@ impl TlsAcceptor { pub fn new(tls_config: TlsAcceptorConfig) -> Result { let client_cert_verifier = if let Some(path) = tls_config.certificate_authority_path.as_ref() { - let root_cert_store = load_ca(path).map_err(|err| { - anyhow!(err).context(format!( - "Failed to read file {path} configured at 'certificate_authority_path'" - )) + let root_cert_store = load_ca(path).with_context(|| { + format!("Failed to read file {path} configured at 'certificate_authority_path'") })?; AllowAnyAuthenticatedClient::new(root_cert_store).boxed() } else { NoClientAuth::boxed() }; - let private_key = load_private_key(&tls_config.private_key_path).map_err(|err| { - anyhow!(err).context(format!( + let private_key = load_private_key(&tls_config.private_key_path).with_context(|| { + format!( "Failed to read file {} configured at 'private_key_path", tls_config.private_key_path, - )) + ) })?; - let certs = load_certs(&tls_config.certificate_path).map_err(|err| { - anyhow!(err).context(format!( + let certs = load_certs(&tls_config.certificate_path).with_context(|| { + format!( "Failed to read file {} configured at 'certificate_path'", tls_config.private_key_path, - )) + ) })?; let config = rustls::ServerConfig::builder() @@ -122,6 +121,7 @@ impl TlsAcceptor { } #[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct TlsConnectorConfig { /// Path to the certificate authority in PEM format pub certificate_authority_path: String, @@ -140,21 +140,20 @@ pub struct TlsConnector { impl TlsConnector { pub fn new(tls_config: TlsConnectorConfig) -> Result { - let root_cert_store = load_ca(&tls_config.certificate_authority_path).map_err(|err| { - anyhow!(err).context(format!( - "Failed to read file {} configured at 'certificate_authority_path'", - tls_config.certificate_authority_path, - )) - })?; + let root_cert_store = + load_ca(&tls_config.certificate_authority_path).with_context(|| { + format!( + "Failed to read file {} configured at 'certificate_authority_path'", + tls_config.certificate_authority_path, + ) + })?; let private_key = tls_config .private_key_path .as_ref() .map(|path| { - load_private_key(path).map_err(|err| { - anyhow!(err).context(format!( - "Failed to read file {path} configured at 'private_key_path", - )) + load_private_key(path).with_context(|| { + format!("Failed to read file {path} configured at 'private_key_path",) }) }) .transpose()?; @@ -162,10 +161,8 @@ impl TlsConnector { .certificate_path .as_ref() .map(|path| { - load_certs(path).map_err(|err| { - anyhow!(err).context(format!( - "Failed to read file {path} configured at 'certificate_path'", - )) + load_certs(path).with_context(|| { + format!("Failed to read file {path} configured at 'certificate_path'",) }) }) .transpose()?; diff --git a/shotover/src/transforms/cassandra/peers_rewrite.rs b/shotover/src/transforms/cassandra/peers_rewrite.rs index e22138426..0ce804585 100644 --- a/shotover/src/transforms/cassandra/peers_rewrite.rs +++ b/shotover/src/transforms/cassandra/peers_rewrite.rs @@ -14,6 +14,7 @@ use cql3_parser::select::SelectElement; use serde::Deserialize; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct CassandraPeersRewriteConfig { pub port: u16, } diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index 257700e0f..400f1e0c7 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -48,6 +48,7 @@ const SYSTEM_KEYSPACES: [IdentifierRef<'static>; 3] = [ ]; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct CassandraSinkClusterConfig { /// contact points must be within the specified data_center and rack. /// If this is not followed, shotover's invariants will still be upheld but shotover will communicate with a @@ -181,6 +182,7 @@ impl TransformBuilder for CassandraSinkClusterBuilder { } #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct ShotoverNode { pub address: SocketAddr, pub data_center: String, diff --git a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs index 896686d2d..77d9860ab 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs @@ -123,10 +123,8 @@ impl NodePool { nodes.shuffle(rng); get_accessible_node(connection_factory, nodes) .await - .map_err(|err| { - err.context(format!( - "Failed to open a connection to any nodes in the rack {rack:?}" - )) + .with_context(|| { + format!("Failed to open a connection to any nodes in the rack {rack:?}") }) } diff --git a/shotover/src/transforms/cassandra/sink_single.rs b/shotover/src/transforms/cassandra/sink_single.rs index f0652f179..7c2fdba62 100644 --- a/shotover/src/transforms/cassandra/sink_single.rs +++ b/shotover/src/transforms/cassandra/sink_single.rs @@ -16,6 +16,7 @@ use tokio::sync::{mpsc, oneshot}; use tracing::trace; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct CassandraSinkSingleConfig { #[serde(rename = "remote_address")] pub address: String, diff --git a/shotover/src/transforms/coalesce.rs b/shotover/src/transforms/coalesce.rs index 700237154..cb57ca153 100644 --- a/shotover/src/transforms/coalesce.rs +++ b/shotover/src/transforms/coalesce.rs @@ -14,6 +14,7 @@ pub struct Coalesce { } #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct CoalesceConfig { pub flush_when_buffered_message_count: Option, pub flush_when_millis_since_last_flush: Option, diff --git a/shotover/src/transforms/debug/force_parse.rs b/shotover/src/transforms/debug/force_parse.rs index d74702f69..4892c7a2f 100644 --- a/shotover/src/transforms/debug/force_parse.rs +++ b/shotover/src/transforms/debug/force_parse.rs @@ -15,6 +15,7 @@ use serde::Deserialize; /// Messages that pass through this transform will be parsed. /// Must be individually enabled at the request or response level. #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct DebugForceParseConfig { pub parse_requests: bool, pub parse_responses: bool, @@ -37,6 +38,7 @@ impl TransformConfig for DebugForceParseConfig { /// Messages that pass through this transform will be parsed and then reencoded. /// Must be individually enabled at the request or response level. #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct DebugForceEncodeConfig { pub encode_requests: bool, pub encode_responses: bool, diff --git a/shotover/src/transforms/debug/log_to_file.rs b/shotover/src/transforms/debug/log_to_file.rs index fcc12d7d7..bb5117c78 100644 --- a/shotover/src/transforms/debug/log_to_file.rs +++ b/shotover/src/transforms/debug/log_to_file.rs @@ -1,6 +1,6 @@ use crate::message::{Encodable, Message}; use crate::transforms::{Transform, TransformBuilder, Transforms, Wrapper}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use async_trait::async_trait; use serde::Deserialize; use std::path::{Path, PathBuf}; @@ -9,6 +9,7 @@ use std::sync::Arc; use tracing::{error, info}; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct DebugLogToFileConfig; #[cfg(feature = "alpha-transforms")] @@ -97,9 +98,9 @@ async fn log_message(message: &Message, path: &Path) -> Result<()> { info!("Logged message to {:?}", path); match message.clone().into_encodable() { Encodable::Bytes(bytes) => { - tokio::fs::write(path, bytes).await.map_err(|e| { - anyhow!(e).context(format!("failed to write message to disk at {path:?}")) - })?; + tokio::fs::write(path, bytes) + .await + .with_context(|| format!("failed to write message to disk at {path:?}"))?; } Encodable::Frame(_) => { error!("Failed to log message because it was a frame. Ensure this Transform is the first transform in the main chain to ensure it only receives unmodified messages.") diff --git a/shotover/src/transforms/debug/printer.rs b/shotover/src/transforms/debug/printer.rs index e976f90ee..640298536 100644 --- a/shotover/src/transforms/debug/printer.rs +++ b/shotover/src/transforms/debug/printer.rs @@ -6,6 +6,7 @@ use serde::Deserialize; use tracing::info; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct DebugPrinterConfig; #[typetag::deserialize(name = "DebugPrinter")] diff --git a/shotover/src/transforms/debug/returner.rs b/shotover/src/transforms/debug/returner.rs index be5cfeddd..96398a4f7 100644 --- a/shotover/src/transforms/debug/returner.rs +++ b/shotover/src/transforms/debug/returner.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use serde::Deserialize; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct DebugReturnerConfig { #[serde(flatten)] response: Response, @@ -20,6 +21,7 @@ impl TransformConfig for DebugReturnerConfig { } #[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] pub enum Response { #[serde(skip)] Message(Messages), diff --git a/shotover/src/transforms/distributed/tuneable_consistency_scatter.rs b/shotover/src/transforms/distributed/tuneable_consistency_scatter.rs index 64de9dfb6..cc5249c47 100644 --- a/shotover/src/transforms/distributed/tuneable_consistency_scatter.rs +++ b/shotover/src/transforms/distributed/tuneable_consistency_scatter.rs @@ -12,6 +12,7 @@ use std::collections::HashMap; use tracing::{error, warn}; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct TuneableConsistencyScatterConfig { pub route_map: HashMap, pub write_consistency: i32, diff --git a/shotover/src/transforms/filter.rs b/shotover/src/transforms/filter.rs index f7799f94e..47c5c4aef 100644 --- a/shotover/src/transforms/filter.rs +++ b/shotover/src/transforms/filter.rs @@ -15,6 +15,7 @@ pub struct QueryTypeFilter { } #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct QueryTypeFilterConfig { pub filter: QueryType, } diff --git a/shotover/src/transforms/kafka/sink_cluster.rs b/shotover/src/transforms/kafka/sink_cluster.rs index b8baab612..d2ad26b35 100644 --- a/shotover/src/transforms/kafka/sink_cluster.rs +++ b/shotover/src/transforms/kafka/sink_cluster.rs @@ -12,8 +12,9 @@ use async_trait::async_trait; use dashmap::DashMap; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::{ - ApiKey, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest, MetadataRequest, - MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest, TopicName, + ApiKey, BrokerId, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest, + MetadataRequest, MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest, + TopicName, }; use kafka_protocol::protocol::{Builder, StrBytes}; use rand::rngs::SmallRng; @@ -29,6 +30,7 @@ use tokio::time::timeout; use uuid::Uuid; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct KafkaSinkClusterConfig { pub first_contact_points: Vec, pub shotover_nodes: Vec, @@ -60,8 +62,8 @@ pub struct KafkaSinkClusterBuilder { shotover_nodes: Vec, connect_timeout: Duration, read_timeout: Option, - coordinator_broker_id: Arc>, - topics: Arc>, + group_to_coordinator_broker: Arc>, + topics: Arc>, nodes_shared: Arc>>, } @@ -91,7 +93,7 @@ impl KafkaSinkClusterBuilder { shotover_nodes, connect_timeout: Duration::from_millis(connect_timeout_ms), read_timeout: receive_timeout, - coordinator_broker_id: Arc::new(DashMap::new()), + group_to_coordinator_broker: Arc::new(DashMap::new()), topics: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), } @@ -108,7 +110,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { read_timeout: self.read_timeout, nodes: vec![], nodes_shared: self.nodes_shared.clone(), - coordinator_broker_id: self.coordinator_broker_id.clone(), + group_to_coordinator_broker: self.group_to_coordinator_broker.clone(), topics: self.topics.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), }) @@ -131,8 +133,8 @@ pub struct KafkaSinkCluster { read_timeout: Option, nodes: Vec, nodes_shared: Arc>>, - coordinator_broker_id: Arc>, - topics: Arc>, + group_to_coordinator_broker: Arc>, + topics: Arc>, rng: SmallRng, } @@ -147,7 +149,7 @@ impl Transform for KafkaSinkCluster { Ok(KafkaNode { connection: None, kafka_address: KafkaAddress::from_str(address)?, - broker_id: -1, + broker_id: BrokerId(-1), }) }) .collect(); @@ -182,15 +184,16 @@ impl Transform for KafkaSinkCluster { } impl KafkaSinkCluster { - fn store_topic(&self, topics: &mut Vec, topic: TopicName) { - if self.topics.get(&topic.0).is_none() && !topics.contains(&topic.0) { - topics.push(topic.0); + fn store_topic(&self, topics: &mut Vec, topic: TopicName) { + if self.topics.get(&topic).is_none() && !topics.contains(&topic) { + topics.push(topic); } } - fn store_group(&self, groups: &mut Vec, group_id: GroupId) { - if self.coordinator_broker_id.get(&group_id.0).is_none() && !groups.contains(&group_id.0) { - groups.push(group_id.0); + fn store_group(&self, groups: &mut Vec, group_id: GroupId) { + if self.group_to_coordinator_broker.get(&group_id).is_none() && !groups.contains(&group_id) + { + groups.push(group_id); } } @@ -249,105 +252,15 @@ impl KafkaSinkCluster { } for group in groups { - let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { - header: RequestHeader::builder() - .request_api_key(ApiKey::FindCoordinatorKey as i16) - .request_api_version(2) - .correlation_id(0) - .client_id(None) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap(), - body: RequestBody::FindCoordinator( - FindCoordinatorRequest::builder() - .coordinator_keys(vec![]) - .key_type(0) - .key(group.clone()) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap(), - ), - })); - - let connection = self - .nodes - .choose_mut(&mut self.rng) - .unwrap() - .get_connection(self.connect_timeout) - .await?; - let (tx, rx) = oneshot::channel(); - connection - .send(Request { - message: request, - return_chan: Some(tx), - }) - .map_err(|_| anyhow!("Failed to send"))?; - let mut response = rx.await.unwrap().response.unwrap(); - match response.frame() { - Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::FindCoordinator(coordinator), - .. - })) => { - self.coordinator_broker_id - .insert(group, coordinator.node_id.0); - } - other => { - return Err(anyhow!( - "Unexpected message returned to metadata request {other:?}" - )) - } - } + let node = self.find_coordinator_of_group(group.clone()).await?; + self.group_to_coordinator_broker + .insert(group, node.broker_id); + self.add_node_if_new(node).await; } if !topics.is_empty() { - let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { - header: RequestHeader::builder() - .request_api_key(ApiKey::MetadataKey as i16) - .request_api_version(4) - .correlation_id(0) - .client_id(None) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap(), - body: RequestBody::Metadata( - MetadataRequest::builder() - .topics(Some( - topics - .into_iter() - .map(|name| { - MetadataRequestTopic::builder() - .name(Some(TopicName(name))) - .topic_id(Uuid::nil()) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap() - }) - .collect(), - )) - .allow_auto_topic_creation(false) - .include_cluster_authorized_operations(false) - .include_topic_authorized_operations(false) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap(), - ), - })); - - let connection = self - .nodes - .choose_mut(&mut self.rng) - .unwrap() - .get_connection(self.connect_timeout) - .await?; - let (tx, rx) = oneshot::channel(); - connection - .send(Request { - message: request, - return_chan: Some(tx), - }) - .map_err(|_| anyhow!("Failed to send"))?; - let mut response = rx.await.unwrap().response.unwrap(); - match response.frame() { + let mut metadata = self.get_metadata_of_topics(topics).await?; + match metadata.frame() { Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Metadata(metadata), .. @@ -464,28 +377,28 @@ impl KafkaSinkCluster { body: RequestBody::Heartbeat(heartbeat), .. })) => { - let group_id = heartbeat.group_id.0.clone(); + let group_id = heartbeat.group_id.clone(); results.push(self.route_to_coordinator(message, group_id).await?); } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::SyncGroup(sync_group), .. })) => { - let group_id = sync_group.group_id.0.clone(); + let group_id = sync_group.group_id.clone(); results.push(self.route_to_coordinator(message, group_id).await?); } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::OffsetFetch(offset_fetch), .. })) => { - let group_id = offset_fetch.group_id.0.clone(); + let group_id = offset_fetch.group_id.clone(); results.push(self.route_to_coordinator(message, group_id).await?); } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::JoinGroup(join_group), .. })) => { - let group_id = join_group.group_id.0.clone(); + let group_id = join_group.group_id.clone(); results.push(self.route_to_coordinator(message, group_id).await?); } @@ -511,6 +424,109 @@ impl KafkaSinkCluster { Ok(results) } + async fn find_coordinator_of_group(&mut self, group: GroupId) -> Result { + let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { + header: RequestHeader::builder() + .request_api_key(ApiKey::FindCoordinatorKey as i16) + .request_api_version(2) + .correlation_id(0) + .client_id(None) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + body: RequestBody::FindCoordinator( + FindCoordinatorRequest::builder() + .coordinator_keys(vec![]) + .key_type(0) + .key(group.0) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + ), + })); + + let connection = self + .nodes + .choose_mut(&mut self.rng) + .unwrap() + .get_connection(self.connect_timeout) + .await?; + let (tx, rx) = oneshot::channel(); + connection + .send(Request { + message: request, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + let mut response = rx.await.unwrap().response.unwrap(); + match response.frame() { + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::FindCoordinator(coordinator), + .. + })) => Ok(KafkaNode { + broker_id: coordinator.node_id, + kafka_address: KafkaAddress { + host: coordinator.host.clone(), + port: coordinator.port, + }, + connection: None, + }), + other => Err(anyhow!( + "Unexpected message returned to metadata request {other:?}" + )), + } + } + + async fn get_metadata_of_topics(&mut self, topics: Vec) -> Result { + let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { + header: RequestHeader::builder() + .request_api_key(ApiKey::MetadataKey as i16) + .request_api_version(4) + .correlation_id(0) + .client_id(None) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + body: RequestBody::Metadata( + MetadataRequest::builder() + .topics(Some( + topics + .into_iter() + .map(|name| { + MetadataRequestTopic::builder() + .name(Some(name)) + .topic_id(Uuid::nil()) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap() + }) + .collect(), + )) + .allow_auto_topic_creation(false) + .include_cluster_authorized_operations(false) + .include_topic_authorized_operations(false) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + ), + })); + + let connection = self + .nodes + .choose_mut(&mut self.rng) + .unwrap() + .get_connection(self.connect_timeout) + .await?; + let (tx, rx) = oneshot::channel(); + connection + .send(Request { + message: request, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + Ok(rx.await.unwrap().response.unwrap()) + } + async fn receive_responses( &mut self, find_coordinator_requests: &[FindCoordinator], @@ -540,8 +556,8 @@ impl KafkaSinkCluster { if *version <= 3 { if request.key_type == 0 { - self.coordinator_broker_id - .insert(request.key.clone(), find_coordinator.node_id.0); + self.group_to_coordinator_broker + .insert(GroupId(request.key.clone()), find_coordinator.node_id); } rewrite_address( &self.shotover_nodes, @@ -551,8 +567,10 @@ impl KafkaSinkCluster { } else { for coordinator in &mut find_coordinator.coordinators { if request.key_type == 0 { - self.coordinator_broker_id - .insert(coordinator.key.clone(), find_coordinator.node_id.0); + self.group_to_coordinator_broker.insert( + GroupId(coordinator.key.clone()), + find_coordinator.node_id, + ); } rewrite_address( &self.shotover_nodes, @@ -597,11 +615,11 @@ impl KafkaSinkCluster { async fn route_to_coordinator( &mut self, message: Message, - group_name: StrBytes, + group_id: GroupId, ) -> Result> { let mut connection = None; for node in &mut self.nodes { - if let Some(broker_id) = self.coordinator_broker_id.get(&group_name) { + if let Some(broker_id) = self.group_to_coordinator_broker.get(&group_id) { if node.broker_id == *broker_id { connection = Some(node.get_connection(self.connect_timeout).await?.clone()); } @@ -610,7 +628,7 @@ impl KafkaSinkCluster { let connection = match connection { Some(connection) => connection, None => { - tracing::warn!("no known coordinator for {group_name:?}, routing message to a random node so that a NOT_COORDINATOR or similar error is returned to the client"); + tracing::warn!("no known coordinator for {group_id:?}, routing message to a random node so that a NOT_COORDINATOR or similar error is returned to the client"); self.nodes .choose_mut(&mut self.rng) .unwrap() @@ -631,29 +649,20 @@ impl KafkaSinkCluster { async fn process_metadata(&mut self, metadata: &MetadataResponse) { for (id, broker) in &metadata.brokers { - let new = self - .nodes_shared - .read() - .await - .iter() - .all(|node| node.broker_id != **id); - if new { - let host = broker.host.clone(); - let port = broker.port; - let node = KafkaNode { - broker_id: **id, - kafka_address: KafkaAddress { host, port }, - connection: None, - }; - self.nodes_shared.write().await.push(node); - - self.update_local_nodes().await; - } + let node = KafkaNode { + broker_id: *id, + kafka_address: KafkaAddress { + host: broker.host.clone(), + port: broker.port, + }, + connection: None, + }; + self.add_node_if_new(node).await; } for topic in &metadata.topics { self.topics.insert( - topic.0.clone().0, + topic.0.clone(), Topic { partitions: topic .1 @@ -668,6 +677,20 @@ impl KafkaSinkCluster { ); } } + + async fn add_node_if_new(&mut self, new_node: KafkaNode) { + let new = self + .nodes_shared + .read() + .await + .iter() + .all(|node| node.broker_id != new_node.broker_id); + if new { + self.nodes_shared.write().await.push(new_node); + + self.update_local_nodes().await; + } + } } async fn read_responses(responses: Vec>) -> Result { @@ -698,7 +721,7 @@ fn rewrite_address(shotover_nodes: &[KafkaAddress], host: &mut StrBytes, port: & #[derive(Clone)] struct KafkaNode { - broker_id: i32, + broker_id: BrokerId, kafka_address: KafkaAddress, connection: Option, } diff --git a/shotover/src/transforms/kafka/sink_single.rs b/shotover/src/transforms/kafka/sink_single.rs index b0e1120bc..36a61df4e 100644 --- a/shotover/src/transforms/kafka/sink_single.rs +++ b/shotover/src/transforms/kafka/sink_single.rs @@ -15,6 +15,7 @@ use tokio::sync::{mpsc, oneshot}; use tokio::time::timeout; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct KafkaSinkSingleConfig { #[serde(rename = "remote_address")] pub address: String, diff --git a/shotover/src/transforms/load_balance.rs b/shotover/src/transforms/load_balance.rs index 7fb8c0782..12373ada1 100644 --- a/shotover/src/transforms/load_balance.rs +++ b/shotover/src/transforms/load_balance.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use tokio::sync::Mutex; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct ConnectionBalanceAndPoolConfig { pub name: String, pub max_connections: usize, diff --git a/shotover/src/transforms/null.rs b/shotover/src/transforms/null.rs index db93a56ab..6a984f41a 100644 --- a/shotover/src/transforms/null.rs +++ b/shotover/src/transforms/null.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use serde::Deserialize; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct NullSinkConfig; #[typetag::deserialize(name = "NullSink")] diff --git a/shotover/src/transforms/parallel_map.rs b/shotover/src/transforms/parallel_map.rs index 33aa8e6f9..b73cd56c6 100644 --- a/shotover/src/transforms/parallel_map.rs +++ b/shotover/src/transforms/parallel_map.rs @@ -64,6 +64,7 @@ where } #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct ParallelMapConfig { pub parallelism: u32, pub chain: TransformChainConfig, diff --git a/shotover/src/transforms/protect/key_management.rs b/shotover/src/transforms/protect/key_management.rs index ff66d0959..9a4d63cc2 100644 --- a/shotover/src/transforms/protect/key_management.rs +++ b/shotover/src/transforms/protect/key_management.rs @@ -18,6 +18,7 @@ pub enum KeyManager { } #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub enum KeyManagerConfig { AWSKms { region: String, diff --git a/shotover/src/transforms/protect/local_kek.rs b/shotover/src/transforms/protect/local_kek.rs index 6999c790f..0a8641988 100644 --- a/shotover/src/transforms/protect/local_kek.rs +++ b/shotover/src/transforms/protect/local_kek.rs @@ -13,6 +13,7 @@ pub struct LocalKeyManagement { } #[derive(Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] pub struct DEKStructure { pub nonce: Nonce, pub key: Vec, diff --git a/shotover/src/transforms/protect/mod.rs b/shotover/src/transforms/protect/mod.rs index 3b00584a2..e34ef6450 100644 --- a/shotover/src/transforms/protect/mod.rs +++ b/shotover/src/transforms/protect/mod.rs @@ -21,6 +21,7 @@ mod local_kek; mod pkcs_11; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct ProtectConfig { pub keyspace_table_columns: HashMap>>, pub key_manager: KeyManagerConfig, diff --git a/shotover/src/transforms/query_counter.rs b/shotover/src/transforms/query_counter.rs index 0617a0ef2..2e9028631 100644 --- a/shotover/src/transforms/query_counter.rs +++ b/shotover/src/transforms/query_counter.rs @@ -14,6 +14,7 @@ pub struct QueryCounter { } #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct QueryCounterConfig { pub name: String, } diff --git a/shotover/src/transforms/redis/cache.rs b/shotover/src/transforms/redis/cache.rs index 5e4a000d6..854b246bb 100644 --- a/shotover/src/transforms/redis/cache.rs +++ b/shotover/src/transforms/redis/cache.rs @@ -49,12 +49,14 @@ enum CacheableState { } #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct TableCacheSchemaConfig { partition_key: Vec, range_key: Vec, } #[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] pub struct TableCacheSchema { partition_key: Vec, range_key: Vec, @@ -74,6 +76,7 @@ impl From<&TableCacheSchemaConfig> for TableCacheSchema { } #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct RedisConfig { pub caching_schema: HashMap, pub chain: TransformChainConfig, diff --git a/shotover/src/transforms/redis/cluster_ports_rewrite.rs b/shotover/src/transforms/redis/cluster_ports_rewrite.rs index e5117ca49..1d3cc0496 100644 --- a/shotover/src/transforms/redis/cluster_ports_rewrite.rs +++ b/shotover/src/transforms/redis/cluster_ports_rewrite.rs @@ -8,6 +8,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use serde::Deserialize; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct RedisClusterPortsRewriteConfig { pub new_port: u16, } diff --git a/shotover/src/transforms/redis/sink_cluster.rs b/shotover/src/transforms/redis/sink_cluster.rs index 890630075..645d04d7d 100644 --- a/shotover/src/transforms/redis/sink_cluster.rs +++ b/shotover/src/transforms/redis/sink_cluster.rs @@ -37,6 +37,7 @@ const SLOT_SIZE: usize = 16384; type ChannelMap = HashMap>>; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct RedisSinkClusterConfig { pub first_contact_points: Vec, pub direct_destination: Option, diff --git a/shotover/src/transforms/redis/sink_single.rs b/shotover/src/transforms/redis/sink_single.rs index 4f49814be..ecaaa5743 100644 --- a/shotover/src/transforms/redis/sink_single.rs +++ b/shotover/src/transforms/redis/sink_single.rs @@ -21,6 +21,7 @@ use tokio_util::codec::{FramedRead, FramedWrite}; use tracing::Instrument; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct RedisSinkSingleConfig { #[serde(rename = "remote_address")] pub address: String, diff --git a/shotover/src/transforms/redis/timestamp_tagging.rs b/shotover/src/transforms/redis/timestamp_tagging.rs index 486b7a1c2..24613d68e 100644 --- a/shotover/src/transforms/redis/timestamp_tagging.rs +++ b/shotover/src/transforms/redis/timestamp_tagging.rs @@ -12,6 +12,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{debug, trace}; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct RedisTimestampTaggerConfig; #[typetag::deserialize(name = "RedisTimestampTagger")] diff --git a/shotover/src/transforms/tee.rs b/shotover/src/transforms/tee.rs index d972a8e3b..fb9d16965 100644 --- a/shotover/src/transforms/tee.rs +++ b/shotover/src/transforms/tee.rs @@ -63,21 +63,27 @@ impl TransformBuilder for TeeBuilder { } fn validate(&self) -> Vec { + let mut errors = self + .tx + .validate() + .iter() + .map(|x| format!(" {x}")) + .collect::>(); + if let ConsistencyBehaviorBuilder::SubchainOnMismatch(mismatch_chain) = &self.behavior { - let mut errors = mismatch_chain + let sub_errors = mismatch_chain .validate() .iter() .map(|x| format!(" {x}")) .collect::>(); + errors.extend(sub_errors) + } - if !errors.is_empty() { - errors.insert(0, format!("{}:", self.get_name())); - } - - errors - } else { - vec![] + if !errors.is_empty() { + errors.insert(0, format!("{}:", self.get_name())); } + + errors } } @@ -96,6 +102,7 @@ pub enum ConsistencyBehavior { } #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct TeeConfig { pub behavior: Option, pub timeout_micros: Option, @@ -104,6 +111,7 @@ pub struct TeeConfig { } #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub enum ConsistencyBehaviorConfig { Ignore, FailOnMismatch, @@ -200,34 +208,64 @@ mod tests { use crate::transforms::null::NullSinkConfig; #[tokio::test] - async fn test_validate_no_subchain() { - { - let config = TeeConfig { - behavior: Some(ConsistencyBehaviorConfig::Ignore), - timeout_micros: None, - chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), - buffer_size: None, - }; - let transform = config.get_builder("".to_owned()).await.unwrap(); - let result = transform.validate(); - assert_eq!(result, Vec::::new()); - } + async fn test_validate_subchain_valid() { + let config = TeeConfig { + behavior: None, + timeout_micros: None, + chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), + buffer_size: None, + }; - { - let config = TeeConfig { - behavior: Some(ConsistencyBehaviorConfig::FailOnMismatch), - timeout_micros: None, - chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), - buffer_size: None, - }; - let transform = config.get_builder("".to_owned()).await.unwrap(); - let result = transform.validate(); - assert_eq!(result, Vec::::new()); - } + let transform = config.get_builder("".to_owned()).await.unwrap(); + let result = transform.validate(); + assert_eq!(result, Vec::::new()); + } + + #[tokio::test] + async fn test_validate_subchain_invalid() { + let config = TeeConfig { + behavior: None, + timeout_micros: None, + chain: TransformChainConfig(vec![Box::new(NullSinkConfig), Box::new(NullSinkConfig)]), + buffer_size: None, + }; + + let transform = config.get_builder("".to_owned()).await.unwrap(); + let result = transform.validate().join("\n"); + let expected = r#"Tee: + tee_chain: + Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain."#; + assert_eq!(result, expected); + } + + #[tokio::test] + async fn test_validate_behaviour_ignore() { + let config = TeeConfig { + behavior: Some(ConsistencyBehaviorConfig::Ignore), + timeout_micros: None, + chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), + buffer_size: None, + }; + let transform = config.get_builder("".to_owned()).await.unwrap(); + let result = transform.validate(); + assert_eq!(result, Vec::::new()); } #[tokio::test] - async fn test_validate_invalid_chain() { + async fn test_validate_behaviour_fail_on_mismatch() { + let config = TeeConfig { + behavior: Some(ConsistencyBehaviorConfig::FailOnMismatch), + timeout_micros: None, + chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), + buffer_size: None, + }; + let transform = config.get_builder("".to_owned()).await.unwrap(); + let result = transform.validate(); + assert_eq!(result, Vec::::new()); + } + + #[tokio::test] + async fn test_validate_behaviour_subchain_on_mismatch_invalid() { let config = TeeConfig { behavior: Some(ConsistencyBehaviorConfig::SubchainOnMismatch( TransformChainConfig(vec![Box::new(NullSinkConfig), Box::new(NullSinkConfig)]), @@ -238,13 +276,15 @@ mod tests { }; let transform = config.get_builder("".to_owned()).await.unwrap(); - let result = transform.validate(); - let expected = vec!["Tee:", " mismatch_chain:", " Terminating transform \"NullSink\" is not last in chain. Terminating transform must be last in chain."]; + let result = transform.validate().join("\n"); + let expected = r#"Tee: + mismatch_chain: + Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain."#; assert_eq!(result, expected); } #[tokio::test] - async fn test_validate_valid_chain() { + async fn test_validate_behaviour_subchain_on_mismatch_valid() { let config = TeeConfig { behavior: Some(ConsistencyBehaviorConfig::SubchainOnMismatch( TransformChainConfig(vec![Box::new(NullSinkConfig)]), diff --git a/shotover/src/transforms/throttling.rs b/shotover/src/transforms/throttling.rs index 3c73d828c..6f88469d9 100644 --- a/shotover/src/transforms/throttling.rs +++ b/shotover/src/transforms/throttling.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use super::Transforms; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct RequestThrottlingConfig { pub max_requests_per_second: NonZeroU32, } diff --git a/test-helpers/Cargo.toml b/test-helpers/Cargo.toml index 269925f8c..372a6e7c1 100644 --- a/test-helpers/Cargo.toml +++ b/test-helpers/Cargo.toml @@ -32,4 +32,4 @@ tracing-subscriber.workspace = true serde_yaml.workspace = true anyhow.workspace = true rcgen.workspace = true -docker-compose-runner = "0.1.0" +docker-compose-runner = "0.2.0" diff --git a/test-helpers/src/connection/redis_connection.rs b/test-helpers/src/connection/redis_connection.rs index f0d410bc2..d4c9ad9d6 100644 --- a/test-helpers/src/connection/redis_connection.rs +++ b/test-helpers/src/connection/redis_connection.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; +use anyhow::Context; use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; use redis::aio::AsyncStream; use redis::Client; @@ -13,9 +13,7 @@ pub fn new(port: u16) -> redis::Connection { let connection = Client::open((address, port)) .unwrap() .get_connection() - .map_err(|e| { - anyhow!(e).context(format!("Failed to create redis connection to port {port}")) - }) + .with_context(|| format!("Failed to create redis connection to port {port}")) .unwrap(); connection .set_read_timeout(Some(Duration::from_secs(10))) @@ -27,11 +25,7 @@ pub async fn new_async(address: &str, port: u16) -> redis::aio::Connection { let stream = Box::pin( tokio::net::TcpStream::connect((address, port)) .await - .map_err(|e| { - anyhow!(e).context(format!( - "Failed to create async redis connection to port {port}" - )) - }) + .with_context(|| format!("Failed to create async redis connection to port {port}")) .unwrap(), ); new_async_inner(Box::pin(stream) as Pin>).await diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index 4fada9a8c..0198f818f 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -32,40 +32,49 @@ pub fn get_image_waiters() -> &'static [Image] { Image { name: "motoserver/moto", log_regex_to_wait_for: r"Press CTRL\+C to quit", + timeout: 120, }, Image { name: "library/redis:5.0.9", log_regex_to_wait_for: r"Ready to accept connections", + timeout: 120, }, Image { name: "library/redis:6.2.5", log_regex_to_wait_for: r"Ready to accept connections", + timeout: 120, }, Image { name: "bitnami/redis-cluster:6.2.12-debian-11-r26", //`Cluster state changed` is created by the node services //`Cluster correctly created` is created by the init service log_regex_to_wait_for: r"Cluster state changed|Cluster correctly created", + timeout: 120, }, Image { name: "bitnami/cassandra:4.0.6", log_regex_to_wait_for: r"Startup complete", + timeout: 120, }, Image { name: "shotover/cassandra-test:4.0.6-r1", log_regex_to_wait_for: r"Startup complet", + timeout: 120, }, Image { name: "shotover/cassandra-test:3.11.13-r1", log_regex_to_wait_for: r"Startup complete", + timeout: 120, }, Image { name: "bitnami/kafka:3.4.0-debian-11-r22", log_regex_to_wait_for: r"Kafka Server started", + timeout: 120, }, Image { name: "opensearchproject/opensearch:2.9.0", log_regex_to_wait_for: r"Node '(?s)(.*)' initialized", + timeout: 120, }, ] } diff --git a/windsock/Cargo.toml b/windsock/Cargo.toml index 14c845786..7f4576f4d 100644 --- a/windsock/Cargo.toml +++ b/windsock/Cargo.toml @@ -13,9 +13,10 @@ bincode.workspace = true clap.workspace = true console = "0.15.5" copy_dir = "0.1.2" -docker-compose-runner = "0.1.0" +docker-compose-runner = "0.2.0" serde = { workspace = true, features = ["derive"] } strum = { version = "0.25.0", features = ["derive"] } +time = { version = "0.3.25", features = ["serde"] } tokio.workspace = true [dev-dependencies] diff --git a/windsock/build.rs b/windsock/build.rs index 8484a5049..a566c2b4c 100644 --- a/windsock/build.rs +++ b/windsock/build.rs @@ -3,5 +3,5 @@ use std::env; fn main() { let profile = env::var("PROFILE").unwrap(); println!("cargo:rustc-env=PROFILE={profile}"); - println!("cargo:rerun-if-env-changed=PROFILE"); + println!("cargo:rerun-if-changed=build.rs"); } diff --git a/windsock/examples/cassandra.rs b/windsock/examples/cassandra.rs index d88eb5882..bd6a6db54 100644 --- a/windsock/examples/cassandra.rs +++ b/windsock/examples/cassandra.rs @@ -142,6 +142,7 @@ fn get_image_waiters() -> &'static [Image] { &[Image { name: "bitnami/cassandra:4.0.6", log_regex_to_wait_for: r"Startup complete", + timeout: 120, }] } diff --git a/windsock/src/bench.rs b/windsock/src/bench.rs index f396fd505..733d7eb05 100644 --- a/windsock/src/bench.rs +++ b/windsock/src/bench.rs @@ -176,8 +176,6 @@ pub trait Bench { fn run_args_vec(internal_run: String, bench_parameters: &BenchParameters) -> Vec { let mut args = vec![]; - args.push("--disable-release-safety-check".to_owned()); - args.push("--bench-length-seconds".to_owned()); args.push(bench_parameters.runtime_seconds.to_string()); diff --git a/windsock/src/cli.rs b/windsock/src/cli.rs index 9a8d06084..004b93fa8 100644 --- a/windsock/src/cli.rs +++ b/windsock/src/cli.rs @@ -107,10 +107,6 @@ pub struct Args { #[clap(long, verbatim_doc_comment)] pub baseline_compare_by_tags: Option, - /// Prevent release mode safety check from triggering to allow running in debug mode - #[clap(long, verbatim_doc_comment)] - pub disable_release_safety_check: bool, - /// Not for human use. Call this from your bench orchestration method to launch your bencher. #[clap(long, verbatim_doc_comment)] pub internal_run: Option, diff --git a/windsock/src/lib.rs b/windsock/src/lib.rs index ae0af5bd9..ce25e1c96 100644 --- a/windsock/src/lib.rs +++ b/windsock/src/lib.rs @@ -7,7 +7,8 @@ mod report; mod tables; pub use bench::{Bench, BenchParameters, BenchTask, Profiling}; -pub use report::{Report, ReportArchive}; +pub use report::{Metric, Report, ReportArchive}; +pub use tables::Goal; use anyhow::{anyhow, Result}; use bench::BenchState; @@ -63,9 +64,6 @@ impl Windsock { let args = cli::Args::parse(); let running_in_release = self.running_in_release; - if !args.disable_release_safety_check && !running_in_release { - panic!("Windsock was not run with a configured release profile, maybe try running with the `--release` flag. Failing that check the release profiles provided in `Windsock::new(..)`."); - } if args.cleanup_cloud_resources { let rt = create_runtime(None); rt.block_on(self.cloud.cleanup_resources()); diff --git a/windsock/src/report.rs b/windsock/src/report.rs index c75a30ed5..9dd45aeb4 100644 --- a/windsock/src/report.rs +++ b/windsock/src/report.rs @@ -1,8 +1,9 @@ -use crate::{bench::Tags, data::windsock_path}; +use crate::{bench::Tags, data::windsock_path, Goal}; use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use std::{io::ErrorKind, path::PathBuf, time::Duration}; use strum::{EnumCount, EnumIter, IntoEnumIterator}; +use time::OffsetDateTime; use tokio::sync::mpsc::UnboundedReceiver; #[derive(Debug, Serialize, Deserialize)] @@ -94,8 +95,10 @@ type Percentiles = [Duration; Percentile::COUNT]; pub struct ReportArchive { pub(crate) running_in_release: bool, pub(crate) tags: Tags, + pub bench_started_at: OffsetDateTime, pub(crate) operations_report: Option, pub(crate) pubsub_report: Option, + pub metrics: Vec, pub(crate) error_messages: Vec, } @@ -130,6 +133,47 @@ pub(crate) struct PubSubReport { pub(crate) backlog_each_second: Vec, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Metric { + Total { + name: String, + compare: f64, + value: String, + goal: Goal, + }, + EachSecond { + name: String, + values: Vec<(f64, String, Goal)>, + }, +} + +impl Metric { + pub(crate) fn identifier(&self) -> MetricIdentifier { + match self { + Metric::Total { name, .. } => MetricIdentifier::Total { + name: name.to_owned(), + }, + Metric::EachSecond { name, .. } => MetricIdentifier::EachSecond { + name: name.to_owned(), + }, + } + } + + #[allow(clippy::len_without_is_empty)] + pub(crate) fn len(&self) -> usize { + match self { + Metric::Total { .. } => 1, + Metric::EachSecond { values, .. } => values.len(), + } + } +} + +#[derive(PartialEq)] +pub enum MetricIdentifier { + Total { name: String }, + EachSecond { name: String }, +} + fn error_message_insertion(messages: &mut Vec, new_message: String) { if !messages.contains(&new_message) { if messages.len() <= 5 { @@ -187,7 +231,7 @@ impl ReportArchive { reports } - fn save(&self) { + pub fn save(&self) { let path = self.path(); std::fs::create_dir_all(path.parent().unwrap()).unwrap(); std::fs::write(&path, bincode::serialize(self).unwrap()) @@ -241,7 +285,7 @@ pub(crate) async fn report_builder( running_in_release: bool, ) -> ReportArchive { let mut finished_in = None; - let mut started = false; + let mut started = None; let mut pubsub_report = None; let mut operations_report = None; let mut operation_times = vec![]; @@ -253,11 +297,11 @@ pub(crate) async fn report_builder( while let Some(report) = rx.recv().await { match report { Report::Start => { - started = true; + started = Some(OffsetDateTime::now_utc()); } Report::QueryCompletedIn(duration) => { let report = operations_report.get_or_insert_with(OperationsReport::default); - if started { + if started.is_some() { report.total += 1; total_operation_time += duration; operation_times.push(duration); @@ -272,7 +316,7 @@ pub(crate) async fn report_builder( message, } => { let report = operations_report.get_or_insert_with(OperationsReport::default); - if started { + if started.is_some() { error_message_insertion(&mut error_messages, message); report.total_errors += 1; total_operation_time += completed_in; @@ -280,7 +324,7 @@ pub(crate) async fn report_builder( } Report::ProduceCompletedIn(duration) => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started { + if started.is_some() { report.total_backlog += 1; report.total_produce += 1; total_produce_time += duration; @@ -296,7 +340,7 @@ pub(crate) async fn report_builder( message, } => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started { + if started.is_some() { error_message_insertion(&mut error_messages, message); report.total_produce_error += 1; total_produce_time += completed_in; @@ -304,7 +348,7 @@ pub(crate) async fn report_builder( } Report::ConsumeCompleted => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started { + if started.is_some() { report.total_backlog -= 1; report.total_consume += 1; match report.consume_each_second.last_mut() { @@ -315,7 +359,7 @@ pub(crate) async fn report_builder( } Report::ConsumeErrored { message } => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started { + if started.is_some() { error_message_insertion(&mut error_messages, message); report.total_consume_error += 1; } @@ -335,7 +379,7 @@ pub(crate) async fn report_builder( } } Report::FinishedIn(duration) => { - if !started { + if started.is_none() { panic!("The bench never returned Report::Start") } finished_in = Some(duration); @@ -376,11 +420,13 @@ pub(crate) async fn report_builder( } let archive = ReportArchive { + bench_started_at: started.unwrap(), running_in_release, tags, pubsub_report, error_messages, operations_report, + metrics: vec![], }; archive.save(); archive diff --git a/windsock/src/tables.rs b/windsock/src/tables.rs index 5242ea550..3b9f3a459 100644 --- a/windsock/src/tables.rs +++ b/windsock/src/tables.rs @@ -1,10 +1,12 @@ use crate::{ bench::Tags, filter::Filter, - report::{Percentile, ReportArchive}, + report::{MetricIdentifier, Percentile, ReportArchive}, + Metric, }; -use anyhow::Result; +use anyhow::{Context, Result}; use console::{pad_str, pad_str_with, style, Alignment}; +use serde::{Deserialize, Serialize}; use std::{collections::HashSet, time::Duration}; use strum::IntoEnumIterator; @@ -52,7 +54,7 @@ pub fn results_by_name(names: &str) -> Result<()> { pub fn baseline_compare_by_tags(arg: &str) -> Result<()> { let filter = Filter::from_query(arg) - .map_err(|e| e.context(format!("Failed to parse tag filter from {:?}", arg)))?; + .with_context(|| format!("Failed to parse tag filter from {:?}", arg))?; let archives: Result> = ReportArchive::reports_in_last_run() .iter() .filter(|name| filter.matches(&Tags::from_name(name))) @@ -72,7 +74,7 @@ pub fn compare_by_tags(arg: &str) -> Result<()> { let tag_args = tag_args.join(" "); let filter = Filter::from_query(&tag_args) - .map_err(|e| e.context(format!("Failed to parse tag filter from {:?}", tag_args)))?; + .with_context(|| format!("Failed to parse tag filter from {:?}", tag_args))?; let archives: Result> = ReportArchive::reports_in_last_run() .iter() .filter(|name| **name != base_name && filter.matches(&Tags::from_name(name))) @@ -100,7 +102,7 @@ pub fn compare_by_tags(arg: &str) -> Result<()> { pub fn results_by_tags(arg: &str) -> Result<()> { let filter = Filter::from_query(arg) - .map_err(|e| e.context(format!("Failed to parse tag filter from {:?}", arg)))?; + .with_context(|| format!("Failed to parse tag filter from {:?}", arg))?; let archives: Result> = ReportArchive::reports_in_last_run() .iter() .filter(|name| filter.matches(&Tags::from_name(name))) @@ -503,6 +505,63 @@ fn base(reports: &[ReportColumn], table_type: &str) { } } + let mut metrics_to_display = vec![]; + for report in reports { + for metric in &report.current.metrics { + if !metrics_to_display.contains(&metric.identifier()) { + metrics_to_display.push(metric.identifier()) + } + } + } + for metric_identifier in metrics_to_display { + match &metric_identifier { + MetricIdentifier::Total { name } => { + rows.push(Row::measurements(reports, name, |report| { + report + .metrics + .iter() + .find(|metric| metric.identifier() == metric_identifier) + .map(|metric| match metric { + Metric::EachSecond { .. } => unreachable!(), + Metric::Total { + compare, + value, + goal, + .. + } => (*compare, value.to_owned(), *goal), + }) + })); + } + MetricIdentifier::EachSecond { name } => { + rows.push(Row::Heading(format!("{name} Each Second"))); + for i in 0..reports + .iter() + .map(|x| { + x.current + .metrics + .iter() + .find(|x| x.identifier() == metric_identifier) + .map(|metric| metric.len()) + .unwrap_or(0) + }) + .max() + .unwrap() + { + rows.push(Row::measurements(reports, &i.to_string(), |report| { + report + .metrics + .iter() + .find(|x| x.identifier() == metric_identifier) + .and_then(|metric| match metric { + Metric::Total { .. } => unreachable!(), + Metric::EachSecond { values, .. } => values.get(i).cloned(), + }) + })); + } + } + } + } + // the width of the legend column let legend_width: usize = rows .iter() @@ -514,7 +573,7 @@ fn base(reports: &[ReportColumn], table_type: &str) { }) .max() .unwrap_or(10); - // the width of the comparison compoenent of each column + // the width of the comparison component of each column let comparison_widths: Vec = reports .iter() .enumerate() @@ -712,7 +771,8 @@ struct Measurement { color: Color, } -enum Goal { +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub enum Goal { BiggerIsBetter, SmallerIsBetter, }