From a8b442494364badb0dea84a57975169e71b325e9 Mon Sep 17 00:00:00 2001 From: Maureen Ononiwu Date: Mon, 12 Feb 2024 16:41:26 +0100 Subject: [PATCH] neutrino: Added sideload functionality This commit introduces a new subsystem, `sideloader` which is used to sideload filter headers and block headers in the chainservice. Signed-off-by: Maureen Ononiwu --- neutrino.go | 124 +++++++++++++++- sideload.go | 420 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 542 insertions(+), 2 deletions(-) create mode 100644 sideload.go diff --git a/neutrino.go b/neutrino.go index e7246260..a8357347 100644 --- a/neutrino.go +++ b/neutrino.go @@ -6,7 +6,9 @@ package neutrino import ( "errors" "fmt" + "github.com/lightninglabs/neutrino/chaindataloader" "github.com/lightninglabs/neutrino/headerlist" + "io" "net" "strconv" "strings" @@ -87,6 +89,19 @@ var ( DefaultBlockCacheSize uint64 = 4096 * 10 * 1000 // 40 MB ) +// SideLoadOpt defines the config required to sideload headers into the DB. +type SideLoadOpt struct { + // SkipVerify indicates if we are to run verification on the headers from the + // blkHdrSideLoad source. + SkipVerify bool + + // Reader is the sideload source. + Reader io.ReadSeeker + + // SourceType indicates the format of a sideload source. + SourceType chaindataloader.SourceType +} + // isDevNetwork indicates if the chain is a private development network, namely // simnet or regtest/regnet. func isDevNetwork(net wire.BitcoinNet) bool { @@ -619,6 +634,14 @@ type Config struct { // not, replies with a getdata message. // 3. Neutrino sends the raw transaction. BroadcastTimeout time.Duration + + // BlkHdrSideloader is the config required to sideload block header in the + // chainservice. + BlkHdrSideloader *SideLoadOpt + + // CfHdrSideloader is the config required to sideload filter header in the + // chainservice. + CfHdrSideloader *SideLoadOpt } // peerSubscription holds a peer subscription which we'll notify about any @@ -679,6 +702,12 @@ type ChainService struct { // nolint:maligned dialer func(net.Addr) (net.Conn, error) broadcastTimeout time.Duration + + // blkHdrSideloader is used to sideload block headers. + blkHdrSideloader *sideloader + + // cfHdrSideloader is used to sideload cfHeaders. + cfHdrSideloader *sideloader } // blkHdrProcessor contains the dependencies required to verify and store @@ -711,7 +740,7 @@ type blkHdrProcessor struct { // the contextual check and blockchain.CheckBlockHeaderSanity for context-less // checks. func (h *blkHdrProcessor) checkHeaderSanity(blockHeader *wire. - BlockHeader, prevNodeHeight int32, +BlockHeader, prevNodeHeight int32, prevNodeHeader *wire.BlockHeader, hList headerlist.Chain) error { parentHeaderCtx := newLightHeaderCtx( @@ -762,7 +791,7 @@ func (h *blkHdrProcessor) verifyBlockHeader(blockHeader *wire.BlockHeader, // It returns nil when there is not one either because the height is already // later than the final checkpoint or there are none for the current network. func (h *blkHdrProcessor) findNextHeaderCheckpoint(height int32) *chaincfg. - Checkpoint { +Checkpoint { // There is no next checkpoint if there are none for this current // network. checkpoints := h.chainParams.Checkpoints @@ -935,6 +964,33 @@ func NewChainService(cfg Config) (*ChainService, error) { if err != nil { return nil, err } + + sideload := &sideloader{} + if cfg.BlkHdrSideloader != nil { + sd, err := prepBlkHdrSideload(cfg.BlkHdrSideloader, hdrProcessor, + sideload) + + if err != nil { + return nil, err + } + + if sd != nil { + s.blkHdrSideloader = sideload + } + } + + if cfg.CfHdrSideloader != nil { + sideload.fHdrStore = s.RegFilterHeaders + sd, err := prepFilterHdrSideload(cfg.CfHdrSideloader, sideload) + + if err != nil { + return nil, err + } + + if sd != nil { + s.cfHdrSideloader = sideload + } + } s.blockManager = bm s.blockSubscriptionMgr = blockntfns.NewSubscriptionManager(s.blockManager) @@ -1727,6 +1783,70 @@ func (s *ChainService) Start() error { return nil } + // Preload headers before starting any subsystem. Block headers should be + // sideloaded first as the amount of filter headers to be sideloaded + //// depends on it. + if s.blkHdrSideloader != nil { + log.Debugf("Sideloading block headers") + err := s.blkHdrSideloader.sideLoadHeaders(s.blkHdrSideloader. + blkHdrCurHeight, + s.blkHdrSideloader.blkHdrProcessor.nextCheckpt.Height, true) + if err != nil { + return err + } + + } + + if s.cfHdrSideloader != nil { + _, blkTip, err := s.BlockHeaders.ChainTip() + if err != nil { + return err + } + + s.cfHdrSideloader.fHdrLastHeight = blkTip + var doNotSideloadCfheaders bool + if s.cfHdrSideloader.fHdrSkipVerify { + var finalNextCheckptHeight uint32 + for i := 0; i < len(s.cfHdrSideloader.filterCheckptHeights); i++ { + + if s.cfHdrSideloader.filterCheckptHeights[i] > blkTip { + break + } + finalNextCheckptHeight = s.cfHdrSideloader. + filterCheckptHeights[i] + + } + + s.cfHdrSideloader.fHdrLastHeight = finalNextCheckptHeight + + if finalNextCheckptHeight == 0 { + log.Debug("block header tip less than the least filter header" + + "check point tip, cannot sideload filter header") + doNotSideloadCfheaders = true + } + + if s.cfHdrSideloader.fHdrCurHeight >= int32(s.cfHdrSideloader. + fHdrLastHeight) { + log.Debug("filter tip is greater than or equal to the last header " + + "tip for fetch no need to update filter header store -- skipping" + + " sideload") + doNotSideloadCfheaders = true + } + + } + + if !doNotSideloadCfheaders { + log.Debugf("Sideloading filter headers") + err := s.cfHdrSideloader.sideLoadHeaders(s.cfHdrSideloader. + fHdrCurHeight, s.cfHdrSideloader.fHdrNextCheckptHeight, + false) + if err != nil { + return err + } + } + + } + // Start the address manager and block manager, both of which are // needed by peers. s.addrManager.Start() diff --git a/sideload.go b/sideload.go new file mode 100644 index 00000000..4ba5dbad --- /dev/null +++ b/sideload.go @@ -0,0 +1,420 @@ +package neutrino + +import ( + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/chaindataloader" + "github.com/lightninglabs/neutrino/chainsync" + "github.com/lightninglabs/neutrino/headerfs" + "github.com/lightninglabs/neutrino/headerlist" +) + +const ( + sideloadDataChunkSize int32 = 2000 +) + +// sideloader a struct that contains all dependencies required to fetch and +// process sideloaded headers. +type sideloader struct { + // reader is the sideload source. + blkHdrReader chaindataloader.Reader + + cfHdrReader chaindataloader.Reader + // blkHdrProcessor contains dependies required to verify and store block + // headers. + blkHdrProcessor *blkHdrProcessor + + // blkHdrSkipVerify indicates if processing the headers require verification. + blkHdrSkipVerify bool + + fHdrSkipVerify bool + + // fHdrNextCheckpointIdx indicates the index of the next filter header + // checkpoint height relative to the current filter tip. + fHdrNextCheckpointIdx int + + // fHdrStore is the store for filter headers. + fHdrStore *headerfs.FilterHeaderStore + + // filterCheckpoints is the map of hardcoded filter checkpoints to their + //hash. + filterCheckpoints map[uint32]*chainhash.Hash + + // filterCheckptHeights is the slice of hardcoded filter checkpoint heights. + filterCheckptHeights []uint32 + + // blkHdrCurHeight is the height of the block header's store tip. + blkHdrCurHeight int32 + + // fHdrCurHeight is the height of the filter header's store tip. + fHdrCurHeight int32 + + // fHdrCurHash is the filter hash of the filter header's store tip. + fHdrCurHash *chainhash.Hash + + // fHdrNextCheckptHeight is the height of the next filter header next + // checkpoint's height relative to the current filter header tip. + fHdrNextCheckptHeight int32 + + // fHdrLastHeight is the last height to which we can sideload the filter + // headers to. This restriction occurs because the filter header tip + // cannot surpass the block header tip as it depends on it. + fHdrLastHeight uint32 +} + +// processSideloadedCfHeader verifies and stores the passed cfheaders. +// It returns the current height and next checkpoint after this process to be +// used by the called. It returns an error where necessary. +func (h *sideloader) processSideloadedCfHeader( + headers []*chainhash.Hash) (*chaindataloader.ProcessHdrResp, error) { + + msgCfHeader := &wire.MsgCFHeaders{ + FilterType: fType, + StopHash: *headers[len(headers)-1], + PrevFilterHeader: *h.fHdrCurHash, + FilterHashes: headers, + } + + // We only verify, if this is set to false. + if !h.fHdrSkipVerify { + + if !verifyCheckpoint(h.fHdrCurHash, headers[len(headers)-1], + msgCfHeader) { + + log.Warnf("Filter headers failed verification at height - %v", + h.fHdrCurHeight) + return nil, nil + } + h.fHdrNextCheckpointIdx = h.fHdrNextCheckpointIdx + 1 + h.fHdrNextCheckptHeight = int32(h.filterCheckptHeights[h. + fHdrNextCheckpointIdx]) + + } + + _, _, _, err := writeCfHeaders(msgCfHeader, h.fHdrStore, + h.blkHdrProcessor.store) + + if err != nil { + return nil, err + } + + h.fHdrCurHash = headers[len(headers)-1] + h.fHdrCurHeight = h.fHdrCurHeight + int32(len(headers)) + + // Check if we have reached the height that we are not to surpass. + if h.fHdrCurHeight > int32(h.fHdrLastHeight) { + log.Debugf("Completed filter header sideloading at height %v", + h.fHdrLastHeight) + return nil, nil + } + + return &chaindataloader.ProcessHdrResp{ + CurHeight: h.fHdrCurHeight, + NextCheckptHeight: h.fHdrNextCheckptHeight, + }, nil +} + +// processSideLoadBlockHeader verifies and stores the passed block headers. +// It returns the current height and next checkpoint after this process to be +// used by the called. It returns an error where necessary. +func (h *sideloader) processSideLoadBlockHeader(headers []*wire. + BlockHeader) (*chaindataloader.ProcessHdrResp, error) { + + if !areHeadersConnected(headers) { + log.Warnf("Headers received don't connect") + return nil, nil + } + + headersArray := make([]headerfs.BlockHeader, 0, len(headers)) + + var ( + finalHeight int32 + nextCheckpointHeight int32 + ) + + for _, header := range headers { + var ( + node *headerlist.Node + prevNode *headerlist.Node + ) + // Ensure there is a previous header to compare against. + prevNodeEl := h.blkHdrProcessor.headerList.Back() + if prevNodeEl == nil { + log.Warnf("side load - Header list does not contain a " + + "previous element as expected -- exiting side load") + + return nil, nil + } + + node = &headerlist.Node{Header: *header} + prevNode = prevNodeEl + node.Height = prevNode.Height + 1 + + if !h.blkHdrSkipVerify { + + valid, err := h.blkHdrProcessor.verifyBlockHeader(header, *prevNode) + if err != nil || !valid { + log.Debugf("Side Load- Did not pass verification at "+ + "height %v-- ", node.Height) + + return nil, nil + } + + // Verify checkpoint only if verification is enabled. + if h.blkHdrProcessor.nextCheckpt != nil && + node.Height == h.blkHdrProcessor.nextCheckpt.Height { + + nodeHash := node.Header.BlockHash() + if nodeHash.IsEqual(h.blkHdrProcessor.nextCheckpt.Hash) { + // Update nextCheckpoint to give more accurate info + // about tip of DB. + h.blkHdrProcessor.nextCheckpt = h.blkHdrProcessor. + findNextHeaderCheckpoint(node.Height) + + log.Infof("Verified downloaded block "+ + "header against checkpoint at height "+ + "%d/hash %s", node.Height, nodeHash) + } else { + log.Warnf("Error at checkpoint while side loading "+ + "headers, exiting at height %d, hash %s", + node.Height, nodeHash) + return nil, nil + } + } + } + + // convert header to headerfs.Blockheader and add to an array. + headersArray = append(headersArray, headerfs.BlockHeader{ + BlockHeader: header, + Height: uint32(node.Height), + }) + h.blkHdrProcessor.headerList.PushBack(*node) + finalHeight = node.Height + nextCheckpointHeight = h.blkHdrProcessor.nextCheckpt.Height + } + + // handle error + err := h.blkHdrProcessor.store.WriteHeaders(headersArray...) + + if err != nil { + return nil, err + } + + return &chaindataloader.ProcessHdrResp{ + CurHeight: finalHeight, + NextCheckptHeight: nextCheckpointHeight, + }, nil +} + +func prepBlkHdrSideload(c *SideLoadOpt, + hdrProcessor *blkHdrProcessor, s *sideloader) (*sideloader, error) { + + s.blkHdrProcessor = hdrProcessor + + reader, err := chaindataloader.NewBlockHeaderReader(&chaindataloader. + BlkHdrReaderConfig{ + ReaderConfig: chaindataloader.ReaderConfig{ + SourceType: c.SourceType, + Reader: c.Reader, + }, + ProcessBlkHeader: s.processSideLoadBlockHeader, + }) + + if err != nil { + return nil, err + } + + // If headers contained in the side load source are for a different chain + // network return immediately. + if reader.HeadersChain() != hdrProcessor.chainParams.Net { + log.Errorf("headers from side load file are of network %v "+ + "and not %v as expected"+ + "-- skipping side loading", reader.HeadersChain(), hdrProcessor. + chainParams.Net) + + return nil, nil + } + + // Initialize the next checkpoint based on the current tipHeight. + tipHeader, tipHeight, err := hdrProcessor.store.ChainTip() + if err != nil { + return nil, err + } + s.blkHdrSkipVerify = c.SkipVerify + if !s.blkHdrSkipVerify { + nextCheckpt := hdrProcessor.findNextHeaderCheckpoint(int32(tipHeight)) + + if nextCheckpt == nil { + log.Debugf("block header tip already past checkpoint cannot" + + " verify sideload, set blkHdrSkipVerify to true if you want to " + + "sideload anyway ---exiting sideload") + + return nil, nil + } + + if reader.EndHeight() < uint32(nextCheckpt.Height) { + log.Debugf("sideload endHeight, %v less than next checkpoint "+ + "tipHeight, %v cannot verify sideload, "+ + "set blkHdrSkipVerify to true if you "+ + "want to sideload anyway ---exiting sideload", + reader.EndHeight(), nextCheckpt.Height) + + return nil, nil + } + s.blkHdrProcessor.nextCheckpt = nextCheckpt + s.blkHdrCurHeight = int32(tipHeight) + } + + if reader.EndHeight() <= tipHeight || reader.StartHeight() > tipHeight { + + // Log error!!!!!!!!!!!! + return nil, nil + + } + + s.blkHdrProcessor.headerList.ResetHeaderState(headerlist.Node{ + Header: *tipHeader, + Height: int32(tipHeight), + }) + s.blkHdrReader = reader + return s, err +} + +func prepFilterHdrSideload(c *SideLoadOpt, s *sideloader) (*sideloader, error) { + + fTipHash, fTip, err := s.fHdrStore.ChainTip() + + if err != nil { + return nil, err + } + + // Filter headers should not be fetched beyond the block header's tip. + s.fHdrCurHeight = int32(fTip) + s.fHdrCurHash = fTipHash + s.fHdrSkipVerify = c.SkipVerify + + var nextCheckptHeight uint32 + if !s.fHdrSkipVerify { + s.filterCheckptHeights = chainsync.FetchHardCodedFilterHdrCheckptHeight( + s.blkHdrProcessor.chainParams.Net) + + s.filterCheckpoints = chainsync.FetchHardCodedFilterHeaderCheckpts( + s.blkHdrProcessor.chainParams.Net) + + var nextCheckptIdx int + for i := 0; i < len(s.filterCheckptHeights); i++ { + + if s.filterCheckptHeights[i] > fTip { + nextCheckptIdx = i + nextCheckptHeight = s.filterCheckptHeights[i] + break + } + } + + if nextCheckptHeight == 0 { + log.Debug("filter tip is already past checkpoint, " + + "cannot verify and sideload") + return nil, nil + } + s.fHdrNextCheckpointIdx = nextCheckptIdx + s.fHdrNextCheckptHeight = int32(nextCheckptHeight) + + } + + reader, err := chaindataloader.NewFilterHeaderReader(&chaindataloader. + FilterHdrReaderConfig{ + ReaderConfig: chaindataloader.ReaderConfig{ + SourceType: c.SourceType, + Reader: c.Reader, + }, + ProcessCfHeader: s.processSideloadedCfHeader, + FilterType: fType, + }) + + if err != nil { + return nil, err + } + + // If headers contained in the side load source are for a different chain + // network return immediately. + if reader.HeadersChain() != s.blkHdrProcessor.chainParams.Net { + log.Error("headers from side load file are of network %v "+ + "and so incompatible with neutrino's current bitcoin network "+ + "-- skipping side loading", reader.HeadersChain()) + + return nil, nil + } + + if reader.EndHeight() <= fTip || (!s.blkHdrSkipVerify && + reader.EndHeight() < nextCheckptHeight) || reader.StartHeight() > fTip { + + log.Warnf("Unable to fetch cfheaders") + return nil, nil + + } + + s.cfHdrReader = reader + + return s, nil +} + +// sideLoadHeaders is the general ingestion loop for side loading headers. +func (s *sideloader) sideLoadHeaders(curHeight, nextCheckptHeight int32, + blkhdr bool) error { + + var ( + reader chaindataloader.Reader + verify bool + ) + + if blkhdr { + reader = s.blkHdrReader + verify = s.blkHdrSkipVerify + + } else { + reader = s.cfHdrReader + verify = s.fHdrSkipVerify + } + + err := reader.SetHeight(uint32(curHeight)) + if err != nil { + return err + } + + if !verify { + log.Debugf("sideloading from height to %v to last checkpoint", + curHeight) + } else { + log.Debugf("sideloading from height to %v, chunk size=%v, "+ + "to height=%v", curHeight, sideloadDataChunkSize, reader. + EndHeight()) + } + for int32(reader.EndHeight()) >= nextCheckptHeight { + + n := sideloadDataChunkSize + + if !verify { + n = nextCheckptHeight - curHeight + } + + resp, err := reader.Load(uint32(n)) + + if err != nil { + return err + } + + // If the current height is unchanged it means no header was written in + // the store and so we are done. `resp` should not be nil if we are + // verifying headers as we need it for the next call, if it is nil we + // stop sideloading. + if (!verify && resp == nil) || resp.CurHeight == curHeight { + log.Infof("Halted sideloading at curHeight - %v", curHeight) + return nil + } + curHeight = resp.CurHeight + nextCheckptHeight = resp.NextCheckptHeight + + } + + return nil +}