From 91a07c106d874428bf16d9cc56910919110f75b6 Mon Sep 17 00:00:00 2001 From: exAspArk Date: Tue, 10 Dec 2024 11:22:02 -0500 Subject: [PATCH] Scan Iceberg tables lazily --- scripts/install.sh | 2 +- src/iceberg_reader.go | 2 ++ src/main.go | 2 +- src/query_handler.go | 27 ++--------------- src/query_handler_test.go | 21 ++++++++++++++ src/query_parser.go | 38 +++++++++++++++++++++++- src/select_table_remapper.go | 56 ++++++++++++++++++++++++++++++------ 7 files changed, 112 insertions(+), 36 deletions(-) diff --git a/scripts/install.sh b/scripts/install.sh index a7df640..08a29cd 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -1,6 +1,6 @@ #!/bin/bash -VERSION="0.22.0" +VERSION="0.22.1" # Detect OS and architecture OS=$(uname -s | tr '[:upper:]' '[:lower:]') diff --git a/src/iceberg_reader.go b/src/iceberg_reader.go index 9205b28..c5f536a 100644 --- a/src/iceberg_reader.go +++ b/src/iceberg_reader.go @@ -11,10 +11,12 @@ func NewIcebergReader(config *Config) *IcebergReader { } func (reader *IcebergReader) Schemas() (icebergSchemas []string, err error) { + LogDebug(reader.config, "Reading Iceberg schemas...") return reader.storage.IcebergSchemas() } func (reader *IcebergReader) SchemaTables() (icebergSchemaTables []SchemaTable, err error) { + LogDebug(reader.config, "Reading Iceberg tables...") return reader.storage.IcebergSchemaTables() } diff --git a/src/main.go b/src/main.go index d1adfd2..faef59f 100644 --- a/src/main.go +++ b/src/main.go @@ -6,7 +6,7 @@ import ( "time" ) -const VERSION = "0.22.0" +const VERSION = "0.22.1" func main() { config := LoadConfig() diff --git a/src/query_handler.go b/src/query_handler.go index 97f9332..faa97e8 100644 --- a/src/query_handler.go +++ b/src/query_handler.go @@ -164,7 +164,7 @@ func NewQueryHandler(config *Config, duckdb *Duckdb, icebergReader *IcebergReade config: config, } - queryHandler.createSchemasAndViews() + queryHandler.createSchemas() return queryHandler } @@ -184,15 +184,6 @@ func (queryHandler *QueryHandler) HandleQuery(originalQuery string) ([]pgproto3. // https://github.com/duckdb/duckdb/issues/11693 LogWarn(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error()) return queryHandler.HandleQuery(FALLBACK_SQL_QUERY) - } else if strings.HasPrefix(errorMessage, "Catalog Error: Table with name ") && strings.Contains(errorMessage, " does not exist!") { - // Re-fetch Iceberg tables and try again - LogWarn(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error()) - queryHandler.createSchemasAndViews() - rows, err = queryHandler.duckdb.QueryContext(context.Background(), query) - if err != nil { - LogError(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error()) - return nil, err - } } else { LogError(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error()) return nil, err @@ -307,8 +298,7 @@ func (queryHandler *QueryHandler) HandleExecuteQuery(message *pgproto3.Execute, return queryHandler.rowsToDataMessages(preparedStatement.Rows, preparedStatement.Query) } -func (queryHandler *QueryHandler) createSchemasAndViews() { - LogInfo(queryHandler.config, "Initializing schemas and tables...") +func (queryHandler *QueryHandler) createSchemas() { ctx := context.Background() schemas, err := queryHandler.icebergReader.Schemas() PanicIfError(err) @@ -321,19 +311,6 @@ func (queryHandler *QueryHandler) createSchemasAndViews() { ) PanicIfError(err) } - - icebergSchemaTables, err := queryHandler.icebergReader.SchemaTables() - PanicIfError(err) - - for _, icebergSchemaTable := range icebergSchemaTables { - metadataFilePath := queryHandler.icebergReader.MetadataFilePath(icebergSchemaTable) - _, err := queryHandler.duckdb.ExecContext( - ctx, - "CREATE VIEW IF NOT EXISTS \"$schema\".\"$table\" AS SELECT * FROM iceberg_scan('$metadataFilePath', skip_schema_inference = true)", - map[string]string{"schema": icebergSchemaTable.Schema, "table": icebergSchemaTable.Table, "metadataFilePath": metadataFilePath}, - ) - PanicIfError(err) - } } func (queryHandler *QueryHandler) rowsToDescriptionMessages(rows *sql.Rows, query string) ([]pgproto3.Message, error) { diff --git a/src/query_handler_test.go b/src/query_handler_test.go index 1ed5974..cc88dc6 100644 --- a/src/query_handler_test.go +++ b/src/query_handler_test.go @@ -2,6 +2,7 @@ package main import ( "reflect" + "strings" "testing" "github.com/jackc/pgx/v5/pgproto3" @@ -431,6 +432,26 @@ func TestHandleQuery(t *testing.T) { }) } + + t.Run("Returns an error if a table does not exist", func(t *testing.T) { + queryHandler := initQueryHandler() + + _, err := queryHandler.HandleQuery("SELECT * FROM non_existent_table") + + if err == nil { + t.Errorf("Expected an error, got nil") + } + + expectedErrorMessage := strings.Join([]string{ + "Catalog Error: Table with name non_existent_table does not exist!", + "Did you mean \"sqlite_temp_master\"?", + "LINE 1: SELECT * FROM non_existent_table", + " ^", + }, "\n") + if err.Error() != expectedErrorMessage { + t.Errorf("Expected the error to be '"+expectedErrorMessage+"', got %v", err.Error()) + } + }) } func TestHandleParseQuery(t *testing.T) { diff --git a/src/query_parser.go b/src/query_parser.go index 8e167bd..1421fc2 100644 --- a/src/query_parser.go +++ b/src/query_parser.go @@ -734,7 +734,7 @@ func (queryParser *QueryParser) IsTableFromPgCatalog(schemaTable SchemaTable) bo // information_schema.tables func (queryParser *QueryParser) IsInformationSchemaTablesTable(schemaTable SchemaTable) bool { - return schemaTable.Schema == PG_SCHEMA_INFORMATION_SCHEMA && schemaTable.Table == PG_TABLE_TABLES + return queryParser.IsTableFromInformationSchema(schemaTable) && schemaTable.Table == PG_TABLE_TABLES } // information_schema.tables -> VALUES(values...) t(columns...) @@ -767,6 +767,13 @@ func (queryParser *QueryParser) MakeInformationSchemaTablesNode(database string, //////////////////////////////////////////////////////////////////////////////// +// Other system pg_* tables +func (queryParser *QueryParser) IsTableFromInformationSchema(schemaTable SchemaTable) bool { + return schemaTable.Schema == PG_SCHEMA_INFORMATION_SCHEMA +} + +//////////////////////////////////////////////////////////////////////////////// + // pg_catalog.pg_get_keywords() func (queryParser *QueryParser) IsPgGetKeywordsFunction(schema string, functionName string) bool { return schema == PG_SCHEMA_PG_CATALOG && functionName == PG_FUNCTION_PG_GET_KEYWORDS @@ -808,6 +815,35 @@ func (queryParser *QueryParser) MakePgGetKeywordsNode() *pgQuery.Node { //////////////////////////////////////////////////////////////////////////////// +// iceberg.table -> FROM iceberg_scan('path', skip_schema_inference = true) +func (queryParser *QueryParser) MakeIcebergTableNode(tablePath string) *pgQuery.Node { + return pgQuery.MakeSimpleRangeFunctionNode([]*pgQuery.Node{ + pgQuery.MakeListNode([]*pgQuery.Node{ + pgQuery.MakeFuncCallNode( + []*pgQuery.Node{ + pgQuery.MakeStrNode("iceberg_scan"), + }, + []*pgQuery.Node{ + pgQuery.MakeAConstStrNode( + tablePath, + 0, + ), + pgQuery.MakeAExprNode( + pgQuery.A_Expr_Kind_AEXPR_OP, + []*pgQuery.Node{pgQuery.MakeStrNode("=")}, + pgQuery.MakeColumnRefNode([]*pgQuery.Node{pgQuery.MakeStrNode("skip_schema_inference")}, 0), + queryParser.MakeAConstBoolNode(true), + 0, + ), + }, + 0, + ), + }), + }) +} + +//////////////////////////////////////////////////////////////////////////////// + // quote_ident() func (queryParser *QueryParser) IsQuoteIdentFunction(functionName string) bool { return functionName == PG_FUNCTION_NAME_QUOTE_INDENT diff --git a/src/select_table_remapper.go b/src/select_table_remapper.go index 20f2aac..e07570c 100644 --- a/src/select_table_remapper.go +++ b/src/select_table_remapper.go @@ -4,17 +4,26 @@ import ( pgQuery "github.com/pganalyze/pg_query_go/v5" ) +const ( + PG_SCHEMA_PUBLIC = "public" +) + type SelectTableRemapper struct { - queryParser *QueryParser - icebergReader *IcebergReader - config *Config + queryParser *QueryParser + icebergSchemaTables []SchemaTable + icebergReader *IcebergReader + config *Config } func NewSelectTableRemapper(config *Config, queryParser *QueryParser, icebergReader *IcebergReader) *SelectTableRemapper { + icebergSchemaTables, err := icebergReader.SchemaTables() + PanicIfError(err) + return &SelectTableRemapper{ - queryParser: queryParser, - icebergReader: icebergReader, - config: config, + queryParser: queryParser, + icebergSchemaTables: icebergSchemaTables, + icebergReader: icebergReader, + config: config, } } @@ -54,11 +63,42 @@ func (remapper *SelectTableRemapper) RemapTable(node *pgQuery.Node) *pgQuery.Nod return remapper.overrideTable(node, tableNode) } - // iceberg.table - return node + // information_schema.* other system tables + if parser.IsTableFromInformationSchema(schemaTable) { + return node + } + + // iceberg.table -> FROM iceberg_scan('iceberg/schema/table/metadata/v1.metadata.json', skip_schema_inference = true) + if schemaTable.Schema == "" { + schemaTable.Schema = PG_SCHEMA_PUBLIC + } + if !remapper.icebergSchemaTableExists(schemaTable) { + remapper.reloadIceberSchemaTables() + if !remapper.icebergSchemaTableExists(schemaTable) { + return node // Let it return "Catalog Error: Table with name _ does not exist!" + } + } + icebergPath := remapper.icebergReader.MetadataFilePath(schemaTable) + tableNode := parser.MakeIcebergTableNode(icebergPath) + return remapper.overrideTable(node, tableNode) } func (remapper *SelectTableRemapper) overrideTable(node *pgQuery.Node, fromClause *pgQuery.Node) *pgQuery.Node { node = fromClause return node } + +func (remapper *SelectTableRemapper) reloadIceberSchemaTables() { + icebergSchemaTables, err := remapper.icebergReader.SchemaTables() + PanicIfError(err) + remapper.icebergSchemaTables = icebergSchemaTables +} + +func (remapper *SelectTableRemapper) icebergSchemaTableExists(schemaTable SchemaTable) bool { + for _, icebergSchemaTable := range remapper.icebergSchemaTables { + if icebergSchemaTable == schemaTable { + return true + } + } + return false +}