diff --git a/core/types/block.go b/core/types/block.go index e732e0a25f..26f8e34f67 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -249,6 +249,7 @@ type Block struct { // inter-peer block relay. ReceivedAt time.Time ReceivedFrom interface{} + AnnouncedAt *time.Time } // "external" block encoding. used for eth protocol, etc. diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 589fc10fd9..3db3a5d87c 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -114,18 +114,20 @@ type blockAnnounce struct { // headerFilterTask represents a batch of headers needing fetcher filtering. type headerFilterTask struct { - peer string // The source peer of block headers - headers []*types.Header // Collection of headers to filter - time time.Time // Arrival time of the headers + peer string // The source peer of block headers + headers []*types.Header // Collection of headers to filter + time time.Time // Arrival time of the headers + announcedTime time.Time } // bodyFilterTask represents a batch of block bodies (transactions and uncles) // needing fetcher filtering. type bodyFilterTask struct { - peer string // The source peer of block bodies - transactions [][]*types.Transaction // Collection of transactions per block bodies - uncles [][]*types.Header // Collection of uncles per block bodies - time time.Time // Arrival time of the blocks' contents + peer string // The source peer of block bodies + transactions [][]*types.Transaction // Collection of transactions per block bodies + uncles [][]*types.Header // Collection of uncles per block bodies + time time.Time // Arrival time of the blocks' contents + announcedTime time.Time } // blockOrHeaderInject represents a schedules import operation. @@ -276,7 +278,7 @@ func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error { // FilterHeaders extracts all the headers that were explicitly requested by the fetcher, // returning those that should be handled differently. -func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { +func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time, announcedAt time.Time) []*types.Header { log.Trace("Filtering headers", "peer", peer, "headers", len(headers)) // Send the filter channel to the fetcher @@ -289,7 +291,7 @@ func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time } // Request the filtering of the header list select { - case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: + case filter <- &headerFilterTask{peer: peer, headers: headers, time: time, announcedTime: announcedAt}: case <-f.quit: return nil } @@ -304,7 +306,7 @@ func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time // FilterBodies extracts all the block bodies that were explicitly requested by // the fetcher, returning those that should be handled differently. -func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { +func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time, announcedAt time.Time) ([][]*types.Transaction, [][]*types.Header) { log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles)) // Send the filter channel to the fetcher @@ -317,7 +319,7 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac } // Request the filtering of the body list select { - case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}: + case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time, announcedTime: announcedAt}: case <-f.quit: return nil, nil } @@ -480,7 +482,7 @@ func (f *BlockFetcher) loop() { log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes) // Create a closure of the fetch and schedule in on a new thread - fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes + fetchHeader, hashes, announcedAt := f.fetching[hashes[0]].fetchHeader, hashes, f.fetching[hashes[0]].time go func(peer string) { if f.fetchingHook != nil { f.fetchingHook(hashes) @@ -504,7 +506,7 @@ func (f *BlockFetcher) loop() { select { case res := <-resCh: res.Done <- nil - f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time)) + f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time), announcedAt) case <-timeout.C: // The peer didn't respond in time. The request @@ -547,6 +549,7 @@ func (f *BlockFetcher) loop() { fetchBodies := f.completing[hashes[0]].fetchBodies bodyFetchMeter.Mark(int64(len(hashes))) + announcedAt := f.completing[hashes[0]].time go func(peer string, hashes []common.Hash) { resCh := make(chan *eth.Response) @@ -565,7 +568,7 @@ func (f *BlockFetcher) loop() { res.Done <- nil // Ignoring withdrawals here, since the block fetcher is not used post-merge. txs, uncles, _ := res.Res.(*eth.BlockBodiesPacket).Unpack() - f.FilterBodies(peer, txs, uncles, time.Now()) + f.FilterBodies(peer, txs, uncles, time.Now(), announcedAt) case <-timeout.C: // The peer didn't respond in time. The request @@ -631,6 +634,7 @@ func (f *BlockFetcher) loop() { block := types.NewBlockWithHeader(header) block.ReceivedAt = task.time + block.AnnouncedAt = &task.announcedTime complete = append(complete, block) f.completing[hash] = announce @@ -725,6 +729,7 @@ func (f *BlockFetcher) loop() { if f.getBlock(hash) == nil { block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) block.ReceivedAt = task.time + block.AnnouncedAt = &task.announcedTime blocks = append(blocks, block) } else { f.forgetHash(hash) @@ -923,6 +928,26 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } + + // Log the insertion event + var ( + msg string + delayInMs uint64 + prettyDelay common.PrettyDuration + ) + + if block.AnnouncedAt != nil { + msg = "Inserted new block with announcement" + delayInMs = uint64(time.Since(*block.AnnouncedAt).Milliseconds()) + prettyDelay = common.PrettyDuration(time.Since(*block.AnnouncedAt)) + } else { + msg = "Inserted new block without announcement" + delayInMs = uint64(time.Since(block.ReceivedAt).Milliseconds()) + prettyDelay = common.PrettyDuration(time.Since(block.ReceivedAt)) + } + + log.Info(msg, "number", block.Number().Uint64(), "hash", hash, "delay", delayInMs, "prettyDelay", prettyDelay) + // If import succeeded, broadcast the block blockAnnounceOutTimer.UpdateSince(block.ReceivedAt) diff --git a/eth/handler.go b/eth/handler.go index d6e6487b32..a64a40de69 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -725,6 +725,8 @@ func (h *handler) minedBroadcastLoop() { for obj := range h.minedBlockSub.Chan() { if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { + delayInMs := uint64(time.Now().UnixMilli()) - uint64(ev.Block.Time()*1000) + log.Info("Broadcasting mined block", "number", ev.Block.NumberU64(), "hash", ev.Block.Hash(), "delay", delayInMs) h.BroadcastBlock(ev.Block, true) // First propagate block to peers h.BroadcastBlock(ev.Block, false) // Only then announce to the rest } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 0ded928328..0b6cad2d79 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -382,8 +382,10 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { return nil // TODO(karalabe): return error eventually, but wait a few releases } + msgTime := msg.Time() ann.Block.ReceivedAt = msg.Time() ann.Block.ReceivedFrom = peer + ann.Block.AnnouncedAt = &msgTime // Mark the peer as owning the block peer.markBlock(ann.Block.Hash())