Skip to content

Commit

Permalink
Allow including / excluding PG schemas when syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Dec 9, 2024
1 parent 39079e3 commit 5273ade
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 66 deletions.
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ BEMIDB_STORAGE_PATH=./iceberg
PG_DATABASE_URL=postgres://[USER]:[PASSWORD]@localhost:5432/[DATABASE]
# PG_SYNC_INTERVAL=1h
# PG_SCHEMA_PREFIX=mydb_
# PG_INCLUDE_SCHEMAS=public
# PG_EXCLUDE_SCHEMAS=auth
# PG_INCLUDE_TABLES=public.users,public.posts
# PG_EXCLUDE_TABLES=public.logs
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ psql postgres://localhost:54321/bemidb -c \
| `--aws-secret-access-key` | `AWS_SECRET_ACCESS_KEY` | | AWS secret access key. Required if storage type is `S3` |
| `--pg-database-url` | `PG_DATABASE_URL` | | PostgreSQL database URL to sync |
| `--pg-sync-interval` | `PG_SYNC_INTERVAL` | | Interval between syncs. Valid units: `ns`, `us`/`µs`, `ms`, `s`, `m`, `h` |
| `--pg-exclude-schemas` | `PG_EXCLUDE_SCHEMAS` | | List of schemas to exclude from sync. Comma-separated |
| `--pg-include-schemas` | `PG_INCLUDE_SCHEMAS` | | List of schemas to include in sync. Comma-separated |
| `--pg-exclude-tables` | `PG_EXCLUDE_TABLES` | | List of tables to exclude from sync. Comma-separated `schema.table` |
| `--pg-include-tables` | `PG_INCLUDE_TABLES` | | List of tables to include in sync. Comma-separated `schema.table` |
| `--pg-schema-prefix` | `PG_SCHEMA_PREFIX` | | Prefix for PostgreSQL schema names |
Expand Down
2 changes: 1 addition & 1 deletion scripts/install.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

VERSION="0.20.1"
VERSION="0.21.0"

