Skip to content

Commit

Permalink
sql,pg: catch db backend failure
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Nov 18, 2024
1 parent 2f8259d commit 6804aa5
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 7 deletions.
5 changes: 3 additions & 2 deletions build/package/docker/postgres.dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM postgres:16.4
FROM postgres:16.5

# Inject the init script that makes the kwild superuser and a kwild database
# owned by that kwild user, as well as a kwil_test_db database for tests.
Expand All @@ -14,4 +14,5 @@ COPY ./pginit.sql /docker-entrypoint-initdb.d/init.sql

# Override the default entrypoint/command to include the additional configuration
CMD ["postgres", "-c", "wal_level=logical", "-c", "max_wal_senders=10", "-c", "max_replication_slots=10", \
"-c", "track_commit_timestamp=true", "-c", "wal_sender_timeout=0", "-c", "max_prepared_transactions=2"]
"-c", "track_commit_timestamp=true", "-c", "wal_sender_timeout=0", "-c", "max_prepared_transactions=2", \
"-c", "max_locks_per_transaction=256", "-c", "max_connections=128"]
17 changes: 17 additions & 0 deletions common/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,30 @@ package sql
import (
"context"
"errors"
"strings"

"github.com/jackc/pgx/v5/pgconn"
)

var (
ErrNoTransaction = errors.New("no transaction")
ErrNoRows = errors.New("no rows in result set")
ErrDBFailure = errors.New("database failure") // fatal! should kill node
)

func IsFatalDBError(err error) bool {
if errors.Is(err, ErrDBFailure) {
return true
}
// In case it wasn't already joined with ErrDBFailure, catch a fatal PgError by code.
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) &&
(strings.HasPrefix(pgErr.Code, "53") || // Class 53 — Insufficient Resources
strings.HasPrefix(pgErr.Code, "58") || // Class 58 — System Error (errors external to PostgreSQL itself)
strings.HasPrefix(pgErr.Code, "XX")) // Class XX — Internal Error (internal_error, data_corrupted, index_corrupted)

}

// ResultSet is the result of a query or execution.
// It contains the returned columns and the rows.
type ResultSet struct {
Expand Down
65 changes: 65 additions & 0 deletions common/sql/sql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package sql

import (
"errors"
"testing"

"github.com/jackc/pgx/v5/pgconn"
)

func TestIsFatalDBError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "nil error",
err: nil,
want: false,
},
{
name: "ErrDBFailure",
err: ErrDBFailure,
want: true,
},
{
name: "wrapped ErrDBFailure",
err: errors.Join(errors.New("wrapped"), ErrDBFailure),
want: true,
},
{
name: "insufficient resources error",
err: &pgconn.PgError{Code: "53100"},
want: true,
},
{
name: "system error",
err: &pgconn.PgError{Code: "58000"},
want: true,
},
{
name: "internal error",
err: &pgconn.PgError{Code: "XX000"},
want: true,
},
{
name: "non-fatal pg error",
err: &pgconn.PgError{Code: "23505"},
want: false,
},
{
name: "generic error",
err: errors.New("some error"),
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsFatalDBError(tt.err); got != tt.want {
t.Errorf("IsFatalDBError() = %v, want %v", got, tt.want)
}
})
}
}
7 changes: 7 additions & 0 deletions internal/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,13 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal

abciRes := &abciTypes.ExecTxResult{}
if txRes.Error != nil {
// Ensure the node halt if a fatal DB error occurs (e.g. out of memory/disk,
// corruption, etc.), rather than allowing non-determinism with a failed txn.
if sql.IsFatalDBError(txRes.Error) {
return nil, txRes.Error
}

// If the transaction failed for user reasons, we still want to include it in the block.
abciRes.Log = txRes.Error.Error()
a.log.Debug("failed to execute transaction", zap.Error(txRes.Error))
} else {
Expand Down
13 changes: 11 additions & 2 deletions internal/sql/pg/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func query(ctx context.Context, oidToDataType map[uint32]*datatype, cq connQuery

rows, err := q(ctx, stmt, args...)
if err != nil {
if sql.IsFatalDBError(err) {
return nil, errors.Join(err, sql.ErrDBFailure)
}
if errors.Is(err, pgx.ErrNoRows) {
return nil, sql.ErrNoRows
}
Expand Down Expand Up @@ -199,6 +202,9 @@ func query(ctx context.Context, oidToDataType map[uint32]*datatype, cq connQuery
}
return decodeFromPG(pgxVals, oids, oidToDataType)
})
if sql.IsFatalDBError(err) { // would probably happen above when executing, but maybe here too
return nil, errors.Join(err, sql.ErrDBFailure)
}
if errors.Is(err, pgx.ErrNoRows) {
return nil, sql.ErrNoRows
}
Expand Down Expand Up @@ -236,10 +242,13 @@ func queryTx(ctx context.Context, oidToDataType map[uint32]*datatype, dbTx txBeg
return resSet, err
}

func queryRowFunc(ctx context.Context, conn *pgx.Conn, sql string,
func queryRowFunc(ctx context.Context, conn *pgx.Conn, stmt string,
scans []any, fn func() error, args ...any) error {
rows, _ := conn.Query(ctx, sql, args...)
rows, _ := conn.Query(ctx, stmt, args...)
_, err := pgx.ForEachRow(rows, scans, fn)
if sql.IsFatalDBError(err) {
err = errors.Join(err, sql.ErrDBFailure)
}
return err
}

Expand Down
19 changes: 16 additions & 3 deletions internal/sql/pg/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package pg

import (
"context"
"errors"

"github.com/jackc/pgx/v5"
common "github.com/kwilteam/kwil-db/common/sql"
Expand Down Expand Up @@ -44,21 +45,33 @@ func (tx *nestedTx) BeginTx(ctx context.Context) (common.Tx, error) {
}

func (tx *nestedTx) Query(ctx context.Context, stmt string, args ...any) (*common.ResultSet, error) {
return query(ctx, tx.oidTypes, tx.Tx, stmt, args...)
resSet, err := query(ctx, tx.oidTypes, tx.Tx, stmt, args...)
if errors.Is(err, common.ErrDBFailure) {
tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit!
}
return resSet, err
}

// Execute is now literally identical to Query in both semantics and syntax. We
// might remove one or the other in this context (transaction methods).
func (tx *nestedTx) Execute(ctx context.Context, stmt string, args ...any) (*common.ResultSet, error) {
return query(ctx, tx.oidTypes, tx.Tx, stmt, args...)
resSet, err := query(ctx, tx.oidTypes, tx.Tx, stmt, args...)
if errors.Is(err, common.ErrDBFailure) {
tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit!
}
return resSet, err
}

// QueryScanFn satisfies sql.QueryScanner.
func (tx *nestedTx) QueryScanFn(ctx context.Context, sql string,
scans []any, fn func() error, args ...any) error {

conn := tx.Conn()
return queryRowFunc(ctx, conn, sql, scans, fn, args...)
err := queryRowFunc(ctx, conn, sql, scans, fn, args...)
if errors.Is(err, common.ErrDBFailure) {
tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit!
}
return err
}

// AccessMode returns the access mode of the transaction.
Expand Down

0 comments on commit 6804aa5

Please sign in to comment.