Skip to content

Commit

Permalink
ethmonitor: add IsStreamingMode() method
Browse files Browse the repository at this point in the history
  • Loading branch information
pkieltyka committed Oct 30, 2024
1 parent 0bbb842 commit 0d909e5
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type Monitor struct {
nextBlockNumber *big.Int
nextBlockNumberMu sync.Mutex
pollInterval atomic.Int64
isStreamingMode atomic.Bool

cache cachestore.Store[[]byte]

Expand Down Expand Up @@ -306,6 +307,9 @@ func (m *Monitor) IsStreamingEnabled() bool {
return !m.options.StreamingDisabled && m.provider.IsStreamingEnabled()
}

func (m *Monitor) IsStreamingMode() bool {
return m.isStreamingMode.Load()
}
func (m *Monitor) listenNewHead() <-chan uint64 {
ch := make(chan uint64)

Expand Down Expand Up @@ -347,6 +351,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
// Streaming mode if available, where we listen for new heads
// and push the new block number to the nextBlock channel.
m.log.Info("ethmonitor: starting stream head listener")
m.isStreamingMode.Store(true)

newHeads := make(chan *types.Header)
sub, err := m.provider.SubscribeNewHeads(m.ctx, newHeads)
Expand Down Expand Up @@ -387,6 +392,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
} else {
// We default to polling if streaming is not enabled
m.log.Info("ethmonitor: starting poll head listener")
m.isStreamingMode.Store(false)

retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter)
for {
Expand Down

0 comments on commit 0d909e5

Please sign in to comment.