Skip to content

Commit

Permalink
wip: Support tuple parameters like (x, y) = $1
Browse files Browse the repository at this point in the history
Change-Id: Iac3977d6841887422020af6ecba325d9199f4914
  • Loading branch information
ethowitz committed May 24, 2024
1 parent 6617599 commit 28bab0d
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 1 deletion.
15 changes: 15 additions & 0 deletions logictests/tuples.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
statement ok
CREATE TABLE t (x int, y int, z int)

statement ok
INSERT INTO t (x, y, z) VALUES (1, 1, 3), (1, 2, -1), (3, 2, 1)

query III rowsort
SELECT * FROM t WHERE (x, y, z) = (1, 2, -1);
----
1
2
-1

statement error
SELECT * FROM t WHERE (x, y, z) = (1, 2);
10 changes: 9 additions & 1 deletion readyset-adapter/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ where
data: DB::PrepareData<'_>,
event: &mut QueryExecutionEvent,
) -> Result<PrepareResultInner<DB>, DB::Error> {
let do_noria = select_meta.should_do_noria;
let do_noria = &select_meta.should_do_noria;
let do_migrate = select_meta.must_migrate;

let up_prep: OptionFuture<_> = self
Expand Down Expand Up @@ -1356,6 +1356,7 @@ where
ticket: Option<Timestamp>,
event: &mut QueryExecutionEvent,
) -> ReadySetResult<QueryResult<'a, DB>> {
println!("execute noria");
use noria_connector::PrepareResult::*;

event.destination = Some(QueryDestination::Readyset);
Expand Down Expand Up @@ -1391,6 +1392,7 @@ where
event: &mut QueryExecutionEvent,
is_fallback: bool,
) -> Result<QueryResult<'a, DB>, DB::Error> {
println!("executing upstream");
let upstream = upstream.as_mut().ok_or_else(|| {
ReadySetError::Internal("This condition requires an upstream connector".to_string())
})?;
Expand Down Expand Up @@ -1453,6 +1455,7 @@ where
"Error received from noria, sending query to fallback");
}

println!("here1");
Self::execute_upstream(upstream, upstream_prep, params, exec_meta, event, true)
.await
}
Expand Down Expand Up @@ -1667,6 +1670,7 @@ where

