From 83cc9bd142a35e8928746c2b842acbb0ed320c7a Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 14 Oct 2024 20:41:40 +0800 Subject: [PATCH] chore: Fix rebase --- src/hooks/utility.rs | 51 ++-------------------------- src/hooks/utility/view.rs | 71 +++++++++++++++++++++++++++++++++++++++ tests/scan.rs | 24 ++++--------- 3 files changed, 79 insertions(+), 67 deletions(-) create mode 100644 src/hooks/utility/view.rs diff --git a/src/hooks/utility.rs b/src/hooks/utility.rs index c38762a1..ee732367 100644 --- a/src/hooks/utility.rs +++ b/src/hooks/utility.rs @@ -19,23 +19,20 @@ #![allow(deprecated)] mod explain; mod prepare; +mod view; use std::ptr::null_mut; use anyhow::{bail, Result}; use pgrx::{pg_sys, AllocatedByRust, HookResult, PgBox}; use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser}; -use std::ops::ControlFlow; +use view::view_query; use pg_sys::NodeTag; -use pgrx::*; -use sqlparser::ast::visit_relations; use explain::explain_query; use prepare::*; -use crate::duckdb::connection::{execute, view_exists}; - use super::query::*; type ProcessUtilityHook = fn( @@ -150,50 +147,6 @@ fn is_support_utility(stmt_type: NodeTag) -> bool { || stmt_type == pg_sys::NodeTag::T_ExecuteStmt } -fn view_query(query_string: &core::ffi::CStr) -> Result { - // Use the current scheme if the schema is not provided in the query. - let current_schema = get_postgres_current_schema(); - // Set DuckDB search path according search path in Postgres - set_search_path_by_pg()?; - - let dialect = PostgreSqlDialect {}; - let statements = Parser::parse_sql(&dialect, query_string.to_str()?)?; - // visit statements, capturing relations (table names) - let mut visited = vec![]; - - visit_relations(&statements, |relation| { - visited.push(relation.clone()); - ControlFlow::<()>::Continue(()) - }); - - for relation in visited.iter() { - let (schema_name, relation_name) = if relation.0.len() == 1 { - (current_schema.clone(), relation.0[0].to_string()) - } else if relation.0.len() == 2 { - (relation.0[0].to_string(), relation.0[1].to_string()) - } else if relation.0.len() == 3 { - // pg_analytics does not create view with database name now - error!( - "pg_analytics does not support creating view with database name: {}", - relation.0[0].to_string() - ); - } else { - bail!("unexpected relation name: {:?}", relation.0); - }; - - // If the table does not exist in DuckDB, do not push down the query to DuckDB - if !view_exists(&relation_name, &schema_name)? { - fallback_warning!(format!( - "{schema_name}.{relation_name} does not exist in DuckDB" - )); - return Ok(true); - } - } - // Push down the view creation query to DuckDB - execute(query_string.to_str()?, [])?; - Ok(true) -} - fn parse_query_from_utility_stmt(query_string: &core::ffi::CStr) -> Result { let query_string = query_string.to_str()?; diff --git a/src/hooks/utility/view.rs b/src/hooks/utility/view.rs new file mode 100644 index 00000000..8cee26b2 --- /dev/null +++ b/src/hooks/utility/view.rs @@ -0,0 +1,71 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use anyhow::{bail, Result}; +use sqlparser::{dialect::PostgreSqlDialect, parser::Parser}; +use std::ops::ControlFlow; + +use pgrx::*; +use sqlparser::ast::visit_relations; + +use crate::duckdb::connection::{execute, view_exists}; + +use super::{get_postgres_current_schema, set_search_path_by_pg}; + +pub fn view_query(query_string: &core::ffi::CStr) -> Result { + // Use the current scheme if the schema is not provided in the query. + let current_schema = get_postgres_current_schema(); + // Set DuckDB search path according search path in Postgres + set_search_path_by_pg()?; + + let dialect = PostgreSqlDialect {}; + let statements = Parser::parse_sql(&dialect, query_string.to_str()?)?; + // visit statements, capturing relations (table names) + let mut visited = vec![]; + + visit_relations(&statements, |relation| { + visited.push(relation.clone()); + ControlFlow::<()>::Continue(()) + }); + + for relation in visited.iter() { + let (schema_name, relation_name) = if relation.0.len() == 1 { + (current_schema.clone(), relation.0[0].to_string()) + } else if relation.0.len() == 2 { + (relation.0[0].to_string(), relation.0[1].to_string()) + } else if relation.0.len() == 3 { + // pg_analytics does not create view with database name now + error!( + "pg_analytics does not support creating view with database name: {}", + relation.0[0].to_string() + ); + } else { + bail!("unexpected relation name: {:?}", relation.0); + }; + + // If the table does not exist in DuckDB, do not push down the query to DuckDB + if !view_exists(&relation_name, &schema_name)? { + fallback_warning!(format!( + "{schema_name}.{relation_name} does not exist in DuckDB" + )); + return Ok(true); + } + } + // Push down the view creation query to DuckDB + execute(query_string.to_str()?, [])?; + Ok(true) +} diff --git a/tests/scan.rs b/tests/scan.rs index 73e754c2..2835fdf6 100644 --- a/tests/scan.rs +++ b/tests/scan.rs @@ -559,7 +559,6 @@ async fn test_executor_hook_search_path(mut conn: PgConnection, tempdir: TempDir Ok(()) } - #[rstest] async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection) -> Result<()> { NycTripsTable::setup().execute(&mut conn); @@ -572,9 +571,11 @@ async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection s3.create_bucket(S3_TRIPS_BUCKET).await?; s3.put_rows(S3_TRIPS_BUCKET, S3_TRIPS_KEY, &rows).await?; - let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap(); - writer.write(&stored_batch)?; - writer.close()?; + NycTripsTable::setup_s3_listing_fdw( + &s3.url.clone(), + &format!("s3://{S3_TRIPS_BUCKET}/{S3_TRIPS_KEY}"), + ) + .execute(&mut conn); r#"PREPARE test_query(int) AS SELECT count(*) FROM trips WHERE "VendorID" = $1;"# .execute(&mut conn); @@ -589,19 +590,6 @@ async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection assert!("EXECUTE test_query(3)".execute_result(&mut conn).is_err()); - // cannot fully pushdown to the DuckDB - "CREATE TABLE t1 (a int);".execute(&mut conn); - "INSERT INTO t1 VALUES (1);".execute(&mut conn); - r#" - CREATE VIEW primitive_join_view AS - SELECT * - FROM primitive - JOIN t1 ON t1.a = primitive.int32_col - "# - .execute(&mut conn); - - let res: (i32,) = "SELECT int32_col FROM primitive_join_view".fetch_one(&mut conn); - assert_eq!(res.0, 1); Ok(()) } @@ -695,4 +683,4 @@ async fn test_view_foreign_table(mut conn: PgConnection, tempdir: TempDir) -> Re let res: (i32,) = "SELECT int32_col FROM primitive_join_view".fetch_one(&mut conn); assert_eq!(res.0, 1); Ok(()) -} \ No newline at end of file +}