Skip to content

Commit

Permalink
Dynamically re-initialize schemas and tables
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Dec 5, 2024
1 parent 2ad3b34 commit e3285d8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 32 deletions.
2 changes: 1 addition & 1 deletion scripts/install.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

VERSION="0.17.1"
VERSION="0.18.0"

# Detect OS and architecture
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
Expand Down
2 changes: 1 addition & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

const VERSION = "0.17.1"
const VERSION = "0.18.0"

func main() {
config := LoadConfig()
Expand Down
81 changes: 51 additions & 30 deletions src/query_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (

type QueryHandler struct {
duckdb *Duckdb
icebergReader *IcebergReader
selectRemapper *SelectRemapper
config *Config
}
Expand Down Expand Up @@ -156,36 +157,16 @@ func (nullArray NullArray) String() string {
////////////////////////////////////////////////////////////////////////////////////////////////////

func NewQueryHandler(config *Config, duckdb *Duckdb, icebergReader *IcebergReader) *QueryHandler {
ctx := context.Background()

schemas, err := icebergReader.Schemas()
PanicIfError(err)
for _, schema := range schemas {
_, err := duckdb.ExecContext(
ctx,
"CREATE SCHEMA IF NOT EXISTS \"$schema\"",
map[string]string{"schema": schema},
)
PanicIfError(err)
}

icebergSchemaTables, err := icebergReader.SchemaTables()
PanicIfError(err)
for _, icebergSchemaTable := range icebergSchemaTables {
metadataFilePath := icebergReader.MetadataFilePath(icebergSchemaTable)
_, err := duckdb.ExecContext(
ctx,
"CREATE VIEW IF NOT EXISTS \"$schema\".\"$table\" AS SELECT * FROM iceberg_scan('$metadataFilePath', skip_schema_inference = true)",
map[string]string{"schema": icebergSchemaTable.Schema, "table": icebergSchemaTable.Table, "metadataFilePath": metadataFilePath},
)
PanicIfError(err)
}

return &QueryHandler{
queryHandler := &QueryHandler{
duckdb: duckdb,
icebergReader: icebergReader,
selectRemapper: &SelectRemapper{config: config, icebergReader: icebergReader},
config: config,
}

queryHandler.createSchemasAndViews()

return queryHandler
}

func (queryHandler *QueryHandler) HandleQuery(originalQuery string) ([]pgproto3.Message, error) {
Expand All @@ -197,14 +178,25 @@ func (queryHandler *QueryHandler) HandleQuery(originalQuery string) ([]pgproto3.

rows, err := queryHandler.duckdb.QueryContext(context.Background(), query)
if err != nil {
LogError(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
errorMessage := err.Error()

if err.Error() == "Binder Error: UNNEST requires a single list as input" {
if errorMessage == "Binder Error: UNNEST requires a single list as input" {
// https://github.com/duckdb/duckdb/issues/11693
LogWarn(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
return queryHandler.HandleQuery(FALLBACK_SQL_QUERY)
} else if strings.HasPrefix(errorMessage, "Catalog Error: Table with name ") && strings.Contains(errorMessage, " does not exist!") {
// Re-fetch Iceberg tables and try again
LogWarn(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
queryHandler.createSchemasAndViews()
rows, err = queryHandler.duckdb.QueryContext(context.Background(), query)
if err != nil {
LogError(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
return nil, err
}
} else {
LogError(queryHandler.config, "Couldn't handle query via DuckDB:", query+"\n"+err.Error())
return nil, err
}

return nil, err
}
defer rows.Close()

Expand Down Expand Up @@ -315,6 +307,35 @@ func (queryHandler *QueryHandler) HandleExecuteQuery(message *pgproto3.Execute,
return queryHandler.rowsToDataMessages(preparedStatement.Rows, preparedStatement.Query)
}

func (queryHandler *QueryHandler) createSchemasAndViews() {
LogInfo(queryHandler.config, "Initializing schemas and tables...")
ctx := context.Background()
schemas, err := queryHandler.icebergReader.Schemas()
PanicIfError(err)

for _, schema := range schemas {
_, err := queryHandler.duckdb.ExecContext(
ctx,
"CREATE SCHEMA IF NOT EXISTS \"$schema\"",
map[string]string{"schema": schema},
)
PanicIfError(err)
}

icebergSchemaTables, err := queryHandler.icebergReader.SchemaTables()
PanicIfError(err)

for _, icebergSchemaTable := range icebergSchemaTables {
metadataFilePath := queryHandler.icebergReader.MetadataFilePath(icebergSchemaTable)
_, err := queryHandler.duckdb.ExecContext(
ctx,
"CREATE VIEW IF NOT EXISTS \"$schema\".\"$table\" AS SELECT * FROM iceberg_scan('$metadataFilePath', skip_schema_inference = true)",
map[string]string{"schema": icebergSchemaTable.Schema, "table": icebergSchemaTable.Table, "metadataFilePath": metadataFilePath},
)
PanicIfError(err)
}
}

func (queryHandler *QueryHandler) rowsToDescriptionMessages(rows *sql.Rows, query string) ([]pgproto3.Message, error) {
cols, err := rows.ColumnTypes()
if err != nil {
Expand Down

0 comments on commit e3285d8

Please sign in to comment.