diff --git a/build/package/docker/postgres.dockerfile b/build/package/docker/postgres.dockerfile index 9306611aa..f189c08cb 100644 --- a/build/package/docker/postgres.dockerfile +++ b/build/package/docker/postgres.dockerfile @@ -1,4 +1,4 @@ -FROM postgres:16.4 +FROM postgres:16.5 # Inject the init script that makes the kwild superuser and a kwild database # owned by that kwild user, as well as a kwil_test_db database for tests. @@ -14,4 +14,5 @@ COPY ./pginit.sql /docker-entrypoint-initdb.d/init.sql # Override the default entrypoint/command to include the additional configuration CMD ["postgres", "-c", "wal_level=logical", "-c", "max_wal_senders=10", "-c", "max_replication_slots=10", \ - "-c", "track_commit_timestamp=true", "-c", "wal_sender_timeout=0", "-c", "max_prepared_transactions=2"] + "-c", "track_commit_timestamp=true", "-c", "wal_sender_timeout=0", "-c", "max_prepared_transactions=2", \ + "-c", "max_locks_per_transaction=256", "-c", "max_connections=128"] diff --git a/common/sql/sql.go b/common/sql/sql.go index 088ea8197..8f4972d48 100644 --- a/common/sql/sql.go +++ b/common/sql/sql.go @@ -5,13 +5,30 @@ package sql import ( "context" "errors" + "strings" + + "github.com/jackc/pgx/v5/pgconn" ) var ( ErrNoTransaction = errors.New("no transaction") ErrNoRows = errors.New("no rows in result set") + ErrDBFailure = errors.New("database failure") // fatal! should kill node ) +func IsFatalDBError(err error) bool { + if errors.Is(err, ErrDBFailure) { + return true + } + // In case it wasn't already joined with ErrDBFailure, catch a fatal PgError by code. + var pgErr *pgconn.PgError + return errors.As(err, &pgErr) && + (strings.HasPrefix(pgErr.Code, "53") || // Class 53 — Insufficient Resources + strings.HasPrefix(pgErr.Code, "58") || // Class 58 — System Error (errors external to PostgreSQL itself) + strings.HasPrefix(pgErr.Code, "XX")) // Class XX — Internal Error (internal_error, data_corrupted, index_corrupted) + +} + // ResultSet is the result of a query or execution. // It contains the returned columns and the rows. type ResultSet struct { diff --git a/common/sql/sql_test.go b/common/sql/sql_test.go new file mode 100644 index 000000000..4b778765e --- /dev/null +++ b/common/sql/sql_test.go @@ -0,0 +1,65 @@ +package sql + +import ( + "errors" + "testing" + + "github.com/jackc/pgx/v5/pgconn" +) + +func TestIsFatalDBError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "nil error", + err: nil, + want: false, + }, + { + name: "ErrDBFailure", + err: ErrDBFailure, + want: true, + }, + { + name: "wrapped ErrDBFailure", + err: errors.Join(errors.New("wrapped"), ErrDBFailure), + want: true, + }, + { + name: "insufficient resources error", + err: &pgconn.PgError{Code: "53100"}, + want: true, + }, + { + name: "system error", + err: &pgconn.PgError{Code: "58000"}, + want: true, + }, + { + name: "internal error", + err: &pgconn.PgError{Code: "XX000"}, + want: true, + }, + { + name: "non-fatal pg error", + err: &pgconn.PgError{Code: "23505"}, + want: false, + }, + { + name: "generic error", + err: errors.New("some error"), + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsFatalDBError(tt.err); got != tt.want { + t.Errorf("IsFatalDBError() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/abci/abci.go b/internal/abci/abci.go index 82e0e3239..bdedbacf0 100644 --- a/internal/abci/abci.go +++ b/internal/abci/abci.go @@ -676,6 +676,13 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal abciRes := &abciTypes.ExecTxResult{} if txRes.Error != nil { + // Ensure the node halt if a fatal DB error occurs (e.g. out of memory/disk, + // corruption, etc.), rather than allowing non-determinism with a failed txn. + if sql.IsFatalDBError(txRes.Error) { + return nil, txRes.Error + } + + // If the transaction failed for user reasons, we still want to include it in the block. abciRes.Log = txRes.Error.Error() a.log.Debug("failed to execute transaction", zap.Error(txRes.Error)) } else { diff --git a/internal/sql/pg/query.go b/internal/sql/pg/query.go index d9d7a9505..b79685990 100644 --- a/internal/sql/pg/query.go +++ b/internal/sql/pg/query.go @@ -171,6 +171,9 @@ func query(ctx context.Context, oidToDataType map[uint32]*datatype, cq connQuery rows, err := q(ctx, stmt, args...) if err != nil { + if sql.IsFatalDBError(err) { + return nil, errors.Join(err, sql.ErrDBFailure) + } if errors.Is(err, pgx.ErrNoRows) { return nil, sql.ErrNoRows } @@ -199,6 +202,9 @@ func query(ctx context.Context, oidToDataType map[uint32]*datatype, cq connQuery } return decodeFromPG(pgxVals, oids, oidToDataType) }) + if sql.IsFatalDBError(err) { // would probably happen above when executing, but maybe here too + return nil, errors.Join(err, sql.ErrDBFailure) + } if errors.Is(err, pgx.ErrNoRows) { return nil, sql.ErrNoRows } @@ -236,10 +242,13 @@ func queryTx(ctx context.Context, oidToDataType map[uint32]*datatype, dbTx txBeg return resSet, err } -func queryRowFunc(ctx context.Context, conn *pgx.Conn, sql string, +func queryRowFunc(ctx context.Context, conn *pgx.Conn, stmt string, scans []any, fn func() error, args ...any) error { - rows, _ := conn.Query(ctx, sql, args...) + rows, _ := conn.Query(ctx, stmt, args...) _, err := pgx.ForEachRow(rows, scans, fn) + if sql.IsFatalDBError(err) { + err = errors.Join(err, sql.ErrDBFailure) + } return err } diff --git a/internal/sql/pg/tx.go b/internal/sql/pg/tx.go index 906f11cdd..44ee5c5e9 100644 --- a/internal/sql/pg/tx.go +++ b/internal/sql/pg/tx.go @@ -4,6 +4,7 @@ package pg import ( "context" + "errors" "github.com/jackc/pgx/v5" common "github.com/kwilteam/kwil-db/common/sql" @@ -44,13 +45,21 @@ func (tx *nestedTx) BeginTx(ctx context.Context) (common.Tx, error) { } func (tx *nestedTx) Query(ctx context.Context, stmt string, args ...any) (*common.ResultSet, error) { - return query(ctx, tx.oidTypes, tx.Tx, stmt, args...) + resSet, err := query(ctx, tx.oidTypes, tx.Tx, stmt, args...) + if errors.Is(err, common.ErrDBFailure) { + tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit! + } + return resSet, err } // Execute is now literally identical to Query in both semantics and syntax. We // might remove one or the other in this context (transaction methods). func (tx *nestedTx) Execute(ctx context.Context, stmt string, args ...any) (*common.ResultSet, error) { - return query(ctx, tx.oidTypes, tx.Tx, stmt, args...) + resSet, err := query(ctx, tx.oidTypes, tx.Tx, stmt, args...) + if errors.Is(err, common.ErrDBFailure) { + tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit! + } + return resSet, err } // QueryScanFn satisfies sql.QueryScanner. @@ -58,7 +67,11 @@ func (tx *nestedTx) QueryScanFn(ctx context.Context, sql string, scans []any, fn func() error, args ...any) error { conn := tx.Conn() - return queryRowFunc(ctx, conn, sql, scans, fn, args...) + err := queryRowFunc(ctx, conn, sql, scans, fn, args...) + if errors.Is(err, common.ErrDBFailure) { + tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit! + } + return err } // AccessMode returns the access mode of the transaction.