Skip to content

Commit

Permalink
WIP Smart worker check for open window + persistent error mgmt (#66)
Browse files Browse the repository at this point in the history
<!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺
v           ✰  Thanks for creating a PR! You're awesome! ✰
v Please note that maintainers will only review those PRs with a
completed PR template.
☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >  -->

## Purpose of Changes and their Description

Smart window detection, reducing the need queries for open nonces
* Refactored mechanism to use same for workers and reputers via
interface.
* Fair offset calculation within window
* Configurable and generic `QueryDataWithRetry`, to further reduce the
chance of missing an epoch
* Added parameters to control the mechanism, explained in README

Other fixes
* Account Sequence and Fees error mgmt is now robust - there's
persistence of errors.
* Loglevels - adding Trace.
* Use of retries in all queries for robustness.
* More robust registration and staking, using confirmation of
registration and stake before going forward (otherwise the actor will
just get its tx continuously rejected)
* Fix panic when account on the keyring suddenly vanishes (eg cases of
key removal or rebuilding keyring)
* Added GetTopic query and others 

## Link(s) to Ticket(s) or Issue(s) resolved by this PR

## Are these changes tested and documented?

- [X] If tested, please describe how. If not, why tests are not needed.
-- Tested against testnet, also added further unit tests
- [X] If documented, please describe where. If not, describe why docs
are not needed. -- On README. Added explanation and text-based
visualization.
- [x] Added to `Unreleased` section of `CHANGELOG.md`?
  • Loading branch information
xmariachi authored Nov 14, 2024
2 parents 21e4c19 + f0c7116 commit 24729f9
Show file tree
Hide file tree
Showing 21 changed files with 982 additions and 146 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) for all versions `v1.0.0` and beyond (still considered experimental prior to v1.0.0).

## [Unreleased]
## v0.6.0

