Skip to content

Commit

Permalink
chore: Fix rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Oct 14, 2024
1 parent 7183b7d commit 83cc9bd
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 67 deletions.
51 changes: 2 additions & 49 deletions src/hooks/utility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,20 @@
#![allow(deprecated)]
mod explain;
mod prepare;
mod view;

use std::ptr::null_mut;

use anyhow::{bail, Result};
use pgrx::{pg_sys, AllocatedByRust, HookResult, PgBox};
use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser};
use std::ops::ControlFlow;
use view::view_query;

use pg_sys::NodeTag;
use pgrx::*;
use sqlparser::ast::visit_relations;

use explain::explain_query;
use prepare::*;

use crate::duckdb::connection::{execute, view_exists};

use super::query::*;

type ProcessUtilityHook = fn(
Expand Down Expand Up @@ -150,50 +147,6 @@ fn is_support_utility(stmt_type: NodeTag) -> bool {
|| stmt_type == pg_sys::NodeTag::T_ExecuteStmt
}

fn view_query(query_string: &core::ffi::CStr) -> Result<bool> {
// Use the current scheme if the schema is not provided in the query.
let current_schema = get_postgres_current_schema();
// Set DuckDB search path according search path in Postgres
set_search_path_by_pg()?;

let dialect = PostgreSqlDialect {};
let statements = Parser::parse_sql(&dialect, query_string.to_str()?)?;
// visit statements, capturing relations (table names)
let mut visited = vec![];

visit_relations(&statements, |relation| {
visited.push(relation.clone());
ControlFlow::<()>::Continue(())
});

for relation in visited.iter() {
let (schema_name, relation_name) = if relation.0.len() == 1 {
(current_schema.clone(), relation.0[0].to_string())
} else if relation.0.len() == 2 {
(relation.0[0].to_string(), relation.0[1].to_string())
} else if relation.0.len() == 3 {
// pg_analytics does not create view with database name now
error!(
"pg_analytics does not support creating view with database name: {}",
relation.0[0].to_string()
);
} else {
bail!("unexpected relation name: {:?}", relation.0);
};

// If the table does not exist in DuckDB, do not push down the query to DuckDB
if !view_exists(&relation_name, &schema_name)? {
fallback_warning!(format!(
"{schema_name}.{relation_name} does not exist in DuckDB"
));
return Ok(true);
}
}
// Push down the view creation query to DuckDB
execute(query_string.to_str()?, [])?;
Ok(true)
}

fn parse_query_from_utility_stmt(query_string: &core::ffi::CStr) -> Result<String> {
let query_string = query_string.to_str()?;

Expand Down
71 changes: 71 additions & 0 deletions src/hooks/utility/view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2023-2024 Retake, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use anyhow::{bail, Result};
use sqlparser::{dialect::PostgreSqlDialect, parser::Parser};
use std::ops::ControlFlow;

use pgrx::*;
use sqlparser::ast::visit_relations;

use crate::duckdb::connection::{execute, view_exists};

use super::{get_postgres_current_schema, set_search_path_by_pg};

pub fn view_query(query_string: &core::ffi::CStr) -> Result<bool> {
// Use the current scheme if the schema is not provided in the query.
let current_schema = get_postgres_current_schema();
// Set DuckDB search path according search path in Postgres
set_search_path_by_pg()?;

let dialect = PostgreSqlDialect {};
let statements = Parser::parse_sql(&dialect, query_string.to_str()?)?;
// visit statements, capturing relations (table names)
let mut visited = vec![];

visit_relations(&statements, |relation| {
visited.push(relation.clone());
ControlFlow::<()>::Continue(())
});

for relation in visited.iter() {
let (schema_name, relation_name) = if relation.0.len() == 1 {
(current_schema.clone(), relation.0[0].to_string())
} else if relation.0.len() == 2 {
(relation.0[0].to_string(), relation.0[1].to_string())
} else if relation.0.len() == 3 {
// pg_analytics does not create view with database name now
error!(
"pg_analytics does not support creating view with database name: {}",
relation.0[0].to_string()
);
} else {
bail!("unexpected relation name: {:?}", relation.0);
};

// If the table does not exist in DuckDB, do not push down the query to DuckDB
if !view_exists(&relation_name, &schema_name)? {
fallback_warning!(format!(
"{schema_name}.{relation_name} does not exist in DuckDB"
));
return Ok(true);
}
}
// Push down the view creation query to DuckDB
execute(query_string.to_str()?, [])?;
Ok(true)
}
24 changes: 6 additions & 18 deletions tests/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ async fn test_executor_hook_search_path(mut conn: PgConnection, tempdir: TempDir
Ok(())
}


#[rstest]
async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection) -> Result<()> {
NycTripsTable::setup().execute(&mut conn);
Expand All @@ -572,9 +571,11 @@ async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection
s3.create_bucket(S3_TRIPS_BUCKET).await?;
s3.put_rows(S3_TRIPS_BUCKET, S3_TRIPS_KEY, &rows).await?;

let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap();
writer.write(&stored_batch)?;
writer.close()?;
NycTripsTable::setup_s3_listing_fdw(
&s3.url.clone(),
&format!("s3://{S3_TRIPS_BUCKET}/{S3_TRIPS_KEY}"),
)
.execute(&mut conn);

r#"PREPARE test_query(int) AS SELECT count(*) FROM trips WHERE "VendorID" = $1;"#
.execute(&mut conn);
Expand All @@ -589,19 +590,6 @@ async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection

assert!("EXECUTE test_query(3)".execute_result(&mut conn).is_err());

// cannot fully pushdown to the DuckDB
"CREATE TABLE t1 (a int);".execute(&mut conn);
"INSERT INTO t1 VALUES (1);".execute(&mut conn);
r#"
CREATE VIEW primitive_join_view AS
SELECT *
FROM primitive
JOIN t1 ON t1.a = primitive.int32_col
"#
.execute(&mut conn);

let res: (i32,) = "SELECT int32_col FROM primitive_join_view".fetch_one(&mut conn);
assert_eq!(res.0, 1);
Ok(())
}

Expand Down Expand Up @@ -695,4 +683,4 @@ async fn test_view_foreign_table(mut conn: PgConnection, tempdir: TempDir) -> Re
let res: (i32,) = "SELECT int32_col FROM primitive_join_view".fetch_one(&mut conn);
assert_eq!(res.0, 1);
Ok(())
}
}

0 comments on commit 83cc9bd

Please sign in to comment.