Skip to content

Commit

Permalink
model executor for postgres and mysql to duckdb (#6083)
Browse files Browse the repository at this point in the history
* model executor for postgres/mysql to duckdb

* fix postgres unit test

* table name is not guaranted to be present for all models

* review comments

* also allow inline variables

* also add dsn for postgres

* also add dsn for postgres

* Update runtime/drivers/duckdb/model_executor_sqlstore_self.go

Co-authored-by: Benjamin Egelund-Müller <[email protected]>

---------

Co-authored-by: Benjamin Egelund-Müller <[email protected]>
  • Loading branch information
k-anshul and begelundmuller committed Jan 6, 2025
1 parent 750d6a0 commit 7b5590c
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 9 deletions.
3 changes: 3 additions & 0 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ func (c *connection) AsModelExecutor(instanceID string, opts *drivers.ModelExecu
if f, ok := opts.InputHandle.AsFileStore(); ok && opts.InputConnector == "local_file" {
return &localFileToSelfExecutor{c, f}, true
}
if opts.InputHandle.Driver() == "mysql" || opts.InputHandle.Driver() == "postgres" {
return &sqlStoreToSelfExecutor{c}, true
}
if _, ok := opts.InputHandle.AsObjectStore(); ok {
return &objectStoreToSelfExecutor{c}, true
}
Expand Down
118 changes: 118 additions & 0 deletions runtime/drivers/duckdb/model_executor_sqlstore_self.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package duckdb

import (
"context"
"fmt"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/drivers/mysql"
"github.com/rilldata/rill/runtime/drivers/postgres"
)

type sqlStoreToSelfInputProps struct {
SQL string `mapstructure:"sql"`
DSN string `mapstructure:"dsn"`
DatabaseURL string `mapstructure:"database_url"`
}

func (p *sqlStoreToSelfInputProps) resolveDSN() string {
if p.DSN != "" {
return p.DSN
}
return p.DatabaseURL
}

func (p *sqlStoreToSelfInputProps) Validate() error {
if p.SQL == "" {
return fmt.Errorf("missing property 'sql'")
}
if p.DSN != "" && p.DatabaseURL != "" {
return fmt.Errorf("cannot set both 'dsn' and 'database_url'")
}
return nil
}

type sqlStoreToSelfExecutor struct {
c *connection
}

var _ drivers.ModelExecutor = &sqlStoreToSelfExecutor{}

func (e *sqlStoreToSelfExecutor) Concurrency(desired int) (int, bool) {
if desired > 1 {
return 0, false
}
return 1, true
}

func (e *sqlStoreToSelfExecutor) Execute(ctx context.Context, opts *drivers.ModelExecuteOptions) (*drivers.ModelResult, error) {
inputProps := &sqlStoreToSelfInputProps{}
if err := mapstructure.WeakDecode(opts.InputProperties, inputProps); err != nil {
return nil, fmt.Errorf("failed to parse input properties: %w", err)
}
if err := inputProps.Validate(); err != nil {
return nil, fmt.Errorf("invalid input properties: %w", err)
}

// Build the model executor options with updated input properties
clone := *opts
newInputProps, err := e.modelInputProperties(opts.ModelName, opts.InputConnector, opts.InputHandle, inputProps)
if err != nil {
return nil, err
}
clone.InputProperties = newInputProps
newOpts := &clone

// execute
executor := &selfToSelfExecutor{c: e.c}
return executor.Execute(ctx, newOpts)
}

func (e *sqlStoreToSelfExecutor) modelInputProperties(modelName, inputConnector string, inputHandle drivers.Handle, inputProps *sqlStoreToSelfInputProps) (map[string]any, error) {
m := &ModelInputProperties{}
dbName := fmt.Sprintf("%s__%s", modelName, inputConnector)
safeDBName := safeName(dbName)
userQuery, _ := strings.CutSuffix(inputProps.SQL, ";") // trim trailing semi colon
switch inputHandle.Driver() {
case "mysql":
dsn := inputProps.resolveDSN()
if dsn == "" {
// may be configured via a connector
var config *mysql.ConfigProperties
if err := mapstructure.Decode(inputHandle.Config(), &config); err != nil {
return nil, err
}
dsn = rewriteMySQLDSN(config.DSN)
}
if dsn == "" {
return nil, fmt.Errorf("must set `dsn` for models that transfer data from `mysql` to `duckdb`")
}
m.PreExec = fmt.Sprintf("INSTALL 'MYSQL'; LOAD 'MYSQL'; ATTACH %s AS %s (TYPE mysql, READ_ONLY)", safeSQLString(dsn), safeDBName)
m.SQL = fmt.Sprintf("SELECT * FROM mysql_query(%s, %s)", safeSQLString(dbName), safeSQLString(userQuery))
case "postgres":
dsn := inputProps.resolveDSN()
if dsn == "" {
// may be configured via a connector
var config *postgres.ConfigProperties
if err := mapstructure.Decode(inputHandle.Config(), &config); err != nil {
return nil, err
}
dsn = config.ResolveDSN()
}
if dsn == "" {
return nil, fmt.Errorf("must set `database_url` or `dsn` for models that transfer data from `postgres` to `duckdb`")
}
m.PreExec = fmt.Sprintf("INSTALL 'POSTGRES'; LOAD 'POSTGRES'; ATTACH %s AS %s (TYPE postgres, READ_ONLY)", safeSQLString(dsn), safeDBName)
m.SQL = fmt.Sprintf("SELECT * FROM postgres_query(%s, %s)", safeSQLString(dbName), safeSQLString(userQuery))
default:
return nil, fmt.Errorf("internal error: unsupported external database: %s", inputHandle.Driver())
}
m.PostExec = fmt.Sprintf("DETACH %s", safeDBName)
propsMap := make(map[string]any)
if err := mapstructure.Decode(m, &propsMap); err != nil {
return nil, err
}
return propsMap, nil
}
10 changes: 10 additions & 0 deletions runtime/drivers/duckdb/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,17 @@ func (c *connection) InsertTableAsSelect(ctx context.Context, name, sql string,

if opts.Strategy == drivers.IncrementalStrategyAppend {
err = db.MutateTable(ctx, name, func(ctx context.Context, conn *sqlx.Conn) error {
if opts.BeforeInsert != "" {
_, err := conn.ExecContext(ctx, opts.BeforeInsert)
if err != nil {
return err
}
}
_, err := conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s %s (%s\n)", safeSQLName(name), byNameClause, sql))
if opts.AfterInsert != "" {
_, afterInsertExecErr := conn.ExecContext(ctx, opts.AfterInsert)
return errors.Join(err, afterInsertExecErr)
}
return err
})
return c.checkErr(err)
Expand Down
75 changes: 68 additions & 7 deletions runtime/drivers/duckdb/transporter_mysql_to_duckDB_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package duckdb
package duckdb_test

import (
"context"
"database/sql"
"fmt"
"testing"
"time"

"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/storage"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"go.uber.org/zap"

"fmt"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
_ "github.com/rilldata/rill/runtime/drivers/duckdb"
_ "github.com/rilldata/rill/runtime/drivers/mysql"
)

var mysqlInitStmt = `
Expand Down Expand Up @@ -98,6 +99,10 @@ func TestMySQLToDuckDBTransfer(t *testing.T) {
t.Run("AllDataTypes", func(t *testing.T) {
allMySQLDataTypesTest(t, db, dsn)
})

t.Run("model_executor_mysql_to_duckDB", func(t *testing.T) {
mysqlToDuckDB(t, fmt.Sprintf("host=%s port=%v database=mydb user=myuser password=mypassword", host, port.Int()))
})
}

func allMySQLDataTypesTest(t *testing.T, db *sql.DB, dsn string) {
Expand All @@ -109,7 +114,11 @@ func allMySQLDataTypesTest(t *testing.T, db *sql.DB, dsn string) {
require.NoError(t, err)
olap, _ := to.AsOLAP("")

tr := newDuckDBToDuckDB(to.(*connection), "mysql", zap.NewNop())
inputHandle, err := drivers.Open("mysql", "default", map[string]any{"dsn": dsn}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

tr, ok := to.AsTransporter(inputHandle, to)
require.True(t, ok)
err = tr.Transfer(ctx, map[string]any{"sql": "select * from all_data_types_table;", "db": dsn}, map[string]any{"table": "sink"}, &drivers.TransferOptions{})
require.NoError(t, err)
res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"})
Expand All @@ -123,3 +132,55 @@ func allMySQLDataTypesTest(t *testing.T, db *sql.DB, dsn string) {
require.NoError(t, res.Close())
require.NoError(t, to.Close())
}

func mysqlToDuckDB(t *testing.T, dsn string) {
duckDB, err := drivers.Open("duckdb", "default", map[string]any{"data_dir": t.TempDir()}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

inputHandle, err := drivers.Open("mysql", "default", map[string]any{"dsn": dsn}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

opts := &drivers.ModelExecutorOptions{
InputHandle: inputHandle,
InputConnector: "mysql",
OutputHandle: duckDB,
OutputConnector: "duckdb",
Env: &drivers.ModelEnv{
AllowHostAccess: false,
StageChanges: true,
},
PreliminaryInputProperties: map[string]any{
"sql": "SELECT * FROM all_data_types_table;",
"dsn": dsn,
},
PreliminaryOutputProperties: map[string]any{
"table": "sink",
},
}

me, ok := duckDB.AsModelExecutor("default", opts)
require.True(t, ok)

execOpts := &drivers.ModelExecuteOptions{
ModelExecutorOptions: opts,
InputProperties: opts.PreliminaryInputProperties,
OutputProperties: opts.PreliminaryOutputProperties,
}
_, err = me.Execute(context.Background(), execOpts)
require.NoError(t, err)

olap, ok := duckDB.AsOLAP("default")
require.True(t, ok)

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"})
require.NoError(t, err)
for res.Next() {
var count int
err = res.Rows.Scan(&count)
require.NoError(t, err)
require.Equal(t, 2, count)
}
require.NoError(t, res.Close())
// TODO : verify this is a table once information_schema is fixed
require.NoError(t, duckDB.Close())
}
88 changes: 86 additions & 2 deletions runtime/drivers/duckdb/transporter_postgres_to_duckDB_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package duckdb
package duckdb_test

import (
"context"
Expand All @@ -14,6 +14,7 @@ import (

// Load postgres driver
_ "github.com/jackc/pgx/v5/stdlib"
_ "github.com/rilldata/rill/runtime/drivers/duckdb"
_ "github.com/rilldata/rill/runtime/drivers/postgres"
)

Expand Down Expand Up @@ -61,6 +62,7 @@ func TestTransfer(t *testing.T) {
defer db.Close()

t.Run("AllDataTypes", func(t *testing.T) { allDataTypesTest(t, db, pg.DatabaseURL) })
t.Run("model_executor_postgres_to_duckDB", func(t *testing.T) { pgxToDuckDB(t, db, pg.DatabaseURL) })
}

func allDataTypesTest(t *testing.T, db *sql.DB, dbURL string) {
Expand All @@ -72,7 +74,12 @@ func allDataTypesTest(t *testing.T, db *sql.DB, dbURL string) {
require.NoError(t, err)
olap, _ := to.AsOLAP("")

tr := newDuckDBToDuckDB(to.(*connection), "postgres", zap.NewNop())
inputHandle, err := drivers.Open("postgres", "default", map[string]any{"database_url": dbURL}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

tr, ok := to.AsTransporter(inputHandle, to)
require.True(t, ok)

err = tr.Transfer(ctx, map[string]any{"sql": "select * from all_datatypes;", "db": dbURL}, map[string]any{"table": "sink"}, &drivers.TransferOptions{})
require.NoError(t, err)
res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"})
Expand All @@ -86,3 +93,80 @@ func allDataTypesTest(t *testing.T, db *sql.DB, dbURL string) {
require.NoError(t, res.Close())
require.NoError(t, to.Close())
}

func pgxToDuckDB(t *testing.T, pgdb *sql.DB, dbURL string) {
duckDB, err := drivers.Open("duckdb", "default", map[string]any{"data_dir": t.TempDir()}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

inputHandle, err := drivers.Open("postgres", "default", map[string]any{"database_url": dbURL}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

opts := &drivers.ModelExecutorOptions{
InputHandle: inputHandle,
InputConnector: "postgres",
OutputHandle: duckDB,
OutputConnector: "duckdb",
Env: &drivers.ModelEnv{
AllowHostAccess: false,
StageChanges: true,
},
PreliminaryInputProperties: map[string]any{
"sql": "SELECT * FROM all_datatypes;",
"dsn": dbURL,
},
PreliminaryOutputProperties: map[string]any{
"table": "sink",
},
}

me, ok := duckDB.AsModelExecutor("default", opts)
require.True(t, ok)

execOpts := &drivers.ModelExecuteOptions{
ModelExecutorOptions: opts,
InputProperties: opts.PreliminaryInputProperties,
OutputProperties: opts.PreliminaryOutputProperties,
}

_, err = me.Execute(context.Background(), execOpts)
require.NoError(t, err)

olap, ok := duckDB.AsOLAP("default")
require.True(t, ok)

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"})
require.NoError(t, err)
for res.Next() {
var count int
err = res.Rows.Scan(&count)
require.NoError(t, err)
require.Equal(t, 1, count)
}
require.NoError(t, res.Close())

// ingest some more data in postges
_, err = pgdb.Exec("INSERT INTO all_datatypes(uuid, created_at) VALUES (gen_random_uuid(), '2024-01-02 12:46:55');")
require.NoError(t, err)

// drop older data from postgres
_, err = pgdb.Exec("DELETE FROM all_datatypes WHERE created_at < '2024-01-01 00:00:00';")
require.NoError(t, err)

// incremental run
execOpts.IncrementalRun = true
execOpts.InputProperties["sql"] = "SELECT * FROM all_datatypes WHERE created_at > '2024-01-01 00:00:00';"
_, err = me.Execute(context.Background(), execOpts)
require.NoError(t, err)

res, err = olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"})
require.NoError(t, err)
for res.Next() {
var count int
err = res.Rows.Scan(&count)
require.NoError(t, err)
require.Equal(t, 2, count)
}
require.NoError(t, res.Close())

require.NoError(t, duckDB.Close())
}
4 changes: 4 additions & 0 deletions runtime/drivers/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ var spec = drivers.Spec{

type driver struct{}

type ConfigProperties struct {
DSN string `mapstructure:"dsn"`
}

func (d driver) Open(instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("mysql driver can't be shared")
Expand Down
Loading

0 comments on commit 7b5590c

Please sign in to comment.