### Added
* [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of submission windows + persistent error management + query retrials + reg/stake robustness + improved logging
* [#81](https://github.com/allora-network/allora-offchain-node/pull/81) Timeout height handling on tx submission

### Removed

### Fixed

### Security


## v0.5.1

### Added
Expand Down
67 changes: 66 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ from the root directory. This will:
5. Run `docker compose up --build`. This will:
- Run the both the offchain node and the source services, communicating through endpoints attached to the internal dns

Please note that the environment variable will be created as bumdle of your config.json and allora account secrets, please make sure to remove every sectrets before commiting to remote git repository
Please note that the environment variable will be created as bumdle of your config.json and allora account secrets, please make sure to remove every secrets before commiting to remote git repository


## How to run without docker
Expand Down Expand Up @@ -69,6 +69,64 @@ Some metrics has been provided for in the node. You can access them with port `:

> Please note that we will keep updating the list as more metrics are being added
## Cycle Example

Visualization of timeline and submission windows for an actor, in this case, a worker.
Reputers have submission windows too, but they are fixed to be 1 full topic's epoch length.

Example configuration:
* Topic epoch length is 100 blocks
* Topic submission window is 10 blocks
* Near zone is 2 submission windows (20 blocks)

```
Epoch N Epoch N+1
|---------|----------------------------------|-----------------------------------|--------→
Block: 1000 1100 1200
↑ ↑ ↑
Epoch Start Epoch End Next Epoch End
(& Submission & Next Epoch Start
Window Start) (& Next Submission
Window Start)
Detailed View of Zones (assuming epoch length = 100 blocks):
Block 1000 Block 1010 Block 1080 Block 1100
|-----------|--------------------------|--------------|
|← Far Zone →|← Near Zone →|
|← SW →|
(SW = Submission Window)
Zone Breakdown (example numbers):
• Epoch Length: 100 blocks
• Submission Window: 100 blocks (coincides with epoch)
• Near Zone: Last 20 blocks (NUM_SUBMISSION_WINDOWS_FOR_SUBMISSION_NEARNESS * WorkerSubmissionWindow)
-------------------------------
- Full cycle transition points
- Block 1000: Epoch N starts & Submission Window opens
- Block 1010: Submission Window closes. Waiting for next window, typically from far zone.
- Block 1080: Enters Near Zone (more frequent checks)
- Block 1100: Epoch N ends & Epoch N+1 starts
```

### Notes

- Submissions
- Submissions are accepted within the submission window
- Submission window opens at epoch start
- Waiting Zone Behavior
- The behaviour of the node when waiting for the submission window depends on its nearness to the submission window to reduce likelihood of missing a window.
- Far Zone: Longer intervals between checks, optimized for efficiency
- This is controlled by `blockDurationEstimated` and `windowCorrectionFactor`
- Near Zone: More frequent checks with randomization for fair participation
- Submissions are separated - they must happen within the submission window

### Random offset

The node introduces a random offset to the submission time to avoid the thundering herd problem alleviating mempool congestion.

## How to configure

There are several ways to configure the node. In order of preference, you can do any of these:
Expand Down Expand Up @@ -106,6 +164,13 @@ Note: when an account sequence mismatch is detected, the node will attempt to se
- `retryDelay`: For all other errors that need retry delays.


### Smart Window Detection

The node will automatically detect the submission window length for each topic on each actor type.
This can be configured by the following settings in the config.json:
* `blockDurationEstimated`: Estimated network block time in seconds. Minimum is 1.
* `windowCorrectionFactor`: Correction factor to fine-tune the submission window length. Higher values optimize the number of calls for window checking. Minimum is 0.5.

## Configuration examples

A complete example is provided in `config.example.json`.
Expand Down
11 changes: 7 additions & 4 deletions config.cdk.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
"alloraHomeDir": "_ALLORA_WALLET_HOME_DIR_",
"gas": "_ALLORA_WALLET_GAS_",
"gasAdjustment": _ALLORA_WALLET_GAS_ADJUSTMENT_,
"gasPrices": _ALLORA_WALLET_GAS_PRICES_,
"maxFees": _ALLORA_WALLET_MAX_FEES_,
"nodeRpc": "_ALLORA_WALLET_NODE_RPC_",
"maxRetries": _ALLORA_WALLET_MAX_RETRIES_,
"delay": _ALLORA_WALLET_DELAY_,
"submitTx": _ALLORA_WALLET_SUBMIT_TX_
"retryDelay": _ALLORA_WALLET_RETRY_DELAY_,
"accountSequenceRetryDelay": _ALLORA_WALLET_ACCOUNT_SEQUENCE_RETRY_DELAY_,
"submitTx": _ALLORA_WALLET_SUBMIT_TX_,
"blockDurationEstimated": _ALLORA_WALLET_BLOCK_DURATION_ESTIMATED_,
"windowCorrectionFactor": _ALLORA_WALLET_WINDOW_CORRECTION_FACTOR_
},
"worker": [
{
"topicId": _ALLORA_WORKER_TOPIC_ID_,
"inferenceEntrypointName": "_ALLORA_WORKER_INFERENCE_ENTRYPOINT_NAME_",
"loopSeconds": _ALLORA_WORKER_LOOP_SECONDS_,
"parameters": {
"InferenceEndpoint": "_ALLORA_WORKER_INFERENCE_ENDPOINT_",
"Token": "_ALLORA_WORKER_TOKEN_"
Expand All @@ -26,7 +30,6 @@
"topicId": _ALLORA_REPUTER_TOPIC_ID_,
"groundTruthEntrypointName": "_ALLORA_REPUTER_ENTRYPOINT_NAME_",
"lossFunctionEntrypointName": "_ALLORA_REPUTER_ENTRYPOINT_NAME_",
"loopSeconds": _ALLORA_REPUTER_LOOP_SECONDS_,
"minStake": _ALLORA_REPUTER_MIN_STAKE_,
"groundTruthParameters": {
"GroundTruthEndpoint": "_ALLORA_REPUTER_SOURCE_OF_TRUTH_ENDPOINT_",
Expand Down
6 changes: 3 additions & 3 deletions config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
"maxRetries": 5,
"retryDelay": 3,
"accountSequenceRetryDelay": 5,
"submitTx": true
"submitTx": true,
"blockDurationEstimated": 10,
"windowCorrectionFactor": 0.8
},
"worker": [
{
"topicId": 1,
"inferenceEntrypointName": "api-worker-reputer",
"loopSeconds": 10,
"parameters": {
"InferenceEndpoint": "http://source:8000/inference/{Token}",
"Token": "ETH"
Expand All @@ -29,7 +30,6 @@
"topicId": 1,
"groundTruthEntrypointName": "api-worker-reputer",
"lossFunctionEntrypointName": "api-worker-reputer",
"loopSeconds": 30,
"minStake": 100000,
"groundTruthParameters": {
"GroundTruthEndpoint": "http://localhost:8888/gt/{Token}/{BlockHeight}",
Expand Down
89 changes: 76 additions & 13 deletions lib/domain_config.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package lib

import (
"errors"
"fmt"

emissions "github.com/allora-network/allora-chain/x/emissions/types"
bank "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/ignite/cli/v28/ignite/pkg/cosmosaccount"
"github.com/ignite/cli/v28/ignite/pkg/cosmosclient"
"github.com/rs/zerolog/log"
)

const (
WindowCorrectionFactorSuggestedMin = 0.5
BlockDurationEstimatedMin = 1.0
RetryDelayMin = 1
AccountSequenceRetryDelayMin = 1
)

// Properties manually provided by the user as part of UserConfig
Expand All @@ -23,6 +32,8 @@ type WalletConfig struct {
RetryDelay int64 // number of seconds to wait between retries (general case)
AccountSequenceRetryDelay int64 // number of seconds to wait between retries in case of account sequence error
SubmitTx bool // useful for dev/testing. set to false to run in dry-run processes without committing to the chain
BlockDurationEstimated float64 // estimated average block duration in seconds
WindowCorrectionFactor float64 // correction factor for the time estimation, suggested range 0.7-0.9.
}

// Properties auto-generated based on what the user has provided in WalletConfig fields of UserConfig
Expand All @@ -36,16 +47,24 @@ type ChainConfig struct {
AddressPrefix string // prefix for the allora addresses
}

type TopicActor interface {
GetTopicId() emissions.TopicId
}

type WorkerConfig struct {
TopicId emissions.TopicId
InferenceEntrypointName string
InferenceEntrypoint AlloraAdapter
ForecastEntrypointName string
ForecastEntrypoint AlloraAdapter
LoopSeconds int64 // seconds to wait between attempts to get next worker nonce
ForecastEntrypoint AlloraAdapter // seconds to wait between attempts to get next worker nonce
Parameters map[string]string // Map for variable configuration values
}

// Implement TopicActor interface for WorkerConfig
func (w WorkerConfig) GetTopicId() emissions.TopicId {
return w.TopicId
}

type ReputerConfig struct {
TopicId emissions.TopicId
GroundTruthEntrypointName string
Expand All @@ -57,11 +76,15 @@ type ReputerConfig struct {
// This is idempotent in that it will not add more stake than specified here.
// Set to 0 to effectively disable this feature and use whatever stake has already been added.
MinStake int64
LoopSeconds int64 // seconds to wait between attempts to get next reptuer nonces
GroundTruthParameters map[string]string // Map for variable configuration values
LossFunctionParameters LossFunctionParameters // Map for variable configuration values
}

// Implement TopicActor interface for ReputerConfig
func (r ReputerConfig) GetTopicId() emissions.TopicId {
return r.TopicId
}

type LossFunctionParameters struct {
LossFunctionService string
LossMethodOptions map[string]string
Expand Down Expand Up @@ -105,19 +128,59 @@ type ValueBundle struct {

// Check that each assigned entrypoint in the user config actually can be used
// for the intended purpose, else throw error
func (c *UserConfig) ValidateConfigAdapters() {
func (c *UserConfig) ValidateConfigAdapters() error {
// Validate wallet config
err := c.ValidateWalletConfig()
if err != nil {
return err
}
// Validate worker configs
for _, workerConfig := range c.Worker {
if workerConfig.InferenceEntrypoint != nil && !workerConfig.InferenceEntrypoint.CanInfer() {
log.Fatal().Interface("entrypoint", workerConfig.InferenceEntrypoint).Msg("Invalid inference entrypoint")
}
if workerConfig.ForecastEntrypoint != nil && !workerConfig.ForecastEntrypoint.CanForecast() {
log.Fatal().Interface("entrypoint", workerConfig.ForecastEntrypoint).Msg("Invalid forecast entrypoint")
err := workerConfig.ValidateWorkerConfig()
if err != nil {
return err
}
}

// Validate reputer configs
for _, reputerConfig := range c.Reputer {
if reputerConfig.GroundTruthEntrypoint != nil && !reputerConfig.GroundTruthEntrypoint.CanSourceGroundTruthAndComputeLoss() {
log.Fatal().Interface("entrypoint", reputerConfig.GroundTruthEntrypoint).Msg("Invalid loss entrypoint")
err := reputerConfig.ValidateReputerConfig()
if err != nil {
return err
}
}
return nil
}

func (c *UserConfig) ValidateWalletConfig() error {
if c.Wallet.WindowCorrectionFactor < WindowCorrectionFactorSuggestedMin {
return errors.New(fmt.Sprintf("window correction factor lower than suggested minimum: %f < %f", c.Wallet.WindowCorrectionFactor, WindowCorrectionFactorSuggestedMin))
}
if c.Wallet.BlockDurationEstimated < BlockDurationEstimatedMin {
return errors.New(fmt.Sprintf("block duration estimated lower than the minimum: %f < %f", c.Wallet.BlockDurationEstimated, BlockDurationEstimatedMin))
}
if c.Wallet.RetryDelay < RetryDelayMin {
return errors.New(fmt.Sprintf("retry delay lower than the minimum: %d < %d", c.Wallet.RetryDelay, RetryDelayMin))
}
if c.Wallet.AccountSequenceRetryDelay < AccountSequenceRetryDelayMin {
return errors.New(fmt.Sprintf("account sequence retry delay lower than the minimum: %d < %d", c.Wallet.AccountSequenceRetryDelay, AccountSequenceRetryDelayMin))
}

return nil
}

func (reputerConfig *ReputerConfig) ValidateReputerConfig() error {
if reputerConfig.GroundTruthEntrypoint != nil && !reputerConfig.GroundTruthEntrypoint.CanSourceGroundTruthAndComputeLoss() {
return errors.New("invalid loss entrypoint")
}
return nil
}

func (workerConfig *WorkerConfig) ValidateWorkerConfig() error {
if workerConfig.InferenceEntrypoint != nil && !workerConfig.InferenceEntrypoint.CanInfer() {
return errors.New("invalid inference entrypoint")
}
if workerConfig.ForecastEntrypoint != nil && !workerConfig.ForecastEntrypoint.CanForecast() {
return errors.New("invalid forecast entrypoint")
}
return nil
}
21 changes: 17 additions & 4 deletions lib/repo_query_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,32 @@ package lib

import (
"context"
"time"

cosmossdk_io_math "cosmossdk.io/math"
"github.com/cosmos/cosmos-sdk/types/query"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
)

func (node *NodeConfig) GetBalance() (cosmossdk_io_math.Int, error) {
ctx := context.Background()
resp, err := node.Chain.BankQueryClient.Balance(ctx, &banktypes.QueryBalanceRequest{
Address: node.Chain.Address,
Denom: node.Chain.DefaultBondDenom,
})

resp, err := QueryDataWithRetry(
ctx,
node.Wallet.MaxRetries,
time.Duration(node.Wallet.RetryDelay)*time.Second,
func(ctx context.Context, req query.PageRequest) (*banktypes.QueryBalanceResponse, error) {
return node.Chain.BankQueryClient.Balance(ctx, &banktypes.QueryBalanceRequest{
Address: node.Chain.Address,
Denom: node.Chain.DefaultBondDenom,
})
},
query.PageRequest{},
"get balance",
)
if err != nil {
return cosmossdk_io_math.Int{}, err
}

return resp.Balance.Amount, nil
}
31 changes: 16 additions & 15 deletions lib/repo_query_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@ package lib

import (
"context"
"encoding/json"
"time"

emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
"github.com/rs/zerolog/log"
"github.com/cosmos/cosmos-sdk/types/query"
)

func (node *NodeConfig) GetReputerValuesAtBlock(topicId emissionstypes.TopicId, nonce BlockHeight) (*emissionstypes.ValueBundle, error) {
ctx := context.Background()

req := &emissionstypes.GetNetworkInferencesAtBlockRequest{
TopicId: topicId,
BlockHeightLastInference: nonce,
}
reqJSON, err := json.Marshal(req)
if err != nil {
log.Error().Err(err).Msg("Error marshaling GetNetworkInferencesAtBlockRequest to print Msg as JSON")
} else {
log.Info().Str("req", string(reqJSON)).Msg("Getting GetNetworkInferencesAtBlockRequest from chain")
}

res, err := node.Chain.EmissionsQueryClient.GetNetworkInferencesAtBlock(ctx, req)
resp, err := QueryDataWithRetry(
ctx,
node.Wallet.MaxRetries,
time.Duration(node.Wallet.RetryDelay)*time.Second,
func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetNetworkInferencesAtBlockResponse, error) {
return node.Chain.EmissionsQueryClient.GetNetworkInferencesAtBlock(ctx, &emissionstypes.GetNetworkInferencesAtBlockRequest{
TopicId: topicId,
BlockHeightLastInference: nonce,
})
},
query.PageRequest{},
"get reputer values at block",
)
if err != nil {
return &emissionstypes.ValueBundle{}, err
}

return res.NetworkInferences, nil
return resp.NetworkInferences, nil
}
Loading

0 comments on commit 24729f9

Please sign in to comment.