From 26ecaacb1ac0d7fcdcd14f5173f2430515b83062 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Fri, 1 Nov 2024 16:35:00 +0200 Subject: [PATCH] feat: implement head block age monitoring --- .gitignore | 1 + cmd/serve.go | 21 ++- config/config.go | 22 +++ config/errors.go | 23 +++ config/healthcheck.go | 9 +- config/healthcheck_geth.go | 20 ++- config/healthcheck_lighthouse.go | 20 ++- config/healthcheck_op_node.go | 22 ++- config/healthcheck_reth.go | 20 ++- config/http_status.go | 4 + config/log.go | 4 + config/server.go | 4 + healthcheck/geth.go | 178 +++++++++++++++++------ healthcheck/lighthouse.go | 233 +++++++++++++++++++++---------- healthcheck/op_node.go | 182 +++++++++++++----------- healthcheck/reth.go | 185 +++++++++++++++++------- server/healthcheck.go | 40 +++--- server/server.go | 2 +- 18 files changed, 716 insertions(+), 274 deletions(-) create mode 100644 config/errors.go diff --git a/.gitignore b/.gitignore index ace404b..e740f32 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ /bin/* !/bin/.gitkeep +/cmd/__debug_* dist # ide diff --git a/cmd/serve.go b/cmd/serve.go index ea85077..c695636 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -33,14 +33,23 @@ func CommandServe(cfg *config.Config) *cli.Command { healthcheckFlags := []cli.Flag{ &cli.DurationFlag{ Category: strings.ToUpper(categoryHealthcheck), - Destination: &cfg.Healthcheck.CacheTimeout, + Destination: &cfg.Healthcheck.BlockAgeThreshold, DefaultText: "disabled", - EnvVars: []string{envPrefix + strings.ToUpper(categoryHealthcheck) + "_CACHE_TIMEOUT"}, - Name: categoryHealthcheck + "-cache-timeout", - Usage: "re-use healthcheck results for the specified `duration`", + EnvVars: []string{envPrefix + strings.ToUpper(categoryHealthcheck) + "_BLOCK_AGE_THRESHOLD"}, + Name: categoryHealthcheck + "-block-age-threshold", + Usage: "monitor the age of latest block and report unhealthy if it's over specified `duration`", Value: 0, }, + &cli.DurationFlag{ + Category: strings.ToUpper(categoryHealthcheck), + Destination: &cfg.Healthcheck.CacheCoolOff, + EnvVars: []string{envPrefix + strings.ToUpper(categoryHealthcheck) + "_CACHE_COOL_OFF"}, + Name: categoryHealthcheck + "-cache-cool-off", + Usage: "re-use healthcheck results for the specified `duration`", + Value: 750 * time.Millisecond, + }, + &cli.DurationFlag{ Category: strings.ToUpper(categoryHealthcheck), Destination: &cfg.Healthcheck.Timeout, @@ -167,7 +176,9 @@ func CommandServe(cfg *config.Config) *cli.Command { ), Before: func(ctx *cli.Context) error { - // TODO: validate inputs + if err := cfg.Preprocess(); err != nil { + return err + } return nil }, diff --git a/config/config.go b/config/config.go index 1495364..4cab2e3 100644 --- a/config/config.go +++ b/config/config.go @@ -13,3 +13,25 @@ type Config struct { HealthcheckOpNode HealthcheckOpNode `yaml:"healthcheck_op_node"` HealthcheckReth HealthcheckReth `yaml:"healthcheck_reth"` } + +func (c *Config) Preprocess() error { + errs := make([]error, 0) + + if c.Healthcheck.BlockAgeThreshold != 0 { + c.HealthcheckGeth.BlockAgeThreshold = c.Healthcheck.BlockAgeThreshold + c.HealthcheckLighthouse.BlockAgeThreshold = c.Healthcheck.BlockAgeThreshold + c.HealthcheckOpNode.BlockAgeThreshold = c.Healthcheck.BlockAgeThreshold + c.HealthcheckReth.BlockAgeThreshold = c.Healthcheck.BlockAgeThreshold + } + + errs = append(errs, c.Log.Preprocess()) + errs = append(errs, c.Server.Preprocess()) + errs = append(errs, c.HttpStatus.Preprocess()) + errs = append(errs, c.Healthcheck.Preprocess()) + errs = append(errs, c.HealthcheckGeth.Preprocess()) + errs = append(errs, c.HealthcheckLighthouse.Preprocess()) + errs = append(errs, c.HealthcheckOpNode.Preprocess()) + errs = append(errs, c.HealthcheckReth.Preprocess()) + + return flatten(errs) +} diff --git a/config/errors.go b/config/errors.go new file mode 100644 index 0000000..d4c4a8e --- /dev/null +++ b/config/errors.go @@ -0,0 +1,23 @@ +package config + +import "errors" + +func flatten(errs []error) error { + next := 0 + for _, err := range errs { + if err != nil { + errs[next] = err + next++ + } + } + errs = errs[:next] + + switch len(errs) { + default: + return errors.Join(errs...) + case 1: + return errs[0] + case 0: + return nil + } +} diff --git a/config/healthcheck.go b/config/healthcheck.go index e16e032..ed28f7c 100644 --- a/config/healthcheck.go +++ b/config/healthcheck.go @@ -3,6 +3,11 @@ package config import "time" type Healthcheck struct { - CacheTimeout time.Duration `yaml:"cache_timeout"` - Timeout time.Duration `yaml:"timeout"` + BlockAgeThreshold time.Duration `yaml:"block_age_threshold"` + CacheCoolOff time.Duration `yaml:"cache_cool_off"` + Timeout time.Duration `yaml:"timeout"` +} + +func (c *Healthcheck) Preprocess() error { + return nil } diff --git a/config/healthcheck_geth.go b/config/healthcheck_geth.go index 62fbad6..c01f334 100644 --- a/config/healthcheck_geth.go +++ b/config/healthcheck_geth.go @@ -1,5 +1,23 @@ package config +import ( + "fmt" + "net/url" + "time" +) + type HealthcheckGeth struct { - BaseURL string `yaml:"base_url"` + BaseURL string `yaml:"base_url"` + BlockAgeThreshold time.Duration `yaml:"-"` +} + +func (c *HealthcheckGeth) Preprocess() error { + if c.BaseURL != "" { + if _, err := url.Parse(c.BaseURL); err != nil { + return fmt.Errorf("invalid geth base url: %w", + err, + ) + } + } + return nil } diff --git a/config/healthcheck_lighthouse.go b/config/healthcheck_lighthouse.go index 0203a79..a5881e6 100644 --- a/config/healthcheck_lighthouse.go +++ b/config/healthcheck_lighthouse.go @@ -1,5 +1,23 @@ package config +import ( + "fmt" + "net/url" + "time" +) + type HealthcheckLighthouse struct { - BaseURL string `yaml:"base_url"` + BaseURL string `yaml:"base_url"` + BlockAgeThreshold time.Duration `yaml:"-"` +} + +func (c *HealthcheckLighthouse) Preprocess() error { + if c.BaseURL != "" { + if _, err := url.Parse(c.BaseURL); err != nil { + return fmt.Errorf("invalid lighthouse base url: %w", + err, + ) + } + } + return nil } diff --git a/config/healthcheck_op_node.go b/config/healthcheck_op_node.go index 26b9b25..410178e 100644 --- a/config/healthcheck_op_node.go +++ b/config/healthcheck_op_node.go @@ -1,6 +1,24 @@ package config +import ( + "fmt" + "net/url" + "time" +) + type HealthcheckOpNode struct { - BaseURL string `yaml:"base_url"` - ConfirmationDistance uint64 `yaml:"confirmation_distance"` + BaseURL string `yaml:"base_url"` + BlockAgeThreshold time.Duration `yaml:"-"` + ConfirmationDistance uint64 `yaml:"confirmation_distance"` +} + +func (c *HealthcheckOpNode) Preprocess() error { + if c.BaseURL != "" { + if _, err := url.Parse(c.BaseURL); err != nil { + return fmt.Errorf("invalid op-node base url: %w", + err, + ) + } + } + return nil } diff --git a/config/healthcheck_reth.go b/config/healthcheck_reth.go index 47d5c34..6807096 100644 --- a/config/healthcheck_reth.go +++ b/config/healthcheck_reth.go @@ -1,5 +1,23 @@ package config +import ( + "fmt" + "net/url" + "time" +) + type HealthcheckReth struct { - BaseURL string `yaml:"base_url"` + BaseURL string `yaml:"base_url"` + BlockAgeThreshold time.Duration `yaml:"-"` +} + +func (c *HealthcheckReth) Preprocess() error { + if c.BaseURL != "" { + if _, err := url.Parse(c.BaseURL); err != nil { + return fmt.Errorf("invalid reth base url: %w", + err, + ) + } + } + return nil } diff --git a/config/http_status.go b/config/http_status.go index 5ce8975..651a4a3 100644 --- a/config/http_status.go +++ b/config/http_status.go @@ -5,3 +5,7 @@ type HttpStatus struct { Warning int `yaml:"warning"` Error int `yaml:"error"` } + +func (c *HttpStatus) Preprocess() error { + return nil +} diff --git a/config/log.go b/config/log.go index c57b2db..2b40e78 100644 --- a/config/log.go +++ b/config/log.go @@ -4,3 +4,7 @@ type Log struct { Level string `yaml:"level"` Mode string `yaml:"mode"` } + +func (c *Log) Preprocess() error { + return nil +} diff --git a/config/server.go b/config/server.go index 64c242b..66c75f2 100644 --- a/config/server.go +++ b/config/server.go @@ -3,3 +3,7 @@ package config type Server struct { ListenAddress string `yaml:"listen_address"` } + +func (c *Server) Preprocess() error { + return nil +} diff --git a/healthcheck/geth.go b/healthcheck/geth.go index 9775809..1538c18 100644 --- a/healthcheck/geth.go +++ b/healthcheck/geth.go @@ -8,6 +8,9 @@ import ( "fmt" "io" "net/http" + "strconv" + "strings" + "time" "github.com/flashbots/node-healthchecker/config" ) @@ -60,64 +63,149 @@ type gethIsSyncing struct { } `json:"result"` } +// gethLatestBlock is the latest block as reported by geth +type gethLatestBlock struct { + Result struct { + Timestamp string `json:"timestamp"` + } `json:"result"` +} + func Geth(ctx context.Context, cfg *config.HealthcheckGeth) *Result { - // https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_syncing - // https://github.com/ethereum/go-ethereum/blob/v1.14.8/interfaces.go#L98-L127 - - const query = `{"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":0}` - - req, err := http.NewRequestWithContext( - ctx, - http.MethodGet, - cfg.BaseURL, - bytes.NewReader([]byte(query)), - ) - if err != nil { - return &Result{Err: err} - } - req.Header.Set("accept", "application/json") - req.Header.Set("content-type", "application/json") + { // eth_syncing - res, err := http.DefaultClient.Do(req) - if err != nil { - return &Result{Err: err} - } - defer res.Body.Close() + // https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_syncing + // https://github.com/ethereum/go-ethereum/blob/v1.14.8/interfaces.go#L98-L127 - body, err := io.ReadAll(res.Body) - if err != nil { - return &Result{Err: err} - } + const ethSyncing = `{"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":0}` - if res.StatusCode != http.StatusOK { - return &Result{ - Err: fmt.Errorf("unexpected HTTP status '%d': %s", - res.StatusCode, - string(body), - ), + req, err := http.NewRequestWithContext( + ctx, + http.MethodGet, + cfg.BaseURL, + bytes.NewReader([]byte(ethSyncing)), + ) + if err != nil { + return &Result{Err: fmt.Errorf("geth: %w", err)} } - } + req.Header.Set("accept", "application/json") + req.Header.Set("content-type", "application/json") - var status gethIsNotSyncing - if err := json.Unmarshal(body, &status); err != nil { - var status gethIsSyncing - if err2 := json.Unmarshal(body, &status); err2 != nil { + res, err := http.DefaultClient.Do(req) + if err != nil { + return &Result{Err: fmt.Errorf("geth: %w", err)} + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return &Result{Err: fmt.Errorf("geth: %w", err)} + } + + if res.StatusCode != http.StatusOK { return &Result{ - Err: fmt.Errorf("failed to parse JSON body '%s': %w", + Err: fmt.Errorf("geth: unexpected HTTP status '%d': %s", + res.StatusCode, string(body), - errors.Join(err, err2), ), } } - return &Result{ - Err: fmt.Errorf("geth is still syncing: Current:=%s, Highest=%s", - status.Result.CurrentBlock, - status.Result.HighestBlock, - ), + + var status gethIsNotSyncing + if err := json.Unmarshal(body, &status); err != nil { + var status gethIsSyncing + if err2 := json.Unmarshal(body, &status); err2 != nil { + return &Result{ + Err: fmt.Errorf("geth: failed to parse JSON body '%s': %w", + string(body), + errors.Join(err, err2), + ), + } + } + return &Result{ + Err: fmt.Errorf("geth: still syncing (current: '%s', highest: '%s')", + status.Result.CurrentBlock, + status.Result.HighestBlock, + ), + } + } + if status.Result { // i.e. it's syncing + return &Result{Err: errors.New("geth: still syncing")} } } - if status.Result { // i.e. it's syncing - return &Result{Err: errors.New("geth is (still) syncing")} + + { // eth_getBlockByNumber + const ethGetBlockByNumber = `{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest",false],"id":0}` + + if cfg.BlockAgeThreshold != 0 { + req, err := http.NewRequestWithContext( + ctx, + http.MethodGet, + cfg.BaseURL, + bytes.NewReader([]byte(ethGetBlockByNumber)), + ) + if err != nil { + return &Result{Err: err} + } + req.Header.Set("accept", "application/json") + req.Header.Set("content-type", "application/json") + + now := time.Now() + res, err := http.DefaultClient.Do(req) + if err != nil { + return &Result{Err: err} + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return &Result{Err: err} + } + + if res.StatusCode != http.StatusOK { + return &Result{ + Err: fmt.Errorf("geth: unexpected HTTP status '%d': %s", + res.StatusCode, + string(body), + ), + } + } + + var latestBlock gethLatestBlock + if err := json.Unmarshal(body, &latestBlock); err != nil { + return &Result{ + Err: fmt.Errorf("geth: failed to parse JSON body '%s': %w", + string(body), + err, + ), + } + } + + epoch, err := strconv.ParseInt( + strings.TrimPrefix(latestBlock.Result.Timestamp, "0x"), + 16, 64, + ) + if err != nil { + return &Result{ + Err: fmt.Errorf("geth: failed to parse hex timestamp '%s': %w", + latestBlock.Result.Timestamp, + err, + ), + } + } + + timestamp := time.Unix(epoch, 0) + age := now.Sub(timestamp) + + if age > cfg.BlockAgeThreshold { + return &Result{ + Err: fmt.Errorf("geth: latest block's timestamp '%s' is too old: %s > %s", + latestBlock.Result.Timestamp, + age, + cfg.BlockAgeThreshold, + ), + } + } + } } return &Result{Ok: true} diff --git a/healthcheck/lighthouse.go b/healthcheck/lighthouse.go index f4665e4..4477135 100644 --- a/healthcheck/lighthouse.go +++ b/healthcheck/lighthouse.go @@ -8,6 +8,8 @@ import ( "io" "net/http" "net/url" + "strconv" + "time" "github.com/flashbots/node-healthchecker/config" ) @@ -66,95 +68,182 @@ type lighthouseStateAsStruct struct { } `json:"data"` } +type lighthouseBeaconBlocksHead struct { + Data struct { + Message struct { + Slot string `json:"slot"` + + Body struct { + ExecutionPayload struct { + Timestamp string `json:"timestamp"` + } `json:"execution_payload"` + } `json:"body"` + } `json:"message"` + } `json:"data"` +} + func Lighthouse(ctx context.Context, cfg *config.HealthcheckLighthouse) *Result { - // https://lighthouse-book.sigmaprime.io/api-lighthouse.html#lighthousesyncing - // https://github.com/sigp/lighthouse/blob/v4.5.0/beacon_node/lighthouse_network/src/types/sync_state.rs#L6-L27 + { // lighthouse/syncing - _url, err := url.JoinPath(cfg.BaseURL, "lighthouse/syncing") - if err != nil { - return &Result{Err: err} - } + // https://lighthouse-book.sigmaprime.io/api-lighthouse.html#lighthousesyncing + // https://github.com/sigp/lighthouse/blob/v4.5.0/beacon_node/lighthouse_network/src/types/sync_state.rs#L6-L27 - req, err := http.NewRequestWithContext( - ctx, - http.MethodGet, - _url, - nil, - ) - if err != nil { - return &Result{Err: err} - } - req.Header.Set("accept", "application/json") + _url, err := url.JoinPath(cfg.BaseURL, "lighthouse/syncing") + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: %w", err)} + } - res, err := http.DefaultClient.Do(req) - if err != nil { - return &Result{Err: err} - } - defer res.Body.Close() + req, err := http.NewRequestWithContext( + ctx, + http.MethodGet, + _url, + nil, + ) + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: %w", err)} + } + req.Header.Set("accept", "application/json") - body, err := io.ReadAll(res.Body) - if err != nil { - return &Result{Err: err} - } + res, err := http.DefaultClient.Do(req) + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: %w", err)} + } + defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return &Result{Err: fmt.Errorf("unexpected HTTP status '%d': %s", - res.StatusCode, - string(body), - )} - } + body, err := io.ReadAll(res.Body) + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: %w", err)} + } - var state lighthouseStateAsString - if err := json.Unmarshal(body, &state); err != nil { - var state lighthouseStateAsStruct - if err2 := json.Unmarshal(body, &state); err2 != nil { - return &Result{Err: fmt.Errorf("failed to parse JSON body '%s': %w", + if res.StatusCode != http.StatusOK { + return &Result{Err: fmt.Errorf("lighthouse: unexpected HTTP status '%d': %s", + res.StatusCode, string(body), - errors.Join(err, err2), )} } - switch { - case state.Data.BackFillSyncing != nil: - // - // BackBillSyncing is "ok" because that's the state lighthouse - // switches to after checkpoint sync is complete. - // - // See: https://lighthouse-book.sigmaprime.io/checkpoint-sync.html#backfilling-blocks - // - return &Result{ - Ok: true, - Err: fmt.Errorf("lighthouse is in 'BackFillSyncing' state (completed: %d, remaining: %d)", - state.Data.BackFillSyncing.Completed, - state.Data.BackFillSyncing.Remaining, - ), - } - case state.Data.SyncingFinalized != nil: - return &Result{ - Err: fmt.Errorf("lighthouse is in 'SyncingFinalized' state (start_slot: '%s', target_slot: '%s')", - state.Data.SyncingFinalized.StartSlot, - state.Data.SyncingFinalized.TargetSlot, - ), + + var state lighthouseStateAsString + if err := json.Unmarshal(body, &state); err != nil { + var state lighthouseStateAsStruct + if err2 := json.Unmarshal(body, &state); err2 != nil { + return &Result{Err: fmt.Errorf("lighthouse: failed to parse JSON body '%s': %w", + string(body), + errors.Join(err, err2), + )} } - case state.Data.SyncingHead != nil: - return &Result{ - Err: fmt.Errorf("lighthouse is in 'SyncingHead' state (start_slot: '%s', target_slot: '%s')", - state.Data.SyncingHead.StartSlot, - state.Data.SyncingHead.TargetSlot, - ), + switch { + case state.Data.BackFillSyncing != nil: + // + // BackBillSyncing is "ok" because that's the state lighthouse + // switches to after checkpoint sync is complete. + // + // See: https://lighthouse-book.sigmaprime.io/checkpoint-sync.html#backfilling-blocks + // + return &Result{ + Ok: true, + Err: fmt.Errorf("lighthouse: is in 'BackFillSyncing' state (completed: %d, remaining: %d)", + state.Data.BackFillSyncing.Completed, + state.Data.BackFillSyncing.Remaining, + ), + } + case state.Data.SyncingFinalized != nil: + return &Result{ + Err: fmt.Errorf("lighthouse: is in 'SyncingFinalized' state (start_slot: '%s', target_slot: '%s')", + state.Data.SyncingFinalized.StartSlot, + state.Data.SyncingFinalized.TargetSlot, + ), + } + case state.Data.SyncingHead != nil: + return &Result{ + Err: fmt.Errorf("lighthouse: is in 'SyncingHead' state (start_slot: '%s', target_slot: '%s')", + state.Data.SyncingHead.StartSlot, + state.Data.SyncingHead.TargetSlot, + ), + } + default: + return &Result{ + Err: fmt.Errorf("lighthouse: is in unrecognised state: %s", + string(body), + ), + } } - default: + } + if state.Data != "Synced" { return &Result{ - Err: fmt.Errorf("lighthouse is in unrecognised state: %s", - string(body), + Err: fmt.Errorf("lighthouse: is not in synced state: %s", + state.Data, ), } } } - if state.Data != "Synced" { - return &Result{ - Err: fmt.Errorf("lighthouse is not in synced state: %s", - state.Data, - ), + + { // eth/v2/beacon/blocks/head + if cfg.BlockAgeThreshold != 0 { + // https://github.com/sigp/lighthouse/blob/v4.5.0/consensus/types/src/execution_payload.rs#L50-L86 + + _url, err := url.JoinPath(cfg.BaseURL, "eth/v2/beacon/blocks/head") + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: %w", err)} + } + + req, err := http.NewRequestWithContext( + ctx, + http.MethodGet, + _url, + nil, + ) + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: %w", err)} + } + req.Header.Set("accept", "application/json") + + now := time.Now() + res, err := http.DefaultClient.Do(req) + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: %w", err)} + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: %w", err)} + } + + if res.StatusCode != http.StatusOK { + return &Result{Err: fmt.Errorf("lighthouse: unexpected HTTP status '%d': %s", + res.StatusCode, + string(body), + )} + } + + var head lighthouseBeaconBlocksHead + if err := json.Unmarshal(body, &head); err != nil { + return &Result{Err: fmt.Errorf("lighthouse: failed to parse JSON body '%s': %w", + string(body), + err, + )} + } + + epoch, err := strconv.Atoi(head.Data.Message.Body.ExecutionPayload.Timestamp) + if err != nil { + return &Result{Err: fmt.Errorf("lighthouse: failed to parse timestamp '%s': %w", + head.Data.Message.Body.ExecutionPayload.Timestamp, + err, + )} + } + timestamp := time.Unix(int64(epoch), 0) + age := now.Sub(timestamp) + + if age > cfg.BlockAgeThreshold { + return &Result{ + Err: fmt.Errorf("lighthouse: beacon head timestamp '%s' (slot '%s') is too old: %s > %s", + head.Data.Message.Body.ExecutionPayload.Timestamp, + head.Data.Message.Slot, + age, + cfg.BlockAgeThreshold, + ), + } + } } } diff --git a/healthcheck/op_node.go b/healthcheck/op_node.go index 30199a4..bda7257 100644 --- a/healthcheck/op_node.go +++ b/healthcheck/op_node.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "time" "github.com/flashbots/node-healthchecker/config" ) @@ -16,51 +17,55 @@ import ( // Values may be zeroed if not yet initialized. type opNodeSyncStatus struct { Result struct { - // CurrentL1 is the L1 block that the derivation process is last idled at. + // CurrentL1 is the L1 block that the derivation process is last idled + // at. // // This may not be fully derived into L2 data yet. // - // The safe L2 blocks were produced/included fully from the L1 chain up to - // and including this L1 block. + // The safe L2 blocks were produced/included fully from the L1 chain up + // to and including this L1 block. // // If the node is synced, this matches the HeadL1, minus the verifier // confirmation distance. CurrentL1 opNodeL1BlockRef `json:"current_l1"` - // HeadL1 is the perceived head of the L1 chain, no confirmation distance. + // HeadL1 is the perceived head of the L1 chain, no confirmation + // distance. // - // The head is not guaranteed to build on the other L1 sync status fields, - // as the node may be in progress of resetting to adapt to a L1 reorg. + // The head is not guaranteed to build on the other L1 sync status + // fields, as the node may be in progress of resetting to adapt to a L1 + // reorg. HeadL1 opNodeL1BlockRef `json:"head_l1"` SafeL1 opNodeL1BlockRef `json:"safe_l1"` FinalizedL1 opNodeL1BlockRef `json:"finalized_l1"` - // UnsafeL2 is the absolute tip of the L2 chain, pointing to block data that - // has not been submitted to L1 yet. + // UnsafeL2 is the absolute tip of the L2 chain, pointing to block data + // that has not been submitted to L1 yet. // - // The sequencer is building this, and verifiers may also be ahead of the - // SafeL2 block if they sync blocks via p2p or other offchain sources. + // The sequencer is building this, and verifiers may also be ahead of + // the SafeL2 block if they sync blocks via p2p or other offchain + // sources. // - // This is considered to only be local-unsafe post-interop, see CrossUnsafe - // for cross-L2 guarantees. + // This is considered to only be local-unsafe post-interop, see + // CrossUnsafe for cross-L2 guarantees. UnsafeL2 opNodeL2BlockRef `json:"unsafe_l2"` // SafeL2 points to the L2 block that was derived from the L1 chain. // // This point may still reorg if the L1 chain reorgs. // - // This is considered to be cross-safe post-interop, see LocalSafe to ignore - // cross-L2 guarantees. + // This is considered to be cross-safe post-interop, see LocalSafe to + // ignore cross-L2 guarantees. SafeL2 opNodeL2BlockRef `json:"safe_l2"` - // FinalizedL2 points to the L2 block that was derived fully from finalized - // L1 information, thus irreversible. + // FinalizedL2 points to the L2 block that was derived fully from + // finalized L1 information, thus irreversible. FinalizedL2 opNodeL2BlockRef `json:"finalized_l2"` - // PendingSafeL2 points to the L2 block processed from the batch, but not - // consolidated to the safe block yet. + // PendingSafeL2 points to the L2 block processed from the batch, but + // not consolidated to the safe block yet. PendingSafeL2 opNodeL2BlockRef `json:"pending_safe_l2"` // CrossUnsafeL2 is an unsafe L2 block, that has been verified to match @@ -96,77 +101,96 @@ type opNodeL2BlockRef struct { } func OpNode(ctx context.Context, cfg *config.HealthcheckOpNode) *Result { - // https://docs.optimism.io/builders/node-operators/json-rpc#optimism_syncstatus - // https://github.com/ethereum-optimism/optimism/blob/v1.9.1/op-service/eth/sync_status.go#L5-L34 - - const query = `{"jsonrpc":"2.0","method":"optimism_syncStatus","params":[],"id":0}` - - req, err := http.NewRequestWithContext( - ctx, - http.MethodPost, - cfg.BaseURL, - bytes.NewReader([]byte(query)), - ) - if err != nil { - return &Result{Err: err} - } - req.Header.Set("accept", "application/json") - req.Header.Set("content-type", "application/json") + { // optimism_syncStatus - res, err := http.DefaultClient.Do(req) - if err != nil { - return &Result{Err: err} - } - defer res.Body.Close() + // https://docs.optimism.io/builders/node-operators/json-rpc#optimism_syncstatus + // https://github.com/ethereum-optimism/optimism/blob/v1.9.1/op-service/eth/sync_status.go#L5-L34 - body, err := io.ReadAll(res.Body) - if err != nil { - return &Result{Err: err} - } + const optimismSyncStatus = `{"jsonrpc":"2.0","method":"optimism_syncStatus","params":[],"id":0}` - if res.StatusCode != http.StatusOK { - return &Result{ - Err: fmt.Errorf("unexpected HTTP status '%d': %s", - res.StatusCode, - string(body), - ), + req, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + cfg.BaseURL, + bytes.NewReader([]byte(optimismSyncStatus)), + ) + if err != nil { + return &Result{Err: fmt.Errorf("op-node: %w", err)} } - } + req.Header.Set("accept", "application/json") + req.Header.Set("content-type", "application/json") - var status opNodeSyncStatus - err = json.Unmarshal(body, &status) - if err != nil { - return &Result{ - Err: fmt.Errorf("failed to parse JSON body '%s': %w", - string(body), - err, - ), + now := time.Now() + res, err := http.DefaultClient.Do(req) + if err != nil { + return &Result{Err: fmt.Errorf("op-node: %w", err)} } - } + defer res.Body.Close() - if status.Result.CurrentL1.Number > status.Result.HeadL1.Number { - dist := status.Result.CurrentL1.Number - status.Result.HeadL1.Number - if dist == 1 { - return &Result{Ok: true} + body, err := io.ReadAll(res.Body) + if err != nil { + return &Result{Err: fmt.Errorf("op-node: %w", err)} } - return &Result{ - Ok: true, - Err: fmt.Errorf("op-node's current l1 block (number: %d, hash: %s) is greater than head (number: %d, hash %s): %d - %d = %d", - status.Result.CurrentL1.Number, status.Result.CurrentL1.Hash, - status.Result.HeadL1.Number, status.Result.HeadL1.Hash, - status.Result.CurrentL1.Number, status.Result.HeadL1.Number, dist, - ), + + if res.StatusCode != http.StatusOK { + return &Result{ + Err: fmt.Errorf("op-node: unexpected HTTP status '%d': %s", + res.StatusCode, + string(body), + ), + } + } + + var status opNodeSyncStatus + err = json.Unmarshal(body, &status) + if err != nil { + return &Result{ + Err: fmt.Errorf("op-node: failed to parse JSON body '%s': %w", + string(body), + err, + ), + } + } + + if status.Result.CurrentL1.Number > status.Result.HeadL1.Number { + dist := status.Result.CurrentL1.Number - status.Result.HeadL1.Number + if dist == 1 { + return &Result{Ok: true} + } + return &Result{ + Ok: true, + Err: fmt.Errorf("op-node: current l1 block (number: %d, hash: %s) is greater than head (number: %d, hash %s): %d - %d = %d", + status.Result.CurrentL1.Number, status.Result.CurrentL1.Hash, + status.Result.HeadL1.Number, status.Result.HeadL1.Hash, + status.Result.CurrentL1.Number, status.Result.HeadL1.Number, dist, + ), + } + } + + dist := status.Result.HeadL1.Number - status.Result.CurrentL1.Number + if dist > cfg.ConfirmationDistance { + return &Result{ + Err: fmt.Errorf("op-node: current l1 block (number: %d, hash: %s) is behind the l1 head (number: %d, hash: %s) for more than confirmation distance: %d > %d", + status.Result.CurrentL1.Number, status.Result.CurrentL1.Hash, + status.Result.HeadL1.Number, status.Result.HeadL1.Hash, + dist, cfg.ConfirmationDistance, + ), + } } - } - dist := status.Result.HeadL1.Number - status.Result.CurrentL1.Number - if dist > cfg.ConfirmationDistance { - return &Result{ - Err: fmt.Errorf("op-node's current l1 block (number: %d, hash: %s) is behind the l1 head (number: %d, hash: %s) for more than confirmation distance: %d > %d", - status.Result.CurrentL1.Number, status.Result.CurrentL1.Hash, - status.Result.HeadL1.Number, status.Result.HeadL1.Hash, - dist, cfg.ConfirmationDistance, - ), + if cfg.BlockAgeThreshold != 0 { + timestamp := time.Unix(int64(status.Result.UnsafeL2.Time), 0) + age := now.Sub(timestamp) + + if age > cfg.BlockAgeThreshold { + return &Result{ + Err: fmt.Errorf("op-node: latest l2 unsafe timestamp %d is too old: %s > %s", + status.Result.UnsafeL2.Time, + age, + cfg.BlockAgeThreshold, + ), + } + } } } diff --git a/healthcheck/reth.go b/healthcheck/reth.go index fe5fe2f..341ea3f 100644 --- a/healthcheck/reth.go +++ b/healthcheck/reth.go @@ -8,7 +8,9 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" + "time" "github.com/flashbots/node-healthchecker/config" ) @@ -47,69 +49,154 @@ type rethIsSyncing struct { } `json:"result"` } +// rethLatestBlock is the latest block as reported by reth +type rethLatestBlock struct { + Result struct { + Timestamp string `json:"timestamp"` + } `json:"result"` +} + func Reth(ctx context.Context, cfg *config.HealthcheckReth) *Result { - // https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_syncing - // https://github.com/alloy-rs/alloy/blob/v0.3.5/crates/rpc-types-eth/src/syncing.rs#L8-L36 - - const query = `{"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":0}` - - req, err := http.NewRequestWithContext( - ctx, - http.MethodPost, - cfg.BaseURL, - bytes.NewReader([]byte(query)), - ) - if err != nil { - return &Result{Err: err} - } - req.Header.Set("accept", "application/json; charset=utf-8") - req.Header.Set("content-type", "application/json; charset=utf-8") + { // eth_syncing - res, err := http.DefaultClient.Do(req) - if err != nil { - return &Result{Err: err} - } - defer res.Body.Close() + // https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_syncing + // https://github.com/alloy-rs/alloy/blob/v0.3.5/crates/rpc-types-eth/src/syncing.rs#L8-L36 - body, err := io.ReadAll(res.Body) - if err != nil { - return &Result{Err: err} - } + const ethSyncing = `{"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":0}` - if res.StatusCode != http.StatusOK { - return &Result{ - Err: fmt.Errorf("unexpected HTTP status '%d': %s", - res.StatusCode, - string(body), - ), + req, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + cfg.BaseURL, + bytes.NewReader([]byte(ethSyncing)), + ) + if err != nil { + return &Result{Err: fmt.Errorf("reth: %w", err)} } - } + req.Header.Set("accept", "application/json; charset=utf-8") + req.Header.Set("content-type", "application/json; charset=utf-8") - var status rethIsNotSyncing - if err := json.Unmarshal(body, &status); err != nil { - var status rethIsSyncing - if err2 := json.Unmarshal(body, &status); err2 != nil { + res, err := http.DefaultClient.Do(req) + if err != nil { + return &Result{Err: fmt.Errorf("reth: %w", err)} + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return &Result{Err: fmt.Errorf("reth: %w", err)} + } + + if res.StatusCode != http.StatusOK { return &Result{ - Err: fmt.Errorf("failed to parse JSON body '%s': %w", + Err: fmt.Errorf("reth: unexpected HTTP status '%d': %s", + res.StatusCode, string(body), - errors.Join(err, err2), ), } } - stages := make([]string, 0, len(status.Result.Stages)) - for idx, stage := range status.Result.Stages { - stages = append(stages, fmt.Sprintf("%s(%d)=%s", stage.Name, idx, stage.Block)) + + var status rethIsNotSyncing + if err := json.Unmarshal(body, &status); err != nil { + var status rethIsSyncing + if err2 := json.Unmarshal(body, &status); err2 != nil { + return &Result{ + Err: fmt.Errorf("reth: failed to parse JSON body '%s': %w", + string(body), + errors.Join(err, err2), + ), + } + } + stages := make([]string, 0, len(status.Result.Stages)) + for idx, stage := range status.Result.Stages { + stages = append(stages, fmt.Sprintf("%s(%d)=%s", stage.Name, idx, stage.Block)) + } + return &Result{ + Err: fmt.Errorf("reth: still syncing (current: %s, highest: %s): %s", + status.Result.CurrentBlock, + status.Result.HighestBlock, + strings.Join(stages, ", "), + ), + } } - return &Result{ - Err: fmt.Errorf("reth is still syncing: Current=%s, Highest=%s, %s", - status.Result.CurrentBlock, - status.Result.HighestBlock, - strings.Join(stages, ", "), - ), + if status.Result { // i.e. it's syncing + return &Result{Err: errors.New("reth: still syncing")} } } - if status.Result { // i.e. it's syncing - return &Result{Err: errors.New("reth is (still) syncing")} + + { // eth_getBlockByNumber + const ethGetBlockByNumber = `{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest",false],"id":0}` + + if cfg.BlockAgeThreshold != 0 { + req, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + cfg.BaseURL, + bytes.NewReader([]byte(ethGetBlockByNumber)), + ) + if err != nil { + return &Result{Err: err} + } + req.Header.Set("accept", "application/json") + req.Header.Set("content-type", "application/json") + + now := time.Now() + res, err := http.DefaultClient.Do(req) + if err != nil { + return &Result{Err: err} + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return &Result{Err: err} + } + + if res.StatusCode != http.StatusOK { + return &Result{ + Err: fmt.Errorf("reth: unexpected HTTP status '%d': %s", + res.StatusCode, + string(body), + ), + } + } + + var latestBlock rethLatestBlock + if err := json.Unmarshal(body, &latestBlock); err != nil { + return &Result{ + Err: fmt.Errorf("reth: failed to parse JSON body '%s': %w", + string(body), + err, + ), + } + } + + epoch, err := strconv.ParseInt( + strings.TrimPrefix(latestBlock.Result.Timestamp, "0x"), + 16, 64, + ) + if err != nil { + return &Result{ + Err: fmt.Errorf("reth: failed to parse hex timestamp '%s': %w", + latestBlock.Result.Timestamp, + err, + ), + } + } + + timestamp := time.Unix(epoch, 0) + age := now.Sub(timestamp) + + if age > cfg.BlockAgeThreshold { + return &Result{ + Err: fmt.Errorf("reth: latest block's timestamp '%s' is too old: %s > %s", + latestBlock.Result.Timestamp, + age, + cfg.BlockAgeThreshold, + ), + } + } + } } return &Result{Ok: true} diff --git a/server/healthcheck.go b/server/healthcheck.go index b6f2c66..bb97abe 100644 --- a/server/healthcheck.go +++ b/server/healthcheck.go @@ -13,18 +13,18 @@ import ( ) func (s *Server) healthcheck(w http.ResponseWriter, r *http.Request) { - if s.cfg.Healthcheck.CacheTimeout != 0 { + if s.cfg.Healthcheck.CacheCoolOff != 0 { s.cache.mx.Lock() defer s.cache.mx.Unlock() now := time.Now() if s.cache.expiry.After(now) { - s.report(w, r, s.cache.errs, s.cache.wrns) + s.report(w, r, true, s.cache.errs, s.cache.wrns) return } - s.cache.expiry = now.Add(s.cfg.Healthcheck.CacheTimeout) + s.cache.expiry = now.Add(s.cfg.Healthcheck.CacheCoolOff) } count := len(s.monitors) @@ -53,17 +53,21 @@ func (s *Server) healthcheck(w http.ResponseWriter, r *http.Request) { } close(results) - s.report(w, r, errs, wrns) + s.report(w, r, false, errs, wrns) - if s.cfg.Healthcheck.CacheTimeout != 0 { + if s.cfg.Healthcheck.CacheCoolOff != 0 { s.cache.errs = errs - s.cache.wrns = wrns + s.cache.wrns = append(wrns, errors.New("cached healthcheck")) } } -func (s *Server) report(w http.ResponseWriter, r *http.Request, errs, wrns []error) { +func (s *Server) report(w http.ResponseWriter, r *http.Request, cached bool, errs, wrns []error) { l := logutils.LoggerFromRequest(r) + if cached { + l.Debug("Sending cached healthcheck") + } + switch { case len(errs) == 0 && len(wrns) == 0: w.WriteHeader(s.cfg.HttpStatus.Ok) @@ -93,17 +97,19 @@ func (s *Server) report(w http.ResponseWriter, r *http.Request, errs, wrns []err } } - l.Warn("Healthcheck encountered upstream error(s)", - zap.Error(errors.Join(errs...)), - zap.Int("http_status", s.cfg.HttpStatus.Error), - ) + if !cached { + l.Warn("Healthcheck encountered upstream error(s)", + zap.Error(errors.Join(errs...)), + zap.Int("http_status", s.cfg.HttpStatus.Error), + ) + } case len(errs) == 0 && len(wrns) > 0: w.WriteHeader(s.cfg.HttpStatus.Warning) w.Header().Set("Content-Type", "application/text") for idx, warn := range wrns { - line := fmt.Sprintf("%d: %s\n", idx, warn) + line := fmt.Sprintf("%d: warning: %s\n", idx, warn) _, err := w.Write([]byte(line)) if err != nil { l.Error("Failed to write the response body", @@ -112,9 +118,11 @@ func (s *Server) report(w http.ResponseWriter, r *http.Request, errs, wrns []err } } - l.Warn("Healthcheck encountered upstream error(s)", - zap.Error(errors.Join(errs...)), - zap.Int("http_status", s.cfg.HttpStatus.Warning), - ) + if !cached { + l.Warn("Healthcheck encountered upstream error(s)", + zap.Error(errors.Join(errs...)), + zap.Int("http_status", s.cfg.HttpStatus.Warning), + ) + } } } diff --git a/server/server.go b/server/server.go index 397b495..e451eb2 100644 --- a/server/server.go +++ b/server/server.go @@ -64,7 +64,7 @@ func New(cfg *config.Config) (*Server, error) { monitors: monitors, } - if cfg.Healthcheck.CacheTimeout != 0 { + if cfg.Healthcheck.CacheCoolOff != 0 { s.cache = &cache{} }