Skip to content

Commit

Permalink
DAC batch resolver resilience improvement (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman authored Mar 11, 2024
1 parent ac5ab5b commit 9ed8d9a
Show file tree
Hide file tree
Showing 9 changed files with 1,423 additions and 277 deletions.
179 changes: 137 additions & 42 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ var (
// DB defines functions that a DB instance should implement
type DB interface {
BeginStateTransaction(ctx context.Context) (Tx, error)
Exists(ctx context.Context, key common.Hash) bool

StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error
GetLastProcessedBlock(ctx context.Context, task string) (uint64, error)

StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error
GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error)
DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error

Exists(ctx context.Context, key common.Hash) bool
GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error)
StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error
StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error
}

Expand Down Expand Up @@ -50,47 +56,107 @@ func (db *pgDB) BeginStateTransaction(ctx context.Context) (Tx, error) {
return db.pg.BeginTxx(ctx, nil)
}

// StoreOffChainData stores and array of key values in the Db
func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error {
const storeOffChainDataSQL = `
INSERT INTO data_node.offchain_data (key, value)
// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task
func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (key) DO NOTHING;
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();
`

for _, d := range od {
if _, err := dbTx.ExecContext(
ctx, storeOffChainDataSQL,
d.Key.Hex(),
common.Bytes2Hex(d.Value),
); err != nil {
return err
}
if _, err := db.execer(dbTx).ExecContext(ctx, storeLastProcessedBlockSQL, task, block); err != nil {
return err
}

return nil
}

// GetOffChainData returns the value identified by the key
func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error) {
const getOffchainDataSQL = `
SELECT value
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`
// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task
func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;"

var (
hexValue string
lastBlock uint64
)

if err := dbTx.QueryRowxContext(ctx, getOffchainDataSQL, key.Hex()).Scan(&hexValue); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrStateNotSynchronized
if err := db.pg.QueryRowContext(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil {
return 0, err
}

return lastBlock, nil
}

// StoreUnresolvedBatchKeys stores unresolved batch keys in the database
func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error {
const storeUnresolvedBatchesSQL = `
INSERT INTO data_node.unresolved_batches (num, hash)
VALUES ($1, $2)
ON CONFLICT (num, hash) DO NOTHING;
`

execer := db.execer(dbTx)
for _, bk := range bks {
if _, err := execer.ExecContext(
ctx, storeUnresolvedBatchesSQL,
bk.Number,
bk.Hash.Hex(),
); err != nil {
return err
}
}

return nil
}

// GetUnresolvedBatchKeys returns the unresolved batch keys from the database
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) {
const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches;"

rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL)
if err != nil {
return nil, err
}

return common.FromHex(hexValue), nil
defer rows.Close()

var bks []types.BatchKey
for rows.Next() {
bk := struct {
Number uint64 `db:"num"`
Hash string `db:"hash"`
}{}
if err = rows.StructScan(&bk); err != nil {
return nil, err
}

bks = append(bks, types.BatchKey{
Number: bk.Number,
Hash: common.HexToHash(bk.Hash),
})
}

return bks, nil
}

// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database
func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error {
const deleteUnresolvedBatchKeysSQL = `
DELETE FROM data_node.unresolved_batches
WHERE num = $1 AND hash = $2;
`

for _, bk := range bks {
if _, err := db.execer(dbTx).ExecContext(
ctx, deleteUnresolvedBatchKeysSQL,
bk.Number,
bk.Hash.Hex(),
); err != nil {
return err
}
}

return nil
}

// Exists checks if a key exists in offchain data table
Expand All @@ -108,33 +174,62 @@ func (db *pgDB) Exists(ctx context.Context, key common.Hash) bool {
return count > 0
}

// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task
func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_tasks (task, block)
// StoreOffChainData stores and array of key values in the Db
func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error {
const storeOffChainDataSQL = `
INSERT INTO data_node.offchain_data (key, value)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();
ON CONFLICT (key) DO NOTHING;
`

if _, err := dbTx.ExecContext(ctx, storeLastProcessedBlockSQL, task, block); err != nil {
return err
execer := db.execer(dbTx)
for _, d := range od {
if _, err := execer.ExecContext(
ctx, storeOffChainDataSQL,
d.Key.Hex(),
common.Bytes2Hex(d.Value),
); err != nil {
return err
}
}

return nil
}

// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task
func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;"
// GetOffChainData returns the value identified by the key
func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error) {
const getOffchainDataSQL = `
SELECT value
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`

var (
lastBlock uint64
hexValue string
)

if err := db.pg.QueryRowContext(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil {
return 0, err
if err := db.querier(dbTx).QueryRowxContext(ctx, getOffchainDataSQL, key.Hex()).Scan(&hexValue); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrStateNotSynchronized
}
return nil, err
}

return lastBlock, nil
return common.FromHex(hexValue), nil
}

func (db *pgDB) execer(dbTx sqlx.ExecerContext) sqlx.ExecerContext {
if dbTx != nil {
return dbTx
}

return db.pg
}

func (db *pgDB) querier(dbTx sqlx.QueryerContext) sqlx.QueryerContext {
if dbTx != nil {
return dbTx
}

return db.pg
}
Loading

0 comments on commit 9ed8d9a

Please sign in to comment.