Skip to content

Commit

Permalink
txpool: more tx to txn renames (#12801)
Browse files Browse the repository at this point in the history
last part of "tx" to "txn" renames to stick to our convention of using
"txn" for blockchain transactions instead of "tx" for db transactions

there is some more pending inside core/types package but planning to do
those at some point in the future (might also move those transaction
types outside of core/types and into a sub-package in txnprovider)
  • Loading branch information
taratorio authored Nov 20, 2024
1 parent 46f44d4 commit e7a2937
Show file tree
Hide file tree
Showing 24 changed files with 702 additions and 701 deletions.
2 changes: 1 addition & 1 deletion cmd/devnet/transactions/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func SendTxLoad(ctx context.Context, to, from string, amount uint64, txPerSec ui
tx, err := CreateManyEIP1559TransactionsHigherThanBaseFee(ctx, to, from, int(batchCount))

if err != nil {
logger.Error("failed Create Txs", "error", err)
logger.Error("failed Create Txns", "error", err)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
return err
})
//miningStages.MockExecFunc(stages.MiningFinish, func(s *stagedsync.StageState, u stagedsync.Unwinder) error {
//debugprint.Transactions(nextBlock.Transactions(), miningWorld.Block.Txs)
//debugprint.Transactions(nextBlock.Transactions(), miningWorld.Block.Txns)
//debugprint.Receipts(miningWorld.Block.Receipts, receiptsInDB)
//return stagedsync.SpawnMiningFinishStage(s, tx, miningWorld.Block, cc.Engine(), chainConfig, quit)
//})
Expand Down
4 changes: 2 additions & 2 deletions core/types/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func TestBlobTxEncodeDecode(t *testing.T) {
}

func TestShortUnwrap(t *testing.T) {
blobTxRlp, _ := typestest.MakeBlobTxRlp()
blobTxRlp, _ := typestest.MakeBlobTxnRlp()
shortRlp, err := UnwrapTxPlayloadRlp(blobTxRlp)
if err != nil {
t.Errorf("short rlp stripping failed: %v", err)
Expand All @@ -747,7 +747,7 @@ func TestShortUnwrap(t *testing.T) {
t.Errorf("short rlp decoding failed : %v", err)
}
wrappedBlobTx := BlobTxWrapper{}
blockTxRlp2, _ := typestest.MakeBlobTxRlp()
blockTxRlp2, _ := typestest.MakeBlobTxnRlp()
err = wrappedBlobTx.DecodeRLP(rlp.NewStream(bytes.NewReader(blockTxRlp2[1:]), 0))
if err != nil {
t.Errorf("long rlp decoding failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion core/types/typestest/test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/erigontech/erigon-lib/crypto/kzg"
)

func MakeBlobTxRlp() ([]byte, []gokzg4844.KZGCommitment) {
func MakeBlobTxnRlp() ([]byte, []gokzg4844.KZGCommitment) {
bodyRlp := hexutility.MustDecodeHex(BodyRlpHex)

blobsRlpPrefix := hexutility.MustDecodeHex("fa040008")
Expand Down
2 changes: 1 addition & 1 deletion txnprovider/txpool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ tx_hashes, pending_pool, queued_pool, local_transactions => pending_pool`, queue

where modified pending pool and queued pool, and local transactions do not contain given transaction hashes anymore.

In code can find production of such event by `p.newPendingTxs <-`, and handling of this event by `announcements := <-newTxs` [here](pool.go#L1746)
In code can find production of such event by `p.newPendingTxns <-`, and handling of this event by `announcements := <-newTxns` [here](pool.go#L1746)

## Reinject transactions into the transaction pool on unwinding a block
This can be thought of a reverse operation from the one described before. When a block that was deemed "the best" of its height, is no longer deemed "the best", the transactions contained in it, are now viable for inclusion in other blocks, and therefore should be returned into the transaction pool. We can describe this function as:
Expand Down
46 changes: 23 additions & 23 deletions txnprovider/txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ type Fetch struct {
stateChangesClient StateChangesClient
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesParseCtx *TxnParseContext
pooledTxsParseCtx *TxnParseContext
pooledTxnsParseCtx *TxnParseContext
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
stateChangesParseCtxLock sync.Mutex
pooledTxsParseCtxLock sync.Mutex
pooledTxnsParseCtxLock sync.Mutex
logger log.Logger
}

Expand All @@ -72,10 +72,10 @@ func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Poo
db: db,
stateChangesClient: stateChangesClient,
stateChangesParseCtx: NewTxnParseContext(chainID).ChainIDRequired(), //TODO: change ctx if rules changed
pooledTxsParseCtx: NewTxnParseContext(chainID).ChainIDRequired(),
pooledTxnsParseCtx: NewTxnParseContext(chainID).ChainIDRequired(),
logger: logger,
}
f.pooledTxsParseCtx.ValidateRLP(f.pool.ValidateSerializedTxn)
f.pooledTxnsParseCtx.ValidateRLP(f.pool.ValidateSerializedTxn)
f.stateChangesParseCtx.ValidateRLP(f.pool.ValidateSerializedTxn)

return f
Expand All @@ -86,9 +86,9 @@ func (f *Fetch) SetWaitGroup(wg *sync.WaitGroup) {
}

func (f *Fetch) threadSafeParsePooledTxn(cb func(*TxnParseContext) error) error {
f.pooledTxsParseCtxLock.Lock()
defer f.pooledTxsParseCtxLock.Unlock()
return cb(f.pooledTxsParseCtx)
f.pooledTxnsParseCtxLock.Lock()
defer f.pooledTxnsParseCtxLock.Unlock()
return cb(f.pooledTxnsParseCtx)
}

func (f *Fetch) threadSafeParseStateChangeTxn(cb func(*TxnParseContext) error) error {
Expand Down Expand Up @@ -282,7 +282,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
const hashSize = 32
hashes = hashes[:min(len(hashes), 256*hashSize)]

var txs [][]byte
var txns [][]byte
responseSize := 0
processed := len(hashes)

Expand All @@ -302,11 +302,11 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
continue
}

txs = append(txs, txn)
txns = append(txns, txn)
responseSize += len(txn)
}

encodedRequest = EncodePooledTransactions66(txs, requestID, nil)
encodedRequest = EncodePooledTransactions66(txns, requestID, nil)
if len(encodedRequest) > p2pTxPacketLimit {
log.Trace("txpool.Fetch.handleInboundMessage PooledTransactions reply exceeds p2pTxPacketLimit", "requested", len(hashes), "processed", processed)
}
Expand All @@ -318,7 +318,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
return err
}
case sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_66:
txs := TxnSlots{}
txns := TxnSlots{}
if err := f.threadSafeParsePooledTxn(func(parseContext *TxnParseContext) error {
return nil
}); err != nil {
Expand All @@ -328,7 +328,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
switch req.Id {
case sentry.MessageId_TRANSACTIONS_66:
if err := f.threadSafeParsePooledTxn(func(parseContext *TxnParseContext) error {
if _, err := ParseTransactions(req.Data, 0, parseContext, &txs, func(hash []byte) error {
if _, err := ParseTransactions(req.Data, 0, parseContext, &txns, func(hash []byte) error {
known, err := f.pool.IdHashKnown(tx, hash)
if err != nil {
return err
Expand All @@ -346,7 +346,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
}
case sentry.MessageId_POOLED_TRANSACTIONS_66:
if err := f.threadSafeParsePooledTxn(func(parseContext *TxnParseContext) error {
if _, _, err := ParsePooledTransactions66(req.Data, 0, parseContext, &txs, func(hash []byte) error {
if _, _, err := ParsePooledTransactions66(req.Data, 0, parseContext, &txns, func(hash []byte) error {
known, err := f.pool.IdHashKnown(tx, hash)
if err != nil {
return err
Expand All @@ -365,10 +365,10 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
default:
return fmt.Errorf("unexpected message: %s", req.Id.String())
}
if len(txs.Txs) == 0 {
if len(txns.Txns) == 0 {
return nil
}
f.pool.AddRemoteTxns(ctx, txs)
f.pool.AddRemoteTxns(ctx, txns)
default:
defer f.logger.Trace("[txpool] dropped p2p message", "id", req.Id)
}
Expand Down Expand Up @@ -472,14 +472,14 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}

func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.StateChangeBatch) error {
var unwindTxs, unwindBlobTxs, minedTxs TxnSlots
var unwindTxns, unwindBlobTxns, minedTxns TxnSlots
for _, change := range req.ChangeBatch {
if change.Direction == remote.Direction_FORWARD {
minedTxs.Resize(uint(len(change.Txs)))
minedTxns.Resize(uint(len(change.Txs)))
for i := range change.Txs {
minedTxs.Txs[i] = &TxnSlot{}
minedTxns.Txns[i] = &TxnSlot{}
if err := f.threadSafeParseStateChangeTxn(func(parseContext *TxnParseContext) error {
_, err := parseContext.ParseTransaction(change.Txs[i], 0, minedTxs.Txs[i], minedTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
_, err := parseContext.ParseTransaction(change.Txs[i], 0, minedTxns.Txns[i], minedTxns.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
return err
}); err != nil && !errors.Is(err, context.Canceled) {
f.logger.Warn("[txpool.fetch] stream.Recv", "err", err)
Expand All @@ -495,10 +495,10 @@ func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.State
if err != nil {
return err
}
if utx.Type == BlobTxType {
unwindBlobTxs.Append(utx, sender, false)
if utx.Type == BlobTxnType {
unwindBlobTxns.Append(utx, sender, false)
} else {
unwindTxs.Append(utx, sender, false)
unwindTxns.Append(utx, sender, false)
}
return nil
}); err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -509,7 +509,7 @@ func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.State
}
}

if err := f.pool.OnNewBlock(ctx, req, unwindTxs, unwindBlobTxs, minedTxs); err != nil && !errors.Is(err, context.Canceled) {
if err := f.pool.OnNewBlock(ctx, req, unwindTxns, unwindBlobTxns, minedTxns); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
Expand Down
47 changes: 24 additions & 23 deletions txnprovider/txpool/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ func TestSendTxPropagate(t *testing.T) {

m := NewMockSentry(ctx, sentryServer)
send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, nil, log.New())
send.BroadcastPooledTxs(testRlps(2), 100)
send.AnnouncePooledTxs([]byte{0, 1}, []uint32{10, 15}, toHashes(1, 42), 100)
send.BroadcastPooledTxns(testRlps(2), 100)
send.AnnouncePooledTxns([]byte{0, 1}, []uint32{10, 15}, toHashes(1, 42), 100)

require.Equal(t, 2, len(requests))

txsMessage := requests[0].Data
assert.Equal(t, sentryproto.MessageId_TRANSACTIONS_66, txsMessage.Id)
assert.Equal(t, 3, len(txsMessage.Data))
txnsMessage := requests[0].Data
assert.Equal(t, sentryproto.MessageId_TRANSACTIONS_66, txnsMessage.Id)
assert.Equal(t, 3, len(txnsMessage.Data))

txnHashesMessage := requests[1].Data
assert.Equal(t, sentryproto.MessageId_NEW_POOLED_TRANSACTION_HASHES_68, txnHashesMessage.Id)
Expand Down Expand Up @@ -137,14 +137,14 @@ func TestSendTxPropagate(t *testing.T) {
b := []byte(fmt.Sprintf("%x", i))
copy(list[i:i+32], b)
}
send.BroadcastPooledTxs(testRlps(len(list)/32), 100)
send.AnnouncePooledTxs([]byte{0, 1, 2}, []uint32{10, 12, 14}, list, 100)
send.BroadcastPooledTxns(testRlps(len(list)/32), 100)
send.AnnouncePooledTxns([]byte{0, 1, 2}, []uint32{10, 12, 14}, list, 100)

require.Equal(t, 2, len(requests))

txsMessage := requests[0].Data
require.Equal(t, sentryproto.MessageId_TRANSACTIONS_66, txsMessage.Id)
require.True(t, len(txsMessage.Data) > 0)
txnsMessage := requests[0].Data
require.Equal(t, sentryproto.MessageId_TRANSACTIONS_66, txnsMessage.Id)
require.True(t, len(txnsMessage.Data) > 0)

txnHashesMessage := requests[1].Data
require.Equal(t, sentryproto.MessageId_NEW_POOLED_TRANSACTION_HASHES_68, txnHashesMessage.Id)
Expand All @@ -167,14 +167,14 @@ func TestSendTxPropagate(t *testing.T) {

m := NewMockSentry(ctx, sentryServer)
send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, nil, log.New())
send.BroadcastPooledTxs(testRlps(2), 100)
send.AnnouncePooledTxs([]byte{0, 1}, []uint32{10, 15}, toHashes(1, 42), 100)
send.BroadcastPooledTxns(testRlps(2), 100)
send.AnnouncePooledTxns([]byte{0, 1}, []uint32{10, 15}, toHashes(1, 42), 100)

require.Equal(t, 2, len(requests))

txsMessage := requests[0].Data
assert.Equal(t, sentryproto.MessageId_TRANSACTIONS_66, txsMessage.Id)
assert.True(t, len(txsMessage.Data) > 0)
txnsMessage := requests[0].Data
assert.Equal(t, sentryproto.MessageId_TRANSACTIONS_66, txnsMessage.Id)
assert.True(t, len(txnsMessage.Data) > 0)

txnHashesMessage := requests[1].Data
assert.Equal(t, sentryproto.MessageId_NEW_POOLED_TRANSACTION_HASHES_68, txnHashesMessage.Id)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestSendTxPropagate(t *testing.T) {
m := NewMockSentry(ctx, sentryServer)
send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, nil, log.New())
expectPeers := toPeerIDs(1, 2, 42)
send.PropagatePooledTxsToPeersList(expectPeers, []byte{0, 1}, []uint32{10, 15}, toHashes(1, 42))
send.PropagatePooledTxnsToPeersList(expectPeers, []byte{0, 1}, []uint32{10, 15}, toHashes(1, 42))

require.Equal(t, 3, len(requests))
for i, req := range requests {
Expand All @@ -226,6 +226,7 @@ func decodeHex(in string) []byte {
}
return payload
}

func TestOnNewBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -246,9 +247,9 @@ func TestOnNewBlock(t *testing.T) {
ChangeBatch: []*remote.StateChange{
{
Txs: [][]byte{
decodeHex(TxParseMainnetTests[0].PayloadStr),
decodeHex(TxParseMainnetTests[1].PayloadStr),
decodeHex(TxParseMainnetTests[2].PayloadStr),
decodeHex(TxnParseMainnetTests[0].PayloadStr),
decodeHex(TxnParseMainnetTests[1].PayloadStr),
decodeHex(TxnParseMainnetTests[2].PayloadStr),
},
BlockHeight: 1,
BlockHash: gointerfaces.ConvertHashToH256([32]byte{}),
Expand All @@ -275,19 +276,19 @@ func TestOnNewBlock(t *testing.T) {
}).
Times(3)

var minedTxs TxnSlots
var minedTxns TxnSlots
pool.EXPECT().
OnNewBlock(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, _ *remote.StateChangeBatch, _ TxnSlots, _ TxnSlots, minedTxsArg TxnSlots) error {
minedTxs = minedTxsArg
DoAndReturn(func(_ context.Context, _ *remote.StateChangeBatch, _ TxnSlots, _ TxnSlots, minedTxnsArg TxnSlots) error {
minedTxns = minedTxnsArg
return nil
}).
Times(1)

fetch := NewFetch(ctx, nil, pool, stateChanges, coreDB, db, *u256.N1, log.New())
err := fetch.handleStateChanges(ctx, stateChanges)
assert.ErrorIs(t, io.EOF, err)
assert.Equal(t, 3, len(minedTxs.Txs))
assert.Equal(t, 3, len(minedTxns.Txns))
}

type MockSentry struct {
Expand Down
6 changes: 3 additions & 3 deletions txnprovider/txpool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package txpool
import "github.com/erigontech/erigon-lib/metrics"

var (
processBatchTxsTimer = metrics.NewSummary(`pool_process_remote_txs`)
addRemoteTxsTimer = metrics.NewSummary(`pool_add_remote_txs`)
processBatchTxnsTimer = metrics.NewSummary(`pool_process_remote_txs`)
addRemoteTxnsTimer = metrics.NewSummary(`pool_add_remote_txs`)
newBlockTimer = metrics.NewSummary(`pool_new_block`)
writeToDBTimer = metrics.NewSummary(`pool_write_to_db`)
propagateToNewPeerTimer = metrics.NewSummary(`pool_propagate_to_new_peer`)
propagateNewTxsTimer = metrics.NewSummary(`pool_propagate_new_txs`)
propagateNewTxnsTimer = metrics.NewSummary(`pool_propagate_new_txs`)
writeToDBBytesCounter = metrics.GetOrCreateGauge(`pool_write_to_db_bytes`)
pendingSubCounter = metrics.GetOrCreateGauge(`txpool_pending`)
queuedSubCounter = metrics.GetOrCreateGauge(`txpool_queued`)
Expand Down
2 changes: 1 addition & 1 deletion txnprovider/txpool/pending_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// PendingPool - is different from other pools - it's best is Slice instead of Heap
// It's more expensive to maintain "slice sort" invariant, but it allow do cheap copy of
// pending.best slice for mining (because we consider txs and metaTxn are immutable)
// pending.best slice for mining (because we consider txns and metaTxn are immutable)
type PendingPool struct {
best *bestSlice
worst *WorstQueue
Expand Down
Loading

0 comments on commit e7a2937

Please sign in to comment.