From 28bab0deef54b3de8cc0f85bbe8a94ed90f110e3 Mon Sep 17 00:00:00 2001 From: Ethan Donowitz Date: Mon, 29 Apr 2024 12:09:08 -0400 Subject: [PATCH] wip: Support tuple parameters like `(x, y)` = $1 Change-Id: Iac3977d6841887422020af6ecba325d9199f4914 --- logictests/tuples.test | 15 ++ readyset-adapter/src/backend.rs | 10 +- .../src/backend/noria_connector.rs | 2 + readyset-client/src/view.rs | 1 + readyset-psql/tests/integration.rs | 63 ++++++ readyset-server/src/controller/mir_to_flow.rs | 1 + readyset-server/src/controller/sql/mir/mod.rs | 1 + .../src/adapter_rewrites/expand_tuples.rs | 182 ++++++++++++++++++ .../src/adapter_rewrites/mod.rs | 24 +++ .../src/detect_unsupported_placeholders.rs | 1 + 10 files changed, 299 insertions(+), 1 deletion(-) create mode 100644 logictests/tuples.test create mode 100644 readyset-sql-passes/src/adapter_rewrites/expand_tuples.rs diff --git a/logictests/tuples.test b/logictests/tuples.test new file mode 100644 index 0000000000..07407cf432 --- /dev/null +++ b/logictests/tuples.test @@ -0,0 +1,15 @@ +statement ok +CREATE TABLE t (x int, y int, z int) + +statement ok +INSERT INTO t (x, y, z) VALUES (1, 1, 3), (1, 2, -1), (3, 2, 1) + +query III rowsort +SELECT * FROM t WHERE (x, y, z) = (1, 2, -1); +---- +1 +2 +-1 + +statement error +SELECT * FROM t WHERE (x, y, z) = (1, 2); diff --git a/readyset-adapter/src/backend.rs b/readyset-adapter/src/backend.rs index 56709099fa..1604961dec 100644 --- a/readyset-adapter/src/backend.rs +++ b/readyset-adapter/src/backend.rs @@ -958,7 +958,7 @@ where data: DB::PrepareData<'_>, event: &mut QueryExecutionEvent, ) -> Result, DB::Error> { - let do_noria = select_meta.should_do_noria; + let do_noria = &select_meta.should_do_noria; let do_migrate = select_meta.must_migrate; let up_prep: OptionFuture<_> = self @@ -1356,6 +1356,7 @@ where ticket: Option, event: &mut QueryExecutionEvent, ) -> ReadySetResult> { + println!("execute noria"); use noria_connector::PrepareResult::*; event.destination = Some(QueryDestination::Readyset); @@ -1391,6 +1392,7 @@ where event: &mut QueryExecutionEvent, is_fallback: bool, ) -> Result, DB::Error> { + println!("executing upstream"); let upstream = upstream.as_mut().ok_or_else(|| { ReadySetError::Internal("This condition requires an upstream connector".to_string()) })?; @@ -1453,6 +1455,7 @@ where "Error received from noria, sending query to fallback"); } + println!("here1"); Self::execute_upstream(upstream, upstream_prep, params, exec_meta, event, true) .await } @@ -1667,6 +1670,7 @@ where let result = match &cached_statement.prep.inner { PrepareResultInner::Noria(prep) => { + println!("executing noria"); Self::execute_noria(noria, prep, params, ticket, &mut event) .await .map_err(Into::into) @@ -1678,9 +1682,11 @@ where .query_status_cache .inlined_cache_miss(cached_statement.as_view_request()?, params.to_vec()) } + println!("here2"); Self::execute_upstream(upstream, prep, params, exec_meta, &mut event, false).await } PrepareResultInner::Both(.., uprep) if should_fallback => { + println!("here3"); Self::execute_upstream(upstream, uprep, params, exec_meta, &mut event, false).await } PrepareResultInner::Both(nprep, uprep) => { @@ -1868,6 +1874,7 @@ where } } // Now migrate the new query + println!("doing adapter rewrites"); adapter_rewrites::process_query(&mut stmt, self.noria.rewrite_params())?; let migration_state = match self .noria @@ -2506,6 +2513,7 @@ where event: &mut QueryExecutionEvent, processed_query_params: ProcessedQueryParams, ) -> Result, DB::Error> { + println!("adhoc"); let mut status = status.unwrap_or(QueryStatus { migration_state: MigrationState::Unsupported, execution_info: None, diff --git a/readyset-adapter/src/backend/noria_connector.rs b/readyset-adapter/src/backend/noria_connector.rs index d9d5b45def..1a01af3aba 100644 --- a/readyset-adapter/src/backend/noria_connector.rs +++ b/readyset-adapter/src/backend/noria_connector.rs @@ -1396,6 +1396,7 @@ impl NoriaConnector { create_if_not_exist: bool, override_schema_search_path: Option>, ) -> ReadySetResult { + println!("noria prepare select"); // extract parameter columns *for the client* // note that we have to do this *before* processing the query, otherwise the // client will be confused about the number of parameters it's supposed to @@ -1484,6 +1485,7 @@ impl NoriaConnector { ticket: Option, event: &mut readyset_client_metrics::QueryExecutionEvent, ) -> ReadySetResult> { + println!("execute select"); let start = Instant::now(); let (qname, processed_query_params, params) = match ctx { ExecuteSelectContext::Prepared { diff --git a/readyset-client/src/view.rs b/readyset-client/src/view.rs index 35dcd407fd..39939d2012 100644 --- a/readyset-client/src/view.rs +++ b/readyset-client/src/view.rs @@ -1539,6 +1539,7 @@ impl ReaderHandle { blocking_read: bool, dialect: Dialect, ) -> ReadySetResult { + dbg!(&raw_keys); trace!("select::lookup"); let (keys, filters) = if raw_keys.is_empty() { diff --git a/readyset-psql/tests/integration.rs b/readyset-psql/tests/integration.rs index 637ed05165..1bc8eb522f 100644 --- a/readyset-psql/tests/integration.rs +++ b/readyset-psql/tests/integration.rs @@ -2076,6 +2076,69 @@ WHERE shutdown_tx.shutdown().await; } +async fn assert_last_statement_readyset(conn: &Client) { + let (destination, status) = match conn + .simple_query("EXPLAIN LAST STATEMENT") + .await + .unwrap() + .into_iter() + .next() + .unwrap() + { + SimpleQueryMessage::Row(row) => ( + row.get(0).unwrap().to_owned(), + row.get(1).unwrap().to_owned(), + ), + _ => panic!(), + }; + + assert_eq!(destination, "readyset"); + assert_eq!(status, "ok"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn tuple_cache_reuse() { + let (opts, _handle, shutdown_tx) = setup().await; + let conn = connect(opts).await; + + conn.simple_query("DROP TABLE IF EXISTS t").await.unwrap(); + conn.simple_query("CREATE TABLE t (x int, y int, z int)") + .await + .unwrap(); + conn.simple_query("INSERT INTO t (x, y, z) VALUES (1, 1, 3), (1, 2, -1), (3, 2, 1)") + .await + .unwrap(); + + eventually!(conn + .simple_query("CREATE CACHE FROM SELECT * FROM t WHERE (x, y, z) = $1") + .await + .is_ok()); + + eventually! { + let len = conn + .query("SELECT * FROM t WHERE (x, y, z) = (1, 1, 3)", &[]) + .await + .unwrap() + .len(); + len == 1 + } + + assert_last_statement_readyset(&conn).await; + + eventually! { + let len = conn + .query("SELECT * FROM t WHERE x = 3 AND y = 2 AND z = 1", &[]) + .await + .unwrap() + .len(); + len == 1 + } + + assert_last_statement_readyset(&conn).await; + + shutdown_tx.shutdown().await; +} + mod http_tests { use super::*; #[tokio::test(flavor = "multi_thread")] diff --git a/readyset-server/src/controller/mir_to_flow.rs b/readyset-server/src/controller/mir_to_flow.rs index 4e73a2215a..bf829c3f82 100644 --- a/readyset-server/src/controller/mir_to_flow.rs +++ b/readyset-server/src/controller/mir_to_flow.rs @@ -199,6 +199,7 @@ pub(super) fn mir_node_to_flow_parts( internal!("Encountered dependent join when lowering to dataflow") } MirNodeInner::ViewKey { ref key } => { + println!("mir to flow"); return Err(ReadySetError::UnsupportedPlaceholders { placeholders: key.mapped_ref( |ViewKeyColumn { diff --git a/readyset-server/src/controller/sql/mir/mod.rs b/readyset-server/src/controller/sql/mir/mod.rs index dd94909d90..45986bd8d7 100644 --- a/readyset-server/src/controller/sql/mir/mod.rs +++ b/readyset-server/src/controller/sql/mir/mod.rs @@ -2308,6 +2308,7 @@ impl SqlToMirConverter { &[final_node], ) } else if let Ok(placeholders) = unsupported_placeholders.try_into() { + println!("named query to mir"); return Err(ReadySetError::UnsupportedPlaceholders { placeholders }); } else { final_node diff --git a/readyset-sql-passes/src/adapter_rewrites/expand_tuples.rs b/readyset-sql-passes/src/adapter_rewrites/expand_tuples.rs new file mode 100644 index 0000000000..2b83595592 --- /dev/null +++ b/readyset-sql-passes/src/adapter_rewrites/expand_tuples.rs @@ -0,0 +1,182 @@ +use std::mem; + +use nom_sql::analysis::visit_mut::{self, VisitorMut}; +use nom_sql::{BinaryOperator, Expr, ItemPlaceholder, Literal, SelectStatement}; +use readyset_errors::{invalid_query_err, ReadySetError}; + +#[derive(Default)] +struct ExpandTuplesVisitor; + +impl<'ast> VisitorMut<'ast> for ExpandTuplesVisitor { + type Error = ReadySetError; + + // TODO ethan we need to make sure this only touches the where clause; seems like visit_expr is + // called in HAVING clauses as well + fn visit_expr(&mut self, expression: &'ast mut Expr) -> Result<(), Self::Error> { + match expression { + Expr::BinaryOp { + // TODO ethan test that using ROW(...) works in actual postgres/mysql + lhs: box Expr::Row { + exprs: lhs_exprs, .. + }, + op: BinaryOperator::Equal, + rhs: box Expr::Row { + exprs: rhs_exprs, .. + }, + } => { + debug_assert!(!lhs_exprs.is_empty()); + debug_assert!(!rhs_exprs.is_empty()); + + if lhs_exprs.len() == rhs_exprs.len() { + if !lhs_exprs.is_empty() { + let last_expr = Expr::BinaryOp { + lhs: Box::new(lhs_exprs.pop().unwrap()), + op: BinaryOperator::Equal, + rhs: Box::new(rhs_exprs.pop().unwrap()), + }; + let lhs_iter = lhs_exprs.iter_mut(); + let rhs_iter = rhs_exprs.iter_mut(); + + // create new expr + // iterate over zipped lhs and rhs, anding everything tg + let new_expression = lhs_iter + .zip(rhs_iter) + .map(|(lhs, rhs)| Expr::BinaryOp { + lhs: Box::new(lhs.take()), + op: BinaryOperator::Equal, + rhs: Box::new(rhs.take()), + }) + .rfold(last_expr, |acc, expr| Expr::BinaryOp { + lhs: Box::new(expr), + op: BinaryOperator::And, + rhs: Box::new(acc), + }); + let _ = mem::replace(expression, new_expression); + } + } else { + return Err(invalid_query_err!( + "Cannot compare row expressions of unequal lengths" + )); + } + } + // TODO ethan test with empty Row; also do same for above + Expr::BinaryOp { + lhs: box Expr::Row { + exprs: lhs_exprs, .. + }, + op: BinaryOperator::Equal, + rhs: box Expr::Literal(Literal::Placeholder(_)), + } if lhs_exprs.iter().all(|expr| matches!(expr, Expr::Column(_))) => { + let last_expr = lhs_exprs.pop().unwrap(); + let lhs_iter = lhs_exprs.iter_mut(); + + let last_expr = Expr::BinaryOp { + lhs: Box::new(last_expr), + op: BinaryOperator::Equal, + rhs: Box::new(Expr::Literal(Literal::Placeholder( + ItemPlaceholder::QuestionMark, + ))), + }; + + let new_expression = lhs_iter + .map(|lhs| Expr::BinaryOp { + lhs: Box::new(lhs.take()), + op: BinaryOperator::Equal, + rhs: Box::new(Expr::Literal(Literal::Placeholder( + ItemPlaceholder::QuestionMark, + ))), + }) + .rfold(last_expr, |acc, expr| Expr::BinaryOp { + lhs: Box::new(expr), + op: BinaryOperator::And, + rhs: Box::new(acc), + }); + let _ = mem::replace(expression, new_expression); + } + _ => (), + }; + + visit_mut::walk_expr(self, expression)?; + Ok(()) + } +} + +pub(super) fn expand_tuples(query: &mut SelectStatement) { + let mut visitor = ExpandTuplesVisitor; + visitor.visit_select_statement(query).unwrap(); +} + +#[cfg(test)] +mod tests { + use nom_sql::{Dialect, DialectDisplay}; + + use super::*; + + const DIALECT: Dialect = Dialect::PostgreSQL; + + fn try_parse_select_statement(q: &str) -> Result { + nom_sql::parse_select_statement(DIALECT, q) + } + + fn parse_select_statement(q: &str) -> SelectStatement { + try_parse_select_statement(q).unwrap() + } + + fn assert_expected(query: &str, expected: &str) { + let mut actual = parse_select_statement(query); + super::expand_tuples(&mut actual); + + assert_eq!( + expected, + actual.where_clause.unwrap().display(DIALECT).to_string(), + ); + } + + #[test] + fn test_placeholder() { + assert_expected( + "SELECT * FROM t WHERE (w, x) = $1", + r#"(("w" = ?) AND ("x" = ?))"#, + ); + } + + #[test] + fn test_placeholder_many_columns() { + assert_expected( + "SELECT * FROM t WHERE (w, x, y, z) = $1", + r#"(("w" = ?) AND (("x" = ?) AND (("y" = ?) AND ("z" = ?))))"#, + ); + } + + #[test] + fn test_placeholder_row_syntax() { + assert_expected( + "SELECT * FROM t WHERE ROW(w, x, y, z) = $1", + r#"(("w" = ?) AND (("x" = ?) AND (("y" = ?) AND ("z" = ?))))"#, + ); + } + + #[test] + fn test_tuple_equal_tuple() { + assert_expected( + "SELECT * FROM t WHERE (w, x) = (1, 2)", + r#"(("w" = 1) AND ("x" = 2))"#, + ); + } + + #[test] + fn test_tuple_equal_tuple_many_columns() { + assert_expected( + "SELECT * FROM t WHERE (w, x, y, z) = (1, 2, 3, 4)", + r#"(("w" = 1) AND (("x" = 2) AND (("y" = 3) AND ("z" = 4))))"#, + ); + } + + #[test] + fn test_tuple_equal_tuple_row_syntax() { + assert_expected( + "SELECT * FROM t WHERE ROW(w, x, y, z) = ROW(1, 2, 3, 4)", + r#"(("w" = 1) AND (("x" = 2) AND (("y" = 3) AND ("z" = 4))))"#, + ); + } +} diff --git a/readyset-sql-passes/src/adapter_rewrites/mod.rs b/readyset-sql-passes/src/adapter_rewrites/mod.rs index 4518132f94..afb5c24c6c 100644 --- a/readyset-sql-passes/src/adapter_rewrites/mod.rs +++ b/readyset-sql-passes/src/adapter_rewrites/mod.rs @@ -1,4 +1,5 @@ mod autoparameterize; +mod expand_tuples; use std::borrow::Cow; use std::cmp::max; @@ -95,6 +96,8 @@ pub fn process_query( query.limit_clause.clone_from(&limit_clause); } + expand_tuples::expand_tuples(query); + let auto_parameters = autoparameterize::auto_parameterize_query(query, params.server_supports_mixed_comparisons); let rewritten_in_conditions = collapse_where_in(query)?; @@ -184,6 +187,7 @@ impl ProcessedQueryParams { where T: Clone + TryFrom + Debug + Default + PartialEq, { + dbg!(¶ms); let params = if let Some(order_map) = &self.reordered_placeholders { Cow::Owned(reorder_params(params, order_map)?) } else { @@ -1566,5 +1570,25 @@ mod tests { vec![vec![1.into(), 1.into()], vec![1.into(), 2.into()]] ); } + + #[test] + fn expand_tuple_with_existing_param() { + let (keys, actual) = process_and_make_keys_postgres( + "SELECT * FROM t WHERE (w, x, y, z) = $1", + vec![1.into(), 2.into(), 3.into(), 4.into()], + ); + let expected = parse_select_statement_postgres( + "SELECT * FROM t WHERE w = $1 AND x = $2 AND y = $3 AND z = $4", + ); + + assert_eq!( + expected, + actual, + "{}", + actual.display(nom_sql::Dialect::PostgreSQL) + ); + + assert_eq!(keys, vec![vec![1.into(), 2.into(), 3.into(), 4.into()]]); + } } } diff --git a/readyset-sql-passes/src/detect_unsupported_placeholders.rs b/readyset-sql-passes/src/detect_unsupported_placeholders.rs index 2c22d444ed..d845446926 100644 --- a/readyset-sql-passes/src/detect_unsupported_placeholders.rs +++ b/readyset-sql-passes/src/detect_unsupported_placeholders.rs @@ -226,6 +226,7 @@ impl DetectUnsupportedPlaceholders for SelectStatement { if unsupported_placeholders.is_empty() { Ok(()) } else { + println!("detect unsupported"); #[allow(clippy::unwrap_used)] // checked that !is_empty() Err(ReadySetError::UnsupportedPlaceholders { placeholders: Vec1::try_from(unsupported_placeholders).unwrap(),