Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close subscribers #6

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions synchronizer/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ type BatchSynchronizer struct {
db *db.DB
committee map[common.Address]etherman.DataCommitteeMember
lock sync.Mutex
reorgs <-chan ReorgBlock
reorgs <-chan BlockReorg
}

const dbTimeout = 2 * time.Second
const rpcTimeout = 3 * time.Second

// NewBatchSynchronizer creates the BatchSynchronizer
func NewBatchSynchronizer(cfg config.L1Config, self common.Address, db *db.DB, reorgs <-chan ReorgBlock) (*BatchSynchronizer, error) {
func NewBatchSynchronizer(cfg config.L1Config, self common.Address, db *db.DB, reorgs <-chan BlockReorg) (*BatchSynchronizer, error) {
watcher, err := newWatcher(cfg)
if err != nil {
return nil, err
Expand Down
24 changes: 15 additions & 9 deletions synchronizer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/0xPolygon/supernets2-node/log"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/blocktracker"
"github.com/umbracle/ethgo/jsonrpc"
Expand All @@ -14,13 +15,14 @@ import (
type ReorgDetector struct {
rpcUrl string
pollingPeriod time.Duration
subscribers []chan ReorgBlock
subscribers []chan BlockReorg
cancel context.CancelFunc
}

// ReorgBlock is emitted to subscribers when a reorg is detected. Number is the block to which the chain rewound.
type ReorgBlock struct {
// BlockReorg is emitted to subscribers when a reorg is detected. Number is the block to which the chain rewound.
type BlockReorg struct {
Number uint64
Hash ethgo.Hash
}

// NewReorgDetector creates a new ReorgDetector
Expand All @@ -32,14 +34,16 @@ func NewReorgDetector(rpcUrl string, pollingPeriod time.Duration) (*ReorgDetecto
}

// Subscribe returns a channel on which the caller can receive reorg messages
func (rd *ReorgDetector) Subscribe() <-chan ReorgBlock {
ch := make(chan ReorgBlock)
func (rd *ReorgDetector) Subscribe() <-chan BlockReorg {
ch := make(chan BlockReorg)
rd.subscribers = append(rd.subscribers, ch)
return ch
}

// Start starts the ReorgDetector tracking for reorg events
func (rd *ReorgDetector) Start() error {
log.Info("starting block reorganization detector")

ctx, cancel := context.WithCancel(context.Background())
rd.cancel = cancel

Expand All @@ -56,7 +60,7 @@ func (rd *ReorgDetector) Start() error {
case block := <-blocks:
if lastBlock != nil {
if lastBlock.Number+1 >= block.Number {
lca := ReorgBlock{Number: block.Number}
lca := BlockReorg{Number: block.Number, Hash: block.Hash}
for _, ch := range rd.subscribers {
ch <- lca
}
Expand All @@ -75,10 +79,12 @@ func (rd *ReorgDetector) Start() error {

// Stop stops the chain reorganization detector loop
func (rd *ReorgDetector) Stop() {
if rd.cancel == nil {
return
if rd.cancel != nil {
rd.cancel()
}
for _, ch := range rd.subscribers {
close(ch)
}
rd.cancel()
}

func (rd *ReorgDetector) trackBlocks(ctx context.Context, ch chan *ethgo.Block) error {
Expand Down
4 changes: 4 additions & 0 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ run-local: ## Runs a full data node
docker compose up -d supernets2-mock-l1-network
go run ../cmd run --cfg config/test.local.toml

.PHONY: run-dev
run-dev: run-db
go run ../cmd run --cfg config/test.dev.toml

.PHONY: run-db
run-db: ## Runs the data store
$(RUN-DB)
Expand Down