From b6dfcd2efb570f5a5b5aecc033d34dbbf47b497b Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Mon, 7 Oct 2024 22:56:18 +0200 Subject: [PATCH 01/24] WIP Smart worker check for open window Interactive rebase --- lib/repo_query_nonce.go | 2 +- lib/repo_query_topic.go | 25 ++++ lib/repo_query_utils.go | 39 ++++++ usecase/spawn_actor_processes.go | 195 ++++++++++++++++++++++++-- usecase/spawn_actor_processes_test.go | 186 ++++++++++++++++++++++++ 5 files changed, 436 insertions(+), 11 deletions(-) create mode 100644 lib/repo_query_topic.go create mode 100644 lib/repo_query_utils.go create mode 100644 usecase/spawn_actor_processes_test.go diff --git a/lib/repo_query_nonce.go b/lib/repo_query_nonce.go index 0cc738c..92fa972 100644 --- a/lib/repo_query_nonce.go +++ b/lib/repo_query_nonce.go @@ -18,7 +18,7 @@ func (node *NodeConfig) GetLatestOpenWorkerNonceByTopicId(topicId emissionstypes } if len(res.Nonces.Nonces) == 0 { - return &emissionstypes.Nonce{}, err + return &emissionstypes.Nonce{}, nil } // Per `AddWorkerNonce()` in `allora-chain/x/emissions/keeper.go`, the latest nonce is first return res.Nonces.Nonces[0], nil diff --git a/lib/repo_query_topic.go b/lib/repo_query_topic.go new file mode 100644 index 0000000..a7dbff2 --- /dev/null +++ b/lib/repo_query_topic.go @@ -0,0 +1,25 @@ +package lib + +import ( + "context" + "errors" + + emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" +) + +func (node *NodeConfig) GetTopicInfo(topicId emissionstypes.TopicId) (*emissionstypes.Topic, error) { + ctx := context.Background() + + res, err := node.Chain.EmissionsQueryClient.GetTopic( + ctx, + &emissionstypes.GetTopicRequest{TopicId: topicId}, + ) + if err != nil { + return nil, err + } + + if res.Topic == nil { + return nil, errors.New("Topic not found") + } + return res.Topic, nil +} diff --git a/lib/repo_query_utils.go b/lib/repo_query_utils.go new file mode 100644 index 0000000..9562646 --- /dev/null +++ b/lib/repo_query_utils.go @@ -0,0 +1,39 @@ +package lib + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog/log" + + "github.com/cosmos/cosmos-sdk/types/query" +) + +// QueryDataWithRetry attempts to query data with a uniform backoff strategy for retries. +func QueryDataWithRetry[T any]( + ctx context.Context, + maxRetries int64, + delay time.Duration, + queryFunc func(context.Context, query.PageRequest) (T, error), + req query.PageRequest, +) (T, error) { + var result T + var err error + + for retryCount := int64(0); retryCount <= maxRetries; retryCount++ { + result, err = queryFunc(ctx, req) + if err == nil { + return result, nil + } + + // Log the error for each retry. + log.Error().Err(err).Msgf("Query failed, retrying... (Retry %d/%d)", retryCount, maxRetries) + + // Wait for the uniform delay before retrying + time.Sleep(delay) + } + + // All retries failed, return the last error + return result, fmt.Errorf("query failed after %d retries: %w", maxRetries, err) +} diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index bae41e5..476caee 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -2,10 +2,24 @@ package usecase import ( "allora_offchain_node/lib" + "context" + "errors" + "math" "sync" + "time" + errorsmod "cosmossdk.io/errors" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" + "github.com/cosmos/cosmos-sdk/types/query" "github.com/rs/zerolog/log" + "golang.org/x/exp/rand" +) + +// TODO move these to Config +const ( + blockDurationAvg float64 = 5.0 // Avg block duration in seconds + correctionFactor float64 = 0.75 // Correction factor for the time estimation + SUBMISSION_WINDOWS_TO_BE_CLOSE int64 = 2 ) func (suite *UseCaseSuite) Spawn() { @@ -47,6 +61,64 @@ func (suite *UseCaseSuite) Spawn() { wg.Wait() } +// Attempts to build and commit a worker payload for a given nonce +func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestNonceHeightActedUpon int64) (int64, error) { + // latestOpenWorkerNonce, err := suite.Node.GetLatestOpenWorkerNonceByTopicId(worker.TopicId) + latestOpenWorkerNonce, err := lib.QueryDataWithRetry( + context.Background(), + suite.Node.Wallet.MaxRetries, + time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.Nonce, error) { + return suite.Node.GetLatestOpenWorkerNonceByTopicId(worker.TopicId) + }, + query.PageRequest{}, // Empty page request as GetLatestOpenWorkerNonceByTopicId doesn't use pagination + ) + + if err != nil { + log.Warn().Err(err).Uint64("topicId", worker.TopicId).Msg("Error getting latest open worker nonce on topic - node availability issue?") + return latestNonceHeightActedUpon, err + } + + if latestOpenWorkerNonce.BlockHeight > latestNonceHeightActedUpon { + log.Debug().Uint64("topicId", worker.TopicId).Int64("BlockHeight", latestOpenWorkerNonce.BlockHeight). + Msg("Building and committing worker payload for topic") + + err := suite.BuildCommitWorkerPayload(worker, latestOpenWorkerNonce) + if err != nil { + return latestNonceHeightActedUpon, errorsmod.Wrapf(err, "error building and committing worker payload for topic") + } + return latestOpenWorkerNonce.BlockHeight, nil + } else { + log.Debug().Uint64("topicId", worker.TopicId). + Int64("BlockHeight", latestOpenWorkerNonce.BlockHeight). + Int64("latestNonceHeightActedUpon", latestNonceHeightActedUpon).Msg("No new worker nonce found") + return latestNonceHeightActedUpon, nil + } +} + +// Calculate the time distance based on the distance until the next epoch +func calculateTimeDistanceInSeconds(distanceUntilNextEpoch int64, blockDurationAvg, correctionFactor float64) (int64, error) { + if distanceUntilNextEpoch < 0 || correctionFactor < 0 { + return 0, errors.New("distanceUntilNextEpoch and correctionFactor must be positive") + } + correctedTimeDistance := float64(distanceUntilNextEpoch) * blockDurationAvg * correctionFactor + return int64(math.Round(correctedTimeDistance)), nil +} + +func generateFairOffset(workerSubmissionWindow int64) int64 { + // Ensure the random number generator is seeded + source := rand.NewSource(uint64(time.Now().UnixNano())) + rng := rand.New(source) + + // Calculate the center of the window + center := workerSubmissionWindow / 2 + + // Generate a random number between -maxOffset and +maxOffset + offset := rng.Int63n(center + 1) + + return offset +} + func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { log.Info().Uint64("topicId", worker.TopicId).Msg("Running worker process for topic") @@ -55,26 +127,129 @@ func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { log.Error().Uint64("topicId", worker.TopicId).Msg("Failed to register worker for topic") return } + log.Debug().Uint64("topicId", worker.TopicId).Msg("Worker registered") + + topicInfo, err := lib.QueryDataWithRetry( + context.Background(), + suite.Node.Wallet.MaxRetries, + time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { + return suite.Node.GetTopicInfo(worker.TopicId) + }, + query.PageRequest{}, // Empty page request as GetTopicInfo doesn't use pagination + ) + if err != nil { + log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Failed to get topic info after retries") + return + } + + // Get epoch length and worker submission window static info + epochLength := topicInfo.EpochLength + workerSubmissionWindow := topicInfo.WorkerSubmissionWindow + minBlocksToCheck := workerSubmissionWindow * SUBMISSION_WINDOWS_TO_BE_CLOSE + + // Last nonce successfully sent tx for + latestNonceHeightSentTxFor := int64(0) + + // Keep this to estimate block duration + var currentBlockHeight int64 - latestNonceHeightActedUpon := int64(0) for { - latestOpenWorkerNonce, err := suite.Node.GetLatestOpenWorkerNonceByTopicId(worker.TopicId) + // Query the latest block + status, err := suite.Node.Chain.Client.Status(context.Background()) if err != nil { - log.Warn().Err(err).Uint64("topicId", worker.TopicId).Msg("Error getting latest open worker nonce on topic - node availability issue?") + log.Error().Err(err).Msg("Failed to get status") + suite.Wait(1) + continue + } + currentBlockHeight = status.SyncInfo.LatestBlockHeight + + topicInfo, err := lib.QueryDataWithRetry( + context.Background(), + suite.Node.Wallet.MaxRetries, + time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { + return suite.Node.GetTopicInfo(worker.TopicId) + }, + query.PageRequest{}, // Empty page request as GetTopicInfo doesn't use pagination + ) + if err != nil { + log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error getting topic info") + return + } + log.Debug().Int64("currentBlockHeight", currentBlockHeight). + Int64("EpochLastEnded", topicInfo.EpochLastEnded). + Int64("EpochLength", epochLength). + Msg("Info from topic") + epochLastEnded := topicInfo.EpochLastEnded + epochEnd := epochLastEnded + epochLength + + // Check if block is within the epoch submission window + if currentBlockHeight-epochLastEnded <= workerSubmissionWindow { + // Attempt to submit worker payload + latestNonceHeightSentTxFor, err = suite.processWorkerPayload(worker, latestNonceHeightSentTxFor) + if err != nil { + log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error processing worker payload - could not complete transaction") + } else { + log.Debug().Uint64("topicId", worker.TopicId).Msg("Successfully sent worker payload") + } + // Wait until the epoch submission window opens + distanceUntilNextEpoch := epochEnd - currentBlockHeight + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, blockDurationAvg, correctionFactor) + if err != nil { + log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating time distance to next epoch after sending tx") + return + } + log.Debug().Uint64("topicId", worker.TopicId). + Int64("currentBlockHeight", currentBlockHeight). + Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). + Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Msg("Waiting until the epoch submission window opens") + suite.Wait(correctedTimeDistanceInSeconds) + } else if currentBlockHeight > epochEnd { + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(epochLength, blockDurationAvg, 1.0) + if err != nil { + log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("epochLength and correctionFactor must be positive") + return + } + log.Warn().Uint64("topicId", worker.TopicId).Msg("Current block height is greater than next epoch length, inactive topic? Waiting one epoch length") + suite.Wait(correctedTimeDistanceInSeconds) } else { - if latestOpenWorkerNonce.BlockHeight > latestNonceHeightActedUpon { - log.Debug().Uint64("topicId", worker.TopicId).Int64("BlockHeight", latestOpenWorkerNonce.BlockHeight).Msg("Building and committing worker payload for topic") + // Check distance until next epoch + distanceUntilNextEpoch := epochEnd - currentBlockHeight - err := suite.BuildCommitWorkerPayload(worker, latestOpenWorkerNonce) + if distanceUntilNextEpoch <= minBlocksToCheck { + // Wait until the center of the epoch submission window + offset := generateFairOffset(workerSubmissionWindow) + closeBlockDistance := distanceUntilNextEpoch + offset + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(closeBlockDistance, blockDurationAvg, 1.0) if err != nil { - log.Error().Err(err).Uint64("topicId", worker.TopicId).Int64("BlockHeight", latestOpenWorkerNonce.BlockHeight).Msg("Error building and committing worker payload for topic") + log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating close distance to epochLength") + return } - latestNonceHeightActedUpon = latestOpenWorkerNonce.BlockHeight + log.Debug().Uint64("topicId", worker.TopicId). + Int64("offset", offset). + Int64("currentBlockHeight", currentBlockHeight). + Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). + Int64("closeBlockDistance", closeBlockDistance). + Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Msg("Close to the window, waiting until next epoch submission window") + suite.Wait(correctedTimeDistanceInSeconds) } else { - log.Debug().Uint64("topicId", worker.TopicId).Msg("No new worker nonce found") + // Wait until the epoch submission window opens + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, blockDurationAvg, correctionFactor) + if err != nil { + log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating far distance to epochLength") + return + } + log.Debug().Uint64("topicId", worker.TopicId). + Int64("currentBlockHeight", currentBlockHeight). + Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). + Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Msg("Waiting until the epoch submission window opens") + suite.Wait(correctedTimeDistanceInSeconds) } } - suite.Wait(worker.LoopSeconds) } } diff --git a/usecase/spawn_actor_processes_test.go b/usecase/spawn_actor_processes_test.go new file mode 100644 index 0000000..7c16d57 --- /dev/null +++ b/usecase/spawn_actor_processes_test.go @@ -0,0 +1,186 @@ +package usecase + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCalculateTimeDistanceInSeconds(t *testing.T) { + tests := []struct { + name string + distanceUntilNextEpoch int64 + correctionFactor float64 + blockAvgSeconds float64 + expectedTimeDistance int64 + expectedError bool + }{ + { + name: "Basic calculation", + distanceUntilNextEpoch: 100, + correctionFactor: 1.0, + blockAvgSeconds: 4.6, + expectedTimeDistance: 460, // 100 * 4.6 * 1.0 + expectedError: false, + }, + { + name: "With correction factor", + distanceUntilNextEpoch: 100, + correctionFactor: 0.75, + blockAvgSeconds: 4.6, + expectedTimeDistance: 345, // 100 * 4.6 * 0.75 + expectedError: false, + }, + { + name: "Zero distance", + distanceUntilNextEpoch: 0, + correctionFactor: 1.0, + blockAvgSeconds: 4.6, + expectedTimeDistance: 0, + expectedError: false, + }, + { + name: "Large distance", + distanceUntilNextEpoch: 1000000, + correctionFactor: 1.0, + blockAvgSeconds: 4.6, + expectedTimeDistance: 4600000, // 1000000 * 4.6 * 1.0 + expectedError: false, + }, + { + name: "Small correction factor", + distanceUntilNextEpoch: 100, + correctionFactor: 0.1, + blockAvgSeconds: 4.6, + expectedTimeDistance: 46, // 100 * 4.6 * 0.1 + expectedError: false, + }, + { + name: "Negative distance", + distanceUntilNextEpoch: -100, + correctionFactor: 1.0, + blockAvgSeconds: 4.6, + expectedTimeDistance: 0, + expectedError: true, + }, + { + name: "Negative correction factor", + distanceUntilNextEpoch: 100, + correctionFactor: -0.5, + blockAvgSeconds: 4.6, + expectedTimeDistance: 0, + expectedError: true, + }, + { + name: "Both negative", + distanceUntilNextEpoch: -100, + correctionFactor: -0.5, + blockAvgSeconds: 4.6, + expectedTimeDistance: 0, + expectedError: true, + }, + // tests with different blockAvgSeconds + { + name: "Basic calculation with 6s blocks", + distanceUntilNextEpoch: 100, + correctionFactor: 1.0, + blockAvgSeconds: 6.0, + expectedTimeDistance: 600, // 100 * 6.0 * 1.0 + expectedError: false, + }, + { + name: "With correction factor and 3s blocks", + distanceUntilNextEpoch: 100, + correctionFactor: 0.75, + blockAvgSeconds: 3.0, + expectedTimeDistance: 225, // 100 * 3.0 * 0.75 + expectedError: false, + }, + { + name: "Large distance with 5.5s blocks", + distanceUntilNextEpoch: 1000000, + correctionFactor: 1.0, + blockAvgSeconds: 5.5, + expectedTimeDistance: 5500000, // 1000000 * 5.5 * 1.0 + expectedError: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result, err := calculateTimeDistanceInSeconds(test.distanceUntilNextEpoch, test.blockAvgSeconds, test.correctionFactor) + if test.expectedError { + assert.Error(t, err, "Expected an error for negative input") + assert.Equal(t, int64(0), result, "Expected 0 result when error occurs") + } else { + assert.NoError(t, err, "Did not expect an error") + assert.Equal(t, test.expectedTimeDistance, result, "Calculated time distance should match expected value") + } + }) + } +} + +func TestGenerateFairRandomOffset(t *testing.T) { + tests := []struct { + name string + workerSubmissionWindow int64 + expectedMin int64 + expectedMax int64 + expectedCenter int64 + iterations int + allowedDeltaFromCenter float64 + }{ + { + name: "Standard window", + workerSubmissionWindow: 100, + expectedMin: 0, + expectedMax: 50, + expectedCenter: 25, + iterations: 10000, + allowedDeltaFromCenter: 1.0, + }, + { + name: "Large window", + workerSubmissionWindow: 1000, + expectedMin: 0, + expectedMax: 500, + expectedCenter: 250, + iterations: 10000, + allowedDeltaFromCenter: 5.0, + }, + { + name: "Small window", + workerSubmissionWindow: 20, + expectedMin: 0, + expectedMax: 10, + expectedCenter: 5, + iterations: 10000, + allowedDeltaFromCenter: 0.5, + }, + { + name: "Odd window size", + workerSubmissionWindow: 101, + expectedMin: 0, + expectedMax: 50, + expectedCenter: 25, + iterations: 10000, + allowedDeltaFromCenter: 1.0, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sum := int64(0) + for i := 0; i < test.iterations; i++ { + result := generateFairOffset(test.workerSubmissionWindow) + assert.GreaterOrEqual(t, result, test.expectedMin, "Result should be greater than or equal to the minimum value") + assert.LessOrEqual(t, result, test.expectedMax, "Result should be less than or equal to the maximum value") + sum += result + } + + // Check that the average is close to the center (allowing for some randomness) + average := float64(sum) / float64(test.iterations) + assert.InDelta(t, float64(test.expectedCenter), average, test.allowedDeltaFromCenter, "Average should be close to the center") + }) + } +} From bcb625f77dc5ee1b30b859dfdaa4e05b9d4cbec3 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Mon, 7 Oct 2024 23:00:19 +0200 Subject: [PATCH 02/24] rename var --- usecase/spawn_actor_processes.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 476caee..4d47dbd 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -17,9 +17,9 @@ import ( // TODO move these to Config const ( - blockDurationAvg float64 = 5.0 // Avg block duration in seconds - correctionFactor float64 = 0.75 // Correction factor for the time estimation - SUBMISSION_WINDOWS_TO_BE_CLOSE int64 = 2 + blockDurationAvg float64 = 5.0 // Avg block duration in seconds + correctionFactor float64 = 0.75 // Correction factor for the time estimation + SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW int64 = 2 ) func (suite *UseCaseSuite) Spawn() { @@ -146,7 +146,7 @@ func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { // Get epoch length and worker submission window static info epochLength := topicInfo.EpochLength workerSubmissionWindow := topicInfo.WorkerSubmissionWindow - minBlocksToCheck := workerSubmissionWindow * SUBMISSION_WINDOWS_TO_BE_CLOSE + minBlocksToCheck := workerSubmissionWindow * SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW // Last nonce successfully sent tx for latestNonceHeightSentTxFor := int64(0) From 8fcd65903c83a6a1a57ed10d1178a72d8fa407f9 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Mon, 7 Oct 2024 23:03:30 +0200 Subject: [PATCH 03/24] Add changelog --- CHANGELOG.md | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ef0cb88..e82eb81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,22 +41,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) for all versions `v1.0.0` and beyond (still considered experimental prior to v1.0.0). -## [Unreleased] - -### Added - -### Removed - -### Fixed - -### Security - ## v0.5.1 ### Added * [#75](https://github.com/allora-network/allora-offchain-node/pull/75) Configurable fee awareness +* [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of open window + randomness ### Removed From 7c9814f8f826ddd6e3787fbc7b5dda90eb781997 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 6 Nov 2024 17:23:39 +0100 Subject: [PATCH 04/24] add unreleased back to CHANGELOG --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e82eb81..7b31315 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) for all versions `v1.0.0` and beyond (still considered experimental prior to v1.0.0). +## [Unreleased] + +### Added + +### Removed + +### Fixed + +### Security ## v0.5.1 From 9caff483dbe1d865023de3f744669086ed0c86c9 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 6 Nov 2024 17:24:56 +0100 Subject: [PATCH 05/24] reputer ok log for uniformity --- usecase/spawn_actor_processes.go | 1 + 1 file changed, 1 insertion(+) diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 4d47dbd..2d23b2d 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -261,6 +261,7 @@ func (suite *UseCaseSuite) runReputerProcess(reputer lib.ReputerConfig) { log.Error().Uint64("topicId", reputer.TopicId).Msg("Failed to register or sufficiently stake reputer for topic") return } + log.Debug().Uint64("topicId", reputer.TopicId).Msg("Reputer registered and staked") latestNonceHeightActedUpon := int64(0) for { From 4d704eb467dea3a343cb529df27b765e805be25f Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 6 Nov 2024 20:30:29 +0100 Subject: [PATCH 06/24] Make values configurable, enforce config validation --- config.example.json | 4 +- lib/domain_config.go | 71 +++++++++++++++++++++++++++----- usecase/spawn_actor_processes.go | 19 ++++----- usecase/usecase_suite.go | 5 ++- 4 files changed, 77 insertions(+), 22 deletions(-) diff --git a/config.example.json b/config.example.json index 568a2fd..a9fdf37 100644 --- a/config.example.json +++ b/config.example.json @@ -11,7 +11,9 @@ "maxRetries": 5, "retryDelay": 3, "accountSequenceRetryDelay": 5, - "submitTx": true + "submitTx": true, + "blockDurationEstimated": 5, + "windowCorrectionFactor": 0.8 }, "worker": [ { diff --git a/lib/domain_config.go b/lib/domain_config.go index b520767..37ef624 100644 --- a/lib/domain_config.go +++ b/lib/domain_config.go @@ -1,11 +1,20 @@ package lib import ( + "errors" + "fmt" + emissions "github.com/allora-network/allora-chain/x/emissions/types" bank "github.com/cosmos/cosmos-sdk/x/bank/types" "github.com/ignite/cli/v28/ignite/pkg/cosmosaccount" "github.com/ignite/cli/v28/ignite/pkg/cosmosclient" - "github.com/rs/zerolog/log" +) + +const ( + WindowCorrectionFactorSuggestedMin = 0.5 + BlockDurationEstimatedMin = 1.0 + RetryDelayMin = 1 + AccountSequenceRetryDelayMin = 1 ) // Properties manually provided by the user as part of UserConfig @@ -23,6 +32,8 @@ type WalletConfig struct { RetryDelay int64 // number of seconds to wait between retries (general case) AccountSequenceRetryDelay int64 // number of seconds to wait between retries in case of account sequence error SubmitTx bool // useful for dev/testing. set to false to run in dry-run processes without committing to the chain + BlockDurationEstimated float64 // estimated average block duration in seconds + WindowCorrectionFactor float64 // correction factor for the time estimation, suggested range 0.7-0.9. } // Properties auto-generated based on what the user has provided in WalletConfig fields of UserConfig @@ -105,19 +116,59 @@ type ValueBundle struct { // Check that each assigned entrypoint in the user config actually can be used // for the intended purpose, else throw error -func (c *UserConfig) ValidateConfigAdapters() { +func (c *UserConfig) ValidateConfigAdapters() error { + // Validate wallet config + err := c.ValidateWalletConfig() + if err != nil { + return err + } + // Validate worker configs for _, workerConfig := range c.Worker { - if workerConfig.InferenceEntrypoint != nil && !workerConfig.InferenceEntrypoint.CanInfer() { - log.Fatal().Interface("entrypoint", workerConfig.InferenceEntrypoint).Msg("Invalid inference entrypoint") - } - if workerConfig.ForecastEntrypoint != nil && !workerConfig.ForecastEntrypoint.CanForecast() { - log.Fatal().Interface("entrypoint", workerConfig.ForecastEntrypoint).Msg("Invalid forecast entrypoint") + err := workerConfig.ValidateWorkerConfig() + if err != nil { + return err } } - + // Validate reputer configs for _, reputerConfig := range c.Reputer { - if reputerConfig.GroundTruthEntrypoint != nil && !reputerConfig.GroundTruthEntrypoint.CanSourceGroundTruthAndComputeLoss() { - log.Fatal().Interface("entrypoint", reputerConfig.GroundTruthEntrypoint).Msg("Invalid loss entrypoint") + err := reputerConfig.ValidateReputerConfig() + if err != nil { + return err } } + return nil +} + +func (c *UserConfig) ValidateWalletConfig() error { + if c.Wallet.WindowCorrectionFactor < WindowCorrectionFactorSuggestedMin { + return errors.New(fmt.Sprintf("window correction factor lower than suggested minimum: %f < %f", c.Wallet.WindowCorrectionFactor, WindowCorrectionFactorSuggestedMin)) + } + if c.Wallet.BlockDurationEstimated < BlockDurationEstimatedMin { + return errors.New(fmt.Sprintf("block duration estimated lower than the minimum: %f < %f", c.Wallet.BlockDurationEstimated, BlockDurationEstimatedMin)) + } + if c.Wallet.RetryDelay < RetryDelayMin { + return errors.New(fmt.Sprintf("retry delay lower than the minimum: %f < %f", c.Wallet.RetryDelay, RetryDelayMin)) + } + if c.Wallet.AccountSequenceRetryDelay < AccountSequenceRetryDelayMin { + return errors.New(fmt.Sprintf("account sequence retry delay lower than the minimum: %f < %f", c.Wallet.AccountSequenceRetryDelay, AccountSequenceRetryDelayMin)) + } + + return nil +} + +func (reputerConfig *ReputerConfig) ValidateReputerConfig() error { + if reputerConfig.GroundTruthEntrypoint != nil && !reputerConfig.GroundTruthEntrypoint.CanSourceGroundTruthAndComputeLoss() { + return errors.New("invalid loss entrypoint") + } + return nil +} + +func (workerConfig *WorkerConfig) ValidateWorkerConfig() error { + if workerConfig.InferenceEntrypoint != nil && !workerConfig.InferenceEntrypoint.CanInfer() { + return errors.New("invalid inference entrypoint") + } + if workerConfig.ForecastEntrypoint != nil && !workerConfig.ForecastEntrypoint.CanForecast() { + return errors.New("invalid forecast entrypoint") + } + return nil } diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 2d23b2d..31a6e8a 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -15,12 +15,9 @@ import ( "golang.org/x/exp/rand" ) -// TODO move these to Config -const ( - blockDurationAvg float64 = 5.0 // Avg block duration in seconds - correctionFactor float64 = 0.75 // Correction factor for the time estimation - SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW int64 = 2 -) +// Number of submission windows considered to be "near" the new window +// When it is near, the checks are more frequent +const SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW int64 = 2 func (suite *UseCaseSuite) Spawn() { var wg sync.WaitGroup @@ -195,7 +192,8 @@ func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { } // Wait until the epoch submission window opens distanceUntilNextEpoch := epochEnd - currentBlockHeight - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, blockDurationAvg, correctionFactor) + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, + suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor) if err != nil { log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating time distance to next epoch after sending tx") return @@ -207,7 +205,7 @@ func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { Msg("Waiting until the epoch submission window opens") suite.Wait(correctedTimeDistanceInSeconds) } else if currentBlockHeight > epochEnd { - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(epochLength, blockDurationAvg, 1.0) + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(epochLength, suite.Node.Wallet.BlockDurationEstimated, 1.0) if err != nil { log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("epochLength and correctionFactor must be positive") return @@ -222,7 +220,7 @@ func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { // Wait until the center of the epoch submission window offset := generateFairOffset(workerSubmissionWindow) closeBlockDistance := distanceUntilNextEpoch + offset - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(closeBlockDistance, blockDurationAvg, 1.0) + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(closeBlockDistance, suite.Node.Wallet.BlockDurationEstimated, 1.0) if err != nil { log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating close distance to epochLength") return @@ -237,7 +235,8 @@ func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { suite.Wait(correctedTimeDistanceInSeconds) } else { // Wait until the epoch submission window opens - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, blockDurationAvg, correctionFactor) + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, + suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor) if err != nil { log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating far distance to epochLength") return diff --git a/usecase/usecase_suite.go b/usecase/usecase_suite.go index b125d26..b350ae5 100644 --- a/usecase/usecase_suite.go +++ b/usecase/usecase_suite.go @@ -11,7 +11,10 @@ type UseCaseSuite struct { // Static method to create a new UseCaseSuite func NewUseCaseSuite(userConfig lib.UserConfig) (*UseCaseSuite, error) { - userConfig.ValidateConfigAdapters() + err := userConfig.ValidateConfigAdapters() + if err != nil { + return nil, err + } nodeConfig, err := userConfig.GenerateNodeConfig() if err != nil { return nil, err From b0e2b1d2f35454e4d6ce921c68243b013cdd9f09 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Fri, 8 Nov 2024 14:24:39 +0100 Subject: [PATCH 07/24] initial impl for smart win detection on reputer + fees fix + refactor nonces --- lib/repo_query_nonce.go | 8 +- lib/repo_tx_utils.go | 17 +++- usecase/spawn_actor_processes.go | 160 ++++++++++++++++++++++++++++--- 3 files changed, 168 insertions(+), 17 deletions(-) diff --git a/lib/repo_query_nonce.go b/lib/repo_query_nonce.go index 92fa972..46b0d03 100644 --- a/lib/repo_query_nonce.go +++ b/lib/repo_query_nonce.go @@ -24,7 +24,7 @@ func (node *NodeConfig) GetLatestOpenWorkerNonceByTopicId(topicId emissionstypes return res.Nonces.Nonces[0], nil } -func (node *NodeConfig) GetOldestReputerNonceByTopicId(topicId emissionstypes.TopicId) (BlockHeight, error) { +func (node *NodeConfig) GetOldestReputerNonceByTopicId(topicId emissionstypes.TopicId) (*emissionstypes.Nonce, error) { ctx := context.Background() res, err := node.Chain.EmissionsQueryClient.GetUnfulfilledReputerNonces( @@ -32,12 +32,12 @@ func (node *NodeConfig) GetOldestReputerNonceByTopicId(topicId emissionstypes.To &emissionstypes.GetUnfulfilledReputerNoncesRequest{TopicId: topicId}, ) if err != nil { - return 0, err + return &emissionstypes.Nonce{}, err } if len(res.Nonces.Nonces) == 0 { - return 0, nil + return &emissionstypes.Nonce{}, nil } // Per `AddWorkerNonce()` in `allora-chain/x/emissions/keeper.go`, the oldest nonce is last - return res.Nonces.Nonces[len(res.Nonces.Nonces)-1].ReputerNonce.BlockHeight, nil + return res.Nonces.Nonces[len(res.Nonces.Nonces)-1].ReputerNonce, nil } diff --git a/lib/repo_tx_utils.go b/lib/repo_tx_utils.go index 6396904..46389d8 100644 --- a/lib/repo_tx_utils.go +++ b/lib/repo_tx_utils.go @@ -27,6 +27,7 @@ const EXCESS_CORRECTION_IN_GAS = 20000 const ERROR_PROCESSING_CONTINUE = "continue" const ERROR_PROCESSING_OK = "ok" +const ERROR_PROCESSING_FEES = "fees" const ERROR_PROCESSING_ERROR = "error" // calculateExponentialBackoffDelay returns a duration based on retry count and base delay @@ -68,7 +69,7 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig) return ERROR_PROCESSING_CONTINUE, nil case int(sdkerrors.ErrInsufficientFee.ABCICode()): log.Warn().Str("msg", infoMsg).Msg("Insufficient fee") - return ERROR_PROCESSING_CONTINUE, nil + return ERROR_PROCESSING_FEES, nil case int(sdkerrors.ErrTxTooLarge.ABCICode()): return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "tx too large") case int(sdkerrors.ErrTxInMempoolCache.ABCICode()): @@ -110,9 +111,12 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, var txResp *cosmosclient.Response // Excess fees correction factor translated to fees using configured gas prices excessFactorFees := float64(EXCESS_CORRECTION_IN_GAS) * node.Wallet.GasPrices + // Flag to only recalculate fees on first attempt or on fees error + recalculateFees := true for retryCount := int64(0); retryCount <= node.Wallet.MaxRetries; retryCount++ { log.Debug().Msgf("SendDataWithRetry iteration started (%d/%d)", retryCount, node.Wallet.MaxRetries) + // Create tx without fees txOptions := cosmosclient.TxOptions{} txService, err := node.Chain.Client.CreateTxWithOptions(ctx, node.Chain.Account, txOptions, req) if err != nil { @@ -148,6 +152,9 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, case ERROR_PROCESSING_CONTINUE: // Error has not been handled, just continue next iteration continue + case ERROR_PROCESSING_FEES: + // Error has not been handled, just mark as recalculate fees on this iteration + recalculateFees = true default: return nil, errorsmod.Wrapf(err, "failed to process error") } @@ -157,7 +164,7 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, } // Handle fees if necessary - if node.Wallet.GasPrices > 0 { + if node.Wallet.GasPrices > 0 && recalculateFees { // Precalculate fees fees := uint64(float64(txService.Gas()+EXCESS_CORRECTION_IN_GAS) * node.Wallet.GasPrices) // Add excess fees correction factor to increase with each retry @@ -176,6 +183,8 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, return nil, err } } + // set to false after recalculating fees + recalculateFees = false // Broadcast tx txResponse, err := txService.Broadcast(ctx) @@ -199,6 +208,10 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, case ERROR_PROCESSING_CONTINUE: // Error has not been handled, just continue next iteration continue + case ERROR_PROCESSING_FEES: + // Error has not been handled, just mark as recalculate fees on this iteration + recalculateFees = true + continue default: return nil, errorsmod.Wrapf(err, "failed to process error") } diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 31a6e8a..ab1800a 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -60,7 +60,6 @@ func (suite *UseCaseSuite) Spawn() { // Attempts to build and commit a worker payload for a given nonce func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestNonceHeightActedUpon int64) (int64, error) { - // latestOpenWorkerNonce, err := suite.Node.GetLatestOpenWorkerNonceByTopicId(worker.TopicId) latestOpenWorkerNonce, err := lib.QueryDataWithRetry( context.Background(), suite.Node.Wallet.MaxRetries, @@ -93,6 +92,39 @@ func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestN } } +func (suite *UseCaseSuite) processReputerPayload(reputer lib.ReputerConfig, latestNonceHeightActedUpon int64) (int64, error) { + nonce, err := lib.QueryDataWithRetry( + context.Background(), + suite.Node.Wallet.MaxRetries, + time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.Nonce, error) { + return suite.Node.GetOldestReputerNonceByTopicId(reputer.TopicId) + }, + query.PageRequest{}, // Empty page request as GetLatestOpenWorkerNonceByTopicId doesn't use pagination + ) + + if err != nil { + log.Warn().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error getting latest open worker nonce on topic - node availability issue?") + return latestNonceHeightActedUpon, err + } + + if nonce.BlockHeight > latestNonceHeightActedUpon { + log.Debug().Uint64("topicId", reputer.TopicId).Int64("BlockHeight", nonce.BlockHeight). + Msg("Building and committing worker payload for topic") + + err := suite.BuildCommitReputerPayload(reputer, nonce.BlockHeight) + if err != nil { + return latestNonceHeightActedUpon, errorsmod.Wrapf(err, "error building and committing worker payload for topic") + } + return nonce.BlockHeight, nil + } else { + log.Debug().Uint64("topicId", reputer.TopicId). + Int64("BlockHeight", nonce.BlockHeight). + Int64("latestNonceHeightActedUpon", latestNonceHeightActedUpon).Msg("No new worker nonce found") + return latestNonceHeightActedUpon, nil + } +} + // Calculate the time distance based on the distance until the next epoch func calculateTimeDistanceInSeconds(distanceUntilNextEpoch int64, blockDurationAvg, correctionFactor float64) (int64, error) { if distanceUntilNextEpoch < 0 || correctionFactor < 0 { @@ -262,24 +294,130 @@ func (suite *UseCaseSuite) runReputerProcess(reputer lib.ReputerConfig) { } log.Debug().Uint64("topicId", reputer.TopicId).Msg("Reputer registered and staked") - latestNonceHeightActedUpon := int64(0) + topicInfo, err := lib.QueryDataWithRetry( + context.Background(), + suite.Node.Wallet.MaxRetries, + time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { + return suite.Node.GetTopicInfo(reputer.TopicId) + }, + query.PageRequest{}, // Empty page request as GetTopicInfo doesn't use pagination + ) + if err != nil { + log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to get topic info after retries") + return + } + + // Get epoch length and worker submission window static info + epochLength := topicInfo.EpochLength + workerSubmissionWindow := topicInfo.WorkerSubmissionWindow + minBlocksToCheck := workerSubmissionWindow * SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW + + // Last nonce successfully sent tx for + latestNonceHeightSentTxFor := int64(0) + + // Keep this to estimate block duration + var currentBlockHeight int64 + for { - latestOpenReputerNonce, err := suite.Node.GetOldestReputerNonceByTopicId(reputer.TopicId) + log.Trace().Msg("Starting reputer process loop") + // Query the latest block + status, err := suite.Node.Chain.Client.Status(context.Background()) if err != nil { - log.Warn().Err(err).Uint64("topicId", reputer.TopicId).Int64("BlockHeight", latestOpenReputerNonce).Msg("Error getting latest open reputer nonce on topic - node availability issue?") + log.Error().Err(err).Msg("Failed to get status") + suite.Wait(1) + continue + } + currentBlockHeight = status.SyncInfo.LatestBlockHeight + + topicInfo, err := lib.QueryDataWithRetry( + context.Background(), + suite.Node.Wallet.MaxRetries, + time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { + return suite.Node.GetTopicInfo(reputer.TopicId) + }, + query.PageRequest{}, // Empty page request as GetTopicInfo doesn't use pagination + ) + if err != nil { + log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error getting topic info") + return + } + log.Debug().Int64("currentBlockHeight", currentBlockHeight). + Int64("EpochLastEnded", topicInfo.EpochLastEnded). + Int64("EpochLength", epochLength). + Msg("Info from topic") + epochLastEnded := topicInfo.EpochLastEnded + epochEnd := epochLastEnded + epochLength + + // Check if block is within the epoch submission window + if currentBlockHeight-epochLastEnded <= epochLength { + // Attempt to submit worker payload + latestNonceHeightSentTxFor, err = suite.processReputerPayload(reputer, latestNonceHeightSentTxFor) + if err != nil { + log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error processing reputer payload - could not complete transaction") + } else { + log.Debug().Uint64("topicId", reputer.TopicId).Msg("Successfully sent reputer payload") + } + // Wait until the epoch submission window opens + distanceUntilNextEpoch := epochEnd - currentBlockHeight + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, + suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor) + if err != nil { + log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error calculating time distance to next epoch after sending tx") + return + } + log.Debug().Uint64("topicId", reputer.TopicId). + Int64("currentBlockHeight", currentBlockHeight). + Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). + Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Msg("Waiting until the epoch submission window opens") + suite.Wait(correctedTimeDistanceInSeconds) + } else if currentBlockHeight > epochEnd { + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(epochLength, suite.Node.Wallet.BlockDurationEstimated, 1.0) + if err != nil { + log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("epochLength and correctionFactor must be positive") + return + } + log.Warn().Uint64("topicId", reputer.TopicId).Msg("Current block height is greater than next epoch length, inactive topic? Waiting one epoch length") + suite.Wait(correctedTimeDistanceInSeconds) } else { - if latestOpenReputerNonce > latestNonceHeightActedUpon { - log.Debug().Uint64("topicId", reputer.TopicId).Int64("BlockHeight", latestOpenReputerNonce).Msg("Building and committing reputer payload for topic") + // Check distance until next epoch + distanceUntilNextEpoch := epochEnd - currentBlockHeight - err := suite.BuildCommitReputerPayload(reputer, latestOpenReputerNonce) + if distanceUntilNextEpoch <= minBlocksToCheck { + // Wait until the center of the epoch submission window + offset := generateFairOffset(epochLength) + closeBlockDistance := distanceUntilNextEpoch + offset + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(closeBlockDistance, suite.Node.Wallet.BlockDurationEstimated, 1.0) if err != nil { - log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error building and committing reputer payload for topic") + log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error calculating close distance to epochLength") + return } - latestNonceHeightActedUpon = latestOpenReputerNonce + log.Debug().Uint64("topicId", reputer.TopicId). + Int64("offset", offset). + Int64("currentBlockHeight", currentBlockHeight). + Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). + Int64("closeBlockDistance", closeBlockDistance). + Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Msg("Close to the window, waiting until next epoch submission window") + suite.Wait(correctedTimeDistanceInSeconds) } else { - log.Debug().Uint64("topicId", reputer.TopicId).Msg("No new reputer nonce found") + // Wait until the epoch submission window opens + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, + suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor) + if err != nil { + log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error calculating far distance to epochLength") + return + } + log.Debug().Uint64("topicId", reputer.TopicId). + Int64("currentBlockHeight", currentBlockHeight). + Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). + Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Msg("Waiting until the epoch submission window opens") + suite.Wait(correctedTimeDistanceInSeconds) } } - suite.Wait(reputer.LoopSeconds) } + } From 4951bed6d38bf0177c7096b410e06edbaa1f4eea Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Fri, 8 Nov 2024 19:29:51 +0100 Subject: [PATCH 08/24] Unified smart window detection for reputer/worker --- config.example.json | 2 +- lib/domain_config.go | 14 ++ lib/repo_tx_utils.go | 28 ++- usecase/spawn_actor_processes.go | 370 +++++++++++++++---------------- 4 files changed, 219 insertions(+), 195 deletions(-) diff --git a/config.example.json b/config.example.json index a9fdf37..a076502 100644 --- a/config.example.json +++ b/config.example.json @@ -12,7 +12,7 @@ "retryDelay": 3, "accountSequenceRetryDelay": 5, "submitTx": true, - "blockDurationEstimated": 5, + "blockDurationEstimated": 10, "windowCorrectionFactor": 0.8 }, "worker": [ diff --git a/lib/domain_config.go b/lib/domain_config.go index 37ef624..5677fef 100644 --- a/lib/domain_config.go +++ b/lib/domain_config.go @@ -47,6 +47,10 @@ type ChainConfig struct { AddressPrefix string // prefix for the allora addresses } +type TopicActor interface { + GetTopicId() emissions.TopicId +} + type WorkerConfig struct { TopicId emissions.TopicId InferenceEntrypointName string @@ -57,6 +61,11 @@ type WorkerConfig struct { Parameters map[string]string // Map for variable configuration values } +// Implement TopicActor interface for WorkerConfig +func (w WorkerConfig) GetTopicId() emissions.TopicId { + return w.TopicId +} + type ReputerConfig struct { TopicId emissions.TopicId GroundTruthEntrypointName string @@ -73,6 +82,11 @@ type ReputerConfig struct { LossFunctionParameters LossFunctionParameters // Map for variable configuration values } +// Implement TopicActor interface for ReputerConfig +func (r ReputerConfig) GetTopicId() emissions.TopicId { + return r.TopicId +} + type LossFunctionParameters struct { LossFunctionService string LossMethodOptions map[string]string diff --git a/lib/repo_tx_utils.go b/lib/repo_tx_utils.go index 46389d8..228c68f 100644 --- a/lib/repo_tx_utils.go +++ b/lib/repo_tx_utils.go @@ -111,13 +111,22 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, var txResp *cosmosclient.Response // Excess fees correction factor translated to fees using configured gas prices excessFactorFees := float64(EXCESS_CORRECTION_IN_GAS) * node.Wallet.GasPrices - // Flag to only recalculate fees on first attempt or on fees error - recalculateFees := true + // Keep track of how many times fees need to be recalculated to avoid missing fee info between errors + recalculateFees := 0 + // Use to keep track of expected sequence number between errors + globalExpectedSeqNum := uint64(0) for retryCount := int64(0); retryCount <= node.Wallet.MaxRetries; retryCount++ { log.Debug().Msgf("SendDataWithRetry iteration started (%d/%d)", retryCount, node.Wallet.MaxRetries) // Create tx without fees txOptions := cosmosclient.TxOptions{} + if globalExpectedSeqNum > 0 { + log.Info(). + Uint64("expected", globalExpectedSeqNum). + Uint64("current", node.Chain.Client.TxFactory.Sequence()). + Msg("Resetting sequence to expected from previous sequence errors") + node.Chain.Client.TxFactory = node.Chain.Client.TxFactory.WithSequence(globalExpectedSeqNum) + } txService, err := node.Chain.Client.CreateTxWithOptions(ctx, node.Chain.Account, txOptions, req) if err != nil { // Handle error on creation of tx, before broadcasting @@ -136,6 +145,7 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, if err != nil { return nil, errorsmod.Wrapf(err, "failed to reset sequence second time, exiting") } + globalExpectedSeqNum = expectedSeqNum } else { errorResponse, err := processError(err, infoMsg, retryCount, node) switch errorResponse { @@ -154,7 +164,8 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, continue case ERROR_PROCESSING_FEES: // Error has not been handled, just mark as recalculate fees on this iteration - recalculateFees = true + log.Debug().Msg("Marking fee recalculation on tx creation") + recalculateFees += 1 default: return nil, errorsmod.Wrapf(err, "failed to process error") } @@ -164,11 +175,11 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, } // Handle fees if necessary - if node.Wallet.GasPrices > 0 && recalculateFees { + if node.Wallet.GasPrices > 0 && recalculateFees > 0 { // Precalculate fees fees := uint64(float64(txService.Gas()+EXCESS_CORRECTION_IN_GAS) * node.Wallet.GasPrices) - // Add excess fees correction factor to increase with each retry - fees = fees + uint64(float64(retryCount+1)*excessFactorFees) + // Add excess fees correction factor to increase with each fee-problematic retry + fees = fees + uint64(float64(recalculateFees)*excessFactorFees) // Limit fees to maxFees if fees > node.Wallet.MaxFees { log.Warn().Uint64("gas", txService.Gas()).Uint64("limit", node.Wallet.MaxFees).Msg("Gas limit exceeded, using maxFees instead") @@ -183,8 +194,6 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, return nil, err } } - // set to false after recalculating fees - recalculateFees = false // Broadcast tx txResponse, err := txService.Broadcast(ctx) @@ -210,7 +219,8 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, continue case ERROR_PROCESSING_FEES: // Error has not been handled, just mark as recalculate fees on this iteration - recalculateFees = true + log.Debug().Msg("Marking fee recalculation on tx broadcasting") + recalculateFees += 1 continue default: return nil, errorsmod.Wrapf(err, "failed to process error") diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index ab1800a..98f3f8b 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -19,6 +19,28 @@ import ( // When it is near, the checks are more frequent const SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW int64 = 2 +// Correction factor used when calculating time distances near window +const NEARNESS_CORRECTION_FACTOR float64 = 1.0 + +// Minimum wait time between status checks +const WAIT_TIME_STATUS_CHECKS int64 = 1 + +// ActorProcessParams encapsulates the configuration needed for running actor processes +type ActorProcessParams[T lib.TopicActor] struct { + // Configuration for the actor (Worker or Reputer) + Config T + // Function to process payloads (processWorkerPayload or processReputerPayload) + ProcessPayload func(T, int64) (int64, error) + // Function to get nonces (GetLatestOpenWorkerNonceByTopicId or GetOldestReputerNonceByTopicId) + GetNonce func(emissionstypes.TopicId) (*emissionstypes.Nonce, error) + // Window length used to determine when we're near submission time + NearWindowLength int64 + // Actual submission window length + SubmissionWindowLength int64 + // Actor type for logging ("worker" or "reputer") + ActorType string +} + func (suite *UseCaseSuite) Spawn() { var wg sync.WaitGroup @@ -83,10 +105,13 @@ func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestN if err != nil { return latestNonceHeightActedUpon, errorsmod.Wrapf(err, "error building and committing worker payload for topic") } + log.Debug().Uint64("topicId", uint64(worker.TopicId)). + Str("actorType", "worker"). + Msg("Successfully finished processing payload") return latestOpenWorkerNonce.BlockHeight, nil } else { log.Debug().Uint64("topicId", worker.TopicId). - Int64("BlockHeight", latestOpenWorkerNonce.BlockHeight). + Int64("LastOpenNonceBlockHeight", latestOpenWorkerNonce.BlockHeight). Int64("latestNonceHeightActedUpon", latestNonceHeightActedUpon).Msg("No new worker nonce found") return latestNonceHeightActedUpon, nil } @@ -100,27 +125,30 @@ func (suite *UseCaseSuite) processReputerPayload(reputer lib.ReputerConfig, late func(ctx context.Context, req query.PageRequest) (*emissionstypes.Nonce, error) { return suite.Node.GetOldestReputerNonceByTopicId(reputer.TopicId) }, - query.PageRequest{}, // Empty page request as GetLatestOpenWorkerNonceByTopicId doesn't use pagination + query.PageRequest{}, // Empty page request as GetOldestReputerNonceByTopicId doesn't use pagination ) if err != nil { - log.Warn().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error getting latest open worker nonce on topic - node availability issue?") + log.Warn().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error getting latest open reputer nonce on topic - node availability issue?") return latestNonceHeightActedUpon, err } if nonce.BlockHeight > latestNonceHeightActedUpon { log.Debug().Uint64("topicId", reputer.TopicId).Int64("BlockHeight", nonce.BlockHeight). - Msg("Building and committing worker payload for topic") + Msg("Building and committing reputer payload for topic") err := suite.BuildCommitReputerPayload(reputer, nonce.BlockHeight) if err != nil { - return latestNonceHeightActedUpon, errorsmod.Wrapf(err, "error building and committing worker payload for topic") + return latestNonceHeightActedUpon, errorsmod.Wrapf(err, "error building and committing reputer payload for topic") } + log.Debug().Uint64("topicId", reputer.TopicId). + Str("actorType", "reputer"). + Msg("Successfully finished processing payload") return nonce.BlockHeight, nil } else { log.Debug().Uint64("topicId", reputer.TopicId). - Int64("BlockHeight", nonce.BlockHeight). - Int64("latestNonceHeightActedUpon", latestNonceHeightActedUpon).Msg("No new worker nonce found") + Int64("LastOpenNonceBlockHeight", nonce.BlockHeight). + Int64("latestNonceHeightActedUpon", latestNonceHeightActedUpon).Msg("No new reputer nonce found") return latestNonceHeightActedUpon, nil } } @@ -151,6 +179,7 @@ func generateFairOffset(workerSubmissionWindow int64) int64 { func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { log.Info().Uint64("topicId", worker.TopicId).Msg("Running worker process for topic") + // Handle registration registered := suite.Node.RegisterWorkerIdempotently(worker) if !registered { log.Error().Uint64("topicId", worker.TopicId).Msg("Failed to register worker for topic") @@ -158,135 +187,29 @@ func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { } log.Debug().Uint64("topicId", worker.TopicId).Msg("Worker registered") - topicInfo, err := lib.QueryDataWithRetry( - context.Background(), - suite.Node.Wallet.MaxRetries, - time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, - func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { - return suite.Node.GetTopicInfo(worker.TopicId) - }, - query.PageRequest{}, // Empty page request as GetTopicInfo doesn't use pagination - ) + // Using the helper function + topicInfo, err := queryTopicInfo(suite, worker, "worker") if err != nil { - log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Failed to get topic info after retries") + log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Failed to get topic info for worker") return } - // Get epoch length and worker submission window static info - epochLength := topicInfo.EpochLength - workerSubmissionWindow := topicInfo.WorkerSubmissionWindow - minBlocksToCheck := workerSubmissionWindow * SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW - - // Last nonce successfully sent tx for - latestNonceHeightSentTxFor := int64(0) - - // Keep this to estimate block duration - var currentBlockHeight int64 - - for { - // Query the latest block - status, err := suite.Node.Chain.Client.Status(context.Background()) - if err != nil { - log.Error().Err(err).Msg("Failed to get status") - suite.Wait(1) - continue - } - currentBlockHeight = status.SyncInfo.LatestBlockHeight - - topicInfo, err := lib.QueryDataWithRetry( - context.Background(), - suite.Node.Wallet.MaxRetries, - time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, - func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { - return suite.Node.GetTopicInfo(worker.TopicId) - }, - query.PageRequest{}, // Empty page request as GetTopicInfo doesn't use pagination - ) - if err != nil { - log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error getting topic info") - return - } - log.Debug().Int64("currentBlockHeight", currentBlockHeight). - Int64("EpochLastEnded", topicInfo.EpochLastEnded). - Int64("EpochLength", epochLength). - Msg("Info from topic") - epochLastEnded := topicInfo.EpochLastEnded - epochEnd := epochLastEnded + epochLength - - // Check if block is within the epoch submission window - if currentBlockHeight-epochLastEnded <= workerSubmissionWindow { - // Attempt to submit worker payload - latestNonceHeightSentTxFor, err = suite.processWorkerPayload(worker, latestNonceHeightSentTxFor) - if err != nil { - log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error processing worker payload - could not complete transaction") - } else { - log.Debug().Uint64("topicId", worker.TopicId).Msg("Successfully sent worker payload") - } - // Wait until the epoch submission window opens - distanceUntilNextEpoch := epochEnd - currentBlockHeight - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, - suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor) - if err != nil { - log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating time distance to next epoch after sending tx") - return - } - log.Debug().Uint64("topicId", worker.TopicId). - Int64("currentBlockHeight", currentBlockHeight). - Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). - Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). - Msg("Waiting until the epoch submission window opens") - suite.Wait(correctedTimeDistanceInSeconds) - } else if currentBlockHeight > epochEnd { - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(epochLength, suite.Node.Wallet.BlockDurationEstimated, 1.0) - if err != nil { - log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("epochLength and correctionFactor must be positive") - return - } - log.Warn().Uint64("topicId", worker.TopicId).Msg("Current block height is greater than next epoch length, inactive topic? Waiting one epoch length") - suite.Wait(correctedTimeDistanceInSeconds) - } else { - // Check distance until next epoch - distanceUntilNextEpoch := epochEnd - currentBlockHeight - - if distanceUntilNextEpoch <= minBlocksToCheck { - // Wait until the center of the epoch submission window - offset := generateFairOffset(workerSubmissionWindow) - closeBlockDistance := distanceUntilNextEpoch + offset - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(closeBlockDistance, suite.Node.Wallet.BlockDurationEstimated, 1.0) - if err != nil { - log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating close distance to epochLength") - return - } - log.Debug().Uint64("topicId", worker.TopicId). - Int64("offset", offset). - Int64("currentBlockHeight", currentBlockHeight). - Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). - Int64("closeBlockDistance", closeBlockDistance). - Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). - Msg("Close to the window, waiting until next epoch submission window") - suite.Wait(correctedTimeDistanceInSeconds) - } else { - // Wait until the epoch submission window opens - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, - suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor) - if err != nil { - log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Error calculating far distance to epochLength") - return - } - log.Debug().Uint64("topicId", worker.TopicId). - Int64("currentBlockHeight", currentBlockHeight). - Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). - Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). - Msg("Waiting until the epoch submission window opens") - suite.Wait(correctedTimeDistanceInSeconds) - } - } + params := ActorProcessParams[lib.WorkerConfig]{ + Config: worker, + ProcessPayload: suite.processWorkerPayload, + GetNonce: suite.Node.GetLatestOpenWorkerNonceByTopicId, + NearWindowLength: topicInfo.WorkerSubmissionWindow, // Use worker window to determine "nearness" + SubmissionWindowLength: topicInfo.WorkerSubmissionWindow, // Use worker window for actual submission window + ActorType: "worker", } + + runActorProcess(suite, params) } func (suite *UseCaseSuite) runReputerProcess(reputer lib.ReputerConfig) { log.Debug().Uint64("topicId", reputer.TopicId).Msg("Running reputer process for topic") + // Handle registration and staking registeredAndStaked := suite.Node.RegisterAndStakeReputerIdempotently(reputer) if !registeredAndStaked { log.Error().Uint64("topicId", reputer.TopicId).Msg("Failed to register or sufficiently stake reputer for topic") @@ -294,130 +217,207 @@ func (suite *UseCaseSuite) runReputerProcess(reputer lib.ReputerConfig) { } log.Debug().Uint64("topicId", reputer.TopicId).Msg("Reputer registered and staked") - topicInfo, err := lib.QueryDataWithRetry( - context.Background(), - suite.Node.Wallet.MaxRetries, - time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, - func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { - return suite.Node.GetTopicInfo(reputer.TopicId) - }, - query.PageRequest{}, // Empty page request as GetTopicInfo doesn't use pagination - ) + // Using the helper function + topicInfo, err := queryTopicInfo(suite, reputer, "reputer") if err != nil { - log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to get topic info after retries") + log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to get topic info for reputer") return } - // Get epoch length and worker submission window static info - epochLength := topicInfo.EpochLength - workerSubmissionWindow := topicInfo.WorkerSubmissionWindow - minBlocksToCheck := workerSubmissionWindow * SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW + params := ActorProcessParams[lib.ReputerConfig]{ + Config: reputer, + ProcessPayload: suite.processReputerPayload, + GetNonce: suite.Node.GetOldestReputerNonceByTopicId, + NearWindowLength: topicInfo.WorkerSubmissionWindow, // Use worker window to determine "nearness" + SubmissionWindowLength: topicInfo.EpochLength, // Use epoch length for actual submission window + ActorType: "reputer", + } - // Last nonce successfully sent tx for - latestNonceHeightSentTxFor := int64(0) + runActorProcess(suite, params) +} + +// Function that runs the actor process for a given topic and actor type +func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessParams[T]) { + log.Debug(). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Running actor process for topic") - // Keep this to estimate block duration + topicInfo, err := queryTopicInfo(suite, params.Config, params.ActorType) + if err != nil { + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Failed to get topic info after retries") + return + } + + epochLength := topicInfo.EpochLength + minBlocksToCheck := params.NearWindowLength * SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW + latestNonceHeightSentTxFor := int64(0) var currentBlockHeight int64 for { - log.Trace().Msg("Starting reputer process loop") + log.Debug().Msg("Start iteration, querying latest block") // Query the latest block status, err := suite.Node.Chain.Client.Status(context.Background()) if err != nil { log.Error().Err(err).Msg("Failed to get status") - suite.Wait(1) + suite.Wait(WAIT_TIME_STATUS_CHECKS) continue } currentBlockHeight = status.SyncInfo.LatestBlockHeight - topicInfo, err := lib.QueryDataWithRetry( - context.Background(), - suite.Node.Wallet.MaxRetries, - time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, - func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { - return suite.Node.GetTopicInfo(reputer.TopicId) - }, - query.PageRequest{}, // Empty page request as GetTopicInfo doesn't use pagination - ) + topicInfo, err := queryTopicInfo(suite, params.Config, params.ActorType) if err != nil { - log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error getting topic info") + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Error getting topic info") return } - log.Debug().Int64("currentBlockHeight", currentBlockHeight). + log.Trace(). + Int64("currentBlockHeight", currentBlockHeight). Int64("EpochLastEnded", topicInfo.EpochLastEnded). Int64("EpochLength", epochLength). Msg("Info from topic") + epochLastEnded := topicInfo.EpochLastEnded epochEnd := epochLastEnded + epochLength - // Check if block is within the epoch submission window - if currentBlockHeight-epochLastEnded <= epochLength { - // Attempt to submit worker payload - latestNonceHeightSentTxFor, err = suite.processReputerPayload(reputer, latestNonceHeightSentTxFor) + // Check if block is within the submission window + if currentBlockHeight-epochLastEnded <= params.SubmissionWindowLength { + // Within the submission window, attempt to process payload + latestNonceHeightSentTxFor, err = params.ProcessPayload(params.Config, latestNonceHeightSentTxFor) if err != nil { - log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error processing reputer payload - could not complete transaction") - } else { - log.Debug().Uint64("topicId", reputer.TopicId).Msg("Successfully sent reputer payload") + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Error processing payload - could not complete transaction") } - // Wait until the epoch submission window opens + distanceUntilNextEpoch := epochEnd - currentBlockHeight - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, - suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor) + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds( + distanceUntilNextEpoch, + suite.Node.Wallet.BlockDurationEstimated, + suite.Node.Wallet.WindowCorrectionFactor, + ) if err != nil { - log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error calculating time distance to next epoch after sending tx") + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Error calculating time distance to next epoch after sending tx") return } - log.Debug().Uint64("topicId", reputer.TopicId). + + log.Debug(). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). Int64("currentBlockHeight", currentBlockHeight). Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). - Msg("Waiting until the epoch submission window opens") + Msg("Waiting until the submission window opens after sending") suite.Wait(correctedTimeDistanceInSeconds) } else if currentBlockHeight > epochEnd { - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(epochLength, suite.Node.Wallet.BlockDurationEstimated, 1.0) + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds( + epochLength, + suite.Node.Wallet.BlockDurationEstimated, + NEARNESS_CORRECTION_FACTOR, + ) if err != nil { - log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("epochLength and correctionFactor must be positive") + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Error calculating time distance to next epoch after sending tx") return } - log.Warn().Uint64("topicId", reputer.TopicId).Msg("Current block height is greater than next epoch length, inactive topic? Waiting one epoch length") + log.Warn(). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Msg("Current block height is greater than next epoch length, inactive topic? Waiting seconds...") suite.Wait(correctedTimeDistanceInSeconds) } else { - // Check distance until next epoch distanceUntilNextEpoch := epochEnd - currentBlockHeight if distanceUntilNextEpoch <= minBlocksToCheck { - // Wait until the center of the epoch submission window - offset := generateFairOffset(epochLength) + // Close distance, check more closely until the submission window opens + offset := generateFairOffset(params.SubmissionWindowLength) closeBlockDistance := distanceUntilNextEpoch + offset - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(closeBlockDistance, suite.Node.Wallet.BlockDurationEstimated, 1.0) + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds( + closeBlockDistance, + suite.Node.Wallet.BlockDurationEstimated, + NEARNESS_CORRECTION_FACTOR, + ) if err != nil { - log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error calculating close distance to epochLength") + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Error calculating close distance to epochLength") return } - log.Debug().Uint64("topicId", reputer.TopicId). + log.Debug(). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Int64("SubmissionWindowLength", params.SubmissionWindowLength). Int64("offset", offset). Int64("currentBlockHeight", currentBlockHeight). Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). Int64("closeBlockDistance", closeBlockDistance). Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). - Msg("Close to the window, waiting until next epoch submission window") + Msg("Close to the window, waiting until next submission window") suite.Wait(correctedTimeDistanceInSeconds) } else { - // Wait until the epoch submission window opens - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds(distanceUntilNextEpoch, - suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor) + // Far distance, bigger waits until the submission window opens + correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds( + distanceUntilNextEpoch, + suite.Node.Wallet.BlockDurationEstimated, + suite.Node.Wallet.WindowCorrectionFactor, + ) if err != nil { - log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error calculating far distance to epochLength") + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Error calculating far distance to epochLength") return } - log.Debug().Uint64("topicId", reputer.TopicId). + log.Debug(). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). Int64("currentBlockHeight", currentBlockHeight). Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). - Msg("Waiting until the epoch submission window opens") + Msg("Waiting until the submission window opens - far distance") suite.Wait(correctedTimeDistanceInSeconds) } } } +} +// Queries the topic info for a given actor type and wallet params from suite +func queryTopicInfo[T lib.TopicActor]( + suite *UseCaseSuite, + config T, + actorType string, +) (*emissionstypes.Topic, error) { + topicInfo, err := lib.QueryDataWithRetry( + context.Background(), + suite.Node.Wallet.MaxRetries, + time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { + return suite.Node.GetTopicInfo(config.GetTopicId()) + }, + query.PageRequest{}, + ) + if err != nil { + return nil, errorsmod.Wrapf(err, "failed to get topic info") + } + return topicInfo, nil } From 9cf4bf806628d6bc95aa243617cc7aafbc83eabe Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Sun, 10 Nov 2024 13:14:53 +0100 Subject: [PATCH 09/24] README and changelog --- CHANGELOG.md | 4 ++-- README.md | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b31315..275ce6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,9 +41,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) for all versions `v1.0.0` and beyond (still considered experimental prior to v1.0.0). -## [Unreleased] +## v0.6.0 ### Added +* [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of submission windows ### Removed @@ -56,7 +57,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added * [#75](https://github.com/allora-network/allora-offchain-node/pull/75) Configurable fee awareness -* [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of open window + randomness ### Removed diff --git a/README.md b/README.md index cfbdab3..3b24ae9 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,13 @@ Note: when an account sequence mismatch is detected, the node will attempt to se - `retryDelay`: For all other errors that need retry delays. +### Smart Window Detection + +The node will automatically detect the submission window length for each topic on each actor type. +This can be configured by the following settings in the config.json: +* `blockDurationEstimated`: Estimated network block time in seconds. Minimum is 1. +* `windowCorrectionFactor`: Correction factor to fine-tune the submission window length. Higher values optimize the number of calls for window checking. Minimum is 0.5. + ## Configuration examples A complete example is provided in `config.example.json`. From 22213bbc1e2b099bc2e1c8dca9e5070314fc40de Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Sun, 10 Nov 2024 13:50:04 +0100 Subject: [PATCH 10/24] clarify comments on consts --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 275ce6e..15f0297 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,7 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## v0.6.0 ### Added -* [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of submission windows +* [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of submission windows + persistent error management ### Removed From 1bdad461c6759705943225e7dc3de8d63ce3a133 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Sun, 10 Nov 2024 14:06:51 +0100 Subject: [PATCH 11/24] better names and comments --- usecase/spawn_actor_processes.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 98f3f8b..72ac79e 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -15,11 +15,12 @@ import ( "golang.org/x/exp/rand" ) -// Number of submission windows considered to be "near" the new window -// When it is near, the checks are more frequent -const SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW int64 = 2 +// Number of submission windows considered to be "near" the next window +// When time is near, the control is more accurate +const NUM_SUBMISSION_WINDOWS_FOR_SUBMISSION_NEARNESS int64 = 2 // Correction factor used when calculating time distances near window +// Waiting times under nearness circumstances are adjusted by this factor const NEARNESS_CORRECTION_FACTOR float64 = 1.0 // Minimum wait time between status checks @@ -254,7 +255,7 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP } epochLength := topicInfo.EpochLength - minBlocksToCheck := params.NearWindowLength * SUBMISSION_WINDOWS_TO_BE_NEAR_NEW_WINDOW + minBlocksToCheck := params.NearWindowLength * NUM_SUBMISSION_WINDOWS_FOR_SUBMISSION_NEARNESS latestNonceHeightSentTxFor := int64(0) var currentBlockHeight int64 From 72871692900988e6643d23fd782aa8d14968ee69 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Mon, 11 Nov 2024 16:01:58 +0100 Subject: [PATCH 12/24] Retry acct seq mismatch, remove unused loopSeconds, fix printouts and tests --- config.cdk.json.template | 11 +++++++---- config.example.json | 2 -- lib/domain_config.go | 8 +++----- lib/repo_tx_utils.go | 5 ++++- usecase/build_commit_worker_payload_test.go | 3 --- 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/config.cdk.json.template b/config.cdk.json.template index 7b33f84..790389a 100644 --- a/config.cdk.json.template +++ b/config.cdk.json.template @@ -5,16 +5,20 @@ "alloraHomeDir": "_ALLORA_WALLET_HOME_DIR_", "gas": "_ALLORA_WALLET_GAS_", "gasAdjustment": _ALLORA_WALLET_GAS_ADJUSTMENT_, + "gasPrices": _ALLORA_WALLET_GAS_PRICES_, + "maxFees": _ALLORA_WALLET_MAX_FEES_, "nodeRpc": "_ALLORA_WALLET_NODE_RPC_", "maxRetries": _ALLORA_WALLET_MAX_RETRIES_, - "delay": _ALLORA_WALLET_DELAY_, - "submitTx": _ALLORA_WALLET_SUBMIT_TX_ + "retryDelay": _ALLORA_WALLET_RETRY_DELAY_, + "accountSequenceRetryDelay": _ALLORA_WALLET_ACCOUNT_SEQUENCE_RETRY_DELAY_, + "submitTx": _ALLORA_WALLET_SUBMIT_TX_, + "blockDurationEstimated": _ALLORA_WALLET_BLOCK_DURATION_ESTIMATED_, + "windowCorrectionFactor": _ALLORA_WALLET_WINDOW_CORRECTION_FACTOR_ }, "worker": [ { "topicId": _ALLORA_WORKER_TOPIC_ID_, "inferenceEntrypointName": "_ALLORA_WORKER_INFERENCE_ENTRYPOINT_NAME_", - "loopSeconds": _ALLORA_WORKER_LOOP_SECONDS_, "parameters": { "InferenceEndpoint": "_ALLORA_WORKER_INFERENCE_ENDPOINT_", "Token": "_ALLORA_WORKER_TOKEN_" @@ -26,7 +30,6 @@ "topicId": _ALLORA_REPUTER_TOPIC_ID_, "groundTruthEntrypointName": "_ALLORA_REPUTER_ENTRYPOINT_NAME_", "lossFunctionEntrypointName": "_ALLORA_REPUTER_ENTRYPOINT_NAME_", - "loopSeconds": _ALLORA_REPUTER_LOOP_SECONDS_, "minStake": _ALLORA_REPUTER_MIN_STAKE_, "groundTruthParameters": { "GroundTruthEndpoint": "_ALLORA_REPUTER_SOURCE_OF_TRUTH_ENDPOINT_", diff --git a/config.example.json b/config.example.json index a076502..901ed9e 100644 --- a/config.example.json +++ b/config.example.json @@ -19,7 +19,6 @@ { "topicId": 1, "inferenceEntrypointName": "api-worker-reputer", - "loopSeconds": 10, "parameters": { "InferenceEndpoint": "http://source:8000/inference/{Token}", "Token": "ETH" @@ -31,7 +30,6 @@ "topicId": 1, "groundTruthEntrypointName": "api-worker-reputer", "lossFunctionEntrypointName": "api-worker-reputer", - "loopSeconds": 30, "minStake": 100000, "groundTruthParameters": { "GroundTruthEndpoint": "http://localhost:8888/gt/{Token}/{BlockHeight}", diff --git a/lib/domain_config.go b/lib/domain_config.go index 5677fef..c709766 100644 --- a/lib/domain_config.go +++ b/lib/domain_config.go @@ -56,8 +56,7 @@ type WorkerConfig struct { InferenceEntrypointName string InferenceEntrypoint AlloraAdapter ForecastEntrypointName string - ForecastEntrypoint AlloraAdapter - LoopSeconds int64 // seconds to wait between attempts to get next worker nonce + ForecastEntrypoint AlloraAdapter // seconds to wait between attempts to get next worker nonce Parameters map[string]string // Map for variable configuration values } @@ -77,7 +76,6 @@ type ReputerConfig struct { // This is idempotent in that it will not add more stake than specified here. // Set to 0 to effectively disable this feature and use whatever stake has already been added. MinStake int64 - LoopSeconds int64 // seconds to wait between attempts to get next reptuer nonces GroundTruthParameters map[string]string // Map for variable configuration values LossFunctionParameters LossFunctionParameters // Map for variable configuration values } @@ -161,10 +159,10 @@ func (c *UserConfig) ValidateWalletConfig() error { return errors.New(fmt.Sprintf("block duration estimated lower than the minimum: %f < %f", c.Wallet.BlockDurationEstimated, BlockDurationEstimatedMin)) } if c.Wallet.RetryDelay < RetryDelayMin { - return errors.New(fmt.Sprintf("retry delay lower than the minimum: %f < %f", c.Wallet.RetryDelay, RetryDelayMin)) + return errors.New(fmt.Sprintf("retry delay lower than the minimum: %d < %d", c.Wallet.RetryDelay, RetryDelayMin)) } if c.Wallet.AccountSequenceRetryDelay < AccountSequenceRetryDelayMin { - return errors.New(fmt.Sprintf("account sequence retry delay lower than the minimum: %f < %f", c.Wallet.AccountSequenceRetryDelay, AccountSequenceRetryDelayMin)) + return errors.New(fmt.Sprintf("account sequence retry delay lower than the minimum: %d < %d", c.Wallet.AccountSequenceRetryDelay, AccountSequenceRetryDelayMin)) } return nil diff --git a/lib/repo_tx_utils.go b/lib/repo_tx_utils.go index 228c68f..9402518 100644 --- a/lib/repo_tx_utils.go +++ b/lib/repo_tx_utils.go @@ -143,8 +143,11 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, log.Info().Uint64("expected", expectedSeqNum).Uint64("current", currentSeqNum).Msg("Retrying resetting sequence from current to expected") txService, err = node.Chain.Client.CreateTxWithOptions(ctx, node.Chain.Account, txOptions, req) if err != nil { - return nil, errorsmod.Wrapf(err, "failed to reset sequence second time, exiting") + log.Error().Err(err).Str("msg", infoMsg).Msg("Failed to reset sequence second time, retrying with regular delay") + time.Sleep(time.Duration(node.Wallet.RetryDelay) * time.Second) + continue } + // if creation is successful, make the expected sequence number persistent globalExpectedSeqNum = expectedSeqNum } else { errorResponse, err := processError(err, infoMsg, retryCount, node) diff --git a/usecase/build_commit_worker_payload_test.go b/usecase/build_commit_worker_payload_test.go index b3b75c7..ae00bfd 100644 --- a/usecase/build_commit_worker_payload_test.go +++ b/usecase/build_commit_worker_payload_test.go @@ -38,7 +38,6 @@ func TestComputeWorkerBundle(t *testing.T) { InferenceEntrypoint: nil, // Will be set in the test ForecastEntrypoint: nil, // Will be set in the test Parameters: workerOptions, - LoopSeconds: 10, }, InfererValue: "9.5", ForecasterValues: []lib.NodeValue{ @@ -80,7 +79,6 @@ func TestComputeWorkerBundle(t *testing.T) { InferenceEntrypoint: nil, ForecastEntrypoint: nil, Parameters: workerOptions, - LoopSeconds: 10, }, InfererValue: "invalid", ForecasterValues: []lib.NodeValue{ @@ -102,7 +100,6 @@ func TestComputeWorkerBundle(t *testing.T) { InferenceEntrypoint: nil, ForecastEntrypoint: nil, Parameters: workerOptions, - LoopSeconds: 10, }, InfererValue: "9.5", ForecasterValues: []lib.NodeValue{ From 3f13f4440ad9cdf9a0210f4c5bd48f3adeb05e71 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Tue, 12 Nov 2024 19:15:29 +0100 Subject: [PATCH 13/24] namings and comments --- usecase/spawn_actor_processes.go | 33 ++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 72ac79e..0c69a9c 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -163,15 +163,16 @@ func calculateTimeDistanceInSeconds(distanceUntilNextEpoch int64, blockDurationA return int64(math.Round(correctedTimeDistance)), nil } -func generateFairOffset(workerSubmissionWindow int64) int64 { +// Generates a random offset within the submission window +func generateFairOffset(submissionWindow int64) int64 { // Ensure the random number generator is seeded source := rand.NewSource(uint64(time.Now().UnixNano())) rng := rand.New(source) // Calculate the center of the window - center := workerSubmissionWindow / 2 + center := submissionWindow / 2 - // Generate a random number between -maxOffset and +maxOffset + // Generate a random number between start and window center offset := rng.Int63n(center + 1) return offset @@ -301,7 +302,7 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP } distanceUntilNextEpoch := epochEnd - currentBlockHeight - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds( + waitingTimeInSeconds, err := calculateTimeDistanceInSeconds( distanceUntilNextEpoch, suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor, @@ -320,11 +321,12 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP Str("actorType", params.ActorType). Int64("currentBlockHeight", currentBlockHeight). Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). - Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Int64("waitingTimeInSeconds", waitingTimeInSeconds). Msg("Waiting until the submission window opens after sending") - suite.Wait(correctedTimeDistanceInSeconds) + suite.Wait(waitingTimeInSeconds) } else if currentBlockHeight > epochEnd { - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds( + // Inconsistent topic data, wait until the next epoch + waitingTimeInSeconds, err := calculateTimeDistanceInSeconds( epochLength, suite.Node.Wallet.BlockDurationEstimated, NEARNESS_CORRECTION_FACTOR, @@ -340,17 +342,16 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP log.Warn(). Uint64("topicId", uint64(params.Config.GetTopicId())). Str("actorType", params.ActorType). - Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Int64("waitingTimeInSeconds", waitingTimeInSeconds). Msg("Current block height is greater than next epoch length, inactive topic? Waiting seconds...") - suite.Wait(correctedTimeDistanceInSeconds) + suite.Wait(waitingTimeInSeconds) } else { distanceUntilNextEpoch := epochEnd - currentBlockHeight - if distanceUntilNextEpoch <= minBlocksToCheck { // Close distance, check more closely until the submission window opens offset := generateFairOffset(params.SubmissionWindowLength) closeBlockDistance := distanceUntilNextEpoch + offset - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds( + waitingTimeInSeconds, err := calculateTimeDistanceInSeconds( closeBlockDistance, suite.Node.Wallet.BlockDurationEstimated, NEARNESS_CORRECTION_FACTOR, @@ -371,12 +372,12 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP Int64("currentBlockHeight", currentBlockHeight). Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). Int64("closeBlockDistance", closeBlockDistance). - Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Int64("waitingTimeInSeconds", waitingTimeInSeconds). Msg("Close to the window, waiting until next submission window") - suite.Wait(correctedTimeDistanceInSeconds) + suite.Wait(waitingTimeInSeconds) } else { // Far distance, bigger waits until the submission window opens - correctedTimeDistanceInSeconds, err := calculateTimeDistanceInSeconds( + waitingTimeInSeconds, err := calculateTimeDistanceInSeconds( distanceUntilNextEpoch, suite.Node.Wallet.BlockDurationEstimated, suite.Node.Wallet.WindowCorrectionFactor, @@ -394,9 +395,9 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP Str("actorType", params.ActorType). Int64("currentBlockHeight", currentBlockHeight). Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). - Int64("correctedTimeDistanceInSeconds", correctedTimeDistanceInSeconds). + Int64("waitingTimeInSeconds", waitingTimeInSeconds). Msg("Waiting until the submission window opens - far distance") - suite.Wait(correctedTimeDistanceInSeconds) + suite.Wait(waitingTimeInSeconds) } } } From 479f9259097dd0a9d9e7bc44c1e844f1e08a6f6b Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 13:47:10 +0100 Subject: [PATCH 14/24] Add README example with visualization --- README.md | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b24ae9..bf7e7ae 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ from the root directory. This will: 5. Run `docker compose up --build`. This will: - Run the both the offchain node and the source services, communicating through endpoints attached to the internal dns -Please note that the environment variable will be created as bumdle of your config.json and allora account secrets, please make sure to remove every sectrets before commiting to remote git repository +Please note that the environment variable will be created as bumdle of your config.json and allora account secrets, please make sure to remove every secrets before commiting to remote git repository ## How to run without docker @@ -69,6 +69,62 @@ Some metrics has been provided for in the node. You can access them with port `: > Please note that we will keep updating the list as more metrics are being added +## Cycle Example + +Visualization of timeline and submission windows for an actor, in this case, a worker. +Reputers have submission windows too, but they are fixed to be 1 full topic's epoch length. + +Example configuration: +* Topic epoch length is 100 blocks +* Topic submission window is 10 blocks +* Near zone is 2 submission windows (20 blocks) + +``` + Epoch N Epoch N+1 +|---------|----------------------------------|-----------------------------------|--------→ +Block: 1000 1100 1200 + ↑ ↑ ↑ + Epoch Start Epoch End Next Epoch End + (& Submission & Next Epoch Start + Window Start) (& Next Submission + Window Start) + +Detailed View of Zones (assuming epoch length = 100 blocks): + +Block 1000 Block 1010 Block 1080 Block 1100 + |-----------|--------------------------|--------------| + |← Far Zone →|← Near Zone →| + |← SW →| +(SW = Submission Window) + +Zone Breakdown (example numbers): +• Epoch Length: 100 blocks +• Submission Window: 100 blocks (coincides with epoch) +• Near Zone: Last 20 blocks (NUM_SUBMISSION_WINDOWS_FOR_SUBMISSION_NEARNESS * WorkerSubmissionWindow) + +------------------------------- +- Full cycle transition points + - Block 1000: Epoch N starts & Submission Window opens + - Block 1010: Submission Window closes. Waiting for next window, typically from far zone. + - Block 1080: Enters Near Zone (more frequent checks) + - Block 1100: Epoch N ends & Epoch N+1 starts + +``` + +### Notes + +- Submissions + - Submissions are accepted within the submission window + - Submission window opens at epoch start +- Waiting Zone Behavior + - The behaviour of the node when waiting for the submission window depends on its nearness to the submission window to reduce likelihood of missing a window. + - Far Zone: Longer intervals between checks, optimized for efficiency + - This is controlled by `blockDurationEstimated` and `windowCorrectionFactor` + - Near Zone: More frequent checks with randomization for fair participation + - Submissions are separated - they must happen within the submission window + + + ## How to configure There are several ways to configure the node. In order of preference, you can do any of these: From 63fe60ae28daa6585f7d9981da86871bb885cb4d Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 14:08:05 +0100 Subject: [PATCH 15/24] fix err checking prior to op --- usecase/build_commit_reputer_payload.go | 3 +-- usecase/build_commit_worker_payload.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/usecase/build_commit_reputer_payload.go b/usecase/build_commit_reputer_payload.go index d7df5c6..eef4b9a 100644 --- a/usecase/build_commit_reputer_payload.go +++ b/usecase/build_commit_reputer_payload.go @@ -210,11 +210,10 @@ func (suite *UseCaseSuite) SignReputerValueBundle(valueBundle *emissionstypes.Va return &emissionstypes.ReputerValueBundle{}, errorsmod.Wrapf(err, "error marshalling valueBundle") } sig, pk, err := suite.Node.Chain.Client.Context().Keyring.Sign(suite.Node.Chain.Account.Name, protoBytesIn, signing.SignMode_SIGN_MODE_DIRECT) - pkStr := hex.EncodeToString(pk.Bytes()) if err != nil { return &emissionstypes.ReputerValueBundle{}, errorsmod.Wrapf(err, "error signing valueBundle") } - + pkStr := hex.EncodeToString(pk.Bytes()) reputerValueBundle := &emissionstypes.ReputerValueBundle{ ValueBundle: valueBundle, Signature: sig, diff --git a/usecase/build_commit_worker_payload.go b/usecase/build_commit_worker_payload.go index 5eb0da5..64efb07 100644 --- a/usecase/build_commit_worker_payload.go +++ b/usecase/build_commit_worker_payload.go @@ -137,10 +137,10 @@ func (suite *UseCaseSuite) SignWorkerPayload(workerPayload *emissionstypes.Infer return &emissionstypes.WorkerDataBundle{}, errorsmod.Wrapf(err, "error marshalling workerPayload") } sig, pk, err := suite.Node.Chain.Client.Context().Keyring.Sign(suite.Node.Chain.Account.Name, protoBytesIn, signing.SignMode_SIGN_MODE_DIRECT) - pkStr := hex.EncodeToString(pk.Bytes()) if err != nil { return &emissionstypes.WorkerDataBundle{}, errorsmod.Wrapf(err, "error signing the InferenceForecastsBundle message") } + pkStr := hex.EncodeToString(pk.Bytes()) // Create workerDataBundle with signature workerDataBundle := &emissionstypes.WorkerDataBundle{ Worker: suite.Node.Wallet.Address, From c4e585c18f822df2f5e32809cad8f6b9a212ce41 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 18:05:57 +0100 Subject: [PATCH 16/24] add trace to the loglevels stack --- logger.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/logger.go b/logger.go index 2532eaa..ee3cb75 100644 --- a/logger.go +++ b/logger.go @@ -30,6 +30,8 @@ func initLogger() { logLevel := strings.ToLower(os.Getenv("LOG_LEVEL")) switch logLevel { + case "trace": + zerolog.SetGlobalLevel(zerolog.TraceLevel) case "debug": zerolog.SetGlobalLevel(zerolog.DebugLevel) case "info": From 946a452766d6956277610083885e15834b00b3c4 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 18:10:35 +0100 Subject: [PATCH 17/24] Use retrials in all queries, fault-tol reg/stake process, minor fixes, logging --- lib/repo_query_balance.go | 21 ++++++++-- lib/repo_query_block.go | 31 +++++++-------- lib/repo_query_nonce.go | 36 +++++++++++++---- lib/repo_query_registration.go | 66 ++++++++++++++++++-------------- lib/repo_query_stake.go | 21 ++++++++-- lib/repo_query_topic.go | 19 +++++++-- lib/repo_query_utils.go | 4 +- lib/repo_tx_registration.go | 36 ++++++++++++++++- lib/repo_tx_utils.go | 10 ++--- usecase/spawn_actor_processes.go | 55 ++++++++------------------ 10 files changed, 189 insertions(+), 110 deletions(-) diff --git a/lib/repo_query_balance.go b/lib/repo_query_balance.go index eddd5e5..c660e78 100644 --- a/lib/repo_query_balance.go +++ b/lib/repo_query_balance.go @@ -2,19 +2,32 @@ package lib import ( "context" + "time" cosmossdk_io_math "cosmossdk.io/math" + "github.com/cosmos/cosmos-sdk/types/query" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" ) func (node *NodeConfig) GetBalance() (cosmossdk_io_math.Int, error) { ctx := context.Background() - resp, err := node.Chain.BankQueryClient.Balance(ctx, &banktypes.QueryBalanceRequest{ - Address: node.Chain.Address, - Denom: node.Chain.DefaultBondDenom, - }) + + resp, err := QueryDataWithRetry( + ctx, + node.Wallet.MaxRetries, + time.Duration(node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*banktypes.QueryBalanceResponse, error) { + return node.Chain.BankQueryClient.Balance(ctx, &banktypes.QueryBalanceRequest{ + Address: node.Chain.Address, + Denom: node.Chain.DefaultBondDenom, + }) + }, + query.PageRequest{}, + "get balance", + ) if err != nil { return cosmossdk_io_math.Int{}, err } + return resp.Balance.Amount, nil } diff --git a/lib/repo_query_block.go b/lib/repo_query_block.go index b20c856..5a2de01 100644 --- a/lib/repo_query_block.go +++ b/lib/repo_query_block.go @@ -2,30 +2,31 @@ package lib import ( "context" - "encoding/json" + "time" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" - "github.com/rs/zerolog/log" + "github.com/cosmos/cosmos-sdk/types/query" ) func (node *NodeConfig) GetReputerValuesAtBlock(topicId emissionstypes.TopicId, nonce BlockHeight) (*emissionstypes.ValueBundle, error) { ctx := context.Background() - req := &emissionstypes.GetNetworkInferencesAtBlockRequest{ - TopicId: topicId, - BlockHeightLastInference: nonce, - } - reqJSON, err := json.Marshal(req) - if err != nil { - log.Error().Err(err).Msg("Error marshaling GetNetworkInferencesAtBlockRequest to print Msg as JSON") - } else { - log.Info().Str("req", string(reqJSON)).Msg("Getting GetNetworkInferencesAtBlockRequest from chain") - } - - res, err := node.Chain.EmissionsQueryClient.GetNetworkInferencesAtBlock(ctx, req) + resp, err := QueryDataWithRetry( + ctx, + node.Wallet.MaxRetries, + time.Duration(node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetNetworkInferencesAtBlockResponse, error) { + return node.Chain.EmissionsQueryClient.GetNetworkInferencesAtBlock(ctx, &emissionstypes.GetNetworkInferencesAtBlockRequest{ + TopicId: topicId, + BlockHeightLastInference: nonce, + }) + }, + query.PageRequest{}, + "get reputer values at block", + ) if err != nil { return &emissionstypes.ValueBundle{}, err } - return res.NetworkInferences, nil + return resp.NetworkInferences, nil } diff --git a/lib/repo_query_nonce.go b/lib/repo_query_nonce.go index 46b0d03..0be011b 100644 --- a/lib/repo_query_nonce.go +++ b/lib/repo_query_nonce.go @@ -2,42 +2,62 @@ package lib import ( "context" + "time" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" + "github.com/cosmos/cosmos-sdk/types/query" ) +// Gets the latest open worker nonce for a given topic, with retries func (node *NodeConfig) GetLatestOpenWorkerNonceByTopicId(topicId emissionstypes.TopicId) (*emissionstypes.Nonce, error) { ctx := context.Background() - res, err := node.Chain.EmissionsQueryClient.GetUnfulfilledWorkerNonces( + resp, err := QueryDataWithRetry( ctx, - &emissionstypes.GetUnfulfilledWorkerNoncesRequest{TopicId: topicId}, + node.Wallet.MaxRetries, + time.Duration(node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetUnfulfilledWorkerNoncesResponse, error) { + return node.Chain.EmissionsQueryClient.GetUnfulfilledWorkerNonces(ctx, &emissionstypes.GetUnfulfilledWorkerNoncesRequest{ + TopicId: topicId, + }) + }, + query.PageRequest{}, + "get open worker nonce", ) if err != nil { return &emissionstypes.Nonce{}, err } - if len(res.Nonces.Nonces) == 0 { + if len(resp.Nonces.Nonces) == 0 { return &emissionstypes.Nonce{}, nil } // Per `AddWorkerNonce()` in `allora-chain/x/emissions/keeper.go`, the latest nonce is first - return res.Nonces.Nonces[0], nil + return resp.Nonces.Nonces[0], nil } +// Gets the oldest open reputer nonce for a given topic, with retries func (node *NodeConfig) GetOldestReputerNonceByTopicId(topicId emissionstypes.TopicId) (*emissionstypes.Nonce, error) { ctx := context.Background() - res, err := node.Chain.EmissionsQueryClient.GetUnfulfilledReputerNonces( + resp, err := QueryDataWithRetry( ctx, - &emissionstypes.GetUnfulfilledReputerNoncesRequest{TopicId: topicId}, + node.Wallet.MaxRetries, + time.Duration(node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetUnfulfilledReputerNoncesResponse, error) { + return node.Chain.EmissionsQueryClient.GetUnfulfilledReputerNonces(ctx, &emissionstypes.GetUnfulfilledReputerNoncesRequest{ + TopicId: topicId, + }) + }, + query.PageRequest{}, + "get open reputer nonce", ) if err != nil { return &emissionstypes.Nonce{}, err } - if len(res.Nonces.Nonces) == 0 { + if len(resp.Nonces.Nonces) == 0 { return &emissionstypes.Nonce{}, nil } // Per `AddWorkerNonce()` in `allora-chain/x/emissions/keeper.go`, the oldest nonce is last - return res.Nonces.Nonces[len(res.Nonces.Nonces)-1].ReputerNonce, nil + return resp.Nonces.Nonces[len(resp.Nonces.Nonces)-1].ReputerNonce, nil } diff --git a/lib/repo_query_registration.go b/lib/repo_query_registration.go index fb5ed45..a9e2998 100644 --- a/lib/repo_query_registration.go +++ b/lib/repo_query_registration.go @@ -3,54 +3,64 @@ package lib import ( "context" "errors" + "time" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" + "github.com/cosmos/cosmos-sdk/types/query" ) +// Checks if the worker is registered in a topic, with retries func (node *NodeConfig) IsWorkerRegistered(topicId uint64) (bool, error) { - ctx := context.Background() - - var ( - res *emissionstypes.IsWorkerRegisteredInTopicIdResponse - err error - ) - - if node.Worker != nil { - res, err = node.Chain.EmissionsQueryClient.IsWorkerRegisteredInTopicId(ctx, &emissionstypes.IsWorkerRegisteredInTopicIdRequest{ - TopicId: topicId, - Address: node.Wallet.Address, - }) - } else { + if node.Worker == nil { return false, errors.New("no worker to register") } + ctx := context.Background() + + resp, err := QueryDataWithRetry( + ctx, + node.Wallet.MaxRetries, + time.Duration(node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.IsWorkerRegisteredInTopicIdResponse, error) { + return node.Chain.EmissionsQueryClient.IsWorkerRegisteredInTopicId(ctx, &emissionstypes.IsWorkerRegisteredInTopicIdRequest{ + TopicId: topicId, + Address: node.Wallet.Address, + }) + }, + query.PageRequest{}, + "is worker registered in topic", + ) if err != nil { return false, err } - return res.IsRegistered, nil + return resp.IsRegistered, nil } +// Checks if the reputer is registered in a topic, with retries func (node *NodeConfig) IsReputerRegistered(topicId uint64) (bool, error) { - ctx := context.Background() - - var ( - res *emissionstypes.IsReputerRegisteredInTopicIdResponse - err error - ) - - if node.Reputer != nil { - res, err = node.Chain.EmissionsQueryClient.IsReputerRegisteredInTopicId(ctx, &emissionstypes.IsReputerRegisteredInTopicIdRequest{ - TopicId: topicId, - Address: node.Wallet.Address, - }) - } else { + if node.Reputer == nil { return false, errors.New("no reputer to register") } + ctx := context.Background() + + resp, err := QueryDataWithRetry( + ctx, + node.Wallet.MaxRetries, + time.Duration(node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.IsReputerRegisteredInTopicIdResponse, error) { + return node.Chain.EmissionsQueryClient.IsReputerRegisteredInTopicId(ctx, &emissionstypes.IsReputerRegisteredInTopicIdRequest{ + TopicId: topicId, + Address: node.Wallet.Address, + }) + }, + query.PageRequest{}, + "is reputer registered in topic", + ) if err != nil { return false, err } - return res.IsRegistered, nil + return resp.IsRegistered, nil } diff --git a/lib/repo_query_stake.go b/lib/repo_query_stake.go index 712253d..b6857cf 100644 --- a/lib/repo_query_stake.go +++ b/lib/repo_query_stake.go @@ -2,20 +2,33 @@ package lib import ( "context" + "time" cosmossdk_io_math "cosmossdk.io/math" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" + "github.com/cosmos/cosmos-sdk/types/query" ) +// Gets the stake from a reputer in a given topic, with retries func (node *NodeConfig) GetReputerStakeInTopic( topicId emissionstypes.TopicId, reputer Address, ) (cosmossdk_io_math.Int, error) { ctx := context.Background() - resp, err := node.Chain.EmissionsQueryClient.GetStakeFromReputerInTopicInSelf(ctx, &emissionstypes.GetStakeFromReputerInTopicInSelfRequest{ - ReputerAddress: reputer, - TopicId: topicId, - }) + + resp, err := QueryDataWithRetry( + ctx, + node.Wallet.MaxRetries, + time.Duration(node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetStakeFromReputerInTopicInSelfResponse, error) { + return node.Chain.EmissionsQueryClient.GetStakeFromReputerInTopicInSelf(ctx, &emissionstypes.GetStakeFromReputerInTopicInSelfRequest{ + ReputerAddress: reputer, + TopicId: topicId, + }) + }, + query.PageRequest{}, + "get reputer stake in topic", + ) if err != nil { return cosmossdk_io_math.Int{}, err } diff --git a/lib/repo_query_topic.go b/lib/repo_query_topic.go index a7dbff2..434a6a1 100644 --- a/lib/repo_query_topic.go +++ b/lib/repo_query_topic.go @@ -3,23 +3,34 @@ package lib import ( "context" "errors" + "time" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" + "github.com/cosmos/cosmos-sdk/types/query" ) +// Gets topic info for a given topic ID, with retries func (node *NodeConfig) GetTopicInfo(topicId emissionstypes.TopicId) (*emissionstypes.Topic, error) { ctx := context.Background() - res, err := node.Chain.EmissionsQueryClient.GetTopic( + resp, err := QueryDataWithRetry( ctx, - &emissionstypes.GetTopicRequest{TopicId: topicId}, + node.Wallet.MaxRetries, + time.Duration(node.Wallet.RetryDelay)*time.Second, + func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetTopicResponse, error) { + return node.Chain.EmissionsQueryClient.GetTopic(ctx, &emissionstypes.GetTopicRequest{ + TopicId: topicId, + }) + }, + query.PageRequest{}, + "get topic info", ) if err != nil { return nil, err } - if res.Topic == nil { + if resp.Topic == nil { return nil, errors.New("Topic not found") } - return res.Topic, nil + return resp.Topic, nil } diff --git a/lib/repo_query_utils.go b/lib/repo_query_utils.go index 9562646..4527c14 100644 --- a/lib/repo_query_utils.go +++ b/lib/repo_query_utils.go @@ -17,18 +17,20 @@ func QueryDataWithRetry[T any]( delay time.Duration, queryFunc func(context.Context, query.PageRequest) (T, error), req query.PageRequest, + infoMsg string, ) (T, error) { var result T var err error for retryCount := int64(0); retryCount <= maxRetries; retryCount++ { + log.Trace().Msgf("QueryDataWithRetry iteration started (%d/%d): %s", retryCount, maxRetries, infoMsg) result, err = queryFunc(ctx, req) if err == nil { return result, nil } // Log the error for each retry. - log.Error().Err(err).Msgf("Query failed, retrying... (Retry %d/%d)", retryCount, maxRetries) + log.Error().Err(err).Msgf("Query failed, retrying... (Retry %d/%d): %s", retryCount, maxRetries, infoMsg) // Wait for the uniform delay before retrying time.Sleep(delay) diff --git a/lib/repo_tx_registration.go b/lib/repo_tx_registration.go index ef7daa4..9941995 100644 --- a/lib/repo_tx_registration.go +++ b/lib/repo_tx_registration.go @@ -17,10 +17,13 @@ func (node *NodeConfig) RegisterWorkerIdempotently(config WorkerConfig) bool { isRegistered, err := node.IsWorkerRegistered(config.TopicId) if err != nil { log.Error().Err(err).Msg("Could not check if the node is already registered for topic as worker, skipping") + return false } if isRegistered { log.Info().Uint64("topicId", config.TopicId).Msg("Worker node already registered for topic") return true + } else { + log.Info().Uint64("topicId", config.TopicId).Msg("Worker node not yet registered for topic. Attempting registration...") } moduleParams, err := node.Chain.EmissionsQueryClient.GetParams(ctx, &emissionstypes.GetParamsRequest{}) @@ -55,7 +58,13 @@ func (node *NodeConfig) RegisterWorkerIdempotently(config WorkerConfig) bool { return false } - return true + isRegistered, err = node.IsWorkerRegistered(config.TopicId) + if err != nil { + log.Error().Err(err).Msg("Could not check if the node is already registered for topic as worker, skipping") + return false + } + + return isRegistered } // True if the actor is ultimately, definitively registered for the specified topic with at least config.MinStake placed on topic, else False @@ -67,6 +76,7 @@ func (node *NodeConfig) RegisterAndStakeReputerIdempotently(config ReputerConfig isRegistered, err := node.IsReputerRegistered(config.TopicId) if err != nil { log.Error().Err(err).Msg("Could not check if the node is already registered for topic as reputer, skipping") + return false } if isRegistered { @@ -105,7 +115,15 @@ func (node *NodeConfig) RegisterAndStakeReputerIdempotently(config ReputerConfig return false } - log.Info().Uint64("topicId", config.TopicId).Msg("Reputer node registered") + isRegistered, err = node.IsReputerRegistered(config.TopicId) + if err != nil { + log.Error().Err(err).Msg("Could not check if the node is already registered for topic as reputer, skipping") + return false + } + if !isRegistered { + log.Error().Uint64("topicId", config.TopicId).Msg("Reputer node not registered after all retries") + return false + } } stake, err := node.GetReputerStakeInTopic(config.TopicId, node.Chain.Address) @@ -113,10 +131,13 @@ func (node *NodeConfig) RegisterAndStakeReputerIdempotently(config ReputerConfig log.Error().Err(err).Msg("Could not check if the reputer node has enough balance to stake, skipping") return false } + minStake := cosmossdk_io_math.NewInt(config.MinStake) if minStake.LTE(stake) { log.Info().Msg("Reputer stake above minimum requested stake, skipping adding stake.") return true + } else { + log.Info().Interface("stake", stake).Interface("minStake", minStake).Interface("stakeToAdd", minStake.Sub(stake)).Msg("Reputer stake below minimum requested stake, adding stake.") } msgAddStake := &emissionstypes.AddStakeRequest{ @@ -133,5 +154,16 @@ func (node *NodeConfig) RegisterAndStakeReputerIdempotently(config ReputerConfig log.Error().Err(err).Uint64("topic", config.TopicId).Str("txHash", txHash).Msg("Could not stake the reputer node with the Allora blockchain in specified topic") return false } + + stake, err = node.GetReputerStakeInTopic(config.TopicId, node.Chain.Address) + if err != nil { + log.Error().Err(err).Msg("Could not check if the reputer node has enough balance to stake, skipping") + return false + } + if stake.LT(minStake) { + log.Error().Interface("stake", stake).Interface("minStake", minStake).Msg("Reputer stake below minimum requested stake, skipping.") + return false + } + return true } diff --git a/lib/repo_tx_utils.go b/lib/repo_tx_utils.go index 9402518..001ac23 100644 --- a/lib/repo_tx_utils.go +++ b/lib/repo_tx_utils.go @@ -68,7 +68,6 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig) time.Sleep(time.Duration(node.Wallet.AccountSequenceRetryDelay) * time.Second) return ERROR_PROCESSING_CONTINUE, nil case int(sdkerrors.ErrInsufficientFee.ABCICode()): - log.Warn().Str("msg", infoMsg).Msg("Insufficient fee") return ERROR_PROCESSING_FEES, nil case int(sdkerrors.ErrTxTooLarge.ABCICode()): return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "tx too large") @@ -81,7 +80,7 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig) } } } else { - log.Error().Str("msg", infoMsg).Msg("Unmatched error format, cannot classify as ABCI error") + log.Warn().Str("msg", infoMsg).Msg("Unmatched error format, cannot classify as ABCI error") } } @@ -120,8 +119,8 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, log.Debug().Msgf("SendDataWithRetry iteration started (%d/%d)", retryCount, node.Wallet.MaxRetries) // Create tx without fees txOptions := cosmosclient.TxOptions{} - if globalExpectedSeqNum > 0 { - log.Info(). + if globalExpectedSeqNum > 0 && node.Chain.Client.TxFactory.Sequence() != globalExpectedSeqNum { + log.Debug(). Uint64("expected", globalExpectedSeqNum). Uint64("current", node.Chain.Client.TxFactory.Sequence()). Msg("Resetting sequence to expected from previous sequence errors") @@ -198,6 +197,7 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, } } + log.Trace().Msg("Creation of tx successful, broadcasting tx") // Broadcast tx txResponse, err := txService.Broadcast(ctx) if err == nil { @@ -222,7 +222,7 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, continue case ERROR_PROCESSING_FEES: // Error has not been handled, just mark as recalculate fees on this iteration - log.Debug().Msg("Marking fee recalculation on tx broadcasting") + log.Info().Msg("Insufficient fees, marking fee recalculation on tx broadcasting for retrial") recalculateFees += 1 continue default: diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 0c69a9c..3d55198 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -10,7 +10,6 @@ import ( errorsmod "cosmossdk.io/errors" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" - "github.com/cosmos/cosmos-sdk/types/query" "github.com/rs/zerolog/log" "golang.org/x/exp/rand" ) @@ -83,15 +82,7 @@ func (suite *UseCaseSuite) Spawn() { // Attempts to build and commit a worker payload for a given nonce func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestNonceHeightActedUpon int64) (int64, error) { - latestOpenWorkerNonce, err := lib.QueryDataWithRetry( - context.Background(), - suite.Node.Wallet.MaxRetries, - time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, - func(ctx context.Context, req query.PageRequest) (*emissionstypes.Nonce, error) { - return suite.Node.GetLatestOpenWorkerNonceByTopicId(worker.TopicId) - }, - query.PageRequest{}, // Empty page request as GetLatestOpenWorkerNonceByTopicId doesn't use pagination - ) + latestOpenWorkerNonce, err := suite.Node.GetLatestOpenWorkerNonceByTopicId(worker.TopicId) if err != nil { log.Warn().Err(err).Uint64("topicId", worker.TopicId).Msg("Error getting latest open worker nonce on topic - node availability issue?") @@ -119,15 +110,7 @@ func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestN } func (suite *UseCaseSuite) processReputerPayload(reputer lib.ReputerConfig, latestNonceHeightActedUpon int64) (int64, error) { - nonce, err := lib.QueryDataWithRetry( - context.Background(), - suite.Node.Wallet.MaxRetries, - time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, - func(ctx context.Context, req query.PageRequest) (*emissionstypes.Nonce, error) { - return suite.Node.GetOldestReputerNonceByTopicId(reputer.TopicId) - }, - query.PageRequest{}, // Empty page request as GetOldestReputerNonceByTopicId doesn't use pagination - ) + nonce, err := suite.Node.GetOldestReputerNonceByTopicId(reputer.TopicId) if err != nil { log.Warn().Err(err).Uint64("topicId", reputer.TopicId).Msg("Error getting latest open reputer nonce on topic - node availability issue?") @@ -163,7 +146,7 @@ func calculateTimeDistanceInSeconds(distanceUntilNextEpoch int64, blockDurationA return int64(math.Round(correctedTimeDistance)), nil } -// Generates a random offset within the submission window +// Generates a conservative random offset within the submission window func generateFairOffset(submissionWindow int64) int64 { // Ensure the random number generator is seeded source := rand.NewSource(uint64(time.Now().UnixNano())) @@ -184,13 +167,13 @@ func (suite *UseCaseSuite) runWorkerProcess(worker lib.WorkerConfig) { // Handle registration registered := suite.Node.RegisterWorkerIdempotently(worker) if !registered { - log.Error().Uint64("topicId", worker.TopicId).Msg("Failed to register worker for topic") + log.Fatal().Uint64("topicId", worker.TopicId).Msg("Failed to register worker for topic, exiting") return } log.Debug().Uint64("topicId", worker.TopicId).Msg("Worker registered") // Using the helper function - topicInfo, err := queryTopicInfo(suite, worker, "worker") + topicInfo, err := queryTopicInfo(suite, worker, "worker", "topic info: worker") if err != nil { log.Error().Err(err).Uint64("topicId", worker.TopicId).Msg("Failed to get topic info for worker") return @@ -214,13 +197,13 @@ func (suite *UseCaseSuite) runReputerProcess(reputer lib.ReputerConfig) { // Handle registration and staking registeredAndStaked := suite.Node.RegisterAndStakeReputerIdempotently(reputer) if !registeredAndStaked { - log.Error().Uint64("topicId", reputer.TopicId).Msg("Failed to register or sufficiently stake reputer for topic") + log.Fatal().Uint64("topicId", reputer.TopicId).Msg("Failed to register or sufficiently stake reputer for topic") return } log.Debug().Uint64("topicId", reputer.TopicId).Msg("Reputer registered and staked") // Using the helper function - topicInfo, err := queryTopicInfo(suite, reputer, "reputer") + topicInfo, err := queryTopicInfo(suite, reputer, "reputer", "topic info: reputer") if err != nil { log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to get topic info for reputer") return @@ -245,7 +228,7 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP Str("actorType", params.ActorType). Msg("Running actor process for topic") - topicInfo, err := queryTopicInfo(suite, params.Config, params.ActorType) + topicInfo, err := queryTopicInfo(suite, params.Config, params.ActorType, "topic info: actor process") if err != nil { log.Error(). Err(err). @@ -261,7 +244,7 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP var currentBlockHeight int64 for { - log.Debug().Msg("Start iteration, querying latest block") + log.Trace().Msg("Start iteration, querying latest block") // Query the latest block status, err := suite.Node.Chain.Client.Status(context.Background()) if err != nil { @@ -271,7 +254,7 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP } currentBlockHeight = status.SyncInfo.LatestBlockHeight - topicInfo, err := queryTopicInfo(suite, params.Config, params.ActorType) + topicInfo, err := queryTopicInfo(suite, params.Config, params.ActorType, "topic info: actor process") if err != nil { log.Error(). Err(err). @@ -316,7 +299,7 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP return } - log.Debug(). + log.Info(). Uint64("topicId", uint64(params.Config.GetTopicId())). Str("actorType", params.ActorType). Int64("currentBlockHeight", currentBlockHeight). @@ -364,7 +347,7 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP Msg("Error calculating close distance to epochLength") return } - log.Debug(). + log.Info(). Uint64("topicId", uint64(params.Config.GetTopicId())). Str("actorType", params.ActorType). Int64("SubmissionWindowLength", params.SubmissionWindowLength). @@ -390,7 +373,7 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP Msg("Error calculating far distance to epochLength") return } - log.Debug(). + log.Info(). Uint64("topicId", uint64(params.Config.GetTopicId())). Str("actorType", params.ActorType). Int64("currentBlockHeight", currentBlockHeight). @@ -404,20 +387,14 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP } // Queries the topic info for a given actor type and wallet params from suite +// Wrapper over NodeConfig.GetTopicInfo() with generic config type func queryTopicInfo[T lib.TopicActor]( suite *UseCaseSuite, config T, actorType string, + infoMsg string, ) (*emissionstypes.Topic, error) { - topicInfo, err := lib.QueryDataWithRetry( - context.Background(), - suite.Node.Wallet.MaxRetries, - time.Duration(suite.Node.Wallet.RetryDelay)*time.Second, - func(ctx context.Context, req query.PageRequest) (*emissionstypes.Topic, error) { - return suite.Node.GetTopicInfo(config.GetTopicId()) - }, - query.PageRequest{}, - ) + topicInfo, err := suite.Node.GetTopicInfo(config.GetTopicId()) if err != nil { return nil, errorsmod.Wrapf(err, "failed to get topic info") } From 1de9f35b648dd11471214bba241cd19096db0798 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 18:20:21 +0100 Subject: [PATCH 18/24] Changelog update --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15f0297..8969521 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,7 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## v0.6.0 ### Added -* [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of submission windows + persistent error management +* [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of submission windows + persistent error management + query retrials + reg/stake robustness + improved logging ### Removed From 02c447b51b4ae85e5599a8179969f44c4decd446 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 20:37:42 +0100 Subject: [PATCH 19/24] Set TimeoutHeight on worker/reputer tx --- lib/repo_tx_registration.go | 6 ++--- lib/repo_tx_utils.go | 31 ++++++++++++++++++++----- usecase/build_commit_reputer_payload.go | 4 ++-- usecase/build_commit_worker_payload.go | 4 ++-- usecase/spawn_actor_processes.go | 13 ++++++----- 5 files changed, 39 insertions(+), 19 deletions(-) diff --git a/lib/repo_tx_registration.go b/lib/repo_tx_registration.go index 9941995..c64af81 100644 --- a/lib/repo_tx_registration.go +++ b/lib/repo_tx_registration.go @@ -48,7 +48,7 @@ func (node *NodeConfig) RegisterWorkerIdempotently(config WorkerConfig) bool { Owner: node.Chain.Address, IsReputer: false, } - res, err := node.SendDataWithRetry(ctx, msg, "Register worker node") + res, err := node.SendDataWithRetry(ctx, msg, "Register worker node", 0) if err != nil { txHash := "" if res != nil { @@ -105,7 +105,7 @@ func (node *NodeConfig) RegisterAndStakeReputerIdempotently(config ReputerConfig Owner: node.Chain.Address, IsReputer: true, } - res, err := node.SendDataWithRetry(ctx, msgRegister, "Register reputer node") + res, err := node.SendDataWithRetry(ctx, msgRegister, "Register reputer node", 0) if err != nil { txHash := "" if res != nil { @@ -145,7 +145,7 @@ func (node *NodeConfig) RegisterAndStakeReputerIdempotently(config ReputerConfig Amount: minStake.Sub(stake), TopicId: config.TopicId, } - res, err := node.SendDataWithRetry(ctx, msgAddStake, "Add reputer stake") + res, err := node.SendDataWithRetry(ctx, msgAddStake, "Add reputer stake", 0) if err != nil { txHash := "" if res != nil { diff --git a/lib/repo_tx_utils.go b/lib/repo_tx_utils.go index 001ac23..8742cb6 100644 --- a/lib/repo_tx_utils.go +++ b/lib/repo_tx_utils.go @@ -22,13 +22,21 @@ const ERROR_MESSAGE_DATA_ALREADY_SUBMITTED = "already submitted" const ERROR_MESSAGE_CANNOT_UPDATE_EMA = "cannot update EMA" const ERROR_MESSAGE_WAITING_FOR_NEXT_BLOCK = "waiting for next block" // This means tx is accepted in mempool but not yet included in a block const ERROR_MESSAGE_ACCOUNT_SEQUENCE_MISMATCH = "account sequence mismatch" +const ERROR_MESSAGE_TIMEOUT_HEIGHT = "timeout height" const ERROR_MESSAGE_ABCI_ERROR_CODE_MARKER = "error code:" const EXCESS_CORRECTION_IN_GAS = 20000 +// Error processing types +// - "continue", nil: tx was not successful, but special error type. Handled, ready for retry +// - "ok", nil: tx was successful, error handled and not re-raised +// - "error", error: tx failed, with regular error type +// - "fees": tx failed, because of insufficient fees +// - "failure": tx failed, and should not be retried anymore const ERROR_PROCESSING_CONTINUE = "continue" const ERROR_PROCESSING_OK = "ok" const ERROR_PROCESSING_FEES = "fees" const ERROR_PROCESSING_ERROR = "error" +const ERROR_PROCESSING_FAILURE = "failure" // calculateExponentialBackoffDelay returns a duration based on retry count and base delay func calculateExponentialBackoffDelay(baseDelay int64, retryCount int64) time.Duration { @@ -36,10 +44,6 @@ func calculateExponentialBackoffDelay(baseDelay int64, retryCount int64) time.Du } // processError handles the error messages. -// Returns: -// - "continue", nil: tx was not successful, but special error type. Handled, ready for retry -// - "ok", nil: tx was successful -// - "error", error: tx failed, with regular error type func processError(err error, infoMsg string, retryCount int64, node *NodeConfig) (string, error) { if strings.Contains(err.Error(), ERROR_MESSAGE_ABCI_ERROR_CODE_MARKER) { re := regexp.MustCompile(`error code: '(\d+)'`) @@ -75,6 +79,8 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig) return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "tx already in mempool cache") case int(sdkerrors.ErrInvalidChainID.ABCICode()): return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "invalid chain-id") + case int(sdkerrors.ErrTxTimeoutHeight.ABCICode()): + return ERROR_PROCESSING_FAILURE, errorsmod.Wrapf(err, "tx timeout height") default: log.Info().Int("errorCode", errorCode).Str("msg", infoMsg).Msg("ABCI error, but not special case - regular retry") } @@ -94,11 +100,14 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig) time.Sleep(time.Duration(node.Wallet.AccountSequenceRetryDelay) * time.Second) return ERROR_PROCESSING_CONTINUE, nil } else if strings.Contains(err.Error(), ERROR_MESSAGE_WAITING_FOR_NEXT_BLOCK) { - log.Warn().Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying") + log.Warn().Err(err).Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying") return ERROR_PROCESSING_OK, nil } else if strings.Contains(err.Error(), ERROR_MESSAGE_DATA_ALREADY_SUBMITTED) || strings.Contains(err.Error(), ERROR_MESSAGE_CANNOT_UPDATE_EMA) { log.Warn().Err(err).Str("msg", infoMsg).Msg("Already submitted data for this epoch.") return ERROR_PROCESSING_OK, nil + } else if strings.Contains(err.Error(), ERROR_MESSAGE_TIMEOUT_HEIGHT) { + log.Warn().Err(err).Str("msg", infoMsg).Msg("Tx failed because of timeout height") + return ERROR_PROCESSING_FAILURE, err } return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "failed to process error") @@ -106,7 +115,7 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig) // SendDataWithRetry attempts to send data, handling retries, with fee awareness. // Custom handling for different errors. -func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, infoMsg string) (*cosmosclient.Response, error) { +func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, infoMsg string, timeoutHeight uint64) (*cosmosclient.Response, error) { var txResp *cosmosclient.Response // Excess fees correction factor translated to fees using configured gas prices excessFactorFees := float64(EXCESS_CORRECTION_IN_GAS) * node.Wallet.GasPrices @@ -115,6 +124,12 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, // Use to keep track of expected sequence number between errors globalExpectedSeqNum := uint64(0) + // Create tx options with timeout height if specified + if timeoutHeight > 0 { + log.Debug().Uint64("timeoutHeight", timeoutHeight).Msg("Setting timeout height for tx") + node.Chain.Client.TxFactory = node.Chain.Client.TxFactory.WithTimeoutHeight(timeoutHeight) + } + for retryCount := int64(0); retryCount <= node.Wallet.MaxRetries; retryCount++ { log.Debug().Msgf("SendDataWithRetry iteration started (%d/%d)", retryCount, node.Wallet.MaxRetries) // Create tx without fees @@ -168,6 +183,8 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, // Error has not been handled, just mark as recalculate fees on this iteration log.Debug().Msg("Marking fee recalculation on tx creation") recalculateFees += 1 + case ERROR_PROCESSING_FAILURE: + return nil, errorsmod.Wrapf(err, "tx failed and not retried") default: return nil, errorsmod.Wrapf(err, "failed to process error") } @@ -225,6 +242,8 @@ func (node *NodeConfig) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, log.Info().Msg("Insufficient fees, marking fee recalculation on tx broadcasting for retrial") recalculateFees += 1 continue + case ERROR_PROCESSING_FAILURE: + return nil, errorsmod.Wrapf(err, "tx failed and not retried") default: return nil, errorsmod.Wrapf(err, "failed to process error") } diff --git a/usecase/build_commit_reputer_payload.go b/usecase/build_commit_reputer_payload.go index eef4b9a..19108a1 100644 --- a/usecase/build_commit_reputer_payload.go +++ b/usecase/build_commit_reputer_payload.go @@ -18,7 +18,7 @@ import ( // Get the reputer's values at the block from the chain // Compute loss bundle with the reputer provided Loss function and ground truth // sign and commit to chain -func (suite *UseCaseSuite) BuildCommitReputerPayload(reputer lib.ReputerConfig, nonce lib.BlockHeight) error { +func (suite *UseCaseSuite) BuildCommitReputerPayload(reputer lib.ReputerConfig, nonce lib.BlockHeight, timeoutHeight uint64) error { ctx := context.Background() valueBundle, err := suite.Node.GetReputerValuesAtBlock(reputer.TopicId, nonce) @@ -62,7 +62,7 @@ func (suite *UseCaseSuite) BuildCommitReputerPayload(reputer lib.ReputerConfig, log.Debug().Uint64("topicId", reputer.TopicId).Msgf("Sending InsertReputerPayload to chain %s", string(reqJSON)) } if suite.Node.Wallet.SubmitTx { - _, err = suite.Node.SendDataWithRetry(ctx, req, "Send Reputer Data to chain") + _, err = suite.Node.SendDataWithRetry(ctx, req, "Send Reputer Data to chain", timeoutHeight) if err != nil { return errorsmod.Wrapf(err, "error sending Reputer Data to chain, topic: %d, blockHeight: %d", reputer.TopicId, nonce) } diff --git a/usecase/build_commit_worker_payload.go b/usecase/build_commit_worker_payload.go index 64efb07..dc95510 100644 --- a/usecase/build_commit_worker_payload.go +++ b/usecase/build_commit_worker_payload.go @@ -15,7 +15,7 @@ import ( "github.com/cosmos/cosmos-sdk/types/tx/signing" ) -func (suite *UseCaseSuite) BuildCommitWorkerPayload(worker lib.WorkerConfig, nonce *emissionstypes.Nonce) error { +func (suite *UseCaseSuite) BuildCommitWorkerPayload(worker lib.WorkerConfig, nonce *emissionstypes.Nonce, timeoutHeight uint64) error { ctx := context.Background() if worker.InferenceEntrypoint == nil && worker.ForecastEntrypoint == nil { @@ -74,7 +74,7 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(worker lib.WorkerConfig, non } if suite.Node.Wallet.SubmitTx { - _, err = suite.Node.SendDataWithRetry(ctx, req, "Send Worker Data to chain") + _, err = suite.Node.SendDataWithRetry(ctx, req, "Send Worker Data to chain", timeoutHeight) if err != nil { return errorsmod.Wrapf(err, "Error sending Worker Data to chain, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight) } diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 3d55198..3aa5465 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -30,7 +30,7 @@ type ActorProcessParams[T lib.TopicActor] struct { // Configuration for the actor (Worker or Reputer) Config T // Function to process payloads (processWorkerPayload or processReputerPayload) - ProcessPayload func(T, int64) (int64, error) + ProcessPayload func(T, int64, uint64) (int64, error) // Function to get nonces (GetLatestOpenWorkerNonceByTopicId or GetOldestReputerNonceByTopicId) GetNonce func(emissionstypes.TopicId) (*emissionstypes.Nonce, error) // Window length used to determine when we're near submission time @@ -81,7 +81,7 @@ func (suite *UseCaseSuite) Spawn() { } // Attempts to build and commit a worker payload for a given nonce -func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestNonceHeightActedUpon int64) (int64, error) { +func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestNonceHeightActedUpon int64, timeoutHeight uint64) (int64, error) { latestOpenWorkerNonce, err := suite.Node.GetLatestOpenWorkerNonceByTopicId(worker.TopicId) if err != nil { @@ -93,7 +93,7 @@ func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestN log.Debug().Uint64("topicId", worker.TopicId).Int64("BlockHeight", latestOpenWorkerNonce.BlockHeight). Msg("Building and committing worker payload for topic") - err := suite.BuildCommitWorkerPayload(worker, latestOpenWorkerNonce) + err := suite.BuildCommitWorkerPayload(worker, latestOpenWorkerNonce, timeoutHeight) if err != nil { return latestNonceHeightActedUpon, errorsmod.Wrapf(err, "error building and committing worker payload for topic") } @@ -109,7 +109,7 @@ func (suite *UseCaseSuite) processWorkerPayload(worker lib.WorkerConfig, latestN } } -func (suite *UseCaseSuite) processReputerPayload(reputer lib.ReputerConfig, latestNonceHeightActedUpon int64) (int64, error) { +func (suite *UseCaseSuite) processReputerPayload(reputer lib.ReputerConfig, latestNonceHeightActedUpon int64, timeoutHeight uint64) (int64, error) { nonce, err := suite.Node.GetOldestReputerNonceByTopicId(reputer.TopicId) if err != nil { @@ -121,7 +121,7 @@ func (suite *UseCaseSuite) processReputerPayload(reputer lib.ReputerConfig, late log.Debug().Uint64("topicId", reputer.TopicId).Int64("BlockHeight", nonce.BlockHeight). Msg("Building and committing reputer payload for topic") - err := suite.BuildCommitReputerPayload(reputer, nonce.BlockHeight) + err := suite.BuildCommitReputerPayload(reputer, nonce.BlockHeight, timeoutHeight) if err != nil { return latestNonceHeightActedUpon, errorsmod.Wrapf(err, "error building and committing reputer payload for topic") } @@ -271,11 +271,12 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP epochLastEnded := topicInfo.EpochLastEnded epochEnd := epochLastEnded + epochLength + timeoutHeight := epochLastEnded + params.SubmissionWindowLength // Check if block is within the submission window if currentBlockHeight-epochLastEnded <= params.SubmissionWindowLength { // Within the submission window, attempt to process payload - latestNonceHeightSentTxFor, err = params.ProcessPayload(params.Config, latestNonceHeightSentTxFor) + latestNonceHeightSentTxFor, err = params.ProcessPayload(params.Config, latestNonceHeightSentTxFor, uint64(timeoutHeight)) if err != nil { log.Error(). Err(err). From 2fb8e92baaef70555fd01c695fe8c998fed30184 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 20:41:42 +0100 Subject: [PATCH 20/24] Set TimeoutHeight on txs --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8969521..37b8403 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added * [#66](https://github.com/allora-network/allora-offchain-node/pull/66) Smart worker detection of submission windows + persistent error management + query retrials + reg/stake robustness + improved logging +* [#81](https://github.com/allora-network/allora-offchain-node/pull/81) Timeout height handling on tx submission ### Removed From 465412cacee41c80b80283562944c32763a326b8 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 23:25:13 +0100 Subject: [PATCH 21/24] add sleep after tx before checking to save one tx --- lib/repo_tx_registration.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/repo_tx_registration.go b/lib/repo_tx_registration.go index c64af81..b46ad47 100644 --- a/lib/repo_tx_registration.go +++ b/lib/repo_tx_registration.go @@ -2,6 +2,7 @@ package lib import ( "context" + "time" "github.com/rs/zerolog/log" @@ -58,6 +59,9 @@ func (node *NodeConfig) RegisterWorkerIdempotently(config WorkerConfig) bool { return false } + // Give time for the tx to be included in a block + log.Debug().Int64("delay", node.Wallet.RetryDelay).Msg("Waiting to check registration status to be included in a block...") + time.Sleep(time.Duration(node.Wallet.RetryDelay) * time.Second) isRegistered, err = node.IsWorkerRegistered(config.TopicId) if err != nil { log.Error().Err(err).Msg("Could not check if the node is already registered for topic as worker, skipping") @@ -115,6 +119,9 @@ func (node *NodeConfig) RegisterAndStakeReputerIdempotently(config ReputerConfig return false } + // Give time for the tx to be included in a block + log.Debug().Int64("delay", node.Wallet.RetryDelay).Msg("Waiting to check registration status to be included in a block...") + time.Sleep(time.Duration(node.Wallet.RetryDelay) * time.Second) isRegistered, err = node.IsReputerRegistered(config.TopicId) if err != nil { log.Error().Err(err).Msg("Could not check if the node is already registered for topic as reputer, skipping") @@ -155,6 +162,9 @@ func (node *NodeConfig) RegisterAndStakeReputerIdempotently(config ReputerConfig return false } + // Give time for the tx to be included in a block + log.Debug().Int64("delay", node.Wallet.RetryDelay).Msg("Waiting to check stake status to be included in a block...") + time.Sleep(time.Duration(node.Wallet.RetryDelay) * time.Second) stake, err = node.GetReputerStakeInTopic(config.TopicId, node.Chain.Address) if err != nil { log.Error().Err(err).Msg("Could not check if the reputer node has enough balance to stake, skipping") From c64b312eeceec124c9cf981fb6e54577599c7208 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 23:36:05 +0100 Subject: [PATCH 22/24] Added comments to underline usage for both wk and rp --- usecase/spawn_actor_processes.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 3aa5465..85d65cb 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -222,6 +222,8 @@ func (suite *UseCaseSuite) runReputerProcess(reputer lib.ReputerConfig) { } // Function that runs the actor process for a given topic and actor type +// This mechanism is used to handle the submission of payloads for both workers and reputers, +// using ActorProcessParams to handle the different configurations and functions needed for each actor type func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessParams[T]) { log.Debug(). Uint64("topicId", uint64(params.Config.GetTopicId())). From 020069a97d6dc8eeffb2a00cf64baf4d8af77d40 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Wed, 13 Nov 2024 23:49:26 +0100 Subject: [PATCH 23/24] clearer docs on random offset --- README.md | 2 ++ usecase/spawn_actor_processes.go | 5 +++-- usecase/spawn_actor_processes_test.go | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index bf7e7ae..4aef2ed 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,9 @@ Zone Breakdown (example numbers): - Near Zone: More frequent checks with randomization for fair participation - Submissions are separated - they must happen within the submission window +### Random offset +The node introduces a random offset to the submission time to avoid the thundering herd problem alleviating mempool congestion. ## How to configure diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 85d65cb..ecd49ab 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -147,7 +147,7 @@ func calculateTimeDistanceInSeconds(distanceUntilNextEpoch int64, blockDurationA } // Generates a conservative random offset within the submission window -func generateFairOffset(submissionWindow int64) int64 { +func generateRandomOffset(submissionWindow int64) int64 { // Ensure the random number generator is seeded source := rand.NewSource(uint64(time.Now().UnixNano())) rng := rand.New(source) @@ -335,7 +335,8 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP distanceUntilNextEpoch := epochEnd - currentBlockHeight if distanceUntilNextEpoch <= minBlocksToCheck { // Close distance, check more closely until the submission window opens - offset := generateFairOffset(params.SubmissionWindowLength) + // Introduces a random offset to avoid thundering herd problem + offset := generateRandomOffset(params.SubmissionWindowLength) closeBlockDistance := distanceUntilNextEpoch + offset waitingTimeInSeconds, err := calculateTimeDistanceInSeconds( closeBlockDistance, diff --git a/usecase/spawn_actor_processes_test.go b/usecase/spawn_actor_processes_test.go index 7c16d57..7bdd224 100644 --- a/usecase/spawn_actor_processes_test.go +++ b/usecase/spawn_actor_processes_test.go @@ -172,7 +172,7 @@ func TestGenerateFairRandomOffset(t *testing.T) { t.Run(test.name, func(t *testing.T) { sum := int64(0) for i := 0; i < test.iterations; i++ { - result := generateFairOffset(test.workerSubmissionWindow) + result := generateRandomOffset(test.workerSubmissionWindow) assert.GreaterOrEqual(t, result, test.expectedMin, "Result should be greater than or equal to the minimum value") assert.LessOrEqual(t, result, test.expectedMax, "Result should be less than or equal to the maximum value") sum += result From f0c7116779c83a15ff694922e2344d7685458c61 Mon Sep 17 00:00:00 2001 From: Diego Campo Date: Thu, 14 Nov 2024 11:10:24 +0100 Subject: [PATCH 24/24] Increase inner status check delay --- usecase/spawn_actor_processes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index ecd49ab..8bce23a 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -23,7 +23,7 @@ const NUM_SUBMISSION_WINDOWS_FOR_SUBMISSION_NEARNESS int64 = 2 const NEARNESS_CORRECTION_FACTOR float64 = 1.0 // Minimum wait time between status checks -const WAIT_TIME_STATUS_CHECKS int64 = 1 +const WAIT_TIME_STATUS_CHECKS int64 = 2 // ActorProcessParams encapsulates the configuration needed for running actor processes type ActorProcessParams[T lib.TopicActor] struct {