From 0d909e5c50b98ab31e528a1ee78b209100322005 Mon Sep 17 00:00:00 2001 From: Peter Kieltyka Date: Wed, 30 Oct 2024 11:40:54 -0400 Subject: [PATCH] ethmonitor: add IsStreamingMode() method --- ethmonitor/ethmonitor.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 5d595d8..c32c8af 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -131,6 +131,7 @@ type Monitor struct { nextBlockNumber *big.Int nextBlockNumberMu sync.Mutex pollInterval atomic.Int64 + isStreamingMode atomic.Bool cache cachestore.Store[[]byte] @@ -306,6 +307,9 @@ func (m *Monitor) IsStreamingEnabled() bool { return !m.options.StreamingDisabled && m.provider.IsStreamingEnabled() } +func (m *Monitor) IsStreamingMode() bool { + return m.isStreamingMode.Load() +} func (m *Monitor) listenNewHead() <-chan uint64 { ch := make(chan uint64) @@ -347,6 +351,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 { // Streaming mode if available, where we listen for new heads // and push the new block number to the nextBlock channel. m.log.Info("ethmonitor: starting stream head listener") + m.isStreamingMode.Store(true) newHeads := make(chan *types.Header) sub, err := m.provider.SubscribeNewHeads(m.ctx, newHeads) @@ -387,6 +392,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 { } else { // We default to polling if streaming is not enabled m.log.Info("ethmonitor: starting poll head listener") + m.isStreamingMode.Store(false) retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter) for {