let result = match &cached_statement.prep.inner {
PrepareResultInner::Noria(prep) => {
println!("executing noria");
Self::execute_noria(noria, prep, params, ticket, &mut event)
.await
.map_err(Into::into)
Expand All @@ -1678,9 +1682,11 @@ where
.query_status_cache
.inlined_cache_miss(cached_statement.as_view_request()?, params.to_vec())
}
println!("here2");
Self::execute_upstream(upstream, prep, params, exec_meta, &mut event, false).await
}
PrepareResultInner::Both(.., uprep) if should_fallback => {
println!("here3");
Self::execute_upstream(upstream, uprep, params, exec_meta, &mut event, false).await
}
PrepareResultInner::Both(nprep, uprep) => {
Expand Down Expand Up @@ -1868,6 +1874,7 @@ where
}
}
// Now migrate the new query
println!("doing adapter rewrites");
adapter_rewrites::process_query(&mut stmt, self.noria.rewrite_params())?;
let migration_state = match self
.noria
Expand Down Expand Up @@ -2506,6 +2513,7 @@ where
event: &mut QueryExecutionEvent,
processed_query_params: ProcessedQueryParams,
) -> Result<QueryResult<'a, DB>, DB::Error> {
println!("adhoc");
let mut status = status.unwrap_or(QueryStatus {
migration_state: MigrationState::Unsupported,
execution_info: None,
Expand Down
2 changes: 2 additions & 0 deletions readyset-adapter/src/backend/noria_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,7 @@ impl NoriaConnector {
create_if_not_exist: bool,
override_schema_search_path: Option<Vec<SqlIdentifier>>,
) -> ReadySetResult<PrepareResult> {
println!("noria prepare select");
// extract parameter columns *for the client*
// note that we have to do this *before* processing the query, otherwise the
// client will be confused about the number of parameters it's supposed to
Expand Down Expand Up @@ -1484,6 +1485,7 @@ impl NoriaConnector {
ticket: Option<Timestamp>,
event: &mut readyset_client_metrics::QueryExecutionEvent,
) -> ReadySetResult<QueryResult<'_>> {
println!("execute select");
let start = Instant::now();
let (qname, processed_query_params, params) = match ctx {
ExecuteSelectContext::Prepared {
Expand Down
1 change: 1 addition & 0 deletions readyset-client/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,7 @@ impl ReaderHandle {
blocking_read: bool,
dialect: Dialect,
) -> ReadySetResult<ViewQuery> {
dbg!(&raw_keys);
trace!("select::lookup");

let (keys, filters) = if raw_keys.is_empty() {
Expand Down
63 changes: 63 additions & 0 deletions readyset-psql/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2076,6 +2076,69 @@ WHERE
shutdown_tx.shutdown().await;
}

async fn assert_last_statement_readyset(conn: &Client) {
let (destination, status) = match conn
.simple_query("EXPLAIN LAST STATEMENT")
.await
.unwrap()
.into_iter()
.next()
.unwrap()
{
SimpleQueryMessage::Row(row) => (
row.get(0).unwrap().to_owned(),
row.get(1).unwrap().to_owned(),
),
_ => panic!(),
};

assert_eq!(destination, "readyset");
assert_eq!(status, "ok");
}

#[tokio::test(flavor = "multi_thread")]
async fn tuple_cache_reuse() {
let (opts, _handle, shutdown_tx) = setup().await;
let conn = connect(opts).await;

conn.simple_query("DROP TABLE IF EXISTS t").await.unwrap();
conn.simple_query("CREATE TABLE t (x int, y int, z int)")
.await
.unwrap();
conn.simple_query("INSERT INTO t (x, y, z) VALUES (1, 1, 3), (1, 2, -1), (3, 2, 1)")
.await
.unwrap();

eventually!(conn
.simple_query("CREATE CACHE FROM SELECT * FROM t WHERE (x, y, z) = $1")
.await
.is_ok());

eventually! {
let len = conn
.query("SELECT * FROM t WHERE (x, y, z) = (1, 1, 3)", &[])
.await
.unwrap()
.len();
len == 1
}

assert_last_statement_readyset(&conn).await;

eventually! {
let len = conn
.query("SELECT * FROM t WHERE x = 3 AND y = 2 AND z = 1", &[])
.await
.unwrap()
.len();
len == 1
}

assert_last_statement_readyset(&conn).await;

shutdown_tx.shutdown().await;
}

mod http_tests {
use super::*;
#[tokio::test(flavor = "multi_thread")]
Expand Down
1 change: 1 addition & 0 deletions readyset-server/src/controller/mir_to_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ pub(super) fn mir_node_to_flow_parts(
internal!("Encountered dependent join when lowering to dataflow")
}
MirNodeInner::ViewKey { ref key } => {
println!("mir to flow");
return Err(ReadySetError::UnsupportedPlaceholders {
placeholders: key.mapped_ref(
|ViewKeyColumn {
Expand Down
1 change: 1 addition & 0 deletions readyset-server/src/controller/sql/mir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2308,6 +2308,7 @@ impl SqlToMirConverter {
&[final_node],
)
} else if let Ok(placeholders) = unsupported_placeholders.try_into() {
println!("named query to mir");
return Err(ReadySetError::UnsupportedPlaceholders { placeholders });
} else {
final_node
Expand Down
182 changes: 182 additions & 0 deletions readyset-sql-passes/src/adapter_rewrites/expand_tuples.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use std::mem;

use nom_sql::analysis::visit_mut::{self, VisitorMut};
use nom_sql::{BinaryOperator, Expr, ItemPlaceholder, Literal, SelectStatement};
use readyset_errors::{invalid_query_err, ReadySetError};

#[derive(Default)]
struct ExpandTuplesVisitor;

impl<'ast> VisitorMut<'ast> for ExpandTuplesVisitor {
type Error = ReadySetError;

// TODO ethan we need to make sure this only touches the where clause; seems like visit_expr is
// called in HAVING clauses as well
fn visit_expr(&mut self, expression: &'ast mut Expr) -> Result<(), Self::Error> {
match expression {
Expr::BinaryOp {
// TODO ethan test that using ROW(...) works in actual postgres/mysql
lhs: box Expr::Row {
exprs: lhs_exprs, ..
},
op: BinaryOperator::Equal,
rhs: box Expr::Row {
exprs: rhs_exprs, ..
},
} => {
debug_assert!(!lhs_exprs.is_empty());
debug_assert!(!rhs_exprs.is_empty());

if lhs_exprs.len() == rhs_exprs.len() {
if !lhs_exprs.is_empty() {
let last_expr = Expr::BinaryOp {
lhs: Box::new(lhs_exprs.pop().unwrap()),
op: BinaryOperator::Equal,
rhs: Box::new(rhs_exprs.pop().unwrap()),
};
let lhs_iter = lhs_exprs.iter_mut();
let rhs_iter = rhs_exprs.iter_mut();

// create new expr
// iterate over zipped lhs and rhs, anding everything tg
let new_expression = lhs_iter
.zip(rhs_iter)
.map(|(lhs, rhs)| Expr::BinaryOp {
lhs: Box::new(lhs.take()),
op: BinaryOperator::Equal,
rhs: Box::new(rhs.take()),
})
.rfold(last_expr, |acc, expr| Expr::BinaryOp {
lhs: Box::new(expr),
op: BinaryOperator::And,
rhs: Box::new(acc),
});
let _ = mem::replace(expression, new_expression);
}
} else {
return Err(invalid_query_err!(
"Cannot compare row expressions of unequal lengths"
));
}
}
// TODO ethan test with empty Row; also do same for above
Expr::BinaryOp {
lhs: box Expr::Row {
exprs: lhs_exprs, ..
},
op: BinaryOperator::Equal,
rhs: box Expr::Literal(Literal::Placeholder(_)),
} if lhs_exprs.iter().all(|expr| matches!(expr, Expr::Column(_))) => {
let last_expr = lhs_exprs.pop().unwrap();
let lhs_iter = lhs_exprs.iter_mut();

let last_expr = Expr::BinaryOp {
lhs: Box::new(last_expr),
op: BinaryOperator::Equal,
rhs: Box::new(Expr::Literal(Literal::Placeholder(
ItemPlaceholder::QuestionMark,
))),
};

let new_expression = lhs_iter
.map(|lhs| Expr::BinaryOp {
lhs: Box::new(lhs.take()),
op: BinaryOperator::Equal,
rhs: Box::new(Expr::Literal(Literal::Placeholder(
ItemPlaceholder::QuestionMark,
))),
})
.rfold(last_expr, |acc, expr| Expr::BinaryOp {
lhs: Box::new(expr),
op: BinaryOperator::And,
rhs: Box::new(acc),
});
let _ = mem::replace(expression, new_expression);
}
_ => (),
};

visit_mut::walk_expr(self, expression)?;
Ok(())
}
}

pub(super) fn expand_tuples(query: &mut SelectStatement) {
let mut visitor = ExpandTuplesVisitor;
visitor.visit_select_statement(query).unwrap();
}

#[cfg(test)]
mod tests {
use nom_sql::{Dialect, DialectDisplay};

use super::*;

const DIALECT: Dialect = Dialect::PostgreSQL;

fn try_parse_select_statement(q: &str) -> Result<SelectStatement, String> {
nom_sql::parse_select_statement(DIALECT, q)
}

fn parse_select_statement(q: &str) -> SelectStatement {
try_parse_select_statement(q).unwrap()
}

fn assert_expected(query: &str, expected: &str) {
let mut actual = parse_select_statement(query);
super::expand_tuples(&mut actual);

assert_eq!(
expected,
actual.where_clause.unwrap().display(DIALECT).to_string(),
);
}

#[test]
fn test_placeholder() {
assert_expected(
"SELECT * FROM t WHERE (w, x) = $1",
r#"(("w" = ?) AND ("x" = ?))"#,
);
}

#[test]
fn test_placeholder_many_columns() {
assert_expected(
"SELECT * FROM t WHERE (w, x, y, z) = $1",
r#"(("w" = ?) AND (("x" = ?) AND (("y" = ?) AND ("z" = ?))))"#,
);
}

#[test]
fn test_placeholder_row_syntax() {
assert_expected(
"SELECT * FROM t WHERE ROW(w, x, y, z) = $1",
r#"(("w" = ?) AND (("x" = ?) AND (("y" = ?) AND ("z" = ?))))"#,
);
}

#[test]
fn test_tuple_equal_tuple() {
assert_expected(
"SELECT * FROM t WHERE (w, x) = (1, 2)",
r#"(("w" = 1) AND ("x" = 2))"#,
);
}

#[test]
fn test_tuple_equal_tuple_many_columns() {
assert_expected(
"SELECT * FROM t WHERE (w, x, y, z) = (1, 2, 3, 4)",
r#"(("w" = 1) AND (("x" = 2) AND (("y" = 3) AND ("z" = 4))))"#,
);
}

#[test]
fn test_tuple_equal_tuple_row_syntax() {
assert_expected(
"SELECT * FROM t WHERE ROW(w, x, y, z) = ROW(1, 2, 3, 4)",
r#"(("w" = 1) AND (("x" = 2) AND (("y" = 3) AND ("z" = 4))))"#,
);
}
}
Loading

0 comments on commit 28bab0d

Please sign in to comment.