Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk syncing of missing BlockMetadata #2765

Open
wants to merge 11 commits into
base: bulk-blockmetadata-api
Choose a base branch
from
149 changes: 149 additions & 0 deletions arbnode/blockmetadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package arbnode

import (
"bytes"
"context"
"encoding/binary"
"time"

"github.com/spf13/pflag"

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

type BlockMetadataFetcherConfig struct {
Enable bool `koanf:"enable"`
Source rpcclient.ClientConfig `koanf:"source"`
SyncInterval time.Duration `koanf:"sync-interval"`
APIBlocksLimit uint64 `koanf:"api-blocks-limit"`
}

var DefaultBlockMetadataFetcherConfig = BlockMetadataFetcherConfig{
Enable: false,
Source: rpcclient.DefaultClientConfig,
SyncInterval: time.Minute * 5,
APIBlocksLimit: 100,
}

func BlockMetadataFetcherConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockMetadataFetcherConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api. If the source doesn't have the missing blockMetadata, we keep retyring in every sync-interval (default=5mins) duration")
rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataFetcherConfig.Source)
f.Duration(prefix+".sync-interval", DefaultBlockMetadataFetcherConfig.SyncInterval, "interval at which blockMetadata are synced regularly")
f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataFetcherConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+
"This should be set lesser than or equal to the limit on the api provider side")
}

type BlockMetadataFetcher struct {
stopwaiter.StopWaiter
config BlockMetadataFetcherConfig
db ethdb.Database
client *rpcclient.RpcClient
exec execution.ExecutionClient
}

func NewBlockMetadataFetcher(ctx context.Context, c BlockMetadataFetcherConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataFetcher, error) {
client := rpcclient.NewRpcClient(func() *rpcclient.ClientConfig { return &c.Source }, nil)
if err := client.Start(ctx); err != nil {
return nil, err
}
return &BlockMetadataFetcher{
config: c,
db: db,
client: client,
exec: exec,
}, nil
}

func (b *BlockMetadataFetcher) fetch(ctx context.Context, fromBlock, toBlock uint64) ([]gethexec.NumberAndBlockMetadata, error) {
var result []gethexec.NumberAndBlockMetadata
err := b.client.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(fromBlock), rpc.BlockNumber(toBlock))
if err != nil {
return nil, err
}
return result, nil
}

func (b *BlockMetadataFetcher) persistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error {
batch := b.db.NewBatch()
queryMap := util.ArrayToSet(query)
for _, elem := range result {
pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber)
if err != nil {
return err
}
if _, ok := queryMap[uint64(pos)]; ok {
if err := batch.Put(dbKey(blockMetadataInputFeedPrefix, uint64(pos)), elem.RawMetadata); err != nil {
return err
}
if err := batch.Delete(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))); err != nil {
return err
}
// If we reached the ideal batch size, commit and reset
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
}
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
return batch.Write()
}

func (b *BlockMetadataFetcher) Update(ctx context.Context) time.Duration {
handleQuery := func(query []uint64) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: why not defining handleQuery outside of the Update function?
I mean, why handleQuery is created as a variable while Fetch and PersistBlockMetadata are not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both fetch and persistBlockMetadata each have their own goals to achieve, where as handleQuery is just a way to reduce code-duplication. So I preferred declaring it as a variable inside the main Update function

result, err := b.fetch(
ctx,
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])),
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])),
)
if err != nil {
log.Error("Error getting result from bulk blockMetadata API", "err", err)
return false
}
if err = b.persistBlockMetadata(query, result); err != nil {
log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err)
return false
}
return true
}
iter := b.db.NewIterator(missingBlockMetadataInputFeedPrefix, nil)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
defer iter.Release()
var query []uint64
for iter.Next() {
keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
query = append(query, binary.BigEndian.Uint64(keyBytes))
end := len(query) - 1
if query[end]-query[0]+1 >= uint64(b.config.APIBlocksLimit) {
if query[end]-query[0]+1 > uint64(b.config.APIBlocksLimit) && len(query) >= 2 {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
end -= 1
}
if success := handleQuery(query[:end+1]); !success {
return b.config.SyncInterval
}
query = query[end+1:]
}
}
if len(query) > 0 {
_ = handleQuery(query)
}
return b.config.SyncInterval
}

