Skip to content

Commit

Permalink
Scan Iceberg tables lazily
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Dec 10, 2024
1 parent ffff248 commit 91a07c1
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 36 deletions.
2 changes: 1 addition & 1 deletion scripts/install.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

VERSION="0.22.0"
VERSION="0.22.1"

# Detect OS and architecture
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ func NewIcebergReader(config *Config) *IcebergReader {
}

func (reader *IcebergReader) Schemas() (icebergSchemas []string, err error) {
LogDebug(reader.config, "Reading Iceberg schemas...")
return reader.storage.IcebergSchemas()
}

func (reader *IcebergReader) SchemaTables() (icebergSchemaTables []SchemaTable, err error) {
LogDebug(reader.config, "Reading Iceberg tables...")
return reader.storage.IcebergSchemaTables()
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

const VERSION = "0.22.0"
const VERSION = "0.22.1"

func main() {
config := LoadConfig()
Expand Down
27 changes: 2 additions & 25 deletions src/query_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func NewQueryHandler(config *Config, duckdb *Duckdb, icebergReader *IcebergReade
config: config,
}

queryHandler.createSchemasAndViews()
queryHandler.createSchemas()

return queryHandler
}
Expand All @@ -184,15 +184,6 @@ func (queryHandler *QueryHandler) HandleQuery(originalQuery string) ([]pgproto3.
// https://github.com/duckdb/duckdb/issues/11693
LogWarn(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
return queryHandler.HandleQuery(FALLBACK_SQL_QUERY)
} else if strings.HasPrefix(errorMessage, "Catalog Error: Table with name ") && strings.Contains(errorMessage, " does not exist!") {
// Re-fetch Iceberg tables and try again
LogWarn(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
queryHandler.createSchemasAndViews()
rows, err = queryHandler.duckdb.QueryContext(context.Background(), query)
if err != nil {
LogError(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
return nil, err
}
} else {
LogError(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
return nil, err
Expand Down Expand Up @@ -307,8 +298,7 @@ func (queryHandler *QueryHandler) HandleExecuteQuery(message *pgproto3.Execute,
return queryHandler.rowsToDataMessages(preparedStatement.Rows, preparedStatement.Query)
}

func (queryHandler *QueryHandler) createSchemasAndViews() {
LogInfo(queryHandler.config, "Initializing schemas and tables...")
func (queryHandler *QueryHandler) createSchemas() {
ctx := context.Background()
schemas, err := queryHandler.icebergReader.Schemas()
PanicIfError(err)
Expand All @@ -321,19 +311,6 @@ func (queryHandler *QueryHandler) createSchemasAndViews() {
)
PanicIfError(err)
}

icebergSchemaTables, err := queryHandler.icebergReader.SchemaTables()
PanicIfError(err)

for _, icebergSchemaTable := range icebergSchemaTables {
metadataFilePath := queryHandler.icebergReader.MetadataFilePath(icebergSchemaTable)
_, err := queryHandler.duckdb.ExecContext(
ctx,
"CREATE VIEW IF NOT EXISTS \"$schema\".\"$table\" AS SELECT * FROM iceberg_scan('$metadataFilePath', skip_schema_inference = true)",
map[string]string{"schema": icebergSchemaTable.Schema, "table": icebergSchemaTable.Table, "metadataFilePath": metadataFilePath},
)
PanicIfError(err)
}
}

func (queryHandler *QueryHandler) rowsToDescriptionMessages(rows *sql.Rows, query string) ([]pgproto3.Message, error) {
Expand Down
21 changes: 21 additions & 0 deletions src/query_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"reflect"
"strings"
"testing"

"github.com/jackc/pgx/v5/pgproto3"
Expand Down Expand Up @@ -431,6 +432,26 @@ func TestHandleQuery(t *testing.T) {

})
}

t.Run("Returns an error if a table does not exist", func(t *testing.T) {
queryHandler := initQueryHandler()

_, err := queryHandler.HandleQuery("SELECT * FROM non_existent_table")

if err == nil {
t.Errorf("Expected an error, got nil")
}

expectedErrorMessage := strings.Join([]string{
"Catalog Error: Table with name non_existent_table does not exist!",
"Did you mean \"sqlite_temp_master\"?",
"LINE 1: SELECT * FROM non_existent_table",
" ^",
}, "\n")
if err.Error() != expectedErrorMessage {
t.Errorf("Expected the error to be '"+expectedErrorMessage+"', got %v", err.Error())
}
})
}

func TestHandleParseQuery(t *testing.T) {
Expand Down
38 changes: 37 additions & 1 deletion src/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ func (queryParser *QueryParser) IsTableFromPgCatalog(schemaTable SchemaTable) bo

// information_schema.tables
func (queryParser *QueryParser) IsInformationSchemaTablesTable(schemaTable SchemaTable) bool {
return schemaTable.Schema == PG_SCHEMA_INFORMATION_SCHEMA && schemaTable.Table == PG_TABLE_TABLES
return queryParser.IsTableFromInformationSchema(schemaTable) && schemaTable.Table == PG_TABLE_TABLES
}

// information_schema.tables -> VALUES(values...) t(columns...)
Expand Down Expand Up @@ -767,6 +767,13 @@ func (queryParser *QueryParser) MakeInformationSchemaTablesNode(database string,

////////////////////////////////////////////////////////////////////////////////

// Other system pg_* tables
func (queryParser *QueryParser) IsTableFromInformationSchema(schemaTable SchemaTable) bool {
return schemaTable.Schema == PG_SCHEMA_INFORMATION_SCHEMA
}

////////////////////////////////////////////////////////////////////////////////

// pg_catalog.pg_get_keywords()
func (queryParser *QueryParser) IsPgGetKeywordsFunction(schema string, functionName string) bool {
return schema == PG_SCHEMA_PG_CATALOG && functionName == PG_FUNCTION_PG_GET_KEYWORDS
Expand Down Expand Up @@ -808,6 +815,35 @@ func (queryParser *QueryParser) MakePgGetKeywordsNode() *pgQuery.Node {

////////////////////////////////////////////////////////////////////////////////

// iceberg.table -> FROM iceberg_scan('path', skip_schema_inference = true)
func (queryParser *QueryParser) MakeIcebergTableNode(tablePath string) *pgQuery.Node {
return pgQuery.MakeSimpleRangeFunctionNode([]*pgQuery.Node{
pgQuery.MakeListNode([]*pgQuery.Node{
pgQuery.MakeFuncCallNode(
[]*pgQuery.Node{
pgQuery.MakeStrNode("iceberg_scan"),
},
[]*pgQuery.Node{
pgQuery.MakeAConstStrNode(
tablePath,
0,
),
pgQuery.MakeAExprNode(
pgQuery.A_Expr_Kind_AEXPR_OP,
[]*pgQuery.Node{pgQuery.MakeStrNode("=")},
pgQuery.MakeColumnRefNode([]*pgQuery.Node{pgQuery.MakeStrNode("skip_schema_inference")}, 0),
queryParser.MakeAConstBoolNode(true),
0,
),
},
0,
),
}),
})
}

////////////////////////////////////////////////////////////////////////////////

// quote_ident()
func (queryParser *QueryParser) IsQuoteIdentFunction(functionName string) bool {
return functionName == PG_FUNCTION_NAME_QUOTE_INDENT
Expand Down
56 changes: 48 additions & 8 deletions src/select_table_remapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@ import (
pgQuery "github.com/pganalyze/pg_query_go/v5"
)

const (
PG_SCHEMA_PUBLIC = "public"
)

type SelectTableRemapper struct {
queryParser *QueryParser
icebergReader *IcebergReader
config *Config
queryParser *QueryParser
icebergSchemaTables []SchemaTable
icebergReader *IcebergReader
config *Config
}

func NewSelectTableRemapper(config *Config, queryParser *QueryParser, icebergReader *IcebergReader) *SelectTableRemapper {
icebergSchemaTables, err := icebergReader.SchemaTables()
PanicIfError(err)

return &SelectTableRemapper{
queryParser: queryParser,
icebergReader: icebergReader,
config: config,
queryParser: queryParser,
icebergSchemaTables: icebergSchemaTables,
icebergReader: icebergReader,
config: config,
}
}

Expand Down Expand Up @@ -54,11 +63,42 @@ func (remapper *SelectTableRemapper) RemapTable(node *pgQuery.Node) *pgQuery.Nod
return remapper.overrideTable(node, tableNode)
}

// iceberg.table
return node
// information_schema.* other system tables
if parser.IsTableFromInformationSchema(schemaTable) {
return node
}

// iceberg.table -> FROM iceberg_scan('iceberg/schema/table/metadata/v1.metadata.json', skip_schema_inference = true)
if schemaTable.Schema == "" {
schemaTable.Schema = PG_SCHEMA_PUBLIC
}
if !remapper.icebergSchemaTableExists(schemaTable) {
remapper.reloadIceberSchemaTables()
if !remapper.icebergSchemaTableExists(schemaTable) {
return node // Let it return "Catalog Error: Table with name _ does not exist!"
}
}
icebergPath := remapper.icebergReader.MetadataFilePath(schemaTable)
tableNode := parser.MakeIcebergTableNode(icebergPath)
return remapper.overrideTable(node, tableNode)
}

func (remapper *SelectTableRemapper) overrideTable(node *pgQuery.Node, fromClause *pgQuery.Node) *pgQuery.Node {
node = fromClause
return node
}

func (remapper *SelectTableRemapper) reloadIceberSchemaTables() {
icebergSchemaTables, err := remapper.icebergReader.SchemaTables()
PanicIfError(err)
remapper.icebergSchemaTables = icebergSchemaTables
}

func (remapper *SelectTableRemapper) icebergSchemaTableExists(schemaTable SchemaTable) bool {
for _, icebergSchemaTable := range remapper.icebergSchemaTables {
if icebergSchemaTable == schemaTable {
return true
}
}
return false
}

0 comments on commit 91a07c1

Please sign in to comment.