From 2c1e0e5bbbec868e711d3748d97eee46fb77ebbd Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Fri, 1 Nov 2024 20:40:27 +0530 Subject: [PATCH 01/10] Bulk sync missing blockMetadata --- arbnode/blockmetadata.go | 159 ++++++++++++++++++++++++++++++++ arbnode/node.go | 3 + arbnode/schema.go | 19 ++-- arbnode/transaction_streamer.go | 16 ++++ execution/gethexec/node.go | 3 + execution/interface.go | 2 + 6 files changed, 193 insertions(+), 9 deletions(-) create mode 100644 arbnode/blockmetadata.go diff --git a/arbnode/blockmetadata.go b/arbnode/blockmetadata.go new file mode 100644 index 0000000000..e2b0ad6b3c --- /dev/null +++ b/arbnode/blockmetadata.go @@ -0,0 +1,159 @@ +package arbnode + +import ( + "bytes" + "context" + "encoding/binary" + "time" + + "github.com/spf13/pflag" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/rpc" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/execution/gethexec" + "github.com/offchainlabs/nitro/util/signature" + "github.com/offchainlabs/nitro/util/stopwaiter" +) + +type BlockMetadataRebuilderConfig struct { + Enable bool `koanf:"enable"` + Url string `koanf:"url"` + JWTSecret string `koanf:"jwt-secret"` + RebuildInterval time.Duration `koanf:"rebuild-interval"` + APIBlocksLimit int `koanf:"api-blocks-limit"` +} + +var DefaultBlockMetadataRebuilderConfig = BlockMetadataRebuilderConfig{ + Enable: false, + RebuildInterval: time.Minute * 5, + APIBlocksLimit: 100, +} + +func BlockMetadataRebuilderConfigAddOptions(prefix string, f *pflag.FlagSet) { + f.Bool(prefix+".enable", DefaultBlockMetadataRebuilderConfig.Enable, "enable syncing blockMetadata using a bulk metadata api") + f.String(prefix+".url", DefaultBlockMetadataRebuilderConfig.Url, "url for bulk blockMetadata api") + f.String(prefix+".jwt-secret", DefaultBlockMetadataRebuilderConfig.JWTSecret, "filepath of jwt secret") + f.Duration(prefix+".rebuild-interval", DefaultBlockMetadataRebuilderConfig.RebuildInterval, "interval at which blockMetadata is synced regularly") + f.Int(prefix+".api-blocks-limit", DefaultBlockMetadataRebuilderConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+ + "This should be set lesser than or equal to the value set on the api provider side") +} + +type BlockMetadataRebuilder struct { + stopwaiter.StopWaiter + config *BlockMetadataRebuilderConfig + db ethdb.Database + client *rpc.Client + exec execution.ExecutionClient +} + +func NewBlockMetadataRebuilder(ctx context.Context, c *BlockMetadataRebuilderConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataRebuilder, error) { + var err error + var jwt *common.Hash + if c.JWTSecret != "" { + jwt, err = signature.LoadSigningKey(c.JWTSecret) + if err != nil { + return nil, err + } + } + var client *rpc.Client + if jwt == nil { + client, err = rpc.DialOptions(ctx, c.Url) + } else { + client, err = rpc.DialOptions(ctx, c.Url, rpc.WithHTTPAuth(node.NewJWTAuth([32]byte(*jwt)))) + } + if err != nil { + return nil, err + } + return &BlockMetadataRebuilder{ + config: c, + db: db, + client: client, + exec: exec, + }, nil +} + +func (b *BlockMetadataRebuilder) Fetch(ctx context.Context, fromBlock, toBlock uint64) ([]gethexec.NumberAndBlockMetadata, error) { + var result []gethexec.NumberAndBlockMetadata + err := b.client.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(fromBlock), rpc.BlockNumber(toBlock)) + if err != nil { + return nil, err + } + return result, nil +} + +func ArrayToMap[T comparable](arr []T) map[T]struct{} { + ret := make(map[T]struct{}) + for _, elem := range arr { + ret[elem] = struct{}{} + } + return ret +} + +func (b *BlockMetadataRebuilder) PushBlockMetadataToDB(query []uint64, result []gethexec.NumberAndBlockMetadata) error { + batch := b.db.NewBatch() + queryMap := ArrayToMap(query) + for _, elem := range result { + pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber) + if err != nil { + return err + } + if _, ok := queryMap[uint64(pos)]; ok { + if err := batch.Put(dbKey(blockMetadataInputFeedPrefix, uint64(pos)), elem.RawMetadata); err != nil { + return err + } + if err := batch.Delete(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))); err != nil { + return err + } + } + } + return batch.Write() +} + +func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration { + iter := b.db.NewIterator(missingBlockMetadataInputFeedPrefix, nil) + defer iter.Release() + var query []uint64 + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) + query = append(query, binary.BigEndian.Uint64(keyBytes)) + end := len(query) - 1 + if query[end]-query[0] >= uint64(b.config.APIBlocksLimit) { + if query[end]-query[0] > uint64(b.config.APIBlocksLimit) { + if len(query) >= 2 { + end -= 1 + } else { + end = 0 + } + } + result, err := b.Fetch( + ctx, + b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])), + b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[end])), + ) + if err != nil { + log.Error("Error getting result from bulk blockMetadata API", "err", err) + return b.config.RebuildInterval // backoff + } + if err = b.PushBlockMetadataToDB(query[:end+1], result); err != nil { + log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err) + return b.config.RebuildInterval // backoff + } + query = query[end+1:] + } + } + return b.config.RebuildInterval +} + +func (b *BlockMetadataRebuilder) Start(ctx context.Context) { + b.StopWaiter.Start(ctx, b) + b.CallIteratively(b.Update) +} + +func (b *BlockMetadataRebuilder) StopAndWait() { + b.StopWaiter.StopAndWait() +} diff --git a/arbnode/node.go b/arbnode/node.go index 705a48da08..0961a7ed43 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -129,6 +129,9 @@ func (c *Config) Validate() error { if err := c.Staker.Validate(); err != nil { return err } + if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 { + return errors.New("when sequencer is enabled track-missing-block-metadata should be enabled as well") + } return nil } diff --git a/arbnode/schema.go b/arbnode/schema.go index 486afb20ae..09554d6161 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -4,15 +4,16 @@ package arbnode var ( - messagePrefix []byte = []byte("m") // maps a message sequence number to a message - blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed - blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed - messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result - legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 - rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message - parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number - sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata - delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count + messagePrefix []byte = []byte("m") // maps a message sequence number to a message + blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed + blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed + missingBlockMetadataInputFeedPrefix []byte = []byte("mt") // maps a message sequence number whose blockMetaData byte array is missing to nil + messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result + legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 + rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message + parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number + sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata + delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count messageCountKey []byte = []byte("_messageCount") // contains the current message count delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 1636e06bd3..f1747e879e 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -68,12 +68,15 @@ type TransactionStreamer struct { broadcastServer *broadcaster.Broadcaster inboxReader *InboxReader delayedBridge *DelayedBridge + + trackBlockMetadataFrom arbutil.MessageIndex } type TransactionStreamerConfig struct { MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"` MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"` ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` + TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig @@ -82,18 +85,21 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 50_000, MaxReorgResequenceDepth: 1024, ExecuteMessageLoopDelay: time.Millisecond * 100, + TrackBlockMetadataFrom: 0, } var TestTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 10_000, MaxReorgResequenceDepth: 128 * 1024, ExecuteMessageLoopDelay: time.Millisecond, + TrackBlockMetadataFrom: 0, } func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages") f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)") f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") + f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number starting from which the missing of blockmetadata is being tracked in the local disk. Disabled by default") } func NewTransactionStreamer( @@ -119,6 +125,13 @@ func NewTransactionStreamer( if err != nil { return nil, err } + if config().TrackBlockMetadataFrom != 0 { + trackBlockMetadataFrom, err := exec.BlockNumberToMessageIndex(config().TrackBlockMetadataFrom) + if err != nil { + return nil, err + } + streamer.trackBlockMetadataFrom = trackBlockMetadataFrom + } return streamer, nil } @@ -1045,6 +1058,9 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty // This also allows update of BatchGasCost in message without mistakenly erasing BlockMetadata key = dbKey(blockMetadataInputFeedPrefix, uint64(pos)) return batch.Put(key, msg.BlockMetadata) + } else if s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom { + key = dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)) + return batch.Put(key, nil) } return nil } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 32e43874f2..54fbd0d429 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -439,6 +439,9 @@ func (n *ExecutionNode) SetConsensusClient(consensus execution.FullConsensusClie func (n *ExecutionNode) MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 { return n.ExecEngine.MessageIndexToBlockNumber(messageNum) } +func (n *ExecutionNode) BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) { + return n.ExecEngine.BlockNumberToMessageIndex(blockNum) +} func (n *ExecutionNode) Maintenance() error { return n.ChainDB.Compact(nil, nil) diff --git a/execution/interface.go b/execution/interface.go index 01f71d4422..700ae61ecd 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -33,6 +33,8 @@ type ExecutionClient interface { HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) ResultAtPos(pos arbutil.MessageIndex) (*MessageResult, error) + MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 + BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) } // needed for validators / stakers From c4f43162f2c65ea32ab6c167d5b4988cc507675b Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 6 Nov 2024 12:43:32 +0530 Subject: [PATCH 02/10] delete affected missing blockMetadata trackers from ArbDB in case of a Reorg --- arbnode/transaction_streamer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index f1747e879e..2d2192d4b9 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -399,6 +399,10 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde if err != nil { return err } + err = deleteStartingAt(s.db, batch, missingBlockMetadataInputFeedPrefix, uint64ToKey(uint64(count))) + if err != nil { + return err + } err = deleteStartingAt(s.db, batch, messagePrefix, uint64ToKey(uint64(count))) if err != nil { return err From 4722102980f35da0bb1b530724d7a6ab4ca14012 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 6 Nov 2024 14:59:56 +0530 Subject: [PATCH 03/10] add plumbing to allow starting BlockMetadataRebuilder --- arbnode/blockmetadata.go | 17 ++++++-- arbnode/node.go | 86 +++++++++++++++++++++++++--------------- 2 files changed, 66 insertions(+), 37 deletions(-) diff --git a/arbnode/blockmetadata.go b/arbnode/blockmetadata.go index e2b0ad6b3c..35691fa064 100644 --- a/arbnode/blockmetadata.go +++ b/arbnode/blockmetadata.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "encoding/binary" + "errors" + "fmt" "time" "github.com/spf13/pflag" @@ -28,6 +30,13 @@ type BlockMetadataRebuilderConfig struct { APIBlocksLimit int `koanf:"api-blocks-limit"` } +func (c *BlockMetadataRebuilderConfig) Validate() error { + if c.APIBlocksLimit < 0 { + return errors.New("api-blocks-limit cannot be negative") + } + return nil +} + var DefaultBlockMetadataRebuilderConfig = BlockMetadataRebuilderConfig{ Enable: false, RebuildInterval: time.Minute * 5, @@ -45,19 +54,19 @@ func BlockMetadataRebuilderConfigAddOptions(prefix string, f *pflag.FlagSet) { type BlockMetadataRebuilder struct { stopwaiter.StopWaiter - config *BlockMetadataRebuilderConfig + config BlockMetadataRebuilderConfig db ethdb.Database client *rpc.Client exec execution.ExecutionClient } -func NewBlockMetadataRebuilder(ctx context.Context, c *BlockMetadataRebuilderConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataRebuilder, error) { +func NewBlockMetadataRebuilder(ctx context.Context, c BlockMetadataRebuilderConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataRebuilder, error) { var err error var jwt *common.Hash if c.JWTSecret != "" { jwt, err = signature.LoadSigningKey(c.JWTSecret) if err != nil { - return nil, err + return nil, fmt.Errorf("BlockMetadataRebuilder: error loading jwt secret: %w", err) } } var client *rpc.Client @@ -67,7 +76,7 @@ func NewBlockMetadataRebuilder(ctx context.Context, c *BlockMetadataRebuilderCon client, err = rpc.DialOptions(ctx, c.Url, rpc.WithHTTPAuth(node.NewJWTAuth([32]byte(*jwt)))) } if err != nil { - return nil, err + return nil, fmt.Errorf("BlockMetadataRebuilder: error connecting to bulk blockMetadata API: %w", err) } return &BlockMetadataRebuilder{ config: c, diff --git a/arbnode/node.go b/arbnode/node.go index 0961a7ed43..80603cd8d1 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -77,22 +77,23 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com } type Config struct { - Sequencer bool `koanf:"sequencer"` - ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` - InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` - DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` - BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"` - MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"` - BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"` - Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"` - Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"` - SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"` - DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"` - SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` - Dangerous DangerousConfig `koanf:"dangerous"` - TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` - Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` - ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` + Sequencer bool `koanf:"sequencer"` + ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` + InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` + DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` + BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"` + MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"` + BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"` + Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"` + Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"` + SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"` + DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"` + SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` + Dangerous DangerousConfig `koanf:"dangerous"` + TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` + Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` + ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` + BlockMetadataRebuilder BlockMetadataRebuilderConfig `koanf:"block-metadata-rebuilder" reload:"hot"` // SnapSyncConfig is only used for testing purposes, these should not be configured in production. SnapSyncTest SnapSyncConfig } @@ -129,6 +130,9 @@ func (c *Config) Validate() error { if err := c.Staker.Validate(); err != nil { return err } + if err := c.BlockMetadataRebuilder.Validate(); err != nil { + return err + } if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 { return errors.New("when sequencer is enabled track-missing-block-metadata should be enabled as well") } @@ -161,26 +165,28 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed DangerousConfigAddOptions(prefix+".dangerous", f) TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f) MaintenanceConfigAddOptions(prefix+".maintenance", f) + BlockMetadataRebuilderConfigAddOptions(prefix+"block-metadata-rebuilder", f) } var ConfigDefault = Config{ - Sequencer: false, - ParentChainReader: headerreader.DefaultConfig, - InboxReader: DefaultInboxReaderConfig, - DelayedSequencer: DefaultDelayedSequencerConfig, - BatchPoster: DefaultBatchPosterConfig, - MessagePruner: DefaultMessagePrunerConfig, - BlockValidator: staker.DefaultBlockValidatorConfig, - Feed: broadcastclient.FeedConfigDefault, - Staker: staker.DefaultL1ValidatorConfig, - SeqCoordinator: DefaultSeqCoordinatorConfig, - DataAvailability: das.DefaultDataAvailabilityConfig, - SyncMonitor: DefaultSyncMonitorConfig, - Dangerous: DefaultDangerousConfig, - TransactionStreamer: DefaultTransactionStreamerConfig, - ResourceMgmt: resourcemanager.DefaultConfig, - Maintenance: DefaultMaintenanceConfig, - SnapSyncTest: DefaultSnapSyncConfig, + Sequencer: false, + ParentChainReader: headerreader.DefaultConfig, + InboxReader: DefaultInboxReaderConfig, + DelayedSequencer: DefaultDelayedSequencerConfig, + BatchPoster: DefaultBatchPosterConfig, + MessagePruner: DefaultMessagePrunerConfig, + BlockValidator: staker.DefaultBlockValidatorConfig, + Feed: broadcastclient.FeedConfigDefault, + Staker: staker.DefaultL1ValidatorConfig, + SeqCoordinator: DefaultSeqCoordinatorConfig, + DataAvailability: das.DefaultDataAvailabilityConfig, + SyncMonitor: DefaultSyncMonitorConfig, + Dangerous: DefaultDangerousConfig, + TransactionStreamer: DefaultTransactionStreamerConfig, + ResourceMgmt: resourcemanager.DefaultConfig, + Maintenance: DefaultMaintenanceConfig, + BlockMetadataRebuilder: DefaultBlockMetadataRebuilderConfig, + SnapSyncTest: DefaultSnapSyncConfig, } func ConfigDefaultL1Test() *Config { @@ -275,6 +281,7 @@ type Node struct { MaintenanceRunner *MaintenanceRunner DASLifecycleManager *das.LifecycleManager SyncMonitor *SyncMonitor + blockMetadataRebuilder *BlockMetadataRebuilder configFetcher ConfigFetcher ctx context.Context } @@ -483,6 +490,14 @@ func createNodeImpl( } } + var blockMetadataRebuilder *BlockMetadataRebuilder + if config.BlockMetadataRebuilder.Enable { + blockMetadataRebuilder, err = NewBlockMetadataRebuilder(ctx, config.BlockMetadataRebuilder, arbDb, exec) + if err != nil { + return nil, err + } + } + if !config.ParentChainReader.Enable { return &Node{ ArbDB: arbDb, @@ -506,6 +521,7 @@ func createNodeImpl( MaintenanceRunner: maintenanceRunner, DASLifecycleManager: nil, SyncMonitor: syncMonitor, + blockMetadataRebuilder: blockMetadataRebuilder, configFetcher: configFetcher, ctx: ctx, }, nil @@ -742,6 +758,7 @@ func createNodeImpl( MaintenanceRunner: maintenanceRunner, DASLifecycleManager: dasLifecycleManager, SyncMonitor: syncMonitor, + blockMetadataRebuilder: blockMetadataRebuilder, configFetcher: configFetcher, ctx: ctx, }, nil @@ -925,6 +942,9 @@ func (n *Node) Start(ctx context.Context) error { n.BroadcastClients.Start(ctx) }() } + if n.blockMetadataRebuilder != nil { + n.blockMetadataRebuilder.Start(ctx) + } if n.configFetcher != nil { n.configFetcher.Start(ctx) } From df96876ef09e26ef476b396684f27f6ae506a0e4 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 6 Nov 2024 17:17:02 +0530 Subject: [PATCH 04/10] test bulk syncing of missing blockMetadata --- system_tests/timeboost_test.go | 130 ++++++++++++++++++++++++++++++--- 1 file changed, 121 insertions(+), 9 deletions(-) diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 49d9d5fb16..276ff4c08a 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode" + "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbos/util" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" @@ -42,9 +43,129 @@ import ( "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/util/testhelpers" "github.com/stretchr/testify/require" ) +var blockMetadataInputFeedKey = func(pos uint64) []byte { + var key []byte + prefix := []byte("t") + key = append(key, prefix...) + data := make([]byte, 8) + binary.BigEndian.PutUint64(data, pos) + key = append(key, data...) + return key +} + +func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + httpConfig := genericconf.HTTPConfigDefault + httpConfig.Addr = "127.0.0.1" + httpConfig.Apply(builder.l2StackConfig) + builder.execConfig.BlockMetadataApiCacheSize = 0 // Caching is disabled + builder.nodeConfig.TransactionStreamer.TrackBlockMetadataFrom = 1 + cleanupSeq := builder.Build(t) + defer cleanupSeq() + + // Generate blocks until current block is > 20 + arbDb := builder.L2.ConsensusNode.ArbDB + builder.L2Info.GenerateAccount("User") + user := builder.L2Info.GetDefaultTransactOpts("User", ctx) + var latestL2 uint64 + var err error + for i := 0; ; i++ { + builder.L2.TransferBalanceTo(t, "Owner", util.RemapL1Address(user.From), big.NewInt(1e18), builder.L2Info) + latestL2, err = builder.L2.Client.BlockNumber(ctx) + Require(t, err) + // Clean BlockMetadata from arbDB so that we can modify it at will + Require(t, arbDb.Delete(blockMetadataInputFeedKey(latestL2))) + if latestL2 > uint64(20) { + break + } + } + var sampleBulkData []arbostypes.BlockMetadata + for i := 1; i <= int(latestL2); i++ { + blockMetadata := []byte{0, uint8(i)} + sampleBulkData = append(sampleBulkData, blockMetadata) + arbDb.Put(blockMetadataInputFeedKey(uint64(i)), blockMetadata) + } + + ndcfg := arbnode.ConfigDefaultL1NonSequencerTest() + ndcfg.TransactionStreamer.TrackBlockMetadataFrom = 1 + newNode, cleanupNewNode := builder.Build2ndNode(t, &SecondNodeParams{ + nodeConfig: ndcfg, + stackConfig: testhelpers.CreateStackConfigForTest(t.TempDir()), + }) + defer cleanupNewNode() + + // Wait for second node to catchup via L1, since L1 doesn't have the blockMetadata, we ensure that messages are tracked with missingBlockMetadataInputFeedPrefix prefix + for { + current, err := newNode.Client.BlockNumber(ctx) + Require(t, err) + if current == latestL2 { + break + } + time.Sleep(time.Second) + } + + blockMetadataInputFeedPrefix := []byte("t") + missingBlockMetadataInputFeedPrefix := []byte("mt") + arbDb = newNode.ConsensusNode.ArbDB + + // Check if all block numbers with missingBlockMetadataInputFeedPrefix are present as keys in arbDB and that no keys with blockMetadataInputFeedPrefix + iter := arbDb.NewIterator(blockMetadataInputFeedPrefix, nil) + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), blockMetadataInputFeedPrefix) + t.Fatalf("unexpected presence of blockMetadata when blocks are synced via L1. msgSeqNum: %d", binary.BigEndian.Uint64(keyBytes)) + } + iter.Release() + iter = arbDb.NewIterator(missingBlockMetadataInputFeedPrefix, nil) + pos := uint64(1) + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) + if pos != binary.BigEndian.Uint64(keyBytes) { + t.Fatalf("unexpected msgSeqNum with missingBlockMetadataInputFeedPrefix for blockMetadata. Want: %d, Got: %d", pos, binary.BigEndian.Uint64(keyBytes)) + } + pos++ + } + if pos-1 != latestL2 { + t.Fatalf("number of keys with missingBlockMetadataInputFeedPrefix doesn't match expected value. Want: %d, Got: %d", latestL2, pos-1) + } + iter.Release() + + // Rebuild blockMetadata and cleanup trackers from ArbDB + blockMetadataRebuilder, err := arbnode.NewBlockMetadataRebuilder(ctx, arbnode.BlockMetadataRebuilderConfig{Url: "http://127.0.0.1:8547"}, arbDb, newNode.ExecNode) + Require(t, err) + blockMetadataRebuilder.Update(ctx) + + // Check if all blockMetadata was synced from bulk BlockMetadata API via the blockMetadataRebuilder and that trackers for missing blockMetadata were cleared + iter = arbDb.NewIterator(blockMetadataInputFeedPrefix, nil) + pos = uint64(1) + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), blockMetadataInputFeedPrefix) + if binary.BigEndian.Uint64(keyBytes) != pos { + t.Fatalf("unexpected msgSeqNum with blockMetadataInputFeedPrefix for blockMetadata. Want: %d, Got: %d", pos, binary.BigEndian.Uint64(keyBytes)) + } + if !bytes.Equal(sampleBulkData[pos-1], iter.Value()) { + t.Fatalf("blockMetadata mismatch. Want: %v, Got: %v", sampleBulkData[pos-1], iter.Value()) + } + pos++ + } + if pos-1 != latestL2 { + t.Fatalf("number of keys with blockMetadataInputFeedPrefix doesn't match expected value. Want: %d, Got: %d", latestL2, pos-1) + } + iter.Release() + iter = arbDb.NewIterator(missingBlockMetadataInputFeedPrefix, nil) + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) + t.Fatalf("unexpected presence of msgSeqNum with missingBlockMetadataInputFeedPrefix, indicating missing of some blockMetadata after rebuilding. msgSeqNum: %d", binary.BigEndian.Uint64(keyBytes)) + } + iter.Release() +} + func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -55,15 +176,6 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { defer cleanup() arbDb := builder.L2.ConsensusNode.ArbDB - blockMetadataInputFeedKey := func(pos uint64) []byte { - var key []byte - prefix := []byte("t") - key = append(key, prefix...) - data := make([]byte, 8) - binary.BigEndian.PutUint64(data, pos) - key = append(key, data...) - return key - } // Generate blocks until current block is end start := 1 From d8137ec50f3647870a444b512f408ea380a1434d Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Fri, 8 Nov 2024 16:06:46 +0530 Subject: [PATCH 05/10] minor bug fixes and improvements --- arbnode/blockmetadata.go | 57 ++++++++++++++++++++-------------------- arbnode/node.go | 3 --- 2 files changed, 28 insertions(+), 32 deletions(-) diff --git a/arbnode/blockmetadata.go b/arbnode/blockmetadata.go index 35691fa064..bbcbb3c045 100644 --- a/arbnode/blockmetadata.go +++ b/arbnode/blockmetadata.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/binary" - "errors" "fmt" "time" @@ -27,14 +26,7 @@ type BlockMetadataRebuilderConfig struct { Url string `koanf:"url"` JWTSecret string `koanf:"jwt-secret"` RebuildInterval time.Duration `koanf:"rebuild-interval"` - APIBlocksLimit int `koanf:"api-blocks-limit"` -} - -func (c *BlockMetadataRebuilderConfig) Validate() error { - if c.APIBlocksLimit < 0 { - return errors.New("api-blocks-limit cannot be negative") - } - return nil + APIBlocksLimit uint64 `koanf:"api-blocks-limit"` } var DefaultBlockMetadataRebuilderConfig = BlockMetadataRebuilderConfig{ @@ -48,7 +40,7 @@ func BlockMetadataRebuilderConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".url", DefaultBlockMetadataRebuilderConfig.Url, "url for bulk blockMetadata api") f.String(prefix+".jwt-secret", DefaultBlockMetadataRebuilderConfig.JWTSecret, "filepath of jwt secret") f.Duration(prefix+".rebuild-interval", DefaultBlockMetadataRebuilderConfig.RebuildInterval, "interval at which blockMetadata is synced regularly") - f.Int(prefix+".api-blocks-limit", DefaultBlockMetadataRebuilderConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+ + f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataRebuilderConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+ "This should be set lesser than or equal to the value set on the api provider side") } @@ -124,6 +116,22 @@ func (b *BlockMetadataRebuilder) PushBlockMetadataToDB(query []uint64, result [] } func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration { + handleQuery := func(query []uint64) bool { + result, err := b.Fetch( + ctx, + b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])), + b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])), + ) + if err != nil { + log.Error("Error getting result from bulk blockMetadata API", "err", err) + return false + } + if err = b.PushBlockMetadataToDB(query, result); err != nil { + log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err) + return false + } + return true + } iter := b.db.NewIterator(missingBlockMetadataInputFeedPrefix, nil) defer iter.Release() var query []uint64 @@ -131,30 +139,21 @@ func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration { keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) query = append(query, binary.BigEndian.Uint64(keyBytes)) end := len(query) - 1 - if query[end]-query[0] >= uint64(b.config.APIBlocksLimit) { - if query[end]-query[0] > uint64(b.config.APIBlocksLimit) { - if len(query) >= 2 { - end -= 1 - } else { - end = 0 - } - } - result, err := b.Fetch( - ctx, - b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])), - b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[end])), - ) - if err != nil { - log.Error("Error getting result from bulk blockMetadata API", "err", err) - return b.config.RebuildInterval // backoff + if query[end]-query[0]+1 >= uint64(b.config.APIBlocksLimit) { + if query[end]-query[0]+1 > uint64(b.config.APIBlocksLimit) && len(query) >= 2 { + end -= 1 } - if err = b.PushBlockMetadataToDB(query[:end+1], result); err != nil { - log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err) - return b.config.RebuildInterval // backoff + if success := handleQuery(query[:end+1]); !success { + return b.config.RebuildInterval } query = query[end+1:] } } + if len(query) > 0 { + if success := handleQuery(query); !success { + return b.config.RebuildInterval + } + } return b.config.RebuildInterval } diff --git a/arbnode/node.go b/arbnode/node.go index 80603cd8d1..5e27e3141e 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -130,9 +130,6 @@ func (c *Config) Validate() error { if err := c.Staker.Validate(); err != nil { return err } - if err := c.BlockMetadataRebuilder.Validate(); err != nil { - return err - } if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 { return errors.New("when sequencer is enabled track-missing-block-metadata should be enabled as well") } From dfe0c51b6865de5f18db82eb733d868cf7dcf0f9 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 12 Nov 2024 13:21:44 +0530 Subject: [PATCH 06/10] address PR comments --- arbnode/blockmetadata.go | 69 ++++++++++++++------------------- arbnode/node.go | 3 ++ arbnode/schema.go | 2 +- arbnode/transaction_streamer.go | 2 +- system_tests/timeboost_test.go | 14 ++++--- 5 files changed, 43 insertions(+), 47 deletions(-) diff --git a/arbnode/blockmetadata.go b/arbnode/blockmetadata.go index bbcbb3c045..d5d8565f06 100644 --- a/arbnode/blockmetadata.go +++ b/arbnode/blockmetadata.go @@ -4,71 +4,54 @@ import ( "bytes" "context" "encoding/binary" - "fmt" "time" "github.com/spf13/pflag" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" - "github.com/offchainlabs/nitro/util/signature" + "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/stopwaiter" ) type BlockMetadataRebuilderConfig struct { - Enable bool `koanf:"enable"` - Url string `koanf:"url"` - JWTSecret string `koanf:"jwt-secret"` - RebuildInterval time.Duration `koanf:"rebuild-interval"` - APIBlocksLimit uint64 `koanf:"api-blocks-limit"` + Enable bool `koanf:"enable"` + Source rpcclient.ClientConfig `koanf:"source"` + SyncInterval time.Duration `koanf:"sync-interval"` + APIBlocksLimit uint64 `koanf:"api-blocks-limit"` } var DefaultBlockMetadataRebuilderConfig = BlockMetadataRebuilderConfig{ - Enable: false, - RebuildInterval: time.Minute * 5, - APIBlocksLimit: 100, + Enable: false, + Source: rpcclient.DefaultClientConfig, + SyncInterval: time.Minute * 5, + APIBlocksLimit: 100, } func BlockMetadataRebuilderConfigAddOptions(prefix string, f *pflag.FlagSet) { - f.Bool(prefix+".enable", DefaultBlockMetadataRebuilderConfig.Enable, "enable syncing blockMetadata using a bulk metadata api") - f.String(prefix+".url", DefaultBlockMetadataRebuilderConfig.Url, "url for bulk blockMetadata api") - f.String(prefix+".jwt-secret", DefaultBlockMetadataRebuilderConfig.JWTSecret, "filepath of jwt secret") - f.Duration(prefix+".rebuild-interval", DefaultBlockMetadataRebuilderConfig.RebuildInterval, "interval at which blockMetadata is synced regularly") + f.Bool(prefix+".enable", DefaultBlockMetadataRebuilderConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api") + rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataRebuilderConfig.Source) + f.Duration(prefix+".rebuild-interval", DefaultBlockMetadataRebuilderConfig.SyncInterval, "interval at which blockMetadata are synced regularly") f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataRebuilderConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+ - "This should be set lesser than or equal to the value set on the api provider side") + "This should be set lesser than or equal to the limit on the api provider side") } type BlockMetadataRebuilder struct { stopwaiter.StopWaiter config BlockMetadataRebuilderConfig db ethdb.Database - client *rpc.Client + client *rpcclient.RpcClient exec execution.ExecutionClient } func NewBlockMetadataRebuilder(ctx context.Context, c BlockMetadataRebuilderConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataRebuilder, error) { - var err error - var jwt *common.Hash - if c.JWTSecret != "" { - jwt, err = signature.LoadSigningKey(c.JWTSecret) - if err != nil { - return nil, fmt.Errorf("BlockMetadataRebuilder: error loading jwt secret: %w", err) - } - } - var client *rpc.Client - if jwt == nil { - client, err = rpc.DialOptions(ctx, c.Url) - } else { - client, err = rpc.DialOptions(ctx, c.Url, rpc.WithHTTPAuth(node.NewJWTAuth([32]byte(*jwt)))) - } - if err != nil { - return nil, fmt.Errorf("BlockMetadataRebuilder: error connecting to bulk blockMetadata API: %w", err) + client := rpcclient.NewRpcClient(func() *rpcclient.ClientConfig { return &c.Source }, nil) + if err := client.Start(ctx); err != nil { + return nil, err } return &BlockMetadataRebuilder{ config: c, @@ -95,7 +78,7 @@ func ArrayToMap[T comparable](arr []T) map[T]struct{} { return ret } -func (b *BlockMetadataRebuilder) PushBlockMetadataToDB(query []uint64, result []gethexec.NumberAndBlockMetadata) error { +func (b *BlockMetadataRebuilder) PersistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error { batch := b.db.NewBatch() queryMap := ArrayToMap(query) for _, elem := range result { @@ -110,6 +93,13 @@ func (b *BlockMetadataRebuilder) PushBlockMetadataToDB(query []uint64, result [] if err := batch.Delete(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))); err != nil { return err } + // If we exceeded the ideal batch size, commit and reset + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + return err + } + batch.Reset() + } } } return batch.Write() @@ -126,7 +116,7 @@ func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration { log.Error("Error getting result from bulk blockMetadata API", "err", err) return false } - if err = b.PushBlockMetadataToDB(query, result); err != nil { + if err = b.PersistBlockMetadata(query, result); err != nil { log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err) return false } @@ -144,17 +134,17 @@ func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration { end -= 1 } if success := handleQuery(query[:end+1]); !success { - return b.config.RebuildInterval + return b.config.SyncInterval } query = query[end+1:] } } if len(query) > 0 { if success := handleQuery(query); !success { - return b.config.RebuildInterval + return b.config.SyncInterval } } - return b.config.RebuildInterval + return b.config.SyncInterval } func (b *BlockMetadataRebuilder) Start(ctx context.Context) { @@ -164,4 +154,5 @@ func (b *BlockMetadataRebuilder) Start(ctx context.Context) { func (b *BlockMetadataRebuilder) StopAndWait() { b.StopWaiter.StopAndWait() + b.client.Close() } diff --git a/arbnode/node.go b/arbnode/node.go index 5e27e3141e..af9cadf7f4 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -133,6 +133,9 @@ func (c *Config) Validate() error { if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 { return errors.New("when sequencer is enabled track-missing-block-metadata should be enabled as well") } + if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataRebuilder.Enable { + log.Warn("track-missing-block-metadata is set but blockMetadata rebuilder is not enabled") + } return nil } diff --git a/arbnode/schema.go b/arbnode/schema.go index 09554d6161..88d3139f21 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -7,7 +7,7 @@ var ( messagePrefix []byte = []byte("m") // maps a message sequence number to a message blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed - missingBlockMetadataInputFeedPrefix []byte = []byte("mt") // maps a message sequence number whose blockMetaData byte array is missing to nil + missingBlockMetadataInputFeedPrefix []byte = []byte("xt") // maps a message sequence number whose blockMetaData byte array is missing to nil. Leading "x" implies we are tracking the missing of such a data point messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 2d2192d4b9..d2e238ec7a 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -99,7 +99,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages") f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)") f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") - f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number starting from which the missing of blockmetadata is being tracked in the local disk. Disabled by default") + f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number starting from which the missing of blockmetadata is being tracked in the local disk. Setting to zero (default value) disables this") } func NewTransactionStreamer( diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 276ff4c08a..7288256d74 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -42,12 +42,13 @@ import ( "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/stopwaiter" "github.com/offchainlabs/nitro/util/testhelpers" "github.com/stretchr/testify/require" ) -var blockMetadataInputFeedKey = func(pos uint64) []byte { +func blockMetadataInputFeedKey(pos uint64) []byte { var key []byte prefix := []byte("t") key = append(key, prefix...) @@ -94,7 +95,8 @@ func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { } ndcfg := arbnode.ConfigDefaultL1NonSequencerTest() - ndcfg.TransactionStreamer.TrackBlockMetadataFrom = 1 + trackBlockMetadataFrom := uint64(5) + ndcfg.TransactionStreamer.TrackBlockMetadataFrom = trackBlockMetadataFrom newNode, cleanupNewNode := builder.Build2ndNode(t, &SecondNodeParams{ nodeConfig: ndcfg, stackConfig: testhelpers.CreateStackConfigForTest(t.TempDir()), @@ -112,7 +114,7 @@ func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { } blockMetadataInputFeedPrefix := []byte("t") - missingBlockMetadataInputFeedPrefix := []byte("mt") + missingBlockMetadataInputFeedPrefix := []byte("xt") arbDb = newNode.ConsensusNode.ArbDB // Check if all block numbers with missingBlockMetadataInputFeedPrefix are present as keys in arbDB and that no keys with blockMetadataInputFeedPrefix @@ -123,7 +125,7 @@ func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { } iter.Release() iter = arbDb.NewIterator(missingBlockMetadataInputFeedPrefix, nil) - pos := uint64(1) + pos := trackBlockMetadataFrom for iter.Next() { keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) if pos != binary.BigEndian.Uint64(keyBytes) { @@ -137,13 +139,13 @@ func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { iter.Release() // Rebuild blockMetadata and cleanup trackers from ArbDB - blockMetadataRebuilder, err := arbnode.NewBlockMetadataRebuilder(ctx, arbnode.BlockMetadataRebuilderConfig{Url: "http://127.0.0.1:8547"}, arbDb, newNode.ExecNode) + blockMetadataRebuilder, err := arbnode.NewBlockMetadataRebuilder(ctx, arbnode.BlockMetadataRebuilderConfig{Source: rpcclient.ClientConfig{URL: builder.L2.Stack.HTTPEndpoint()}}, arbDb, newNode.ExecNode) Require(t, err) blockMetadataRebuilder.Update(ctx) // Check if all blockMetadata was synced from bulk BlockMetadata API via the blockMetadataRebuilder and that trackers for missing blockMetadata were cleared iter = arbDb.NewIterator(blockMetadataInputFeedPrefix, nil) - pos = uint64(1) + pos = trackBlockMetadataFrom for iter.Next() { keyBytes := bytes.TrimPrefix(iter.Key(), blockMetadataInputFeedPrefix) if binary.BigEndian.Uint64(keyBytes) != pos { From bab01573d3985d716a139d75dbd3776b7308bc3d Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 13 Nov 2024 11:06:49 +0530 Subject: [PATCH 07/10] address PR comments --- arbnode/blockmetadata.go | 53 ++++++++----------- arbnode/node.go | 94 +++++++++++++++++----------------- arbnode/schema.go | 20 ++++---- system_tests/timeboost_test.go | 71 ++++++++++++++----------- util/common.go | 9 ++++ 5 files changed, 130 insertions(+), 117 deletions(-) create mode 100644 util/common.go diff --git a/arbnode/blockmetadata.go b/arbnode/blockmetadata.go index d5d8565f06..82928d9fa7 100644 --- a/arbnode/blockmetadata.go +++ b/arbnode/blockmetadata.go @@ -14,46 +14,47 @@ import ( "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" + "github.com/offchainlabs/nitro/util" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/stopwaiter" ) -type BlockMetadataRebuilderConfig struct { +type BlockMetadataFetcherConfig struct { Enable bool `koanf:"enable"` Source rpcclient.ClientConfig `koanf:"source"` SyncInterval time.Duration `koanf:"sync-interval"` APIBlocksLimit uint64 `koanf:"api-blocks-limit"` } -var DefaultBlockMetadataRebuilderConfig = BlockMetadataRebuilderConfig{ +var DefaultBlockMetadataFetcherConfig = BlockMetadataFetcherConfig{ Enable: false, Source: rpcclient.DefaultClientConfig, SyncInterval: time.Minute * 5, APIBlocksLimit: 100, } -func BlockMetadataRebuilderConfigAddOptions(prefix string, f *pflag.FlagSet) { - f.Bool(prefix+".enable", DefaultBlockMetadataRebuilderConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api") - rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataRebuilderConfig.Source) - f.Duration(prefix+".rebuild-interval", DefaultBlockMetadataRebuilderConfig.SyncInterval, "interval at which blockMetadata are synced regularly") - f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataRebuilderConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+ +func BlockMetadataFetcherConfigAddOptions(prefix string, f *pflag.FlagSet) { + f.Bool(prefix+".enable", DefaultBlockMetadataFetcherConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api") + rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataFetcherConfig.Source) + f.Duration(prefix+".sync-interval", DefaultBlockMetadataFetcherConfig.SyncInterval, "interval at which blockMetadata are synced regularly") + f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataFetcherConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+ "This should be set lesser than or equal to the limit on the api provider side") } -type BlockMetadataRebuilder struct { +type BlockMetadataFetcher struct { stopwaiter.StopWaiter - config BlockMetadataRebuilderConfig + config BlockMetadataFetcherConfig db ethdb.Database client *rpcclient.RpcClient exec execution.ExecutionClient } -func NewBlockMetadataRebuilder(ctx context.Context, c BlockMetadataRebuilderConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataRebuilder, error) { +func NewBlockMetadataFetcher(ctx context.Context, c BlockMetadataFetcherConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataFetcher, error) { client := rpcclient.NewRpcClient(func() *rpcclient.ClientConfig { return &c.Source }, nil) if err := client.Start(ctx); err != nil { return nil, err } - return &BlockMetadataRebuilder{ + return &BlockMetadataFetcher{ config: c, db: db, client: client, @@ -61,7 +62,7 @@ func NewBlockMetadataRebuilder(ctx context.Context, c BlockMetadataRebuilderConf }, nil } -func (b *BlockMetadataRebuilder) Fetch(ctx context.Context, fromBlock, toBlock uint64) ([]gethexec.NumberAndBlockMetadata, error) { +func (b *BlockMetadataFetcher) fetch(ctx context.Context, fromBlock, toBlock uint64) ([]gethexec.NumberAndBlockMetadata, error) { var result []gethexec.NumberAndBlockMetadata err := b.client.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(fromBlock), rpc.BlockNumber(toBlock)) if err != nil { @@ -70,17 +71,9 @@ func (b *BlockMetadataRebuilder) Fetch(ctx context.Context, fromBlock, toBlock u return result, nil } -func ArrayToMap[T comparable](arr []T) map[T]struct{} { - ret := make(map[T]struct{}) - for _, elem := range arr { - ret[elem] = struct{}{} - } - return ret -} - -func (b *BlockMetadataRebuilder) PersistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error { +func (b *BlockMetadataFetcher) persistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error { batch := b.db.NewBatch() - queryMap := ArrayToMap(query) + queryMap := util.ArrayToMap(query) for _, elem := range result { pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber) if err != nil { @@ -93,7 +86,7 @@ func (b *BlockMetadataRebuilder) PersistBlockMetadata(query []uint64, result []g if err := batch.Delete(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))); err != nil { return err } - // If we exceeded the ideal batch size, commit and reset + // If we reached the ideal batch size, commit and reset if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { return err @@ -105,9 +98,9 @@ func (b *BlockMetadataRebuilder) PersistBlockMetadata(query []uint64, result []g return batch.Write() } -func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration { +func (b *BlockMetadataFetcher) Update(ctx context.Context) time.Duration { handleQuery := func(query []uint64) bool { - result, err := b.Fetch( + result, err := b.fetch( ctx, b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])), b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])), @@ -116,7 +109,7 @@ func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration { log.Error("Error getting result from bulk blockMetadata API", "err", err) return false } - if err = b.PersistBlockMetadata(query, result); err != nil { + if err = b.persistBlockMetadata(query, result); err != nil { log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err) return false } @@ -140,19 +133,17 @@ func (b *BlockMetadataRebuilder) Update(ctx context.Context) time.Duration { } } if len(query) > 0 { - if success := handleQuery(query); !success { - return b.config.SyncInterval - } + _ = handleQuery(query) } return b.config.SyncInterval } -func (b *BlockMetadataRebuilder) Start(ctx context.Context) { +func (b *BlockMetadataFetcher) Start(ctx context.Context) { b.StopWaiter.Start(ctx, b) b.CallIteratively(b.Update) } -func (b *BlockMetadataRebuilder) StopAndWait() { +func (b *BlockMetadataFetcher) StopAndWait() { b.StopWaiter.StopAndWait() b.client.Close() } diff --git a/arbnode/node.go b/arbnode/node.go index af9cadf7f4..729775a772 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -77,23 +77,23 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com } type Config struct { - Sequencer bool `koanf:"sequencer"` - ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` - InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` - DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` - BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"` - MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"` - BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"` - Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"` - Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"` - SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"` - DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"` - SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` - Dangerous DangerousConfig `koanf:"dangerous"` - TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` - Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` - ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` - BlockMetadataRebuilder BlockMetadataRebuilderConfig `koanf:"block-metadata-rebuilder" reload:"hot"` + Sequencer bool `koanf:"sequencer"` + ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` + InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` + DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` + BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"` + MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"` + BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"` + Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"` + Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"` + SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"` + DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"` + SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` + Dangerous DangerousConfig `koanf:"dangerous"` + TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` + Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` + ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` + BlockMetadataFetcher BlockMetadataFetcherConfig `koanf:"block-metadata-fetcher" reload:"hot"` // SnapSyncConfig is only used for testing purposes, these should not be configured in production. SnapSyncTest SnapSyncConfig } @@ -131,10 +131,10 @@ func (c *Config) Validate() error { return err } if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 { - return errors.New("when sequencer is enabled track-missing-block-metadata should be enabled as well") + return errors.New("when sequencer is enabled track-missing-block-metadata-from should be set as well") } - if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataRebuilder.Enable { - log.Warn("track-missing-block-metadata is set but blockMetadata rebuilder is not enabled") + if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataFetcher.Enable { + log.Warn("track-missing-block-metadata-from is set but blockMetadata fetcher is not enabled") } return nil } @@ -165,28 +165,28 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed DangerousConfigAddOptions(prefix+".dangerous", f) TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f) MaintenanceConfigAddOptions(prefix+".maintenance", f) - BlockMetadataRebuilderConfigAddOptions(prefix+"block-metadata-rebuilder", f) + BlockMetadataFetcherConfigAddOptions(prefix+"block-metadata-fetcher", f) } var ConfigDefault = Config{ - Sequencer: false, - ParentChainReader: headerreader.DefaultConfig, - InboxReader: DefaultInboxReaderConfig, - DelayedSequencer: DefaultDelayedSequencerConfig, - BatchPoster: DefaultBatchPosterConfig, - MessagePruner: DefaultMessagePrunerConfig, - BlockValidator: staker.DefaultBlockValidatorConfig, - Feed: broadcastclient.FeedConfigDefault, - Staker: staker.DefaultL1ValidatorConfig, - SeqCoordinator: DefaultSeqCoordinatorConfig, - DataAvailability: das.DefaultDataAvailabilityConfig, - SyncMonitor: DefaultSyncMonitorConfig, - Dangerous: DefaultDangerousConfig, - TransactionStreamer: DefaultTransactionStreamerConfig, - ResourceMgmt: resourcemanager.DefaultConfig, - Maintenance: DefaultMaintenanceConfig, - BlockMetadataRebuilder: DefaultBlockMetadataRebuilderConfig, - SnapSyncTest: DefaultSnapSyncConfig, + Sequencer: false, + ParentChainReader: headerreader.DefaultConfig, + InboxReader: DefaultInboxReaderConfig, + DelayedSequencer: DefaultDelayedSequencerConfig, + BatchPoster: DefaultBatchPosterConfig, + MessagePruner: DefaultMessagePrunerConfig, + BlockValidator: staker.DefaultBlockValidatorConfig, + Feed: broadcastclient.FeedConfigDefault, + Staker: staker.DefaultL1ValidatorConfig, + SeqCoordinator: DefaultSeqCoordinatorConfig, + DataAvailability: das.DefaultDataAvailabilityConfig, + SyncMonitor: DefaultSyncMonitorConfig, + Dangerous: DefaultDangerousConfig, + TransactionStreamer: DefaultTransactionStreamerConfig, + ResourceMgmt: resourcemanager.DefaultConfig, + Maintenance: DefaultMaintenanceConfig, + BlockMetadataFetcher: DefaultBlockMetadataFetcherConfig, + SnapSyncTest: DefaultSnapSyncConfig, } func ConfigDefaultL1Test() *Config { @@ -281,7 +281,7 @@ type Node struct { MaintenanceRunner *MaintenanceRunner DASLifecycleManager *das.LifecycleManager SyncMonitor *SyncMonitor - blockMetadataRebuilder *BlockMetadataRebuilder + blockMetadataFetcher *BlockMetadataFetcher configFetcher ConfigFetcher ctx context.Context } @@ -490,9 +490,9 @@ func createNodeImpl( } } - var blockMetadataRebuilder *BlockMetadataRebuilder - if config.BlockMetadataRebuilder.Enable { - blockMetadataRebuilder, err = NewBlockMetadataRebuilder(ctx, config.BlockMetadataRebuilder, arbDb, exec) + var blockMetadataFetcher *BlockMetadataFetcher + if config.BlockMetadataFetcher.Enable { + blockMetadataFetcher, err = NewBlockMetadataFetcher(ctx, config.BlockMetadataFetcher, arbDb, exec) if err != nil { return nil, err } @@ -521,7 +521,7 @@ func createNodeImpl( MaintenanceRunner: maintenanceRunner, DASLifecycleManager: nil, SyncMonitor: syncMonitor, - blockMetadataRebuilder: blockMetadataRebuilder, + blockMetadataFetcher: blockMetadataFetcher, configFetcher: configFetcher, ctx: ctx, }, nil @@ -758,7 +758,7 @@ func createNodeImpl( MaintenanceRunner: maintenanceRunner, DASLifecycleManager: dasLifecycleManager, SyncMonitor: syncMonitor, - blockMetadataRebuilder: blockMetadataRebuilder, + blockMetadataFetcher: blockMetadataFetcher, configFetcher: configFetcher, ctx: ctx, }, nil @@ -942,8 +942,8 @@ func (n *Node) Start(ctx context.Context) error { n.BroadcastClients.Start(ctx) }() } - if n.blockMetadataRebuilder != nil { - n.blockMetadataRebuilder.Start(ctx) + if n.blockMetadataFetcher != nil { + n.blockMetadataFetcher.Start(ctx) } if n.configFetcher != nil { n.configFetcher.Start(ctx) diff --git a/arbnode/schema.go b/arbnode/schema.go index 88d3139f21..acf54c9203 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -4,16 +4,16 @@ package arbnode var ( - messagePrefix []byte = []byte("m") // maps a message sequence number to a message - blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed - blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed - missingBlockMetadataInputFeedPrefix []byte = []byte("xt") // maps a message sequence number whose blockMetaData byte array is missing to nil. Leading "x" implies we are tracking the missing of such a data point - messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result - legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 - rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message - parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number - sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata - delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count + messagePrefix []byte = []byte("m") // maps a message sequence number to a message + blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed + blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed + missingBlockMetadataInputFeedPrefix []byte = []byte("x") // maps a message sequence number whose blockMetaData byte array is missing to nil + messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result + legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 + rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message + parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number + sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata + delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count messageCountKey []byte = []byte("_messageCount") // contains the current message count delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index fcf12f1e34..e63a6187da 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -48,9 +48,8 @@ import ( "github.com/stretchr/testify/require" ) -func blockMetadataInputFeedKey(pos uint64) []byte { +func dbKey(prefix []byte, pos uint64) []byte { var key []byte - prefix := []byte("t") key = append(key, prefix...) data := make([]byte, 8) binary.BigEndian.PutUint64(data, pos) @@ -58,7 +57,9 @@ func blockMetadataInputFeedKey(pos uint64) []byte { return key } -func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { +func TestTimeboostBulkBlockMetadataFetcher(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -77,12 +78,13 @@ func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { user := builder.L2Info.GetDefaultTransactOpts("User", ctx) var latestL2 uint64 var err error + var lastTx *types.Transaction for i := 0; ; i++ { - builder.L2.TransferBalanceTo(t, "Owner", util.RemapL1Address(user.From), big.NewInt(1e18), builder.L2Info) + lastTx, _ = builder.L2.TransferBalanceTo(t, "Owner", util.RemapL1Address(user.From), big.NewInt(1e18), builder.L2Info) latestL2, err = builder.L2.Client.BlockNumber(ctx) Require(t, err) // Clean BlockMetadata from arbDB so that we can modify it at will - Require(t, arbDb.Delete(blockMetadataInputFeedKey(latestL2))) + Require(t, arbDb.Delete(dbKey([]byte("t"), latestL2))) if latestL2 > uint64(20) { break } @@ -91,44 +93,55 @@ func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { for i := 1; i <= int(latestL2); i++ { blockMetadata := []byte{0, uint8(i)} sampleBulkData = append(sampleBulkData, blockMetadata) - arbDb.Put(blockMetadataInputFeedKey(uint64(i)), blockMetadata) + Require(t, arbDb.Put(dbKey([]byte("t"), uint64(i)), blockMetadata)) } - ndcfg := arbnode.ConfigDefaultL1NonSequencerTest() + nodecfg := arbnode.ConfigDefaultL1NonSequencerTest() trackBlockMetadataFrom := uint64(5) - ndcfg.TransactionStreamer.TrackBlockMetadataFrom = trackBlockMetadataFrom + nodecfg.TransactionStreamer.TrackBlockMetadataFrom = trackBlockMetadataFrom newNode, cleanupNewNode := builder.Build2ndNode(t, &SecondNodeParams{ - nodeConfig: ndcfg, + nodeConfig: nodecfg, stackConfig: testhelpers.CreateStackConfigForTest(t.TempDir()), }) defer cleanupNewNode() // Wait for second node to catchup via L1, since L1 doesn't have the blockMetadata, we ensure that messages are tracked with missingBlockMetadataInputFeedPrefix prefix - for { - current, err := newNode.Client.BlockNumber(ctx) - Require(t, err) - if current == latestL2 { - break - } - time.Sleep(time.Second) - } + _, err = WaitForTx(ctx, newNode.Client, lastTx.Hash(), time.Second*5) + Require(t, err) blockMetadataInputFeedPrefix := []byte("t") - missingBlockMetadataInputFeedPrefix := []byte("xt") + missingBlockMetadataInputFeedPrefix := []byte("x") arbDb = newNode.ConsensusNode.ArbDB + // Introduce fragmentation + blocksWithBlockMetadata := []uint64{8, 9, 10, 14, 16} + for _, key := range blocksWithBlockMetadata { + Require(t, arbDb.Put(dbKey([]byte("t"), key), sampleBulkData[key-1])) + Require(t, arbDb.Delete(dbKey([]byte("x"), key))) + } + // Check if all block numbers with missingBlockMetadataInputFeedPrefix are present as keys in arbDB and that no keys with blockMetadataInputFeedPrefix iter := arbDb.NewIterator(blockMetadataInputFeedPrefix, nil) + pos := uint64(0) for iter.Next() { keyBytes := bytes.TrimPrefix(iter.Key(), blockMetadataInputFeedPrefix) - t.Fatalf("unexpected presence of blockMetadata when blocks are synced via L1. msgSeqNum: %d", binary.BigEndian.Uint64(keyBytes)) + if binary.BigEndian.Uint64(keyBytes) != blocksWithBlockMetadata[pos] { + t.Fatalf("unexpected presence of blockMetadata, when blocks are synced via L1. msgSeqNum: %d, expectedMsgSeqNum: %d", binary.BigEndian.Uint64(keyBytes), blocksWithBlockMetadata[pos]) + } + pos++ } iter.Release() iter = arbDb.NewIterator(missingBlockMetadataInputFeedPrefix, nil) - pos := trackBlockMetadataFrom + pos = trackBlockMetadataFrom + i := 0 for iter.Next() { + // Blocks with blockMetadata present shouldn't have the missingBlockMetadataInputFeedPrefix keys present in arbDB + for i < len(blocksWithBlockMetadata) && blocksWithBlockMetadata[i] == pos { + i++ + pos++ + } keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) - if pos != binary.BigEndian.Uint64(keyBytes) { + if binary.BigEndian.Uint64(keyBytes) != pos { t.Fatalf("unexpected msgSeqNum with missingBlockMetadataInputFeedPrefix for blockMetadata. Want: %d, Got: %d", pos, binary.BigEndian.Uint64(keyBytes)) } pos++ @@ -139,11 +152,11 @@ func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { iter.Release() // Rebuild blockMetadata and cleanup trackers from ArbDB - blockMetadataRebuilder, err := arbnode.NewBlockMetadataRebuilder(ctx, arbnode.BlockMetadataRebuilderConfig{Source: rpcclient.ClientConfig{URL: builder.L2.Stack.HTTPEndpoint()}}, arbDb, newNode.ExecNode) + blockMetadataFetcher, err := arbnode.NewBlockMetadataFetcher(ctx, arbnode.BlockMetadataFetcherConfig{Source: rpcclient.ClientConfig{URL: builder.L2.Stack.HTTPEndpoint()}}, arbDb, newNode.ExecNode) Require(t, err) - blockMetadataRebuilder.Update(ctx) + blockMetadataFetcher.Update(ctx) - // Check if all blockMetadata was synced from bulk BlockMetadata API via the blockMetadataRebuilder and that trackers for missing blockMetadata were cleared + // Check if all blockMetadata was synced from bulk BlockMetadata API via the blockMetadataFetcher and that trackers for missing blockMetadata were cleared iter = arbDb.NewIterator(blockMetadataInputFeedPrefix, nil) pos = trackBlockMetadataFrom for iter.Next() { @@ -152,7 +165,7 @@ func TestTimeboostBulkBlockMetadataRebuilder(t *testing.T) { t.Fatalf("unexpected msgSeqNum with blockMetadataInputFeedPrefix for blockMetadata. Want: %d, Got: %d", pos, binary.BigEndian.Uint64(keyBytes)) } if !bytes.Equal(sampleBulkData[pos-1], iter.Value()) { - t.Fatalf("blockMetadata mismatch. Want: %v, Got: %v", sampleBulkData[pos-1], iter.Value()) + t.Fatalf("blockMetadata mismatch for blockNumber: %d. Want: %v, Got: %v", pos, sampleBulkData[pos-1], iter.Value()) } pos++ } @@ -189,7 +202,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { latestL2, err := builder.L2.Client.BlockNumber(ctx) Require(t, err) // Clean BlockMetadata from arbDB so that we can modify it at will - Require(t, arbDb.Delete(blockMetadataInputFeedKey(latestL2))) + Require(t, arbDb.Delete(dbKey([]byte("t"), latestL2))) if latestL2 > uint64(end)+10 { break } @@ -201,7 +214,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { RawMetadata: []byte{0, uint8(i)}, } sampleBulkData = append(sampleBulkData, sampleData) - arbDb.Put(blockMetadataInputFeedKey(sampleData.BlockNumber), sampleData.RawMetadata) + Require(t, arbDb.Put(dbKey([]byte("t"), sampleData.BlockNumber), sampleData.RawMetadata)) } l2rpc := builder.L2.Stack.Attach() @@ -223,7 +236,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { // Test that without cache the result returned is always in sync with ArbDB sampleBulkData[0].RawMetadata = []byte{1, 11} - arbDb.Put(blockMetadataInputFeedKey(1), sampleBulkData[0].RawMetadata) + Require(t, arbDb.Put(dbKey([]byte("t"), 1), sampleBulkData[0].RawMetadata)) err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(1), rpc.BlockNumber(1)) Require(t, err) @@ -244,7 +257,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { arbDb = builder.L2.ConsensusNode.ArbDB updatedBlockMetadata := []byte{2, 12} - arbDb.Put(blockMetadataInputFeedKey(1), updatedBlockMetadata) + Require(t, arbDb.Put(dbKey([]byte("t"), 1), updatedBlockMetadata)) err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(1), rpc.BlockNumber(1)) Require(t, err) diff --git a/util/common.go b/util/common.go new file mode 100644 index 0000000000..ad6f6a875b --- /dev/null +++ b/util/common.go @@ -0,0 +1,9 @@ +package util + +func ArrayToMap[T comparable](arr []T) map[T]struct{} { + ret := make(map[T]struct{}) + for _, elem := range arr { + ret[elem] = struct{}{} + } + return ret +} From a5f248db7ae67a7d16637aa379380966d3d02792 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 13 Nov 2024 11:11:19 +0530 Subject: [PATCH 08/10] make flag description clearer --- arbnode/transaction_streamer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index d2e238ec7a..4a06223d7a 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -99,7 +99,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages") f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)") f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") - f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number starting from which the missing of blockmetadata is being tracked in the local disk. Setting to zero (default value) disables this") + f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "this is the block number starting from which missing of blockmetadata is being tracked in the local disk. Setting to zero (default value) disables this") } func NewTransactionStreamer( From 71f9c5abaa09a489899e6252861c4f5c8f3fd889 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 18 Nov 2024 12:50:50 +0530 Subject: [PATCH 09/10] address PR comments --- arbnode/blockmetadata.go | 4 ++-- util/common.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/arbnode/blockmetadata.go b/arbnode/blockmetadata.go index 82928d9fa7..bd2bd1ad4b 100644 --- a/arbnode/blockmetadata.go +++ b/arbnode/blockmetadata.go @@ -34,7 +34,7 @@ var DefaultBlockMetadataFetcherConfig = BlockMetadataFetcherConfig{ } func BlockMetadataFetcherConfigAddOptions(prefix string, f *pflag.FlagSet) { - f.Bool(prefix+".enable", DefaultBlockMetadataFetcherConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api") + f.Bool(prefix+".enable", DefaultBlockMetadataFetcherConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api. If the source doesn't have the missing blockMetadata, we keep retyring in every sync-interval (default=5mins) duration") rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataFetcherConfig.Source) f.Duration(prefix+".sync-interval", DefaultBlockMetadataFetcherConfig.SyncInterval, "interval at which blockMetadata are synced regularly") f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataFetcherConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+ @@ -73,7 +73,7 @@ func (b *BlockMetadataFetcher) fetch(ctx context.Context, fromBlock, toBlock uin func (b *BlockMetadataFetcher) persistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error { batch := b.db.NewBatch() - queryMap := util.ArrayToMap(query) + queryMap := util.ArrayToSet(query) for _, elem := range result { pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber) if err != nil { diff --git a/util/common.go b/util/common.go index ad6f6a875b..fc71e4a704 100644 --- a/util/common.go +++ b/util/common.go @@ -1,6 +1,6 @@ package util -func ArrayToMap[T comparable](arr []T) map[T]struct{} { +func ArrayToSet[T comparable](arr []T) map[T]struct{} { ret := make(map[T]struct{}) for _, elem := range arr { ret[elem] = struct{}{} From 0178eb7c7291ac87704395c8a1e3f836a7b565d3 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 21 Nov 2024 11:00:10 +0530 Subject: [PATCH 10/10] clear blockMetadata if there's a mismatch between feed's blockhash and locally computed hash --- arbnode/transaction_streamer.go | 28 ++++++++++++++++++++++------ system_tests/timeboost_test.go | 1 + 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 4a06223d7a..53f1910e92 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -1164,17 +1164,33 @@ func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execut return msgResult, nil } -func (s *TransactionStreamer) checkResult(msgResult *execution.MessageResult, expectedBlockHash *common.Hash) { - if expectedBlockHash == nil { +func (s *TransactionStreamer) checkResult(pos arbutil.MessageIndex, msgResult *execution.MessageResult, msgAndBlockInfo *arbostypes.MessageWithMetadataAndBlockInfo) { + if msgAndBlockInfo.BlockHash == nil { return } - if msgResult.BlockHash != *expectedBlockHash { + if msgResult.BlockHash != *msgAndBlockInfo.BlockHash { log.Error( BlockHashMismatchLogMsg, - "expected", expectedBlockHash, + "expected", msgAndBlockInfo.BlockHash, "actual", msgResult.BlockHash, ) - return + // Try deleting the existing blockMetadata for this block in arbDB and set it as missing + if msgAndBlockInfo.BlockMetadata != nil { + batch := s.db.NewBatch() + if err := batch.Delete(dbKey(blockMetadataInputFeedPrefix, uint64(pos))); err != nil { + log.Error("error deleting blockMetadata of block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) + return + } + if s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom { + if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)), nil); err != nil { + log.Error("error marking deleted blockMetadata as missing in arbDB for a block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) + return + } + } + if err := batch.Write(); err != nil { + log.Error("error writing batch that deletes blockMetadata of the block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) + } + } } } @@ -1242,7 +1258,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution } // we just log the error but not update the value in db itself with msgResult.BlockHash? and instead forward the new block hash - s.checkResult(msgResult, msgAndBlockInfo.BlockHash) + s.checkResult(pos, msgResult, msgAndBlockInfo) batch := s.db.NewBatch() err = s.storeResult(pos, *msgResult, batch) diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index e63a6187da..a3bec57b6a 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -563,6 +563,7 @@ func setupExpressLaneAuction( ExpressLaneAdvantage: time.Second * 5, SequencerHTTPEndpoint: fmt.Sprintf("http://localhost:%d", seqPort), } + builderSeq.nodeConfig.TransactionStreamer.TrackBlockMetadataFrom = 1 cleanupSeq := builderSeq.Build(t) seqInfo, seqNode, seqClient := builderSeq.L2Info, builderSeq.L2.ConsensusNode, builderSeq.L2.Client