Skip to content

Commit

Permalink
chain: PrunedBlockDispatcher bugfix.
Browse files Browse the repository at this point in the history
In case we fail the request via the neutriono block peer fetcher
we make sure we fail all dependant `GetBlock` calls and remove
this block request completely so that a new request for this same
block hash can be made.
  • Loading branch information
ziggie1984 committed Jan 24, 2024
1 parent 5df09dd commit f13e5df
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 7 deletions.
28 changes: 27 additions & 1 deletion chain/bitcoind_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,16 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
return nil, err
}

// cancelChan is needed in case a block request with the same hash is
// already registered and fails via the returned errChan. Because we
// don't register a new request in this case this cancelChan is used
// to signal the failure of the dependant block request.
cancelChan := make(chan error, 1)

// Now that we know the block has been pruned for sure, request it from
// our backend peers.
blockChan, errChan := c.prunedBlockDispatcher.Query(
[]*chainhash.Hash{hash},
[]*chainhash.Hash{hash}, cancelChan,
)

for {
Expand All @@ -482,12 +488,32 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {

case err := <-errChan:
if err != nil {
// An error was returned for this block request.
// We have to make sure we remove the blockhash
// from the list of queried blocks.
// Moreover because in case a block is requested
// more than onced no redundant block requests
// are registered but rather a reply channel is
// added to the pending block request. This
// means we need to cancel all dependent
// `GetBlock` calls via the cancel channel when
// the fetching of the block was NOT successful.
c.prunedBlockDispatcher.CancelRequest(
*hash, err,
)

return nil, err
}

// errChan fired before blockChan with a nil error, wait
// for the block now.

// The cancelChan is only used when there is already a pending
// block request for this hash and that block request fails via
// the error channel above.
case err := <-cancelChan:
return nil, err

case <-c.quit:
return nil, ErrBitcoindClientShuttingDown
}
Expand Down
65 changes: 60 additions & 5 deletions chain/pruned_block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,14 @@ type PrunedBlockDispatcher struct {
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock
blockMtx sync.Mutex

// blockQueryCancel signals the cancelation of a `GetBlock` request.
//
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blockQueryCancel map[chainhash.Hash][]chan<- error

blockMtx sync.Mutex

// currentPeers represents the set of peers we're currently connected
// to. Each peer found here will have a worker spawned within the
Expand Down Expand Up @@ -501,10 +508,10 @@ func (d *PrunedBlockDispatcher) banPeer(peer string) {
}

// Query submits a request to query the information of the given blocks.
func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, chanelChan chan<- error,
opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) {

reqs, blockChan, err := d.newRequest(blocks)
reqs, blockChan, err := d.newRequest(blocks, chanelChan)
if err != nil {
errChan := make(chan error, 1)
errChan <- err
Expand All @@ -515,17 +522,22 @@ func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
if len(reqs) > 0 {
errChan = d.workManager.Query(reqs, opts...)
}

// Either we read here and remove the request from memory or
// we do it somewhere else.
return blockChan, errChan
}

// newRequest construct a new query request for the given blocks to submit to
// the internal workManager. A channel is also returned through which the
// requested blocks are sent through.
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
[]*query.Request, <-chan *wire.MsgBlock, error) {
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash,
chanelChan chan<- error) ([]*query.Request, <-chan *wire.MsgBlock,
error) {

// Make sure the channel is buffered enough to handle all blocks.
blockChan := make(chan *wire.MsgBlock, len(blocks))
cancelChan := make(chan error)

d.blockMtx.Lock()
defer d.blockMtx.Unlock()
Expand All @@ -550,6 +562,10 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
} else {
log.Debugf("Received new request for pending query of "+
"block %v", *block)

d.blockQueryCancel[*block] = append(
d.blockQueryCancel[*block], cancelChan,
)
}

d.blocksQueried[*block] = append(
Expand Down Expand Up @@ -678,3 +694,42 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,

return progress
}

// CancelRequest removes all information regarding a failed block request.
// When for example the Peer disconnects or runs in a timeout we make sure
// that all related information is deleted and a new request for this block
// can be registered. Moreover will also cancel all depending goroutines.
func (d *PrunedBlockDispatcher) CancelRequest(blockHash chainhash.Hash,
err error) {

d.blockMtx.Lock()

// Before removing the block hash we get the cancelChans which were
// registered for block requests which already had an ongoing pending
// request registered.
cancelChans, ok := d.blockQueryCancel[blockHash]

// Remove all data related to this block request to make sure the same
// block can be registered again in the future.
delete(d.blocksQueried, blockHash)
delete(d.blockQueryCancel, blockHash)

d.blockMtx.Unlock()

// In case there are dependant goroutines depending on this block
// request we make sure we cancel them.
if ok {
d.wg.Add(1)
go func() {
defer d.wg.Done()

for _, cancel := range cancelChans {
select {
case cancel <- err:
case <-d.quit:
return
}
}
}()
}
}
4 changes: 3 additions & 1 deletion chain/pruned_block_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash) (

h.t.Helper()

blockChan, errChan := h.dispatcher.Query(blocks)
cancelChan := make(chan error, 1)

blockChan, errChan := h.dispatcher.Query(blocks, cancelChan)
select {
case err := <-errChan:
require.NoError(h.t, err)
Expand Down

0 comments on commit f13e5df

Please sign in to comment.