Skip to content

Commit

Permalink
adaptive poll interval
Browse files Browse the repository at this point in the history
  • Loading branch information
pkieltyka committed Aug 11, 2023
1 parent de0a3cf commit 826470c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
2 changes: 1 addition & 1 deletion cmd/chain-watch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func main() {
// Monitor options
cachestore.MaxKeyLength = 180
monitorOptions := ethmonitor.DefaultOptions
monitorOptions.PollingInterval = time.Duration(1000 * time.Millisecond)
monitorOptions.PollingInterval = time.Duration(2000 * time.Millisecond)
monitorOptions.DebugLogging = true
monitorOptions.WithLogs = true
monitorOptions.BlockRetentionLimit = 64
Expand Down
41 changes: 33 additions & 8 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

var DefaultOptions = Options{
Logger: logger.NewLogger(logger.LogLevel_WARN),
PollingInterval: 1000 * time.Millisecond,
PollingInterval: 1500 * time.Millisecond,
Timeout: 20 * time.Second,
StartBlockNumber: nil, // latest
TrailNumBlocksBehindHead: 0, // latest
Expand Down Expand Up @@ -257,10 +257,13 @@ func (m *Monitor) monitor() error {
ctx := m.ctx
events := Blocks{}

// loopInterval is time we monitor between cycles. It's a fast
// minLoopInterval is time we monitor between cycles. It's a fast
// and fixed amount of time, as the internal method `fetchNextBlock`
// will actually use the poll interval while searching for the next block.
loopInterval := 100 * time.Millisecond
minLoopInterval := 100 * time.Millisecond

// adaptive poll interval
pollInterval := m.options.PollingInterval

// monitor run loop
for {
Expand All @@ -269,15 +272,15 @@ func (m *Monitor) monitor() error {
case <-m.ctx.Done():
return nil

case <-time.After(loopInterval):
case <-time.After(pollInterval):
// ...
headBlock := m.chain.Head()
if headBlock != nil {
m.nextBlockNumber = big.NewInt(0).Add(headBlock.Number(), big.NewInt(1))
}

// ..
nextBlock, err := m.fetchNextBlock(ctx)
nextBlock, miss, err := m.fetchNextBlock(ctx)
if err != nil {
m.log.Warnf("ethmonitor: fetchNextBlock error reported '%v', for blockNum:%d, retrying..", err, m.nextBlockNumber.Uint64())

Expand All @@ -286,6 +289,14 @@ func (m *Monitor) monitor() error {
continue
}

// if we hit a miss between calls, then we reset the pollInterval, otherwise
// we speed up the polling interval
if miss {
pollInterval = m.options.PollingInterval
} else {
pollInterval = clampDuration(minLoopInterval, pollInterval/4)
}

// build deterministic set of add/remove events which construct the canonical chain
events, err = m.buildCanonicalChain(ctx, nextBlock, events)
if err != nil {
Expand Down Expand Up @@ -491,7 +502,9 @@ func (m *Monitor) backfillChainLogs(ctx context.Context) {
}
}

func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, error) {
func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, bool, error) {
miss := false

getter := func(ctx context.Context, _ string) (*types.Block, error) {
m.log.Debugf("ethmonitor: fetchNextBlock is calling origin for number %s", m.nextBlockNumber)
for {
Expand All @@ -503,11 +516,13 @@ func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, error) {

nextBlock, err := m.fetchBlockByNumber(ctx, m.nextBlockNumber)
if errors.Is(err, ethereum.NotFound) {
miss = true
time.Sleep(m.options.PollingInterval)
continue
}
if err != nil {
m.log.Warnf("ethmonitor: [retrying] failed to fetch next block # %d, due to: %v", m.nextBlockNumber, err)
miss = true
time.Sleep(m.options.PollingInterval)
continue
}
Expand All @@ -518,9 +533,11 @@ func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, error) {

if m.blockCache != nil {
key := fmt.Sprintf("ethmonitor:%s:BlockNum:%s", m.chainID.String(), m.nextBlockNumber.String())
return m.blockCache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
nextBlock, err := m.blockCache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
return nextBlock, miss, err
}
return getter(ctx, "")
nextBlock, err := getter(ctx, "")
return nextBlock, miss, err
}

func (m *Monitor) fetchBlockByNumber(ctx context.Context, num *big.Int) (*types.Block, error) {
Expand Down Expand Up @@ -769,3 +786,11 @@ func getChainID(provider ethrpc.Interface) (*big.Int, error) {
}
return chainID, nil
}

func clampDuration(x, y time.Duration) time.Duration {
if x > y {
return x
} else {
return y
}
}

0 comments on commit 826470c

Please sign in to comment.