From d98a596dc4fb1cae4e7138d95189811a30ebf941 Mon Sep 17 00:00:00 2001 From: Roman Behma <13855864+begmaroman@users.noreply.github.com> Date: Fri, 21 Jun 2024 14:27:13 +0100 Subject: [PATCH] Fixed unhandled Errors in consumeEvents loop (#93) --- .github/workflows/regression-tests.yml | 45 +++++++----- cmd/main.go | 3 +- synchronizer/batches.go | 95 +++++++++++++------------- synchronizer/batches_test.go | 7 +- synchronizer/init.go | 8 +-- synchronizer/init_test.go | 2 + synchronizer/store.go | 28 ++++---- synchronizer/store_test.go | 15 ++-- 8 files changed, 110 insertions(+), 93 deletions(-) diff --git a/.github/workflows/regression-tests.yml b/.github/workflows/regression-tests.yml index d996d8d2..cf3b5ca9 100644 --- a/.github/workflows/regression-tests.yml +++ b/.github/workflows/regression-tests.yml @@ -9,26 +9,37 @@ on: jobs: deploy_devnet: runs-on: ubuntu-latest - steps: - - name: Checkout + - name: Checkout cdk-data-availability uses: actions/checkout@v4 + with: + path: cdk-data-availability + + - name: Checkout kurtosis-cdk + uses: actions/checkout@v4 + with: + repository: 0xPolygon/kurtosis-cdk + ref: feat/cdk-erigon-zkevm + path: kurtosis-cdk - - name: Build image (with proposed code changes) locally - id: build + - name: Install Kurtosis CDK tools + uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk + + - name: Build docker image + working-directory: ./cdk-data-availability + run: docker build -t cdk-data-availability:local --file Dockerfile . + + - name: Configure Kurtosis CDK + working-directory: ./kurtosis-cdk run: | - if [ "$GITHUB_EVENT_NAME" == "pull_request" ]; then - GITHUB_SHA_SHORT=$(jq -r .pull_request.head.sha "$GITHUB_EVENT_PATH" | cut -c 1-7) - echo $GITHUB_SHA_SHORT - echo "::set-output name=GITHUB_SHA_SHORT::$GITHUB_SHA_SHORT" - fi + yq -Y --in-place '.args.data_availability_mode = "cdk-validium"' params.yml + yq -Y --in-place '.args.zkevm_da_image = "cdk-data-availability:local"' params.yml - - name: Set up Docker - uses: docker/setup-buildx-action@v1 + - name: Deploy Kurtosis CDK package + working-directory: ./kurtosis-cdk + run: kurtosis run --enclave cdk-v1 --args-file params.yml --image-download always . - - name: Run regression tests against JIT container image - uses: 0xPolygon/kurtosis-cdk@v0.1.9 - with: - zkevm_dac: ${{ steps.build.outputs.GITHUB_SHA_SHORT }} - kurtosis_cli: 0.89.3 - kurtosis_cdk: v0.2.0 + - name: Monitor verified batches + working-directory: ./kurtosis-cdk + shell: bash + run: .github/actions/monitor-cdk-verified-batches/batch_verification_monitor.sh 19 600 \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 2ce2c053..bfc6032b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -103,6 +103,7 @@ func start(cliCtx *cli.Context) error { // ensure synchro/reorg start block is set err = synchronizer.InitStartBlock( + cliCtx.Context, storage, etm, c.L1.GenesisBlock, @@ -141,7 +142,7 @@ func start(cliCtx *cli.Context) error { if err != nil { log.Fatal(err) } - go batchSynchronizer.Start() + go batchSynchronizer.Start(cliCtx.Context) cancelFuncs = append(cancelFuncs, batchSynchronizer.Stop) // Register services diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 04f79f7b..7f94213a 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sort" "sync" "time" @@ -40,7 +41,6 @@ type BatchSynchronizer struct { committee map[common.Address]etherman.DataCommitteeMember lock sync.Mutex reorgs <-chan BlockReorg - events chan *polygonvalidium.PolygonvalidiumSequenceBatches sequencer SequencerTracker rpcClientFactory client.Factory } @@ -68,7 +68,6 @@ func NewBatchSynchronizer( self: self, db: db, reorgs: reorgs, - events: make(chan *polygonvalidium.PolygonvalidiumSequenceBatches), sequencer: sequencer, rpcClientFactory: rpcClientFactory, } @@ -94,36 +93,35 @@ func (bs *BatchSynchronizer) resolveCommittee() error { } // Start starts the synchronizer -func (bs *BatchSynchronizer) Start() { +func (bs *BatchSynchronizer) Start(ctx context.Context) { log.Infof("starting batch synchronizer, DAC addr: %v", bs.self) - go bs.startUnresolvedBatchesProcessor() - go bs.consumeEvents() - go bs.produceEvents() - go bs.handleReorgs() + go bs.startUnresolvedBatchesProcessor(ctx) + go bs.produceEvents(ctx) + go bs.handleReorgs(ctx) } // Stop stops the synchronizer func (bs *BatchSynchronizer) Stop() { - close(bs.events) close(bs.stop) } -func (bs *BatchSynchronizer) handleReorgs() { +func (bs *BatchSynchronizer) handleReorgs(ctx context.Context) { log.Info("starting reorgs handler") for { select { case r := <-bs.reorgs: - latest, err := getStartBlock(bs.db) + latest, err := getStartBlock(ctx, bs.db) if err != nil { log.Errorf("could not determine latest processed block: %v", err) continue } + if latest < r.Number { // only reset start block if necessary continue } - err = setStartBlock(bs.db, r.Number) - if err != nil { + + if err = setStartBlock(ctx, bs.db, r.Number); err != nil { log.Errorf("failed to store new start block to %d: %v", r.Number, err) } case <-bs.stop: @@ -132,13 +130,13 @@ func (bs *BatchSynchronizer) handleReorgs() { } } -func (bs *BatchSynchronizer) produceEvents() { +func (bs *BatchSynchronizer) produceEvents(ctx context.Context) { log.Info("starting event producer") for { delay := time.NewTimer(bs.retry) select { case <-delay.C: - if err := bs.filterEvents(); err != nil { + if err := bs.filterEvents(ctx); err != nil { log.Errorf("error filtering events: %v", err) } case <-bs.stop: @@ -148,8 +146,8 @@ func (bs *BatchSynchronizer) produceEvents() { } // Start an iterator from last block processed, picking off SequenceBatches events -func (bs *BatchSynchronizer) filterEvents() error { - start, err := getStartBlock(bs.db) +func (bs *BatchSynchronizer) filterEvents(ctx context.Context) error { + start, err := getStartBlock(ctx, bs.db) if err != nil { return err } @@ -157,11 +155,12 @@ func (bs *BatchSynchronizer) filterEvents() error { end := start + uint64(bs.blockBatchSize) // get the latest block number - header, err := bs.client.HeaderByNumber(context.TODO(), nil) + header, err := bs.client.HeaderByNumber(ctx, nil) if err != nil { log.Errorf("failed to determine latest block number: %v", err) return err } + // we don't want to scan beyond latest block if end > header.Number.Uint64() { end = header.Number.Uint64() @@ -169,44 +168,47 @@ func (bs *BatchSynchronizer) filterEvents() error { iter, err := bs.client.FilterSequenceBatches( &bind.FilterOpts{ + Context: ctx, Start: start, End: &end, - Context: context.TODO(), }, nil) if err != nil { + log.Errorf("failed to create SequenceBatches event iterator: %v", err) return err } + + // Collect events into the slice + var events []*polygonvalidium.PolygonvalidiumSequenceBatches for iter.Next() { if iter.Error() != nil { return iter.Error() } - bs.events <- iter.Event + + events = append(events, iter.Event) } - // advance start block - err = setStartBlock(bs.db, end) - if err != nil { - return err + if err = iter.Close(); err != nil { + log.Errorf("failed to close SequenceBatches event iterator: %v", err) } - return nil -} -func (bs *BatchSynchronizer) consumeEvents() { - log.Info("starting event consumer") - for { - select { - case sb := <-bs.events: - if err := bs.handleEvent(sb); err != nil { - log.Errorf("failed to handle event: %v", err) - } - case <-bs.stop: - return + // Sort events by block number ascending + sort.Slice(events, func(i, j int) bool { + return events[i].Raw.BlockNumber < events[j].Raw.BlockNumber + }) + + // Handle events + for _, event := range events { + if err = bs.handleEvent(ctx, event); err != nil { + log.Errorf("failed to handle event: %v", err) + return setStartBlock(ctx, bs.db, event.Raw.BlockNumber-1) } } + + return setStartBlock(ctx, bs.db, end) } -func (bs *BatchSynchronizer) handleEvent(event *polygonvalidium.PolygonvalidiumSequenceBatches) error { - ctx, cancel := context.WithTimeout(context.Background(), bs.rpcTimeout) +func (bs *BatchSynchronizer) handleEvent(parentCtx context.Context, event *polygonvalidium.PolygonvalidiumSequenceBatches) error { + ctx, cancel := context.WithTimeout(parentCtx, bs.rpcTimeout) defer cancel() tx, _, err := bs.client.GetTx(ctx, event.Raw.TxHash) @@ -214,8 +216,7 @@ func (bs *BatchSynchronizer) handleEvent(event *polygonvalidium.PolygonvalidiumS return err } - txData := tx.Data() - keys, err := UnpackTxData(txData) + keys, err := UnpackTxData(tx.Data()) if err != nil { return err } @@ -231,16 +232,16 @@ func (bs *BatchSynchronizer) handleEvent(event *polygonvalidium.PolygonvalidiumS } // Store batch keys. Already handled batch keys are going to be ignored based on the DB logic. - return storeUnresolvedBatchKeys(bs.db, batchKeys) + return storeUnresolvedBatchKeys(ctx, bs.db, batchKeys) } -func (bs *BatchSynchronizer) startUnresolvedBatchesProcessor() { +func (bs *BatchSynchronizer) startUnresolvedBatchesProcessor(ctx context.Context) { log.Info("starting handling unresolved batches") for { delay := time.NewTimer(bs.retry) select { case <-delay.C: - if err := bs.handleUnresolvedBatches(); err != nil { + if err := bs.handleUnresolvedBatches(ctx); err != nil { log.Error(err) } case <-bs.stop: @@ -250,9 +251,9 @@ func (bs *BatchSynchronizer) startUnresolvedBatchesProcessor() { } // handleUnresolvedBatches handles unresolved batches that were collected by the event consumer -func (bs *BatchSynchronizer) handleUnresolvedBatches() error { +func (bs *BatchSynchronizer) handleUnresolvedBatches(ctx context.Context) error { // Get unresolved batches - batchKeys, err := getUnresolvedBatchKeys(bs.db) + batchKeys, err := getUnresolvedBatchKeys(ctx, bs.db) if err != nil { return fmt.Errorf("failed to get unresolved batch keys: %v", err) } @@ -265,7 +266,7 @@ func (bs *BatchSynchronizer) handleUnresolvedBatches() error { var data []types.OffChainData var resolved []types.BatchKey for _, key := range batchKeys { - if exists(bs.db, key.Hash) { + if exists(ctx, bs.db, key.Hash) { resolved = append(resolved, key) } else { var value *types.OffChainData @@ -281,14 +282,14 @@ func (bs *BatchSynchronizer) handleUnresolvedBatches() error { // Store data of the batches to the DB if len(data) > 0 { - if err = storeOffchainData(bs.db, data); err != nil { + if err = storeOffchainData(ctx, bs.db, data); err != nil { return fmt.Errorf("failed to store offchain data: %v", err) } } // Mark batches as resolved if len(resolved) > 0 { - if err = deleteUnresolvedBatchKeys(bs.db, resolved); err != nil { + if err = deleteUnresolvedBatchKeys(ctx, bs.db, resolved); err != nil { return fmt.Errorf("failed to delete successfully resolved batch keys: %v", err) } } diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index 4ab4716c..f48d6ba4 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -1,6 +1,7 @@ package synchronizer import ( + "context" "errors" "math/big" "strings" @@ -356,7 +357,7 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { client: ethermanMock, } - err := batchSynronizer.handleEvent(event) + err := batchSynronizer.handleEvent(context.Background(), event) if config.isErrorExpected { require.Error(t, err) } else { @@ -624,7 +625,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { sequencer: sequencerMock, } - err := batchSynronizer.handleUnresolvedBatches() + err := batchSynronizer.handleUnresolvedBatches(context.Background()) if config.isErrorExpected { require.Error(t, err) } else { @@ -868,7 +869,7 @@ func TestBatchSyncronizer_HandleReorgs(t *testing.T) { reorgs: reorgChan, } - go batchSynchronizer.handleReorgs() + go batchSynchronizer.handleReorgs(context.Background()) reorgChan <- config.reorg diff --git a/synchronizer/init.go b/synchronizer/init.go index 71df83c4..8d5b18f0 100644 --- a/synchronizer/init.go +++ b/synchronizer/init.go @@ -17,11 +17,11 @@ const ( ) // InitStartBlock initializes the L1 sync task by finding the inception block for the CDKValidium contract -func InitStartBlock(db db.DB, em etherman.Etherman, genesisBlock uint64, validiumAddr common.Address) error { - ctx, cancel := context.WithTimeout(context.Background(), initBlockTimeout) +func InitStartBlock(parentCtx context.Context, db db.DB, em etherman.Etherman, genesisBlock uint64, validiumAddr common.Address) error { + ctx, cancel := context.WithTimeout(parentCtx, initBlockTimeout) defer cancel() - current, err := getStartBlock(db) + current, err := getStartBlock(ctx, db) if err != nil { return err } @@ -41,7 +41,7 @@ func InitStartBlock(db db.DB, em etherman.Etherman, genesisBlock uint64, validiu } } - return setStartBlock(db, startBlock.Uint64()) + return setStartBlock(ctx, db, startBlock.Uint64()) } func findContractDeploymentBlock(ctx context.Context, em etherman.Etherman, contract common.Address) (*big.Int, error) { diff --git a/synchronizer/init_test.go b/synchronizer/init_test.go index 3b71c063..266f65a3 100644 --- a/synchronizer/init_test.go +++ b/synchronizer/init_test.go @@ -1,6 +1,7 @@ package synchronizer import ( + "context" "errors" "math/big" "testing" @@ -90,6 +91,7 @@ func Test_InitStartBlock(t *testing.T) { } err := InitStartBlock( + context.Background(), dbMock, emMock, l1Config.GenesisBlock, diff --git a/synchronizer/store.go b/synchronizer/store.go index ea10b701..1568da9e 100644 --- a/synchronizer/store.go +++ b/synchronizer/store.go @@ -15,8 +15,8 @@ const dbTimeout = 2 * time.Second // L1SyncTask is the name of the L1 sync task const L1SyncTask = "L1" -func getStartBlock(db dbTypes.DB) (uint64, error) { - ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) +func getStartBlock(parentCtx context.Context, db dbTypes.DB) (uint64, error) { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() start, err := db.GetLastProcessedBlock(ctx, L1SyncTask) @@ -29,8 +29,8 @@ func getStartBlock(db dbTypes.DB) (uint64, error) { return start, err } -func setStartBlock(db dbTypes.DB, block uint64) error { - ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) +func setStartBlock(parentCtx context.Context, db dbTypes.DB, block uint64) error { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() var ( @@ -53,15 +53,15 @@ func setStartBlock(db dbTypes.DB, block uint64) error { return nil } -func exists(db dbTypes.DB, key common.Hash) bool { - ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) +func exists(parentCtx context.Context, db dbTypes.DB, key common.Hash) bool { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() return db.Exists(ctx, key) } -func storeUnresolvedBatchKeys(db dbTypes.DB, keys []types.BatchKey) error { - ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) +func storeUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() var ( @@ -85,15 +85,15 @@ func storeUnresolvedBatchKeys(db dbTypes.DB, keys []types.BatchKey) error { return nil } -func getUnresolvedBatchKeys(db dbTypes.DB) ([]types.BatchKey, error) { - ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) +func getUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB) ([]types.BatchKey, error) { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() return db.GetUnresolvedBatchKeys(ctx) } -func deleteUnresolvedBatchKeys(db dbTypes.DB, keys []types.BatchKey) error { - ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) +func deleteUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() var ( @@ -117,8 +117,8 @@ func deleteUnresolvedBatchKeys(db dbTypes.DB, keys []types.BatchKey) error { return nil } -func storeOffchainData(db dbTypes.DB, data []types.OffChainData) error { - ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) +func storeOffchainData(parentCtx context.Context, db dbTypes.DB, data []types.OffChainData) error { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() var ( diff --git a/synchronizer/store_test.go b/synchronizer/store_test.go index 10abbbbd..4ccf804e 100644 --- a/synchronizer/store_test.go +++ b/synchronizer/store_test.go @@ -1,6 +1,7 @@ package synchronizer import ( + "context" "errors" "testing" @@ -50,7 +51,7 @@ func Test_getStartBlock(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testDB := tt.db(t) - if block, err := getStartBlock(testDB); tt.wantErr { + if block, err := getStartBlock(context.Background(), testDB); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -143,7 +144,7 @@ func Test_setStartBlock(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testDB := tt.db(t) - if err := setStartBlock(testDB, tt.block); tt.wantErr { + if err := setStartBlock(context.Background(), testDB, tt.block); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -190,7 +191,7 @@ func Test_exists(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testDB := tt.db(t) - got := exists(testDB, tt.key) + got := exists(context.Background(), testDB, tt.key) require.Equal(t, tt.want, got) }) } @@ -281,7 +282,7 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testDB := tt.db(t) - if err := storeUnresolvedBatchKeys(testDB, tt.keys); tt.wantErr { + if err := storeUnresolvedBatchKeys(context.Background(), testDB, tt.keys); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -333,7 +334,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testDB := tt.db(t) - if keys, err := getUnresolvedBatchKeys(testDB); tt.wantErr { + if keys, err := getUnresolvedBatchKeys(context.Background(), testDB); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -429,7 +430,7 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testDB := tt.db(t) - if err := deleteUnresolvedBatchKeys(testDB, testData); tt.wantErr { + if err := deleteUnresolvedBatchKeys(context.Background(), testDB, testData); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -523,7 +524,7 @@ func Test_storeOffchainData(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testDB := tt.db(t) - if err := storeOffchainData(testDB, tt.data); tt.wantErr { + if err := storeOffchainData(context.Background(), testDB, tt.data); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err)