Skip to content

Commit

Permalink
Merge pull request #6 from 0xPolygon/Bug/close-subscribers
Browse files Browse the repository at this point in the history
Close subscribers
  • Loading branch information
christophercampbell authored Aug 9, 2023
2 parents e1f21ce + b0bc6fc commit afdd6b7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
4 changes: 2 additions & 2 deletions synchronizer/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ type BatchSynchronizer struct {
db *db.DB
committee map[common.Address]etherman.DataCommitteeMember
lock sync.Mutex
reorgs <-chan ReorgBlock
reorgs <-chan BlockReorg
}

const dbTimeout = 2 * time.Second
const rpcTimeout = 3 * time.Second

// NewBatchSynchronizer creates the BatchSynchronizer
func NewBatchSynchronizer(cfg config.L1Config, self common.Address, db *db.DB, reorgs <-chan ReorgBlock) (*BatchSynchronizer, error) {
func NewBatchSynchronizer(cfg config.L1Config, self common.Address, db *db.DB, reorgs <-chan BlockReorg) (*BatchSynchronizer, error) {
watcher, err := newWatcher(cfg)
if err != nil {
return nil, err
Expand Down
24 changes: 15 additions & 9 deletions synchronizer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/0xPolygon/supernets2-node/log"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/blocktracker"
"github.com/umbracle/ethgo/jsonrpc"
Expand All @@ -14,13 +15,14 @@ import (
type ReorgDetector struct {
rpcUrl string
pollingPeriod time.Duration
subscribers []chan ReorgBlock
subscribers []chan BlockReorg
cancel context.CancelFunc
}

// ReorgBlock is emitted to subscribers when a reorg is detected. Number is the block to which the chain rewound.
type ReorgBlock struct {
// BlockReorg is emitted to subscribers when a reorg is detected. Number is the block to which the chain rewound.
type BlockReorg struct {
Number uint64
Hash ethgo.Hash
}

// NewReorgDetector creates a new ReorgDetector
Expand All @@ -32,14 +34,16 @@ func NewReorgDetector(rpcUrl string, pollingPeriod time.Duration) (*ReorgDetecto
}

// Subscribe returns a channel on which the caller can receive reorg messages
func (rd *ReorgDetector) Subscribe() <-chan ReorgBlock {
ch := make(chan ReorgBlock)
func (rd *ReorgDetector) Subscribe() <-chan BlockReorg {
ch := make(chan BlockReorg)
rd.subscribers = append(rd.subscribers, ch)
return ch
}

// Start starts the ReorgDetector tracking for reorg events
func (rd *ReorgDetector) Start() error {
log.Info("starting block reorganization detector")

ctx, cancel := context.WithCancel(context.Background())
rd.cancel = cancel

Expand All @@ -56,7 +60,7 @@ func (rd *ReorgDetector) Start() error {
case block := <-blocks:
if lastBlock != nil {
if lastBlock.Number+1 >= block.Number {
lca := ReorgBlock{Number: block.Number}
lca := BlockReorg{Number: block.Number, Hash: block.Hash}
for _, ch := range rd.subscribers {
ch <- lca
}
Expand All @@ -75,10 +79,12 @@ func (rd *ReorgDetector) Start() error {

// Stop stops the chain reorganization detector loop
func (rd *ReorgDetector) Stop() {
if rd.cancel == nil {
return
if rd.cancel != nil {
rd.cancel()
}
for _, ch := range rd.subscribers {
close(ch)
}
rd.cancel()
}

func (rd *ReorgDetector) trackBlocks(ctx context.Context, ch chan *ethgo.Block) error {
Expand Down
4 changes: 4 additions & 0 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ run-local: ## Runs a full data node
docker compose up -d supernets2-mock-l1-network
go run ../cmd run --cfg config/test.local.toml

.PHONY: run-dev
run-dev: run-db
go run ../cmd run --cfg config/test.dev.toml

.PHONY: run-db
run-db: ## Runs the data store
$(RUN-DB)
Expand Down

0 comments on commit afdd6b7

Please sign in to comment.