diff --git a/build/package/docker/postgres.dockerfile b/build/package/docker/postgres.dockerfile index ddc6144c1..f189c08cb 100644 --- a/build/package/docker/postgres.dockerfile +++ b/build/package/docker/postgres.dockerfile @@ -1,4 +1,4 @@ -FROM postgres:16.2 +FROM postgres:16.5 # Inject the init script that makes the kwild superuser and a kwild database # owned by that kwild user, as well as a kwil_test_db database for tests. @@ -14,4 +14,5 @@ COPY ./pginit.sql /docker-entrypoint-initdb.d/init.sql # Override the default entrypoint/command to include the additional configuration CMD ["postgres", "-c", "wal_level=logical", "-c", "max_wal_senders=10", "-c", "max_replication_slots=10", \ - "-c", "track_commit_timestamp=true", "-c", "wal_sender_timeout=0", "-c", "max_prepared_transactions=2"] + "-c", "track_commit_timestamp=true", "-c", "wal_sender_timeout=0", "-c", "max_prepared_transactions=2", \ + "-c", "max_locks_per_transaction=256", "-c", "max_connections=128"] diff --git a/common/sql/sql.go b/common/sql/sql.go index 534150b21..cff7aa4b0 100644 --- a/common/sql/sql.go +++ b/common/sql/sql.go @@ -5,13 +5,30 @@ package sql import ( "context" "errors" + "strings" + + "github.com/jackc/pgx/v5/pgconn" ) var ( ErrNoTransaction = errors.New("no transaction") ErrNoRows = errors.New("no rows in result set") + ErrDBFailure = errors.New("database failure") // fatal! should kill node ) +func IsFatalDBError(err error) bool { + if errors.Is(err, ErrDBFailure) { + return true + } + // In case it wasn't already joined with ErrDBFailure, catch a fatal PgError by code. + var pgErr *pgconn.PgError + return errors.As(err, &pgErr) && + (strings.HasPrefix(pgErr.Code, "53") || // Class 53 — Insufficient Resources + strings.HasPrefix(pgErr.Code, "58") || // Class 58 — System Error (errors external to PostgreSQL itself) + strings.HasPrefix(pgErr.Code, "XX")) // Class XX — Internal Error (internal_error, data_corrupted, index_corrupted) + +} + // ResultSet is the result of a query or execution. // It contains the returned columns and the rows. type ResultSet struct { diff --git a/common/sql/sql_test.go b/common/sql/sql_test.go new file mode 100644 index 000000000..4b778765e --- /dev/null +++ b/common/sql/sql_test.go @@ -0,0 +1,65 @@ +package sql + +import ( + "errors" + "testing" + + "github.com/jackc/pgx/v5/pgconn" +) + +func TestIsFatalDBError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "nil error", + err: nil, + want: false, + }, + { + name: "ErrDBFailure", + err: ErrDBFailure, + want: true, + }, + { + name: "wrapped ErrDBFailure", + err: errors.Join(errors.New("wrapped"), ErrDBFailure), + want: true, + }, + { + name: "insufficient resources error", + err: &pgconn.PgError{Code: "53100"}, + want: true, + }, + { + name: "system error", + err: &pgconn.PgError{Code: "58000"}, + want: true, + }, + { + name: "internal error", + err: &pgconn.PgError{Code: "XX000"}, + want: true, + }, + { + name: "non-fatal pg error", + err: &pgconn.PgError{Code: "23505"}, + want: false, + }, + { + name: "generic error", + err: errors.New("some error"), + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsFatalDBError(tt.err); got != tt.want { + t.Errorf("IsFatalDBError() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/abci/abci.go b/internal/abci/abci.go index 8c570b3ee..332185e7f 100644 --- a/internal/abci/abci.go +++ b/internal/abci/abci.go @@ -17,6 +17,7 @@ import ( "github.com/kwilteam/kwil-db/common/chain" "github.com/kwilteam/kwil-db/common/chain/forks" "github.com/kwilteam/kwil-db/common/ident" + "github.com/kwilteam/kwil-db/common/sql" "github.com/kwilteam/kwil-db/core/log" "github.com/kwilteam/kwil-db/core/types" "github.com/kwilteam/kwil-db/core/types/serialize" @@ -360,6 +361,13 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal abciRes := &abciTypes.ExecTxResult{} if txRes.Error != nil { + // Ensure the node halt if a fatal DB error occurs (e.g. out of memory/disk, + // corruption, etc.), rather than allowing non-determinism with a failed txn. + if sql.IsFatalDBError(txRes.Error) { + return nil, txRes.Error + } + + // If the transaction failed for user reasons, we still want to include it in the block. abciRes.Log = txRes.Error.Error() a.log.Warn("failed to execute transaction", zap.Error(txRes.Error)) } else { diff --git a/internal/sql/pg/query.go b/internal/sql/pg/query.go index fdad36b29..aedfdc2fd 100644 --- a/internal/sql/pg/query.go +++ b/internal/sql/pg/query.go @@ -175,6 +175,9 @@ func query(ctx context.Context, cq connQueryer, stmt string, args ...any) (*sql. rows, err := q(ctx, stmt, args...) if err != nil { + if sql.IsFatalDBError(err) { + return nil, errors.Join(err, sql.ErrDBFailure) + } if errors.Is(err, pgx.ErrNoRows) { return nil, sql.ErrNoRows } @@ -202,6 +205,9 @@ func query(ctx context.Context, cq connQueryer, stmt string, args ...any) (*sql. } return decodeFromPGType(pgxVals...) }) + if sql.IsFatalDBError(err) { // would probably happen above when executing, but maybe here too + return nil, errors.Join(err, sql.ErrDBFailure) + } if errors.Is(err, pgx.ErrNoRows) { return nil, sql.ErrNoRows } diff --git a/internal/sql/pg/tx.go b/internal/sql/pg/tx.go index fd071c691..c611cf9ec 100644 --- a/internal/sql/pg/tx.go +++ b/internal/sql/pg/tx.go @@ -4,6 +4,7 @@ package pg import ( "context" + "errors" "github.com/jackc/pgx/v5" common "github.com/kwilteam/kwil-db/common/sql" @@ -40,13 +41,21 @@ func (tx *nestedTx) BeginTx(ctx context.Context) (common.Tx, error) { } func (tx *nestedTx) Query(ctx context.Context, stmt string, args ...any) (*common.ResultSet, error) { - return query(ctx, tx.Tx, stmt, args...) + resSet, err := query(ctx, tx.Tx, stmt, args...) + if errors.Is(err, common.ErrDBFailure) { + tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit! + } + return resSet, err } // Execute is now literally identical to Query in both semantics and syntax. We // might remove one or the other in this context (transaction methods). func (tx *nestedTx) Execute(ctx context.Context, stmt string, args ...any) (*common.ResultSet, error) { - return query(ctx, tx.Tx, stmt, args...) + resSet, err := query(ctx, tx.Tx, stmt, args...) + if errors.Is(err, common.ErrDBFailure) { + tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit! + } + return resSet, err } // AccessMode returns the access mode of the transaction.