# Detect OS and architecture
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
Expand Down
73 changes: 48 additions & 25 deletions src/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ const (
ENV_AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
ENV_AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"

ENV_PG_DATABASE_URL = "PG_DATABASE_URL"
ENV_PG_SYNC_INTERVAL = "PG_SYNC_INTERVAL"
ENV_PG_SCHEMA_PREFIX = "PG_SCHEMA_PREFIX"
ENV_PG_INCLUDE_TABLES = "PG_INCLUDE_TABLES"
ENV_PG_EXCLUDE_TABLES = "PG_EXCLUDE_TABLES"
ENV_PG_DATABASE_URL = "PG_DATABASE_URL"
ENV_PG_SYNC_INTERVAL = "PG_SYNC_INTERVAL"
ENV_PG_SCHEMA_PREFIX = "PG_SCHEMA_PREFIX"
ENV_PG_INCLUDE_SCHEMAS = "PG_INCLUDE_SCHEMAS"
ENV_PG_EXCLUDE_SCHEMAS = "PG_EXCLUDE_SCHEMAS"
ENV_PG_INCLUDE_TABLES = "PG_INCLUDE_TABLES"
ENV_PG_EXCLUDE_TABLES = "PG_EXCLUDE_TABLES"

DEFAULT_PORT = "54321"
DEFAULT_DATABASE = "bemidb"
Expand All @@ -51,11 +53,13 @@ type AwsConfig struct {
}

type PgConfig struct {
DatabaseUrl string
SyncInterval string // optional
SchemaPrefix string // optional
IncludeTables *Set // optional
ExcludeTables *Set // optional
DatabaseUrl string
SyncInterval string // optional
SchemaPrefix string // optional
IncludeSchemas *Set // optional
ExcludeSchemas *Set // optional
IncludeTables *Set // optional
ExcludeTables *Set // optional
}

type Config struct {
Expand All @@ -72,9 +76,16 @@ type Config struct {
Pg PgConfig
}

type configParseValues struct {
password string
pgIncludeSchemas string
pgExcludeSchemas string
pgIncludeTables string
pgExcludeTables string
}

var _config Config
var _password string
var _pgIncludeTables, _pgExcludeTables string
var _configParseValues configParseValues

func init() {
registerFlags()
Expand All @@ -85,15 +96,17 @@ func registerFlags() {
flag.StringVar(&_config.Port, "port", os.Getenv(ENV_PORT), "Port for BemiDB to listen on. Default: \""+DEFAULT_PORT+"\"")
flag.StringVar(&_config.Database, "database", os.Getenv(ENV_DATABASE), "Database name. Default: \""+DEFAULT_DATABASE+"\"")
flag.StringVar(&_config.User, "user", os.Getenv(ENV_USER), "Database user. Default: \""+DEFAULT_USER+"\"")
flag.StringVar(&_password, "password", os.Getenv(ENV_PASSWORD), "Database password. Default: \""+DEFAULT_PASSWORD+"\"")
flag.StringVar(&_configParseValues.password, "password", os.Getenv(ENV_PASSWORD), "Database password. Default: \""+DEFAULT_PASSWORD+"\"")
flag.StringVar(&_config.StoragePath, "storage-path", os.Getenv(ENV_STORAGE_PATH), "Path to the storage folder. Default: \""+DEFAULT_STORAGE_PATH+"\"")
flag.StringVar(&_config.InitSqlFilepath, "init-sql", os.Getenv(ENV_INIT_SQL_FILEPATH), "Path to the initialization SQL file. Default: \""+DEFAULT_INIT_SQL_FILEPATH+"\"")
flag.StringVar(&_config.LogLevel, "log-level", os.Getenv(ENV_LOG_LEVEL), "Log level: \"ERROR\", \"WARN\", \"INFO\", \"DEBUG\", \"TRACE\". Default: \""+DEFAULT_LOG_LEVEL+"\"")
flag.StringVar(&_config.StorageType, "storage-type", os.Getenv(ENV_STORAGE_TYPE), "Storage type: \"LOCAL\", \"S3\". Default: \""+DEFAULT_DB_STORAGE_TYPE+"\"")
flag.StringVar(&_config.Pg.SchemaPrefix, "pg-schema-prefix", os.Getenv(ENV_PG_SCHEMA_PREFIX), "(Optional) Prefix for PostgreSQL schema names")
flag.StringVar(&_config.Pg.SyncInterval, "pg-sync-interval", os.Getenv(ENV_PG_SYNC_INTERVAL), "(Optional) Interval between syncs. Valid units: \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
flag.StringVar(&_pgIncludeTables, "pg-include-tables", os.Getenv(ENV_PG_INCLUDE_TABLES), "(Optional) Comma-separated list of tables to include in sync (format: schema.table)")
flag.StringVar(&_pgExcludeTables, "pg-exclude-tables", os.Getenv(ENV_PG_EXCLUDE_TABLES), "(Optional) Comma-separated list of tables to exclude from sync (format: schema.table)")
flag.StringVar(&_configParseValues.pgIncludeSchemas, "pg-include-schemas", os.Getenv(ENV_PG_INCLUDE_SCHEMAS), "(Optional) Comma-separated list of schemas to include in sync")
flag.StringVar(&_configParseValues.pgExcludeSchemas, "pg-exclude-schemas", os.Getenv(ENV_PG_EXCLUDE_SCHEMAS), "(Optional) Comma-separated list of schemas to exclude from sync")
flag.StringVar(&_configParseValues.pgIncludeTables, "pg-include-tables", os.Getenv(ENV_PG_INCLUDE_TABLES), "(Optional) Comma-separated list of tables to include in sync (format: schema.table)")
flag.StringVar(&_configParseValues.pgExcludeTables, "pg-exclude-tables", os.Getenv(ENV_PG_EXCLUDE_TABLES), "(Optional) Comma-separated list of tables to exclude from sync (format: schema.table)")
flag.StringVar(&_config.Pg.DatabaseUrl, "pg-database-url", os.Getenv(ENV_PG_DATABASE_URL), "PostgreSQL database URL to sync")
flag.StringVar(&_config.Aws.Region, "aws-region", os.Getenv(ENV_AWS_REGION), "AWS region")
flag.StringVar(&_config.Aws.S3Bucket, "aws-s3-bucket", os.Getenv(ENV_AWS_S3_BUCKET), "AWS S3 bucket name")
Expand All @@ -116,15 +129,14 @@ func parseFlags() {
if _config.User == "" {
_config.User = DEFAULT_USER
}
if _password == "" {
_password = DEFAULT_PASSWORD
if _configParseValues.password == "" {
_configParseValues.password = DEFAULT_PASSWORD
}
if _password != "" {
if _configParseValues.password != "" {
if _config.User == "" {
panic("Password is set without a user")
}
_config.EncryptedPassword = StringToScramSha256(_password)
_password = ""
_config.EncryptedPassword = StringToScramSha256(_configParseValues.password)
}
if _config.StoragePath == "" {
_config.StoragePath = DEFAULT_STORAGE_PATH
Expand Down Expand Up @@ -156,15 +168,26 @@ func parseFlags() {
panic("AWS secret access key is required")
}
}
if _pgIncludeTables != "" && _pgExcludeTables != "" {
if _configParseValues.pgIncludeSchemas != "" && _configParseValues.pgExcludeSchemas != "" {
panic("Cannot specify both --pg-include-schemas and --pg-exclude-schemas")
}
if _configParseValues.pgIncludeSchemas != "" {
_config.Pg.IncludeSchemas = NewSet(strings.Split(_configParseValues.pgIncludeSchemas, ","))
}
if _configParseValues.pgExcludeSchemas != "" {
_config.Pg.ExcludeSchemas = NewSet(strings.Split(_configParseValues.pgExcludeSchemas, ","))
}
if _configParseValues.pgIncludeTables != "" && _configParseValues.pgExcludeTables != "" {
panic("Cannot specify both --pg-include-tables and --pg-exclude-tables")
}
if _pgIncludeTables != "" {
_config.Pg.IncludeTables = NewSet(strings.Split(_pgIncludeTables, ","))
if _configParseValues.pgIncludeTables != "" {
_config.Pg.IncludeTables = NewSet(strings.Split(_configParseValues.pgIncludeTables, ","))
}
if _pgExcludeTables != "" {
_config.Pg.ExcludeTables = NewSet(strings.Split(_pgExcludeTables, ","))
if _configParseValues.pgExcludeTables != "" {
_config.Pg.ExcludeTables = NewSet(strings.Split(_configParseValues.pgExcludeTables, ","))
}

_configParseValues = configParseValues{}
}

func LoadConfig(reRegisterFlags ...bool) *Config {
Expand Down
91 changes: 55 additions & 36 deletions src/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func TestLoadConfig(t *testing.T) {
if config.Pg.SchemaPrefix != "" {
t.Errorf("Expected schemaPrefix to be empty, got %s", config.Pg.SchemaPrefix)
}
if config.Pg.IncludeSchemas != nil {
t.Errorf("Expected includeSchemas to be empty, got %v", config.Pg.IncludeSchemas)
}
if config.Pg.ExcludeSchemas != nil {
t.Errorf("Expected includeSchemas to be empty, got %v", config.Pg.ExcludeSchemas)
}
if config.Pg.IncludeTables != nil {
t.Errorf("Expected includeTables to be empty, got %v", config.Pg.IncludeTables)
}
Expand Down Expand Up @@ -123,7 +129,8 @@ func TestLoadConfig(t *testing.T) {
t.Setenv("PG_DATABASE_URL", "postgres://user:password@localhost:5432/template1")
t.Setenv("PG_SYNC_INTERVAL", "1h")
t.Setenv("PG_SCHEMA_PREFIX", "mydb_")
t.Setenv("PG_INCLUDE_TABLES", "public.users")
t.Setenv("PG_INCLUDE_SCHEMAS", "public,auth")
t.Setenv("PG_EXCLUDE_TABLES", "public.users,public.secrets")

config := LoadConfig(true)

Expand All @@ -136,8 +143,17 @@ func TestLoadConfig(t *testing.T) {
if config.Pg.SchemaPrefix != "mydb_" {
t.Errorf("Expected schemaPrefix to be empty, got %s", config.Pg.SchemaPrefix)
}
if !config.Pg.IncludeTables.Contains("public.users") {
t.Errorf("Expected includeTables to contain public.users, got %v", config.Pg.IncludeTables)
if !config.Pg.IncludeSchemas.Contains("public") {
t.Errorf("Expected includeSchemas to contain public, got %v", config.Pg.IncludeSchemas)
}
if !config.Pg.IncludeSchemas.Contains("auth") {
t.Errorf("Expected includeSchemas to contain auth, got %v", config.Pg.IncludeSchemas)
}
if !config.Pg.ExcludeTables.Contains("public.users") {
t.Errorf("Expected ExcludeTables to contain public.users, got %v", config.Pg.ExcludeTables)
}
if !config.Pg.ExcludeTables.Contains("public.secrets") {
t.Errorf("Expected ExcludeTables to contain public.secrets, got %v", config.Pg.ExcludeTables)
}
})

Expand All @@ -152,7 +168,8 @@ func TestLoadConfig(t *testing.T) {
"--pg-database-url", "postgres://user:password@localhost:5432/db",
"--pg-sync-interval", "2h30m",
"--pg-schema-prefix", "mydb_",
"--pg-include-tables", "public.users",
"--pg-include-schemas", "public,auth",
"--pg-exclude-tables", "public.users,public.secrets",
})

config := LoadConfig()
Expand Down Expand Up @@ -184,38 +201,49 @@ func TestLoadConfig(t *testing.T) {
if config.Pg.SchemaPrefix != "mydb_" {
t.Errorf("Expected schemaPrefix to be mydb_, got %s", config.Pg.SchemaPrefix)
}
if !config.Pg.IncludeTables.Contains("public.users") {
t.Errorf("Expected includeTables to have public.users, got %v", config.Pg.IncludeTables)
if !config.Pg.IncludeSchemas.Contains("public") {
t.Errorf("Expected IncludeSchemas to have public.users, got %v", config.Pg.IncludeSchemas)
}
if !config.Pg.IncludeSchemas.Contains("auth") {
t.Errorf("Expected includeSchemas to contain auth, got %v", config.Pg.IncludeSchemas)
}
if !config.Pg.ExcludeTables.Contains("public.users") {
t.Errorf("Expected ExcludeTables to have public.users, got %v", config.Pg.ExcludeTables)
}
if !config.Pg.ExcludeTables.Contains("public.secrets") {
t.Errorf("Expected ExcludeTables to have public.secrets, got %v", config.Pg.ExcludeTables)
}
})

t.Run("Handles pg-exclude-tables configuration", func(t *testing.T) {
t.Run("Panics when both include and exclude schemas are specified in env", func(t *testing.T) {
t.Setenv("PG_INCLUDE_SCHEMAS", "public")
t.Setenv("PG_EXCLUDE_SCHEMAS", "auth")

defer func() {
if r := recover(); r == nil {
t.Error("Expected panic when both include and exclude schemas are specified")
}
}()

LoadConfig(true)
})

t.Run("Panics when both include and exclude schemas are specified in args", func(t *testing.T) {
setTestArgs([]string{
"--pg-database-url", "postgres://user:password@localhost:5432/db",
"--pg-exclude-tables", "public.secrets,public.cache",
"--pg-include-schemas", "public",
"--pg-exclude-schemas", "auth",
})
config := LoadConfig(true)

if !config.Pg.ExcludeTables.Contains("public.secrets") {
t.Errorf("Expected ExcludeTables to contain public.secrets, got %v", config.Pg.ExcludeTables)
}
if !config.Pg.ExcludeTables.Contains("public.cache") {
t.Errorf("Expected ExcludeTables to contain public.cache, got %v", config.Pg.ExcludeTables)
}
if config.Pg.IncludeTables != nil {
t.Errorf("Expected IncludeTables to be empty, got %v", config.Pg.IncludeTables)
}
defer func() {
if r := recover(); r == nil {
t.Error("Expected panic when both include and exclude schemas are specified")
}
}()

LoadConfig()
})

t.Run("Panics when both include and exclude tables are specified in env", func(t *testing.T) {
t.Setenv("BEMIDB_PORT", "12345")
t.Setenv("BEMIDB_DATABASE", "mydb")
t.Setenv("BEMIDB_INIT_SQL", "./init/duckdb.sql")
t.Setenv("BEMIDB_STORAGE_PATH", "storage-path")
t.Setenv("BEMIDB_LOG_LEVEL", "ERROR")
t.Setenv("PG_DATABASE_URL", "postgres://user:password@localhost:5432/template1")
t.Setenv("PG_SYNC_INTERVAL", "1h")
t.Setenv("PG_SCHEMA_PREFIX", "mydb_")
t.Setenv("PG_INCLUDE_TABLES", "public.users")
t.Setenv("PG_EXCLUDE_TABLES", "public.orders")

Expand All @@ -230,15 +258,6 @@ func TestLoadConfig(t *testing.T) {

t.Run("Panics when both include and exclude tables are specified in args", func(t *testing.T) {
setTestArgs([]string{
"--port", "12345",
"--database", "mydb",
"--init-sql", "./init/duckdb.sql",
"--storage-path", "storage-path",
"--log-level", "ERROR",
"--storage-type", "local",
"--pg-database-url", "postgres://user:password@localhost:5432/db",
"--pg-sync-interval", "2h30m",
"--pg-schema-prefix", "mydb_",
"--pg-include-tables", "public.users",
"--pg-exclude-tables", "public.orders",
})
Expand Down
4 changes: 1 addition & 3 deletions src/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,7 @@ func loadTestConfig() *Config {
func setTestArgs(args []string) {
// Reset state
_config = Config{}
_password = ""
_pgIncludeTables = ""
_pgExcludeTables = ""
_configParseValues = configParseValues{}

os.Args = append([]string{"cmd"}, args...)
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
Expand Down
2 changes: 1 addition & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

const VERSION = "0.20.1"
const VERSION = "0.21.0"

func main() {
config := LoadConfig()
Expand Down
10 changes: 10 additions & 0 deletions src/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ func (syncer *Syncer) urlEncodePassword(databaseUrl string) string {
func (syncer *Syncer) shouldSyncTable(schemaTable SchemaTable) bool {
tableId := fmt.Sprintf("%s.%s", schemaTable.Schema, schemaTable.Table)

if syncer.config.Pg.IncludeSchemas != nil {
if !syncer.config.Pg.IncludeSchemas.Contains(schemaTable.Schema) {
return false
}
} else if syncer.config.Pg.ExcludeSchemas != nil {
if syncer.config.Pg.ExcludeSchemas.Contains(schemaTable.Schema) {
return false
}
}

if syncer.config.Pg.IncludeTables != nil {
return syncer.config.Pg.IncludeTables.Contains(tableId)
}
Expand Down

0 comments on commit 5273ade

Please sign in to comment.