func (b *BlockMetadataFetcher) Start(ctx context.Context) {
b.StopWaiter.Start(ctx, b)
b.CallIteratively(b.Update)
}

func (b *BlockMetadataFetcher) StopAndWait() {
b.StopWaiter.StopAndWait()
b.client.Close()
}
89 changes: 56 additions & 33 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,23 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com
}

type Config struct {
Sequencer bool `koanf:"sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"`
MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"`
BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"`
Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"`
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
Sequencer bool `koanf:"sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"`
MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"`
BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"`
Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"`
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
BlockMetadataFetcher BlockMetadataFetcherConfig `koanf:"block-metadata-fetcher" reload:"hot"`
// SnapSyncConfig is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}
Expand Down Expand Up @@ -129,6 +130,12 @@ func (c *Config) Validate() error {
if err := c.Staker.Validate(); err != nil {
return err
}
if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 {
return errors.New("when sequencer is enabled track-missing-block-metadata-from should be set as well")
}
if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataFetcher.Enable {
log.Warn("track-missing-block-metadata-from is set but blockMetadata fetcher is not enabled")
}
return nil
}

Expand Down Expand Up @@ -158,26 +165,28 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed
DangerousConfigAddOptions(prefix+".dangerous", f)
TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f)
MaintenanceConfigAddOptions(prefix+".maintenance", f)
BlockMetadataFetcherConfigAddOptions(prefix+"block-metadata-fetcher", f)
}

var ConfigDefault = Config{
Sequencer: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
BatchPoster: DefaultBatchPosterConfig,
MessagePruner: DefaultMessagePrunerConfig,
BlockValidator: staker.DefaultBlockValidatorConfig,
Feed: broadcastclient.FeedConfigDefault,
Staker: staker.DefaultL1ValidatorConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
SnapSyncTest: DefaultSnapSyncConfig,
Sequencer: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
BatchPoster: DefaultBatchPosterConfig,
MessagePruner: DefaultMessagePrunerConfig,
BlockValidator: staker.DefaultBlockValidatorConfig,
Feed: broadcastclient.FeedConfigDefault,
Staker: staker.DefaultL1ValidatorConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
BlockMetadataFetcher: DefaultBlockMetadataFetcherConfig,
SnapSyncTest: DefaultSnapSyncConfig,
}

func ConfigDefaultL1Test() *Config {
Expand Down Expand Up @@ -272,6 +281,7 @@ type Node struct {
MaintenanceRunner *MaintenanceRunner
DASLifecycleManager *das.LifecycleManager
SyncMonitor *SyncMonitor
blockMetadataFetcher *BlockMetadataFetcher
configFetcher ConfigFetcher
ctx context.Context
}
Expand Down Expand Up @@ -480,6 +490,14 @@ func createNodeImpl(
}
}

var blockMetadataFetcher *BlockMetadataFetcher
if config.BlockMetadataFetcher.Enable {
blockMetadataFetcher, err = NewBlockMetadataFetcher(ctx, config.BlockMetadataFetcher, arbDb, exec)
if err != nil {
return nil, err
}
}

if !config.ParentChainReader.Enable {
return &Node{
ArbDB: arbDb,
Expand All @@ -503,6 +521,7 @@ func createNodeImpl(
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: nil,
SyncMonitor: syncMonitor,
blockMetadataFetcher: blockMetadataFetcher,
configFetcher: configFetcher,
ctx: ctx,
}, nil
Expand Down Expand Up @@ -739,6 +758,7 @@ func createNodeImpl(
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: dasLifecycleManager,
SyncMonitor: syncMonitor,
blockMetadataFetcher: blockMetadataFetcher,
configFetcher: configFetcher,
ctx: ctx,
}, nil
Expand Down Expand Up @@ -922,6 +942,9 @@ func (n *Node) Start(ctx context.Context) error {
n.BroadcastClients.Start(ctx)
}()
}
if n.blockMetadataFetcher != nil {
n.blockMetadataFetcher.Start(ctx)
}
if n.configFetcher != nil {
n.configFetcher.Start(ctx)
}
Expand Down
19 changes: 10 additions & 9 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package arbnode

var (
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
missingBlockMetadataInputFeedPrefix []byte = []byte("x") // maps a message sequence number whose blockMetaData byte array is missing to nil
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
Expand Down
Loading
Loading