Skip to content

Commit

Permalink
[reconfiguration] Time based reconfiguration trigger (MystenLabs#8006)
Browse files Browse the repository at this point in the history
Instead of using number of checkpoints as a trigger for reconfiguration,
we use a time based trigger - when more than configured duration has
passed since epoch started we trigger reconfiguration.
  • Loading branch information
andll authored Feb 2, 2023
1 parent 7514161 commit 9e10407
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 68 deletions.
18 changes: 10 additions & 8 deletions crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod test {

#[sim_test(config = "test_config()")]
async fn test_simulated_load_with_reconfig() {
let test_cluster = build_test_cluster(4, 10).await;
let test_cluster = build_test_cluster(4, 1000).await;
test_simulated_load(test_cluster, 60).await;
}

Expand All @@ -79,7 +79,7 @@ mod test {

#[sim_test(config = "test_config()")]
async fn test_simulated_load_reconfig_restarts() {
let test_cluster = build_test_cluster(4, 10).await;
let test_cluster = build_test_cluster(4, 1000).await;
let node_restarter = test_cluster
.random_node_restarter()
.with_kill_interval_secs(5, 15)
Expand All @@ -90,7 +90,7 @@ mod test {

#[sim_test(config = "test_config()")]
async fn test_simulated_load_reconfig_crashes() {
let test_cluster = build_test_cluster(4, 10).await;
let test_cluster = build_test_cluster(4, 1000).await;

struct DeadValidator {
node_id: sui_simulator::task::NodeId,
Expand Down Expand Up @@ -141,16 +141,18 @@ mod test {

async fn build_test_cluster(
default_num_validators: usize,
default_checkpoints_per_epoch: u64,
default_epoch_duration_ms: u64,
) -> Arc<TestCluster> {
let mut builder = TestClusterBuilder::new().with_num_validators(get_var(
"SIM_STRESS_TEST_NUM_VALIDATORS",
default_num_validators,
));

let checkpoints_per_epoch = get_var("CHECKPOINTS_PER_EPOCH", default_checkpoints_per_epoch);
if checkpoints_per_epoch > 0 {
builder = builder.with_checkpoints_per_epoch(checkpoints_per_epoch);
if std::env::var("CHECKPOINTS_PER_EPOCH").is_ok() {
eprintln!("CHECKPOINTS_PER_EPOCH env var is deprecated, use EPOCH_DURATION_MS");
}
let epoch_duration_ms = get_var("EPOCH_DURATION_MS", default_epoch_duration_ms);
if epoch_duration_ms > 0 {
builder = builder.with_epoch_duration_ms(epoch_duration_ms);
}

Arc::new(builder.build().await.unwrap())
Expand Down
14 changes: 7 additions & 7 deletions crates/sui-config/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::node::{
default_checkpoints_per_epoch, default_end_of_epoch_broadcast_channel_capacity,
default_end_of_epoch_broadcast_channel_capacity, default_epoch_duration_ms,
AuthorityKeyPairWithPath, KeyPairWithPath,
};
use crate::{
Expand Down Expand Up @@ -47,7 +47,7 @@ pub struct ConfigBuilder<R = OsRng> {
additional_objects: Vec<Object>,
with_swarm: bool,
validator_ip_sel: ValidatorIpSelection,
checkpoints_per_epoch: Option<u64>,
epoch_duration_ms: u64,
}

impl ConfigBuilder {
Expand All @@ -67,7 +67,7 @@ impl ConfigBuilder {
} else {
ValidatorIpSelection::Localhost
},
checkpoints_per_epoch: default_checkpoints_per_epoch(),
epoch_duration_ms: default_epoch_duration_ms(),
}
}
}
Expand Down Expand Up @@ -108,8 +108,8 @@ impl<R> ConfigBuilder<R> {
self
}

pub fn with_checkpoints_per_epoch(mut self, ckpts: u64) -> Self {
self.checkpoints_per_epoch = Some(ckpts);
pub fn with_epoch_duration(mut self, epoch_duration_ms: u64) -> Self {
self.epoch_duration_ms = epoch_duration_ms;
self
}

Expand All @@ -123,7 +123,7 @@ impl<R> ConfigBuilder<R> {
additional_objects: self.additional_objects,
with_swarm: self.with_swarm,
validator_ip_sel: self.validator_ip_sel,
checkpoints_per_epoch: self.checkpoints_per_epoch,
epoch_duration_ms: self.epoch_duration_ms,
}
}
}
Expand Down Expand Up @@ -367,7 +367,7 @@ impl<R: rand::RngCore + rand::CryptoRng> ConfigBuilder<R> {
json_rpc_address: utils::available_local_socket_address(),
consensus_config: Some(consensus_config),
enable_event_processing: false,
checkpoints_per_epoch: self.checkpoints_per_epoch,
epoch_duration_ms: self.epoch_duration_ms,
genesis: crate::node::Genesis::new(genesis.clone()),
grpc_load_shed: initial_accounts_config.grpc_load_shed,
grpc_concurrency_limit: initial_accounts_config.grpc_concurrency_limit,
Expand Down
17 changes: 8 additions & 9 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@ pub struct NodeConfig {
#[serde(default)]
pub enable_event_processing: bool,

/// Number of checkpoints per epoch.
/// Some means reconfiguration is enabled.
/// None means reconfiguration is disabled.
// TODO: It will be removed down the road.
/// Epoch duration in ms.
/// u64::MAX means reconfiguration is disabled
/// Exposing this in config to allow easier testing with shorter epoch.
/// TODO: It will be removed down the road.
#[serde(default = "default_checkpoints_per_epoch")]
pub checkpoints_per_epoch: Option<u64>,
#[serde(default = "default_epoch_duration_ms")]
pub epoch_duration_ms: u64,

#[serde(default)]
pub grpc_load_shed: Option<bool>,
Expand Down Expand Up @@ -135,9 +134,9 @@ pub fn default_concurrency_limit() -> Option<usize> {
Some(DEFAULT_GRPC_CONCURRENCY_LIMIT)
}

pub fn default_checkpoints_per_epoch() -> Option<u64> {
// Currently a checkpoint is ~3 seconds, 3000 checkpoints is 9000s, which is about 2.5 hours.
Some(3000)
pub fn default_epoch_duration_ms() -> u64 {
// 24 Hrs
24 * 60 * 60 * 1000
}

pub fn default_end_of_epoch_broadcast_channel_capacity() -> usize {
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-config/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::node::AuthorityStorePruningConfig;
use crate::node::{
default_checkpoints_per_epoch, default_end_of_epoch_broadcast_channel_capacity,
default_end_of_epoch_broadcast_channel_capacity, default_epoch_duration_ms,
AuthorityKeyPairWithPath, KeyPairWithPath,
};
use crate::p2p::{P2pConfig, SeedPeer};
Expand Down Expand Up @@ -232,7 +232,7 @@ impl<'a> FullnodeConfigBuilder<'a> {
json_rpc_address,
consensus_config: None,
enable_event_processing: self.enable_event_store,
checkpoints_per_epoch: default_checkpoints_per_epoch(),
epoch_duration_ms: default_epoch_duration_ms(),
genesis: validator_config.genesis.clone(),
grpc_load_shed: None,
grpc_concurrency_limit: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ validator_configs:
report_batch_rate_limit: ~
request_batch_rate_limit: ~
enable-event-processing: false
checkpoints-per-epoch: 3000
epoch-duration-ms: 86400000
grpc-load-shed: ~
grpc-concurrency-limit: 20000000000
p2p-config:
Expand Down Expand Up @@ -122,7 +122,7 @@ validator_configs:
report_batch_rate_limit: ~
request_batch_rate_limit: ~
enable-event-processing: false
checkpoints-per-epoch: 3000
epoch-duration-ms: 86400000
grpc-load-shed: ~
grpc-concurrency-limit: 20000000000
p2p-config:
Expand Down Expand Up @@ -190,7 +190,7 @@ validator_configs:
report_batch_rate_limit: ~
request_batch_rate_limit: ~
enable-event-processing: false
checkpoints-per-epoch: 3000
epoch-duration-ms: 86400000
grpc-load-shed: ~
grpc-concurrency-limit: 20000000000
p2p-config:
Expand Down Expand Up @@ -258,7 +258,7 @@ validator_configs:
report_batch_rate_limit: ~
request_batch_rate_limit: ~
enable-event-processing: false
checkpoints-per-epoch: 3000
epoch-duration-ms: 86400000
grpc-load-shed: ~
grpc-concurrency-limit: 20000000000
p2p-config:
Expand Down Expand Up @@ -326,7 +326,7 @@ validator_configs:
report_batch_rate_limit: ~
request_batch_rate_limit: ~
enable-event-processing: false
checkpoints-per-epoch: 3000
epoch-duration-ms: 86400000
grpc-load-shed: ~
grpc-concurrency-limit: 20000000000
p2p-config:
Expand Down Expand Up @@ -394,7 +394,7 @@ validator_configs:
report_batch_rate_limit: ~
request_batch_rate_limit: ~
enable-event-processing: false
checkpoints-per-epoch: 3000
epoch-duration-ms: 86400000
grpc-load-shed: ~
grpc-concurrency-limit: 20000000000
p2p-config:
Expand Down Expand Up @@ -462,7 +462,7 @@ validator_configs:
report_batch_rate_limit: ~
request_batch_rate_limit: ~
enable-event-processing: false
checkpoints-per-epoch: 3000
epoch-duration-ms: 86400000
grpc-load-shed: ~
grpc-concurrency-limit: 20000000000
p2p-config:
Expand Down
32 changes: 25 additions & 7 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ use sui_types::{
};

use crate::authority::authority_notify_read::NotifyRead;
use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::authority::authority_per_epoch_store::{
AuthorityPerEpochStore, EpochStartConfiguration,
};
use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
use crate::authority_aggregator::TransactionCertifier;
Expand Down Expand Up @@ -1674,6 +1676,7 @@ impl AuthorityState {
&path.join("store"),
None,
EpochMetrics::new(&registry),
Some(Default::default()),
);

let epochs = Arc::new(CommitteeStore::new(
Expand Down Expand Up @@ -1799,14 +1802,19 @@ impl AuthorityState {
})
}

pub async fn reconfigure(&self, new_committee: Committee) -> SuiResult {
pub async fn reconfigure(
&self,
new_committee: Committee,
epoch_start_timestamp_ms: u64,
) -> SuiResult {
self.committee_store.insert_new_committee(&new_committee)?;
let db = self.db();
let mut execution_lock = db.execution_lock_for_reconfiguration().await;
self.revert_uncommitted_epoch_transactions().await?;
let new_epoch = new_committee.epoch;
db.perpetual_tables.set_recovery_epoch(new_epoch)?;
self.reopen_epoch_db(new_committee).await;
self.reopen_epoch_db(new_committee, epoch_start_timestamp_ms)
.await?;
self.transaction_manager.reconfigure(new_epoch);
*execution_lock = new_epoch;
// drop execution_lock after epoch store was updated
Expand Down Expand Up @@ -2615,14 +2623,24 @@ impl AuthorityState {
Ok(())
}

async fn reopen_epoch_db(&self, new_committee: Committee) {
async fn reopen_epoch_db(
&self,
new_committee: Committee,
epoch_start_timestamp_ms: u64,
) -> SuiResult<()> {
info!(new_epoch = ?new_committee.epoch, "re-opening AuthorityEpochTables for new epoch");

let epoch_tables = self
.epoch_store()
.new_at_next_epoch(self.name, new_committee);
let epoch_start_configuration = EpochStartConfiguration {
epoch_start_timestamp_ms,
};
let epoch_tables = self.epoch_store().new_at_next_epoch(
self.name,
new_committee,
epoch_start_configuration,
);
let previous_store = self.epoch_store.swap(epoch_tables);
previous_store.epoch_terminated().await;
Ok(())
}

#[cfg(test)]
Expand Down
47 changes: 45 additions & 2 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::cmp::Ordering as CmpOrdering;
use sui_types::message_envelope::TrustedEnvelope;
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
CheckpointSignatureMessage, CheckpointSummary,
CheckpointSignatureMessage, CheckpointSummary, CheckpointTimestamp,
};
use sui_types::storage::{transaction_input_object_keys, ObjectKey, ParentSync};
use sui_types::temporary_store::InnerTemporaryStore;
Expand Down Expand Up @@ -223,6 +223,16 @@ pub struct AuthorityEpochTables {
/// epoch, used for tallying rule scores.
num_certified_checkpoint_signatures:
DBMap<AuthorityName, (Option<CheckpointSequenceNumber>, u64)>,

/// Parameters of the system fixed at the epoch start
epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
}

/// Parameters of the epoch fixed at epoch start.
#[derive(Default, Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct EpochStartConfiguration {
pub epoch_start_timestamp_ms: CheckpointTimestamp,
// Current epoch committee can eventually move here too, though right now it is served from a different table
}

impl AuthorityEpochTables {
Expand Down Expand Up @@ -271,6 +281,7 @@ impl AuthorityPerEpochStore {
parent_path: &Path,
db_options: Option<Options>,
metrics: Arc<EpochMetrics>,
epoch_start_configuration: Option<EpochStartConfiguration>,
) -> Arc<Self> {
let current_time = Instant::now();
let epoch_id = committee.epoch;
Expand All @@ -294,6 +305,18 @@ impl AuthorityPerEpochStore {
}
})
.collect();
// Insert epoch_start_configuration in the DB. This is used by unit tests
//
// Production code goes different path:
// (1) For the first epoch, this is inserted in the DB along with genesis checkpoint
// (2) For other epochs, this is updated when AuthorityPerEpochStore
// is initialized during epoch change
if let Some(epoch_start_configuration) = epoch_start_configuration {
tables
.epoch_start_configuration
.insert(&(), &epoch_start_configuration)
.expect("Failed to store epoch_start_configuration");
}
metrics.current_epoch.set(epoch_id as i64);
metrics
.current_voting_right
Expand Down Expand Up @@ -321,7 +344,20 @@ impl AuthorityPerEpochStore {
self.parent_path.clone()
}

pub fn new_at_next_epoch(&self, name: AuthorityName, new_committee: Committee) -> Arc<Self> {
pub fn epoch_start_configuration(&self) -> SuiResult<EpochStartConfiguration> {
Ok(self
.tables
.epoch_start_configuration
.get(&())?
.expect("epoch_start_configuration was not initialized properly"))
}

pub fn new_at_next_epoch(
&self,
name: AuthorityName,
new_committee: Committee,
epoch_start_configuration: EpochStartConfiguration,
) -> Arc<Self> {
assert_eq!(self.epoch() + 1, new_committee.epoch);
self.record_reconfig_halt_duration_metric();
self.record_epoch_total_duration_metric();
Expand All @@ -331,6 +367,7 @@ impl AuthorityPerEpochStore {
&self.parent_path,
self.db_options.clone(),
self.metrics.clone(),
Some(epoch_start_configuration),
)
}

Expand Down Expand Up @@ -1481,6 +1518,12 @@ impl AuthorityPerEpochStore {
.builder_digest_to_checkpoint
.insert(&digest, &sequence)?;
}
let epoch_start_configuration = EpochStartConfiguration {
epoch_start_timestamp_ms: summary.timestamp_ms,
};
self.tables
.epoch_start_configuration
.insert(&(), &epoch_start_configuration)?;
self.tables
.builder_checkpoint_summary
.insert(summary.sequence_number(), summary)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub async fn test_checkpoint_executor_cross_epoch() {
);

authority_state
.reconfigure(second_committee.committee().clone())
.reconfigure(second_committee.committee().clone(), 0)
.await
.unwrap();

Expand Down
Loading

0 comments on commit 9e10407

Please sign in to comment.