Skip to content

Commit

Permalink
feat: add namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
0x416e746f6e committed Mar 15, 2024
1 parent 06e96ec commit bca9c33
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 46 deletions.
8 changes: 4 additions & 4 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ func CommandServe(cfg *config.Config) *cli.Command {
Category: categoryEth,
Destination: executionEndpoints,
EnvVars: []string{"NODE_MONITOR_ETH_EL_ENDPOINTS"},
Name: "eth-el-endpoints",
Usage: "eth execution endpoints (websocket) in the format of `id=hostname:port`",
Name: "eth-el-endpoint",
Usage: "eth execution endpoints (websocket) in the format of `[namespace:]id=hostname:port`",
},

&cli.StringSliceFlag{
Category: categoryEth,
Destination: externalExecutionEndpoints,
EnvVars: []string{"NODE_MONITOR_ETH_EXT_EL_ENDPOINTS"},
Name: "eth-ext-el-endpoints",
Usage: "external eth execution endpoints (websocket) in the format of `id=hostname:port`",
Name: "eth-ext-el-endpoint",
Usage: "external eth execution endpoints (websocket) in the format of `[namespace:]id=hostname:port`",
},

&cli.DurationFlag{
Expand Down
74 changes: 49 additions & 25 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"math/big"
"net/http"
"time"

Expand All @@ -18,20 +19,31 @@ func (s *Server) handleEventEthNewHeader(
) {
l := logutils.LoggerFromContext(ctx)

s.state.UpdateHighestBlockIfNeeded(header.Number, ts)
s.state.ExecutionEndpoint(id).UpdateHighestBlockIfNeeded(header.Number, ts)
ee := s.state.ExecutionEndpoint(id)
name, namespace := ee.Name()

latency := ts.Sub(s.state.HighestBlockTime())
s.state.UpdateHighestBlockIfNeeded(namespace, header.Number, ts)
ee.UpdateHighestBlockIfNeeded(header.Number, ts)

latency := ts.Sub(s.state.HighestBlockTime(namespace))
var attrs []attribute.KeyValue
if namespace != "" {
attrs = []attribute.KeyValue{
{Key: "name", Value: attribute.StringValue(name)},
{Key: "namespace", Value: attribute.StringValue(namespace)},
}
} else {
attrs = []attribute.KeyValue{
{Key: "name", Value: attribute.StringValue(name)},
}
}

s.metrics.newBlockLatency.Record(ctx,
latency.Seconds(),
metric.WithAttributes(attribute.KeyValue{
Key: "instance_name",
Value: attribute.StringValue(id),
}),
metric.WithAttributes(attrs...),
)

l.Info("Received new header",
l.Debug("Received new header",
zap.String("block", header.Number.String()),
zap.String("id", id),
zap.Duration("latency", latency),
Expand All @@ -43,29 +55,41 @@ func (s *Server) handleHealthcheck(w http.ResponseWriter, r *http.Request) {
}

func (s *Server) handleEventPrometheusObserve(_ context.Context, o metric.Observer) error {
o.ObserveFloat64(
s.metrics.secondsSinceLastBlock,
time.Since(s.state.HighestBlockTime()).Seconds(),
s.state.IterateNamespaces(func(namespace string, highestBlock *big.Int, highestBlockTime time.Time) {
var attrs []attribute.KeyValue
if namespace != "" {
attrs = []attribute.KeyValue{
{Key: "namespace", Value: attribute.StringValue(namespace)},
}
} else {
attrs = []attribute.KeyValue{}
}

metric.WithAttributes(
attribute.KeyValue{
Key: "instance_name",
Value: attribute.StringValue("__global"),
},
),
)
o.ObserveFloat64(
s.metrics.secondsSinceLastBlock,
time.Since(highestBlockTime).Seconds(),
metric.WithAttributes(attrs...),
)
})

s.state.IterateExecutionEndpoints(func(id string, ee *state.ExecutionEndpoint) {
name, namespace := ee.Name()
var attrs []attribute.KeyValue
if namespace != "" {
attrs = []attribute.KeyValue{
{Key: "name", Value: attribute.StringValue(name)},
{Key: "namespace", Value: attribute.StringValue(namespace)},
}
} else {
attrs = []attribute.KeyValue{
{Key: "name", Value: attribute.StringValue(name)},
}
}

o.ObserveFloat64(
s.metrics.secondsSinceLastBlock,
time.Since(ee.HighestBlockTime()).Seconds(),

metric.WithAttributes(
attribute.KeyValue{
Key: "instance_name",
Value: attribute.StringValue(id),
},
),
metric.WithAttributes(attrs...),
)
})

Expand Down
17 changes: 14 additions & 3 deletions state/execution_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,34 @@ import (
)

type ExecutionEndpoint struct {
id string
id string
name string
namespace string

highestBlock *big.Int
highestBlockTime time.Time

mx sync.RWMutex
}

func newExecutionEndpoint(id string) *ExecutionEndpoint {
func newExecutionEndpoint(id, name, namespace string) *ExecutionEndpoint {
return &ExecutionEndpoint{
id: id,
id: id,
name: name,
namespace: namespace,

highestBlock: big.NewInt(0),
highestBlockTime: time.Time{},
}
}

func (e *ExecutionEndpoint) Name() (name, namespace string) {
e.mx.RLock()
defer e.mx.RUnlock()

return e.name, e.namespace
}

func (e *ExecutionEndpoint) HighestBlock() *big.Int {
e.mx.RLock()
defer e.mx.RUnlock()
Expand Down
55 changes: 41 additions & 14 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type State struct {
executionEndpoints map[string]*ExecutionEndpoint

highestBlock *big.Int
highestBlockTime time.Time
highestBlock map[string]*big.Int
highestBlockTime map[string]time.Time

mx sync.RWMutex
}
Expand All @@ -25,22 +25,34 @@ func New() *State {
return &State{
executionEndpoints: map[string]*ExecutionEndpoint{},

highestBlock: big.NewInt(0),
highestBlockTime: time.Time{},
highestBlock: make(map[string]*big.Int),
highestBlockTime: make(map[string]time.Time),
}
}

func (s *State) RegisterExecutionEndpoint(id string) error {
s.mx.Lock()
defer s.mx.Unlock()

name, namespace, err := parseExecutionPointID(id)
if err != nil {
return err
}

if _, exists := s.executionEndpoints[id]; exists {
return fmt.Errorf("%w: %s",
ErrExecutionEndpointDuplicateID, id,
)
}

s.executionEndpoints[id] = newExecutionEndpoint(id)
if _, exists := s.highestBlock[namespace]; !exists {
s.highestBlock[namespace] = big.NewInt(0)
}
if _, exists := s.highestBlockTime[namespace]; !exists {
s.highestBlockTime[namespace] = time.Time{}
}

s.executionEndpoints[id] = newExecutionEndpoint(id, name, namespace)

return nil
}
Expand All @@ -52,6 +64,17 @@ func (s *State) ExecutionEndpoint(id string) *ExecutionEndpoint {
return s.executionEndpoints[id]
}

func (s *State) IterateNamespaces(do func(namespace string, highestBlock *big.Int, highestBlockTime time.Time)) {
s.mx.RLock()
defer s.mx.RUnlock()

for namespace, highestBlock := range s.highestBlock {
highestBlock = new(big.Int).Set(highestBlock)
highestBlockTime := s.highestBlockTime[namespace]
do(namespace, highestBlock, highestBlockTime)
}
}

func (s *State) IterateExecutionEndpoints(do func(id string, ee *ExecutionEndpoint)) {
s.mx.RLock()
defer s.mx.RUnlock()
Expand All @@ -61,32 +84,36 @@ func (s *State) IterateExecutionEndpoints(do func(id string, ee *ExecutionEndpoi
}
}

func (s *State) HighestBlock() *big.Int {
func (s *State) HighestBlock(namespace string) *big.Int {
s.mx.RLock()
defer s.mx.RUnlock()

res := new(big.Int).Set(s.highestBlock)
res := new(big.Int).Set(s.highestBlock[namespace])
return res
}

func (s *State) HighestBlockTime() time.Time {
func (s *State) HighestBlockTime(namespace string) time.Time {
s.mx.RLock()
defer s.mx.RUnlock()

return s.highestBlockTime
return s.highestBlockTime[namespace]
}

func (s *State) UpdateHighestBlockIfNeeded(block *big.Int, blockTime time.Time) {
func (s *State) UpdateHighestBlockIfNeeded(
namespace string,
block *big.Int,
blockTime time.Time,
) {
s.mx.RLock()
defer s.mx.RUnlock()

// update the highest block
if cmp := s.highestBlock.Cmp(block); cmp == -1 {
if cmp := s.highestBlock[namespace].Cmp(block); cmp == -1 {
s.mx.RUnlock()
s.mx.Lock()
if cmp := s.highestBlock.Cmp(block); cmp == -1 {
s.highestBlock = new(big.Int).Set(block)
s.highestBlockTime = blockTime
if cmp := s.highestBlock[namespace].Cmp(block); cmp == -1 {
s.highestBlock[namespace] = new(big.Int).Set(block)
s.highestBlockTime[namespace] = blockTime
}
s.mx.Unlock()
s.mx.RLock()
Expand Down
30 changes: 30 additions & 0 deletions state/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package state

import (
"errors"
"fmt"
"strings"
)

var (
ErrExecutionEndpointToManyPartsInID = errors.New("too many parts in the id of execution endpoint")
)

func parseExecutionPointID(id string) (
name, namespace string, err error,
) {
name = id

if strings.Contains(id, ":") {
parts := strings.Split(id, ":")
if len(parts) > 2 {
return "", "", fmt.Errorf("%w: %s",
ErrExecutionEndpointToManyPartsInID, id,
)
}
namespace = parts[0]
name = parts[1]
}

return name, namespace, nil
}

0 comments on commit bca9c33

Please sign in to comment.