From efddeef9cca2edea099c225aeb12ba9243bb638b Mon Sep 17 00:00:00 2001 From: Pietralberto Mazza <18440657+altafan@users.noreply.github.com> Date: Thu, 1 Feb 2024 12:24:45 +0100 Subject: [PATCH] Add block timestamp to domain.Transaction (#79) * Add block timestamp info to stored txs * Fix interface layer --- internal/core/application/account_service.go | 2 +- .../application/notification_service_test.go | 7 +- internal/core/domain/transaction.go | 6 +- .../core/domain/transaction_repository.go | 3 +- internal/core/domain/transaction_test.go | 3 +- .../blockchain-scanner/electrum/service.go | 15 +++-- .../db/badger/transaction_repository.go | 5 +- .../db/inmemory/transaction_repository.go | 10 +-- ...0240131150155_add_blocktime_field.down.sql | 1 + .../20240131150155_add_blocktime_field.up.sql | 1 + .../db/postgres/sqlc/queries/models.go | 1 + .../db/postgres/sqlc/queries/query.sql.go | 16 +++-- .../storage/db/postgres/sqlc/query.sql | 6 +- .../db/postgres/transaction_repository.go | 64 ++++++++++--------- .../db/test/transaction_repository_test.go | 6 +- .../interfaces/grpc/handler/notification.go | 5 +- internal/interfaces/grpc/handler/util.go | 2 +- 17 files changed, 93 insertions(+), 60 deletions(-) create mode 100644 internal/infrastructure/storage/db/postgres/migration/20240131150155_add_blocktime_field.down.sql create mode 100644 internal/infrastructure/storage/db/postgres/migration/20240131150155_add_blocktime_field.up.sql diff --git a/internal/core/application/account_service.go b/internal/core/application/account_service.go index a621dc01..9a6120e5 100644 --- a/internal/core/application/account_service.go +++ b/internal/core/application/account_service.go @@ -372,7 +372,7 @@ func (as *AccountService) storeQueuedTransactions() { as.log("received confirmed tx %s from channel", tx.TxID) if _, err := txRepo.ConfirmTransaction( - ctx, tx.TxID, tx.BlockHash, tx.BlockHeight, + ctx, tx.TxID, tx.BlockHash, tx.BlockHeight, tx.BlockTime, ); err != nil { as.warn( err, "error while confirming transaction %s for account(s) %s", diff --git a/internal/core/application/notification_service_test.go b/internal/core/application/notification_service_test.go index 370c0d79..36c9e3e9 100644 --- a/internal/core/application/notification_service_test.go +++ b/internal/core/application/notification_service_test.go @@ -82,10 +82,11 @@ func testGetTxChannel(t *testing.T) { time.Sleep(time.Second) - blockHash := randomHex(32) - blockHeight := uint64(randomIntInRange(1, 300)) + blockhash := randomHex(32) + blockheight := uint64(randomIntInRange(1, 300)) + blocktime := time.Now().Unix() repoManager.TransactionRepository().ConfirmTransaction( - ctx, txid, blockHash, blockHeight, + ctx, txid, blockhash, blockheight, blocktime, ) repoManager.TransactionRepository().UpdateTransaction( diff --git a/internal/core/domain/transaction.go b/internal/core/domain/transaction.go index 54b5a792..05a64b11 100644 --- a/internal/core/domain/transaction.go +++ b/internal/core/domain/transaction.go @@ -8,6 +8,7 @@ type Transaction struct { TxHex string BlockHash string BlockHeight uint64 + BlockTime int64 Accounts map[string]struct{} } @@ -17,13 +18,16 @@ func (t *Transaction) IsConfirmed() bool { } // Confirm marks the tx as confirmed. -func (t *Transaction) Confirm(blockHash string, blockHeight uint64) { +func (t *Transaction) Confirm( + blockHash string, blockHeight uint64, blockTime int64, +) { if t.IsConfirmed() { return } t.BlockHash = blockHash t.BlockHeight = blockHeight + t.BlockTime = blockTime } // AddAccount adds the given account to the map of those involved in the tx. diff --git a/internal/core/domain/transaction_repository.go b/internal/core/domain/transaction_repository.go index 884ad715..239669a1 100644 --- a/internal/core/domain/transaction_repository.go +++ b/internal/core/domain/transaction_repository.go @@ -39,7 +39,8 @@ type TransactionRepository interface { // Transaction identified by the given txid. // Generates a TransactionConfirmed event if successful. ConfirmTransaction( - ctx context.Context, txid, blockHash string, blockheight uint64, + ctx context.Context, + txid, blockHash string, blockheight uint64, blocktime int64, ) (bool, error) // GetTransaction returns the Transaction identified by the given txid. GetTransaction(ctx context.Context, txid string) (*Transaction, error) diff --git a/internal/core/domain/transaction_test.go b/internal/core/domain/transaction_test.go index 0e66a7d6..3b7132a2 100644 --- a/internal/core/domain/transaction_test.go +++ b/internal/core/domain/transaction_test.go @@ -2,6 +2,7 @@ package domain_test import ( "testing" + "time" "github.com/stretchr/testify/require" "github.com/vulpemventures/ocean/internal/core/domain" @@ -11,7 +12,7 @@ func TestConfirmTransaction(t *testing.T) { tx := &domain.Transaction{} require.False(t, tx.IsConfirmed()) - tx.Confirm("fa84eb6806daf1b3c495ed30554d80573a39335b2993b66b3cc1afaa53816e47", 1728312) + tx.Confirm("fa84eb6806daf1b3c495ed30554d80573a39335b2993b66b3cc1afaa53816e47", 1728312, time.Now().Unix()) require.True(t, tx.IsConfirmed()) } diff --git a/internal/infrastructure/blockchain-scanner/electrum/service.go b/internal/infrastructure/blockchain-scanner/electrum/service.go index fd29a920..c08a47ce 100644 --- a/internal/infrastructure/blockchain-scanner/electrum/service.go +++ b/internal/infrastructure/blockchain-scanner/electrum/service.go @@ -555,19 +555,22 @@ func (s *service) dbEventHandler(event dbEvent) { chTx := s.getTxChannelByAccount(event.account) txHex, _ := tx.ToHex() - var hash string - var blockHeight uint64 + var blockhash string + var blockheight uint64 + var blocktime int64 if block != nil { - hash = block.hash().String() - blockHeight = uint64(event.tx.Height) + blockhash = block.hash().String() + blockheight = uint64(event.tx.Height) + blocktime = block.timestamp() } go func() { chTx <- &domain.Transaction{ TxID: event.tx.Txid, TxHex: txHex, - BlockHash: hash, - BlockHeight: blockHeight, + BlockHash: blockhash, + BlockHeight: blockheight, + BlockTime: blocktime, Accounts: map[string]struct{}{event.account: {}}, } }() diff --git a/internal/infrastructure/storage/db/badger/transaction_repository.go b/internal/infrastructure/storage/db/badger/transaction_repository.go index 203e51ac..0f384002 100644 --- a/internal/infrastructure/storage/db/badger/transaction_repository.go +++ b/internal/infrastructure/storage/db/badger/transaction_repository.go @@ -55,7 +55,8 @@ func (r *transactionRepository) AddTransaction( } func (r *transactionRepository) ConfirmTransaction( - ctx context.Context, txid, blockHash string, blockheight uint64, + ctx context.Context, + txid, blockHash string, blockheight uint64, blocktime int64, ) (bool, error) { tx, err := r.getTx(ctx, txid) if err != nil { @@ -66,7 +67,7 @@ func (r *transactionRepository) ConfirmTransaction( return false, nil } - tx.Confirm(blockHash, blockheight) + tx.Confirm(blockHash, blockheight, blocktime) if err := r.updateTx(ctx, *tx); err != nil { return false, err diff --git a/internal/infrastructure/storage/db/inmemory/transaction_repository.go b/internal/infrastructure/storage/db/inmemory/transaction_repository.go index 5f7cfaf9..82ae19cd 100644 --- a/internal/infrastructure/storage/db/inmemory/transaction_repository.go +++ b/internal/infrastructure/storage/db/inmemory/transaction_repository.go @@ -46,12 +46,13 @@ func (r *txRepository) AddTransaction( } func (r *txRepository) ConfirmTransaction( - ctx context.Context, txid, blockHash string, blockheight uint64, + ctx context.Context, + txid, blockHash string, blockheight uint64, blocktime int64, ) (bool, error) { r.store.lock.Lock() defer r.store.lock.Unlock() - return r.confirmTx(ctx, txid, blockHash, blockheight) + return r.confirmTx(ctx, txid, blockHash, blockheight, blocktime) } func (r *txRepository) GetTransaction( @@ -106,7 +107,8 @@ func (r *txRepository) addTx( } func (r *txRepository) confirmTx( - ctx context.Context, txid string, blockHash string, blockHeight uint64, + ctx context.Context, + txid string, blockHash string, blockHeight uint64, blocktime int64, ) (bool, error) { tx, err := r.getTx(ctx, txid) if err != nil { @@ -117,7 +119,7 @@ func (r *txRepository) confirmTx( return false, nil } - tx.Confirm(blockHash, blockHeight) + tx.Confirm(blockHash, blockHeight, blocktime) r.store.txs[txid] = tx diff --git a/internal/infrastructure/storage/db/postgres/migration/20240131150155_add_blocktime_field.down.sql b/internal/infrastructure/storage/db/postgres/migration/20240131150155_add_blocktime_field.down.sql new file mode 100644 index 00000000..6c717f81 --- /dev/null +++ b/internal/infrastructure/storage/db/postgres/migration/20240131150155_add_blocktime_field.down.sql @@ -0,0 +1 @@ +ALTER TABLE transaction DROP COLUMN block_time; \ No newline at end of file diff --git a/internal/infrastructure/storage/db/postgres/migration/20240131150155_add_blocktime_field.up.sql b/internal/infrastructure/storage/db/postgres/migration/20240131150155_add_blocktime_field.up.sql new file mode 100644 index 00000000..727aae51 --- /dev/null +++ b/internal/infrastructure/storage/db/postgres/migration/20240131150155_add_blocktime_field.up.sql @@ -0,0 +1 @@ +ALTER TABLE transaction ADD COLUMN block_time BIGINT; \ No newline at end of file diff --git a/internal/infrastructure/storage/db/postgres/sqlc/queries/models.go b/internal/infrastructure/storage/db/postgres/sqlc/queries/models.go index ee58bd6e..df47d3b2 100644 --- a/internal/infrastructure/storage/db/postgres/sqlc/queries/models.go +++ b/internal/infrastructure/storage/db/postgres/sqlc/queries/models.go @@ -37,6 +37,7 @@ type Transaction struct { TxHex string BlockHash string BlockHeight int32 + BlockTime sql.NullInt64 } type TxInputAccount struct { diff --git a/internal/infrastructure/storage/db/postgres/sqlc/queries/query.sql.go b/internal/infrastructure/storage/db/postgres/sqlc/queries/query.sql.go index c742d5ac..8e523535 100644 --- a/internal/infrastructure/storage/db/postgres/sqlc/queries/query.sql.go +++ b/internal/infrastructure/storage/db/postgres/sqlc/queries/query.sql.go @@ -193,7 +193,7 @@ func (q *Queries) GetScript(ctx context.Context, account string) (ExternalScript } const getTransaction = `-- name: GetTransaction :many -SELECT tx_id, tx_hex, block_hash, block_height, id, account_name, fk_tx_id FROM transaction t left join tx_input_account tia on t.tx_id = tia.fk_tx_id WHERE tx_id=$1 +SELECT tx_id, tx_hex, block_hash, block_height, block_time, id, account_name, fk_tx_id FROM transaction t left join tx_input_account tia on t.tx_id = tia.fk_tx_id WHERE tx_id=$1 ` type GetTransactionRow struct { @@ -201,6 +201,7 @@ type GetTransactionRow struct { TxHex string BlockHash string BlockHeight int32 + BlockTime sql.NullInt64 ID sql.NullInt32 AccountName sql.NullString FkTxID sql.NullString @@ -220,6 +221,7 @@ func (q *Queries) GetTransaction(ctx context.Context, txID string) ([]GetTransac &i.TxHex, &i.BlockHash, &i.BlockHeight, + &i.BlockTime, &i.ID, &i.AccountName, &i.FkTxID, @@ -558,8 +560,8 @@ func (q *Queries) InsertScript(ctx context.Context, arg InsertScriptParams) erro } const insertTransaction = `-- name: InsertTransaction :one -INSERT INTO transaction(tx_id,tx_hex,block_hash,block_height) -VALUES($1,$2,$3,$4) RETURNING tx_id, tx_hex, block_hash, block_height +INSERT INTO transaction(tx_id,tx_hex,block_hash,block_height,block_time) +VALUES($1,$2,$3,$4,$5) RETURNING tx_id, tx_hex, block_hash, block_height, block_time ` type InsertTransactionParams struct { @@ -567,6 +569,7 @@ type InsertTransactionParams struct { TxHex string BlockHash string BlockHeight int32 + BlockTime sql.NullInt64 } // TRANSACTION @@ -576,6 +579,7 @@ func (q *Queries) InsertTransaction(ctx context.Context, arg InsertTransactionPa arg.TxHex, arg.BlockHash, arg.BlockHeight, + arg.BlockTime, ) var i Transaction err := row.Scan( @@ -583,6 +587,7 @@ func (q *Queries) InsertTransaction(ctx context.Context, arg InsertTransactionPa &i.TxHex, &i.BlockHash, &i.BlockHeight, + &i.BlockTime, ) return i, err } @@ -810,13 +815,14 @@ func (q *Queries) UpdateAccount(ctx context.Context, arg UpdateAccountParams) (A } const updateTransaction = `-- name: UpdateTransaction :one -UPDATE transaction SET tx_hex=$1,block_hash=$2,block_height=$3 WHERE tx_id=$4 RETURNING tx_id, tx_hex, block_hash, block_height +UPDATE transaction SET tx_hex=$1,block_hash=$2,block_height=$3,block_time=$4 WHERE tx_id=$5 RETURNING tx_id, tx_hex, block_hash, block_height, block_time ` type UpdateTransactionParams struct { TxHex string BlockHash string BlockHeight int32 + BlockTime sql.NullInt64 TxID string } @@ -825,6 +831,7 @@ func (q *Queries) UpdateTransaction(ctx context.Context, arg UpdateTransactionPa arg.TxHex, arg.BlockHash, arg.BlockHeight, + arg.BlockTime, arg.TxID, ) var i Transaction @@ -833,6 +840,7 @@ func (q *Queries) UpdateTransaction(ctx context.Context, arg UpdateTransactionPa &i.TxHex, &i.BlockHash, &i.BlockHeight, + &i.BlockTime, ) return i, err } diff --git a/internal/infrastructure/storage/db/postgres/sqlc/query.sql b/internal/infrastructure/storage/db/postgres/sqlc/query.sql index 1ad156f6..2748c832 100644 --- a/internal/infrastructure/storage/db/postgres/sqlc/query.sql +++ b/internal/infrastructure/storage/db/postgres/sqlc/query.sql @@ -65,15 +65,15 @@ SELECT * FROM utxo WHERE account_name=$1; /* TRANSACTION */ -- name: InsertTransaction :one -INSERT INTO transaction(tx_id,tx_hex,block_hash,block_height) -VALUES($1,$2,$3,$4) RETURNING *; +INSERT INTO transaction(tx_id,tx_hex,block_hash,block_height,block_time) +VALUES($1,$2,$3,$4,$5) RETURNING *; -- name: InsertTransactionInputAccount :one INSERT INTO tx_input_account(account_name, fk_tx_id) VALUES($1,$2) RETURNING *; -- name: UpdateTransaction :one -UPDATE transaction SET tx_hex=$1,block_hash=$2,block_height=$3 WHERE tx_id=$4 RETURNING *; +UPDATE transaction SET tx_hex=$1,block_hash=$2,block_height=$3,block_time=$4 WHERE tx_id=$5 RETURNING *; -- name: DeleteTransactionInputAccounts :exec DELETE FROM tx_input_account WHERE fk_tx_id=$1; diff --git a/internal/infrastructure/storage/db/postgres/transaction_repository.go b/internal/infrastructure/storage/db/postgres/transaction_repository.go index cf72bc74..3ea5c2fa 100644 --- a/internal/infrastructure/storage/db/postgres/transaction_repository.go +++ b/internal/infrastructure/storage/db/postgres/transaction_repository.go @@ -2,6 +2,7 @@ package postgresdb import ( "context" + "database/sql" "errors" "sync" @@ -23,7 +24,9 @@ type txRepositoryPg struct { externalChEvents chan domain.TransactionEvent } -func NewTxRepositoryPgImpl(pgxPool *pgxpool.Pool) domain.TransactionRepository { +func NewTxRepositoryPgImpl( + pgxPool *pgxpool.Pool, +) domain.TransactionRepository { return newTxRepositoryPgImpl(pgxPool) } @@ -38,8 +41,7 @@ func newTxRepositoryPgImpl(pgxPool *pgxpool.Pool) *txRepositoryPg { } func (t *txRepositoryPg) AddTransaction( - ctx context.Context, - trx *domain.Transaction, + ctx context.Context, trx *domain.Transaction, ) (bool, error) { conn, err := t.pgxPool.Acquire(ctx) if err != nil { @@ -55,14 +57,18 @@ func (t *txRepositoryPg) AddTransaction( querierWithTx := t.querier.WithTx(tx) - txPg, err := querierWithTx.InsertTransaction(ctx, queries.InsertTransactionParams{ - TxID: trx.TxID, - TxHex: trx.TxHex, - BlockHash: trx.BlockHash, - BlockHeight: int32(trx.BlockHeight), - }) + txPg, err := querierWithTx.InsertTransaction( + ctx, queries.InsertTransactionParams{ + TxID: trx.TxID, + TxHex: trx.TxHex, + BlockHash: trx.BlockHash, + BlockHeight: int32(trx.BlockHeight), + BlockTime: sql.NullInt64{Int64: trx.BlockTime, Valid: true}, + }, + ) if err != nil { - if pqErr, ok := err.(*pgconn.PgError); pqErr != nil && ok && pqErr.Code == uniqueViolation { + if pqErr, ok := err.(*pgconn.PgError); pqErr != nil && ok && + pqErr.Code == uniqueViolation { return false, nil } else { return false, err @@ -70,10 +76,12 @@ func (t *txRepositoryPg) AddTransaction( } for k := range trx.Accounts { - if _, err := querierWithTx.InsertTransactionInputAccount(ctx, queries.InsertTransactionInputAccountParams{ - AccountName: k, - FkTxID: txPg.TxID, - }); err != nil { + if _, err := querierWithTx.InsertTransactionInputAccount( + ctx, queries.InsertTransactionInputAccountParams{ + AccountName: k, + FkTxID: txPg.TxID, + }, + ); err != nil { return false, err } } @@ -92,9 +100,7 @@ func (t *txRepositoryPg) AddTransaction( func (t *txRepositoryPg) ConfirmTransaction( ctx context.Context, - txid string, - blockHash string, - blockHeight uint64, + txid, blockhash string, blockheight uint64, blocktime int64, ) (bool, error) { tx, err := t.getTx(ctx, txid) if err != nil { @@ -105,7 +111,7 @@ func (t *txRepositoryPg) ConfirmTransaction( return false, nil } - tx.Confirm(blockHash, blockHeight) + tx.Confirm(blockhash, blockheight, blocktime) if err := t.updateTx(ctx, t.querier, *tx); err != nil { return false, err @@ -120,15 +126,13 @@ func (t *txRepositoryPg) ConfirmTransaction( } func (t *txRepositoryPg) GetTransaction( - ctx context.Context, - txid string, + ctx context.Context, txid string, ) (*domain.Transaction, error) { return t.getTx(ctx, txid) } func (t *txRepositoryPg) UpdateTransaction( - ctx context.Context, - txid string, + ctx context.Context, txid string, updateFn func(tx *domain.Transaction) (*domain.Transaction, error), ) error { tx, err := t.getTx(ctx, txid) @@ -166,14 +170,13 @@ func (t *txRepositoryPg) close() { } func (t *txRepositoryPg) updateTx( - ctx context.Context, - querier *queries.Queries, - trx domain.Transaction, + ctx context.Context, querier *queries.Queries, trx domain.Transaction, ) error { if _, err := querier.UpdateTransaction(ctx, queries.UpdateTransactionParams{ TxHex: trx.TxHex, BlockHash: trx.BlockHash, BlockHeight: int32(trx.BlockHeight), + BlockTime: sql.NullInt64{Int64: trx.BlockTime, Valid: true}, TxID: trx.TxID, }); err != nil { return err @@ -184,10 +187,12 @@ func (t *txRepositoryPg) updateTx( } for k := range trx.Accounts { - if _, err := querier.InsertTransactionInputAccount(ctx, queries.InsertTransactionInputAccountParams{ - AccountName: k, - FkTxID: trx.TxID, - }); err != nil { + if _, err := querier.InsertTransactionInputAccount( + ctx, queries.InsertTransactionInputAccountParams{ + AccountName: k, + FkTxID: trx.TxID, + }, + ); err != nil { return err } } @@ -219,6 +224,7 @@ func (t *txRepositoryPg) getTx( TxHex: tx[0].TxHex, BlockHash: tx[0].BlockHash, BlockHeight: uint64(tx[0].BlockHeight), + BlockTime: tx[0].BlockTime.Int64, Accounts: accounts, }, nil } diff --git a/internal/infrastructure/storage/db/test/transaction_repository_test.go b/internal/infrastructure/storage/db/test/transaction_repository_test.go index 8556ce11..ab8c2cc4 100644 --- a/internal/infrastructure/storage/db/test/transaction_repository_test.go +++ b/internal/infrastructure/storage/db/test/transaction_repository_test.go @@ -3,6 +3,7 @@ package db_test import ( "fmt" "testing" + "time" "github.com/stretchr/testify/require" "github.com/vulpemventures/ocean/internal/core/domain" @@ -62,12 +63,13 @@ func testTransactionRepository(t *testing.T, repo domain.TransactionRepository) t.Run("confirm_transaction", func(t *testing.T) { blockHash := randomHex(32) blockHeight := uint64(randomIntInRange(100, 1000)) + blocktime := time.Now().Unix() - done, err := repo.ConfirmTransaction(ctx, txid, blockHash, blockHeight) + done, err := repo.ConfirmTransaction(ctx, txid, blockHash, blockHeight, blocktime) require.NoError(t, err) require.True(t, done) - done, err = repo.ConfirmTransaction(ctx, txid, blockHash, blockHeight) + done, err = repo.ConfirmTransaction(ctx, txid, blockHash, blockHeight, blocktime) require.NoError(t, err) require.False(t, done) diff --git a/internal/interfaces/grpc/handler/notification.go b/internal/interfaces/grpc/handler/notification.go index 42743346..6634c5c8 100644 --- a/internal/interfaces/grpc/handler/notification.go +++ b/internal/interfaces/grpc/handler/notification.go @@ -38,8 +38,9 @@ func (n notification) TransactionNotifications( var blockDetails *pb.BlockDetails if e.Transaction.IsConfirmed() { blockDetails = &pb.BlockDetails{ - Hash: e.Transaction.BlockHash, - Height: e.Transaction.BlockHeight, + Hash: e.Transaction.BlockHash, + Height: e.Transaction.BlockHeight, + Timestamp: e.Transaction.BlockTime, } } if err := stream.Send(&pb.TransactionNotificationsResponse{ diff --git a/internal/interfaces/grpc/handler/util.go b/internal/interfaces/grpc/handler/util.go index 0548318a..364eb252 100644 --- a/internal/interfaces/grpc/handler/util.go +++ b/internal/interfaces/grpc/handler/util.go @@ -109,7 +109,7 @@ func parseBlockDetails(tx application.TransactionInfo) *pb.BlockDetails { return &pb.BlockDetails{ Hash: tx.BlockHash, Height: tx.BlockHeight, - Timestamp: int64(tx.BlockHeight), + Timestamp: tx.BlockTime, } }