Skip to content

Commit

Permalink
feat: handle nodes that lag and catch up
Browse files Browse the repository at this point in the history
  • Loading branch information
0x416e746f6e committed Mar 17, 2024
1 parent 04663c8 commit 2cc0363
Show file tree
Hide file tree
Showing 20 changed files with 653 additions and 459 deletions.
5 changes: 4 additions & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func CommandServe(cfg *config.Config) *cli.Command {
EnvVars: []string{"NODE_MONITOR_RESUBSCRIBE_INTERVAL"},
Name: "resubscribe-interval",
Usage: "an `interval` at which the monitor will try to (re-)subscribe to node events",
Value: 15 * time.Second,
Value: 5 * time.Second,
},
}

Expand Down Expand Up @@ -98,6 +98,9 @@ func CommandServe(cfg *config.Config) *cli.Command {
parts[idx] = strings.TrimSpace(part)
}
id := parts[0]
if _, _, err := utils.ParseELEndpointID(id); err != nil {
return err
}
uri := parts[1]
parsed, err := utils.ParseRawURI(uri)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/sdk/metric v1.24.0
go.uber.org/zap v1.27.0
gotest.tools v2.2.0+incompatible
)

require (
Expand All @@ -32,10 +33,12 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,5 +206,7 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU=
rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA=
2 changes: 1 addition & 1 deletion httplogger/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Middleware(logger *zap.Logger, next http.Handler) http.Handler {

// Passing request stats both in-message (for the human reader)
// as well as inside the structured log (for the machine parser)
logger.Debug(fmt.Sprintf("%s: %s %s %d", r.URL.Scheme, r.Method, r.URL.EscapedPath(), wrapped.Status()),
logger.Debug(fmt.Sprintf("%s %s %d", r.Method, r.URL.EscapedPath(), wrapped.Status()),
zap.Int("durationMs", int(time.Since(start).Milliseconds())),
zap.Int("status", wrapped.Status()),
zap.String("httpRequestID", httpRequestID),
Expand Down
109 changes: 62 additions & 47 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,55 @@ package server

import (
"context"
"math/big"
"net/http"
"time"

ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/flashbots/node-monitor/logutils"
"github.com/flashbots/node-monitor/state"
"github.com/flashbots/node-monitor/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

const (
defaultNamespace = "__default"
defaultGroup = "__default"
groupEndpoint = "__group"

keyName = "node_monitor_target_name"
keyNamespace = "node_monitor_target_namespace"
keyTargetEndpoint = "node_monitor_target_endpoint"
keyTargetGroup = "node_monitor_target_group"
keyTargetID = "node_monitor_target_id"
)

func (s *Server) handleEventEthNewHeader(
ctx context.Context, id string, ts time.Time, header *ethtypes.Header,
ctx context.Context,
gname, ename string,
ts time.Time,
header *ethtypes.Header,
) {
l := logutils.LoggerFromContext(ctx)

ee := s.state.ExecutionEndpoint(id)
name, namespace := ee.Name()
block := header.Number

s.state.UpdateHighestBlockIfNeeded(namespace, header.Number, ts)
ee.UpdateHighestBlockIfNeeded(header.Number, ts)
g := s.state.ExecutionGroup(gname)
e := g.Endpoint(ename)

e.RegisterBlock(block, ts)
latency := g.RegisterBlockAndGetLatency(block, ts)

latency := ts.Sub(s.state.HighestBlockTime(namespace))
var attrs []attribute.KeyValue
if namespace != "" {
if gname != "" {
attrs = []attribute.KeyValue{
{Key: keyName, Value: attribute.StringValue(name)},
{Key: keyNamespace, Value: attribute.StringValue(namespace)},
{Key: keyTargetEndpoint, Value: attribute.StringValue(ename)},
{Key: keyTargetGroup, Value: attribute.StringValue(gname)},
{Key: keyTargetID, Value: attribute.StringValue(utils.MakeELEndpointID(gname, ename))},
}
} else {
attrs = []attribute.KeyValue{
{Key: keyName, Value: attribute.StringValue(name)},
{Key: keyNamespace, Value: attribute.StringValue(defaultNamespace)},
{Key: keyTargetEndpoint, Value: attribute.StringValue(ename)},
{Key: keyTargetGroup, Value: attribute.StringValue(defaultGroup)},
{Key: keyTargetID, Value: attribute.StringValue(utils.MakeELEndpointID(gname, ename))},
}
}

Expand All @@ -52,9 +60,10 @@ func (s *Server) handleEventEthNewHeader(
)

l.Debug("Received new header",
zap.String("block", header.Number.String()),
zap.String("id", id),
zap.Duration("latency", latency),
zap.String("block", block.String()),
zap.String("endpoint_group", gname),
zap.String("endpoint_name", ename),
zap.Duration("latency_s", latency),
)
}

Expand All @@ -63,45 +72,51 @@ func (s *Server) handleHealthcheck(w http.ResponseWriter, r *http.Request) {
}

func (s *Server) handleEventPrometheusObserve(_ context.Context, o metric.Observer) error {
s.state.IterateNamespaces(func(namespace string, highestBlock *big.Int, highestBlockTime time.Time) {
s.state.IterateELGroupsRO(func(gname string, g *state.ELGroup) {
var attrs []attribute.KeyValue
if namespace != "" {
if gname != "" {
attrs = []attribute.KeyValue{
{Key: keyNamespace, Value: attribute.StringValue(namespace)},
{Key: keyTargetEndpoint, Value: attribute.StringValue(groupEndpoint)},
{Key: keyTargetGroup, Value: attribute.StringValue(gname)},
}
} else {
attrs = []attribute.KeyValue{
{Key: keyNamespace, Value: attribute.StringValue(defaultNamespace)},
{Key: keyTargetGroup, Value: attribute.StringValue(defaultGroup)},
{Key: keyTargetEndpoint, Value: attribute.StringValue(groupEndpoint)},
}
}

o.ObserveFloat64(
s.metrics.secondsSinceLastBlock,
time.Since(highestBlockTime).Seconds(),
metric.WithAttributes(attrs...),
)
})

s.state.IterateExecutionEndpoints(func(id string, ee *state.ExecutionEndpoint) {
name, namespace := ee.Name()
var attrs []attribute.KeyValue
if namespace != "" {
attrs = []attribute.KeyValue{
{Key: keyName, Value: attribute.StringValue(name)},
{Key: keyNamespace, Value: attribute.StringValue(namespace)},
b, t := g.TimeSinceHighestBlock()

// group's highest block
o.ObserveInt64(s.metrics.highestBlock, b, metric.WithAttributes(attrs...))

// group's time since last block
o.ObserveFloat64(s.metrics.timeSinceLastBlock, t.Seconds(), metric.WithAttributes(attrs...))

g.IterateEndpointsRO(func(ename string, e *state.ELEndpoint) {
if gname != "" {
attrs = []attribute.KeyValue{
{Key: keyTargetEndpoint, Value: attribute.StringValue(ename)},
{Key: keyTargetGroup, Value: attribute.StringValue(gname)},
{Key: keyTargetID, Value: attribute.StringValue(utils.MakeELEndpointID(gname, ename))},
}
} else {
attrs = []attribute.KeyValue{
{Key: keyTargetEndpoint, Value: attribute.StringValue(ename)},
{Key: keyTargetGroup, Value: attribute.StringValue(defaultGroup)},
{Key: keyTargetID, Value: attribute.StringValue(utils.MakeELEndpointID(gname, ename))},
}
}
} else {
attrs = []attribute.KeyValue{
{Key: keyName, Value: attribute.StringValue(name)},
{Key: keyNamespace, Value: attribute.StringValue(defaultNamespace)},
}
}

o.ObserveFloat64(
s.metrics.secondsSinceLastBlock,
time.Since(ee.HighestBlockTime()).Seconds(),
metric.WithAttributes(attrs...),
)
b, t := e.TimeSinceHighestBlock()

// endpoint's highest block
o.ObserveInt64(s.metrics.highestBlock, b, metric.WithAttributes(attrs...))

// endpoint's time since last block
o.ObserveFloat64(s.metrics.timeSinceLastBlock, t.Seconds(), metric.WithAttributes(attrs...))
})
})

return nil
Expand Down
44 changes: 31 additions & 13 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
)

const (
metricsNewBlockLatency = "new_block_latency"
metricHighestBlock = "highest_block"
metricNewBlockLatency = "new_block_latency"
metricTimeSinceLastBlock = "time_since_last_block"
)

var (
metricDescriptions = map[string]string{
metricsNewBlockLatency: "Statistics on how late a node receives blocks compared to the earliest observed ones",
metricHighestBlock: "The highest known block",
metricNewBlockLatency: "Statistics on how late a node receives blocks compared to the earliest observed ones",
metricTimeSinceLastBlock: "Time passed since last block was received",
}
)
Expand All @@ -26,36 +28,52 @@ var (
)

type metrics struct {
newBlockLatency otelapi.Float64Histogram
secondsSinceLastBlock otelapi.Float64Observable
highestBlock otelapi.Int64ObservableGauge
newBlockLatency otelapi.Float64Histogram
timeSinceLastBlock otelapi.Float64Observable
}

func (m *metrics) setup(meter otelapi.Meter, observe func(ctx context.Context, o metric.Observer) error) error {
secondsSinceLastBlock, err := meter.Float64ObservableGauge(metricTimeSinceLastBlock,
otelapi.WithDescription(metricDescriptions[metricTimeSinceLastBlock]),
otelapi.WithUnit("s"),
// highest block
highestBlock, err := meter.Int64ObservableGauge(metricHighestBlock,
otelapi.WithDescription(metricDescriptions[metricHighestBlock]),
)
if err != nil {
return fmt.Errorf("%w: %w: %s",
ErrSetupMetricsFailed, err, metricTimeSinceLastBlock,
ErrSetupMetricsFailed, err, metricHighestBlock,
)
}
m.secondsSinceLastBlock = secondsSinceLastBlock
m.highestBlock = highestBlock

newBlockLatency, err := meter.Float64Histogram(metricsNewBlockLatency,
// new block latency
newBlockLatency, err := meter.Float64Histogram(metricNewBlockLatency,
metric.WithExplicitBucketBoundaries(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 1.5, 3, 6, 12),
otelapi.WithDescription(metricDescriptions[metricsNewBlockLatency]),
otelapi.WithDescription(metricDescriptions[metricNewBlockLatency]),
otelapi.WithUnit("s"),
)
if err != nil {
return fmt.Errorf("%w: %w: %s",
ErrSetupMetricsFailed, err, metricsNewBlockLatency,
ErrSetupMetricsFailed, err, metricNewBlockLatency,
)
}
m.newBlockLatency = newBlockLatency

// time since last block
timeSinceLastBlock, err := meter.Float64ObservableGauge(metricTimeSinceLastBlock,
otelapi.WithDescription(metricDescriptions[metricTimeSinceLastBlock]),
otelapi.WithUnit("s"),
)
if err != nil {
return fmt.Errorf("%w: %w: %s",
ErrSetupMetricsFailed, err, metricTimeSinceLastBlock,
)
}
m.timeSinceLastBlock = timeSinceLastBlock

// observables
if _, err := meter.RegisterCallback(observe,
m.secondsSinceLastBlock,
m.highestBlock,
m.timeSinceLastBlock,
); err != nil {
return err
}
Expand Down
26 changes: 14 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/flashbots/node-monitor/prometheus"
"github.com/flashbots/node-monitor/state"
"github.com/flashbots/node-monitor/subscriber"
"github.com/flashbots/node-monitor/utils"
otelapi "go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

Expand All @@ -31,7 +32,7 @@ type Server struct {
metrics *metrics
state *state.State

subs map[string]*subscriber.ExecutionEndpoint
subs map[string]*subscriber.ELEndpoint
}

var (
Expand All @@ -54,7 +55,7 @@ func New(cfg *config.Config) (*Server, error) {
}

state := state.New()
subs := make(map[string]*subscriber.ExecutionEndpoint, len(cfg.Eth.ExecutionEndpoints))
subs := make(map[string]*subscriber.ELEndpoint, len(cfg.Eth.ExecutionEndpoints))
for _, rpc := range cfg.Eth.ExecutionEndpoints {
parts := strings.Split(rpc, "=")
id := parts[0]
Expand All @@ -64,14 +65,18 @@ func New(cfg *config.Config) (*Server, error) {
ErrExecutionEndpointDuplicateId, id,
)
}
sub, err := subscriber.NewExecutionEndpoint(cfg, id, uri)
group, name, err := utils.ParseELEndpointID(id)
if err != nil {
return nil, err
}
sub, err := subscriber.NewELEndpoint(cfg, group, name, uri)
if err != nil {
return nil, fmt.Errorf("%w: %w",
ErrExecutionEndpointFailedToSubscribe, err,
)
}
subs[id] = sub
if err := state.RegisterExecutionEndpoint(id); err != nil {
if err := state.RegisterExecutionEndpoint(group, name); err != nil {
return nil, fmt.Errorf("%w: %w",
ErrExecutionEndpointFailedToRegister, err,
)
Expand Down Expand Up @@ -134,17 +139,14 @@ func (s *Server) Run() error {
}
}()

for _, sub := range s.subs {
if err := sub.Subscribe(ctx, s.handleEventEthNewHeader); err != nil {
return fmt.Errorf("%w: %w",
ErrExecutionEndpointFailedToSubscribe, err,
)
}
}

l.Info("Starting up the monitor server...",
zap.String("server_listen_address", s.cfg.Server.ListenAddress),
)

for _, sub := range s.subs {
sub.Subscribe(ctx, s.handleEventEthNewHeader)
}

if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
l.Error("Monitor server failed", zap.Error(err))
}
Expand Down
Loading

0 comments on commit 2cc0363

Please sign in to comment.