From b0bc6fce1199c607a5277ee348f114d875a05c52 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 9 Aug 2023 12:23:28 -0400 Subject: [PATCH] close subscriber channels on stop --- synchronizer/batches.go | 4 ++-- synchronizer/reorg.go | 24 +++++++++++++++--------- test/Makefile | 4 ++++ 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/synchronizer/batches.go b/synchronizer/batches.go index f05f02f2..29e3a6e0 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -30,14 +30,14 @@ type BatchSynchronizer struct { db *db.DB committee map[common.Address]etherman.DataCommitteeMember lock sync.Mutex - reorgs <-chan ReorgBlock + reorgs <-chan BlockReorg } const dbTimeout = 2 * time.Second const rpcTimeout = 3 * time.Second // NewBatchSynchronizer creates the BatchSynchronizer -func NewBatchSynchronizer(cfg config.L1Config, self common.Address, db *db.DB, reorgs <-chan ReorgBlock) (*BatchSynchronizer, error) { +func NewBatchSynchronizer(cfg config.L1Config, self common.Address, db *db.DB, reorgs <-chan BlockReorg) (*BatchSynchronizer, error) { watcher, err := newWatcher(cfg) if err != nil { return nil, err diff --git a/synchronizer/reorg.go b/synchronizer/reorg.go index 68c7449a..6f297957 100644 --- a/synchronizer/reorg.go +++ b/synchronizer/reorg.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/0xPolygon/supernets2-node/log" "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/blocktracker" "github.com/umbracle/ethgo/jsonrpc" @@ -14,13 +15,14 @@ import ( type ReorgDetector struct { rpcUrl string pollingPeriod time.Duration - subscribers []chan ReorgBlock + subscribers []chan BlockReorg cancel context.CancelFunc } -// ReorgBlock is emitted to subscribers when a reorg is detected. Number is the block to which the chain rewound. -type ReorgBlock struct { +// BlockReorg is emitted to subscribers when a reorg is detected. Number is the block to which the chain rewound. +type BlockReorg struct { Number uint64 + Hash ethgo.Hash } // NewReorgDetector creates a new ReorgDetector @@ -32,14 +34,16 @@ func NewReorgDetector(rpcUrl string, pollingPeriod time.Duration) (*ReorgDetecto } // Subscribe returns a channel on which the caller can receive reorg messages -func (rd *ReorgDetector) Subscribe() <-chan ReorgBlock { - ch := make(chan ReorgBlock) +func (rd *ReorgDetector) Subscribe() <-chan BlockReorg { + ch := make(chan BlockReorg) rd.subscribers = append(rd.subscribers, ch) return ch } // Start starts the ReorgDetector tracking for reorg events func (rd *ReorgDetector) Start() error { + log.Info("starting block reorganization detector") + ctx, cancel := context.WithCancel(context.Background()) rd.cancel = cancel @@ -56,7 +60,7 @@ func (rd *ReorgDetector) Start() error { case block := <-blocks: if lastBlock != nil { if lastBlock.Number+1 >= block.Number { - lca := ReorgBlock{Number: block.Number} + lca := BlockReorg{Number: block.Number, Hash: block.Hash} for _, ch := range rd.subscribers { ch <- lca } @@ -75,10 +79,12 @@ func (rd *ReorgDetector) Start() error { // Stop stops the chain reorganization detector loop func (rd *ReorgDetector) Stop() { - if rd.cancel == nil { - return + if rd.cancel != nil { + rd.cancel() + } + for _, ch := range rd.subscribers { + close(ch) } - rd.cancel() } func (rd *ReorgDetector) trackBlocks(ctx context.Context, ch chan *ethgo.Block) error { diff --git a/test/Makefile b/test/Makefile index 13d1d073..fefbc171 100644 --- a/test/Makefile +++ b/test/Makefile @@ -11,6 +11,10 @@ run-local: ## Runs a full data node docker compose up -d supernets2-mock-l1-network go run ../cmd run --cfg config/test.local.toml +.PHONY: run-dev +run-dev: run-db + go run ../cmd run --cfg config/test.dev.toml + .PHONY: run-db run-db: ## Runs the data store $(RUN-DB)