From bd59e96ce7b0d0376b5543edd4c6707b96871e67 Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Fri, 31 Dec 2021 14:24:54 -0600 Subject: [PATCH] query: end completed queries in sendTransaction (*ChainService).sendTransaction always timed out even if each peer did respond. This is because after receiving a getdata message in response to the outgoing inv, the checkResponse closure provided to queryAllPeers neglected to close the peerQuit channel. As a result, sendTransaction always blocks for the full configured BroadcastTimeout. This change adds the missing close(peerQuit) when a matching response (either a getdata or reject) is received. Note that as before this change, a non-nil error from sendTransaction does not guarantee that a remote peer actually received the MsgTx that was queued since the outgoing tx message is handled asynchronously. To account for the removed delay, the TestNeutrinoSync unit test is updated with a waitTx helper to ensure the mining node has the sent transaction prior to requesting a new block. --- query.go | 15 ++++++++++++--- sync_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/query.go b/query.go index 1d64f738..a05f23b0 100644 --- a/query.go +++ b/query.go @@ -1078,6 +1078,8 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e ) numReplied++ + + close(peerQuit) } } @@ -1096,6 +1098,13 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e response, sp.Addr(), ) rejections[*broadcastErr]++ + + log.Debugf("Transaction %v rejected by peer "+ + "%v: code = %v, reason = %q", txHash, + sp.Addr(), broadcastErr.Code, + broadcastErr.Reason) + + close(peerQuit) } }, append( @@ -1109,7 +1118,7 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e // transaction upon every block connected/disconnected. if numReplied == 0 { log.Debugf("No peers replied to inv message for transaction %v", - tx.TxHash()) + txHash) return nil } @@ -1118,7 +1127,7 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e // it so we'll return the most rejected error between all of our peers. // // TODO(wilmer): This might be too naive, some rejections are more - // critical than others. + // critical than others. e.g. pushtx.Mempool and pushtx.Confirmed are OK. // // TODO(wilmer): This does not cover the case where a peer also rejected // our transaction but didn't send the response within our given timeout @@ -1126,7 +1135,7 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e // threshold of rejections instead. if numReplied == len(rejections) { log.Warnf("All peers rejected transaction %v checking errors", - tx.TxHash()) + txHash) mostRejectedCount := 0 var mostRejectedErr pushtx.BroadcastError diff --git a/sync_test.go b/sync_test.go index af6d5f87..10142d23 100644 --- a/sync_test.go +++ b/sync_test.go @@ -474,6 +474,29 @@ func testStartRescan(harness *neutrinoHarness, t *testing.T) { } } + // waitTx will poll for a transaction to appear on given node for up to + // 5 seconds. + waitTx := func(node *rpcclient.Client, hash chainhash.Hash) { + t.Helper() + exitTimer := time.NewTimer(5 * time.Second) + defer exitTimer.Stop() + for { + <-time.After(200 * time.Millisecond) + + select { + case <-exitTimer.C: + t.Fatalf("Timeout waiting to see transaction.") + default: + } + + if _, err := node.GetRawTransaction(&hash); err != nil { + continue + } + + return + } + } + // Create another address to send to so we don't trip the rescan with // the old address and we can test monitoring both OutPoint usage and // receipt by addresses. @@ -520,6 +543,9 @@ func testStartRescan(harness *neutrinoHarness, t *testing.T) { if err != nil && !strings.Contains(err.Error(), "already have") { t.Fatalf("Unable to send transaction to network: %s", err) } + // SendTransaction does not know when the MsgTx was actually sent, only + // that a getdata request was received and a MsgTx queued to send. + waitTx(harness.h1.Client, authTx1.Tx.TxHash()) _, err = harness.h1.Client.Generate(1) if err != nil { t.Fatalf("Couldn't generate/submit block: %s", err) @@ -565,6 +591,7 @@ func testStartRescan(harness *neutrinoHarness, t *testing.T) { if err != nil && !strings.Contains(err.Error(), "already have") { t.Fatalf("Unable to send transaction to network: %s", err) } + waitTx(harness.h1.Client, authTx2.Tx.TxHash()) _, err = harness.h1.Client.Generate(1) if err != nil { t.Fatalf("Couldn't generate/submit block: %s", err)