Skip to content

Commit

Permalink
Merge branch 'main' into split_produce_requests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 10, 2024
2 parents 5c4d7fb + f268a0c commit b33fa9b
Show file tree
Hide file tree
Showing 10 changed files with 484 additions and 424 deletions.
858 changes: 459 additions & 399 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ inherits = "release"
debug = true

[workspace.dependencies]
scylla = { version = "0.13.0", features = ["ssl"] }
scylla = { version = "0.14.0", features = ["ssl"] }
bytes = { version = "1.0.0", features = ["serde"] }
tokio = { version = "1.25.0", features = ["full"] }
tokio-util = { version = "0.7.7", features = ["codec"] }
tokio-openssl = "0.6.2"
itertools = "0.12.0"
itertools = "0.13.0"
openssl = { version = "0.10.36", features = ["vendored"] }
anyhow = "1.0.76"
serde = { version = "1.0.111", features = ["derive"] }
Expand Down Expand Up @@ -55,5 +55,5 @@ typetag = "0.2.5"
aws-throwaway = { version = "0.6.0", default-features = false }
tokio-bin-process = "0.5.0"
ordered-float = { version = "4.0.0", features = ["serde"] }
shell-quote = { default-features = false, version = "0.5.0" }
shell-quote = { default-features = false, features = ["bash"], version = "0.7.0" }
pretty_assertions = "1.4.0"
2 changes: 1 addition & 1 deletion ec2-cargo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ AWS_ACCESS_KEY_ID={} AWS_SECRET_ACCESS_KEY={} cargo {command} {args} 2>&1
fn process_args(mut args: Vec<String>) -> String {
args.remove(0);
args.iter()
.map(|x| String::from_utf8(shell_quote::Bash::quote(x)).unwrap())
.map(|x| String::from_utf8(shell_quote::Bash::quote_vec(x)).unwrap())
.collect::<Vec<_>>()
.join(" ")
}
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
clap.workspace = true
rstest = "0.19.0"
rstest_reuse = "0.6.0"
rstest = "0.22.0"
rstest_reuse = "0.7.0"
cassandra-cpp = { version = "3.0.0", default-features = false }
test-helpers = { path = "../test-helpers" }
redis.workspace = true
Expand Down Expand Up @@ -48,7 +48,7 @@ rustls-pki-types = "1.1.0"
aws-throwaway.workspace = true
windsock = "0.1.0"
regex = "1.7.0"
opensearch = "2.1.0"
opensearch = { version = "2.1.0", default-features = false, features = ["rustls-tls"] }
serde_json = "1.0.103"
time = { version = "0.3.25" }
shell-quote.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/benches/windsock/cassandra/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl CassandraSession {
async fn query(&self, query: &str) {
match self {
Self::Scylla(session) => {
session.query(query, ()).await.unwrap();
session.query_unpaged(query, ()).await.unwrap();
}
Self::CdrsTokio(session) => {
session.query(query).await.unwrap();
Expand All @@ -271,7 +271,7 @@ impl CassandraSession {
) -> Result<(), String> {
match self {
Self::Scylla(session) => session
.execute(prepared_statement.as_scylla(), (value,))
.execute_unpaged(prepared_statement.as_scylla(), (value,))
.await
.map_err(|err| format!("{err:?}"))
.map(|_| ()),
Expand Down Expand Up @@ -310,7 +310,7 @@ impl CassandraSession {
) -> Result<(), String> {
match self {
Self::Scylla(session) => session
.execute(prepared_statement.as_scylla(), (value, blob))
.execute_unpaged(prepared_statement.as_scylla(), (value, blob))
.await
.map_err(|err| format!("{err:?}"))
.map(|_| ()),
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/cloud/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ sudo docker system prune -af"#,
let mut env_args = String::new();
for (key, value) in envs {
let key_value =
String::from_utf8(shell_quote::Bash::quote(&format!("{key}={value}"))).unwrap();
String::from_utf8(shell_quote::Bash::quote_vec(&format!("{key}={value}"))).unwrap();
env_args.push_str(&format!(" -e {key_value}"))
}
let output = self
Expand Down
6 changes: 1 addition & 5 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ cassandra = [
"dep:aws-config",
"dep:base64",
"dep:serde_json",
"dep:halfbrown",
"dep:chacha20poly1305",
"dep:generic-array",
"dep:hex",
Expand All @@ -32,7 +31,6 @@ kafka = [
"dep:kafka-protocol",
"dep:dashmap",
"dep:xxhash-rust",
"dep:string",
"dep:base64",
"dep:sasl",
]
Expand All @@ -54,7 +52,7 @@ axum = { version = "0.7", default-features = false, features = ["tokio", "tracin
pretty-hex = "0.4.0"
tokio-stream = "0.1.2"
derivative = "2.1.1"
cached = { version = "0.51", features = ["async"], optional = true }
cached = { version = "0.53", features = ["async"], optional = true }
governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] }
nonzero_ext = "0.3.0"
version-compare = { version = "0.2", optional = true }
Expand Down Expand Up @@ -105,7 +103,6 @@ metrics-exporter-prometheus = { version = "0.15.0", default-features = false }
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
halfbrown = { version = "0.2.1", optional = true }

# Transform dependencies
redis-protocol = { workspace = true, optional = true }
Expand All @@ -123,7 +120,6 @@ rustls = { version = "0.23.0", default-features = false, features = ["tls12"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.0.1"
string = { version = "0.3.0", optional = true }
xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true }
dashmap = { version = "6.0.0", optional = true }
atoi = { version = "2.0.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl TlsConnector {
}

#[derive(Debug)]
pub struct SkipVerifyHostName {
struct SkipVerifyHostName {
verifier: Arc<WebPkiServerVerifier>,
}

Expand Down
18 changes: 11 additions & 7 deletions test-helpers/src/connection/cassandra/connection/scylla.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use cdrs_tokio::frame::message_error::{ErrorBody, ErrorType};
use scylla::batch::Batch;
use scylla::frame::types::Consistency as ScyllaConsistency;
use scylla::frame::value::{CqlDate, CqlDecimal, CqlTime, CqlTimestamp};
use scylla::serialize::value::SerializeCql;
use scylla::serialize::value::SerializeValue;
use scylla::statement::query::Query;
use scylla::transport::errors::{DbError, QueryError};
use scylla::{ExecutionProfile, QueryResult};
Expand Down Expand Up @@ -74,7 +74,11 @@ impl ScyllaConnection {
let statement = prepared_query.as_scylla();
let values = Self::build_values_scylla(values);

let response = self.session.execute(statement, values).await.unwrap();
let response = self
.session
.execute_unpaged(statement, values)
.await
.unwrap();
let tracing_id = response.tracing_id.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Expand All @@ -101,7 +105,7 @@ impl ScyllaConnection {
}

pub async fn execute_fallible(&self, query: &str) -> Result<Vec<Vec<ResultValue>>, ErrorBody> {
Self::process_scylla_response(self.session.query(query, ()).await)
Self::process_scylla_response(self.session.query_unpaged(query, ()).await)
}

pub async fn execute_with_timestamp(
Expand All @@ -111,7 +115,7 @@ impl ScyllaConnection {
) -> Result<Vec<Vec<ResultValue>>, ErrorBody> {
let mut query = Query::new(query);
query.set_timestamp(Some(timestamp));
Self::process_scylla_response(self.session.query(query, ()).await)
Self::process_scylla_response(self.session.query_unpaged(query, ()).await)
}

pub async fn prepare(&self, query: &str) -> PreparedQuery {
Expand All @@ -134,7 +138,7 @@ impl ScyllaConnection {
});
let values = Self::build_values_scylla(values);

Self::process_scylla_response(self.session.execute(&statement, values).await)
Self::process_scylla_response(self.session.execute_unpaged(&statement, values).await)
}

fn process_scylla_response(
Expand Down Expand Up @@ -167,11 +171,11 @@ impl ScyllaConnection {
}

// TODO: lets return Vec<CqlValue> instead, as it provides better guarantees for correctness
fn build_values_scylla(values: &[ResultValue]) -> Vec<Box<dyn SerializeCql + '_>> {
fn build_values_scylla(values: &[ResultValue]) -> Vec<Box<dyn SerializeValue + '_>> {
values
.iter()
.map(|v| match v {
ResultValue::Int(v) => Box::new(v) as Box<dyn SerializeCql>,
ResultValue::Int(v) => Box::new(v) as Box<dyn SerializeValue>,
ResultValue::Ascii(v) => Box::new(v),
ResultValue::BigInt(v) => Box::new(v),
ResultValue::Blob(v) => Box::new(v),
Expand Down
2 changes: 1 addition & 1 deletion windsock-cloud-docker/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ unzip awscliv2.zip
if x.is_empty() {
String::from("''")
} else {
String::from_utf8(shell_quote::Bash::quote(&x)).unwrap()
String::from_utf8(shell_quote::Bash::quote_vec(&x)).unwrap()
}
})
.collect();
Expand Down

0 comments on commit b33fa9b

Please sign in to comment.