diff --git a/Cargo.lock b/Cargo.lock index 988d11f7f5631b..f0103dd448109d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5792,6 +5792,7 @@ dependencies = [ "solana-version", "solana-vote", "solana-vote-program", + "solana-wen-restart", "static_assertions", "strum", "strum_macros", @@ -7482,6 +7483,28 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-wen-restart" +version = "1.18.0" +dependencies = [ + "log", + "prost", + "prost-build", + "prost-types", + "protobuf-src", + "rustc_version 0.4.0", + "serial_test", + "solana-entry", + "solana-gossip", + "solana-ledger", + "solana-logger", + "solana-program", + "solana-runtime", + "solana-sdk", + "solana-streamer", + "solana-vote-program", +] + [[package]] name = "solana-zk-keygen" version = "1.18.0" diff --git a/Cargo.toml b/Cargo.toml index ed39154543f8e2..f5849604d0e3b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,6 +111,7 @@ members = [ "version", "vote", "watchtower", + "wen-restart", "zk-keygen", "zk-token-sdk", ] @@ -261,6 +262,7 @@ pretty-hex = "0.3.0" proc-macro2 = "1.0.67" proptest = "1.2" prost = "0.11.9" +prost-build = "0.11.9" prost-types = "0.11.9" protobuf-src = "1.1.0" qstring = "0.7.2" @@ -371,6 +373,7 @@ solana-udp-client = { path = "udp-client", version = "=1.18.0" } solana-version = { path = "version", version = "=1.18.0" } solana-vote = { path = "vote", version = "=1.18.0" } solana-vote-program = { path = "programs/vote", version = "=1.18.0" } +solana-wen-restart = { path = "wen-restart", version = "=1.18.0" } solana-zk-keygen = { path = "zk-keygen", version = "=1.18.0" } solana-zk-token-proof-program = { path = "programs/zk-token-proof", version = "=1.18.0" } solana-zk-token-sdk = { path = "zk-token-sdk", version = "=1.18.0" } diff --git a/core/Cargo.toml b/core/Cargo.toml index fcab8ff8775912..c3923613b768a2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -71,6 +71,7 @@ solana-turbine = { workspace = true } solana-version = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } +solana-wen-restart = { workspace = true } strum = { workspace = true, features = ["derive"] } strum_macros = { workspace = true } sys-info = { workspace = true } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 37067ce38f556d..59036a997039c1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -29,7 +29,6 @@ use { }, rewards_recorder_service::{RewardsMessage, RewardsRecorderSender}, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, - validator::ProcessBlockStore, voting_service::VoteOp, window_service::DuplicateSlotReceiver, }, @@ -483,7 +482,7 @@ impl ReplayStage { ledger_signal_receiver: Receiver, duplicate_slots_receiver: DuplicateSlotReceiver, poh_recorder: Arc>, - maybe_process_blockstore: Option, + mut tower: Tower, vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: Sender, @@ -502,15 +501,6 @@ impl ReplayStage { banking_tracer: Arc, popular_pruned_forks_receiver: PopularPrunedForksReceiver, ) -> Result { - let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { - let tower = process_blockstore.process_to_create_tower()?; - info!("Tower state: {:?}", tower); - tower - } else { - warn!("creating default tower...."); - Tower::default() - }; - let ReplayStageConfig { vote_account, authorized_voter_keypairs, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 0b8358863fbceb..ec444ae4403d7e 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -11,7 +11,7 @@ use { }, cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsService}, completed_data_sets_service::CompletedDataSetsSender, - consensus::tower_storage::TowerStorage, + consensus::{tower_storage::TowerStorage, Tower}, cost_update_service::CostUpdateService, drop_bank_service::DropBankService, ledger_cleanup_service::LedgerCleanupService, @@ -19,7 +19,6 @@ use { replay_stage::{ReplayStage, ReplayStageConfig}, rewards_recorder_service::RewardsRecorderSender, shred_fetch_stage::ShredFetchStage, - validator::ProcessBlockStore, voting_service::VotingService, warm_quic_cache_service::WarmQuicCacheService, window_service::WindowService, @@ -109,7 +108,7 @@ impl Tvu { ledger_signal_receiver: Receiver, rpc_subscriptions: &Arc, poh_recorder: &Arc>, - maybe_process_block_store: Option, + tower: Tower, tower_storage: Arc, leader_schedule_cache: &Arc, exit: Arc, @@ -292,7 +291,7 @@ impl Tvu { ledger_signal_receiver, duplicate_slots_receiver, poh_recorder.clone(), - maybe_process_block_store, + tower, vote_tracker, cluster_slots, retransmit_slots_sender, @@ -463,7 +462,7 @@ pub mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), &poh_recorder, - None, + Tower::default(), Arc::new(FileTowerStorage::default()), &leader_schedule_cache, exit.clone(), diff --git a/core/src/validator.rs b/core/src/validator.rs index e2b763f202f89b..e5eb3544ab468f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -119,6 +119,7 @@ use { solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_turbine::{self, broadcast_stage::BroadcastStageType}, solana_vote_program::vote_state, + solana_wen_restart::wen_restart::wait_for_wen_restart, std::{ collections::{HashMap, HashSet}, net::SocketAddr, @@ -259,6 +260,7 @@ pub struct ValidatorConfig { pub block_production_method: BlockProductionMethod, pub generator_config: Option, pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, + pub wen_restart_proto_path: Option, } impl Default for ValidatorConfig { @@ -326,6 +328,7 @@ impl Default for ValidatorConfig { block_production_method: BlockProductionMethod::default(), generator_config: None, use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), + wen_restart_proto_path: None, } } } @@ -1202,6 +1205,22 @@ impl Validator { ) .unwrap(); + let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority; + let tower = match process_blockstore.process_to_create_tower() { + Ok(tower) => { + info!("Tower state: {:?}", tower); + tower + } + Err(e) => { + warn!( + "Unable to retrieve tower: {:?} creating default tower....", + e + ); + Tower::default() + } + }; + let last_vote = tower.last_vote(); + let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( vote_account, @@ -1218,7 +1237,7 @@ impl Validator { ledger_signal_receiver, &rpc_subscriptions, &poh_recorder, - Some(process_blockstore), + tower, config.tower_storage.clone(), &leader_schedule_cache, exit.clone(), @@ -1257,6 +1276,21 @@ impl Validator { repair_quic_endpoint_sender, )?; + if in_wen_restart { + info!("Waiting for wen_restart phase one to finish"); + match wait_for_wen_restart( + &config.wen_restart_proto_path.clone().unwrap(), + last_vote, + blockstore.clone(), + cluster_info.clone(), + ) { + Ok(()) => { + return Err("wen_restart phase one completedy".to_string()); + } + Err(e) => return Err(format!("wait_for_wen_restart failed: {e:?}")), + }; + } + let tpu = Tpu::new( &cluster_info, &poh_recorder, diff --git a/gossip/src/epoch_slots.rs b/gossip/src/epoch_slots.rs index dc94380b33e5de..186a17aa6ec255 100644 --- a/gossip/src/epoch_slots.rs +++ b/gossip/src/epoch_slots.rs @@ -13,7 +13,7 @@ use { }, }; -const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8; +pub const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)] pub struct Uncompressed { pub first_slot: Slot, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 70211b5dac666b..d480dc2653567e 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { block_production_method: config.block_production_method.clone(), generator_config: config.generator_config.clone(), use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, + wen_restart_proto_path: config.wen_restart_proto_path.clone(), } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 438ecaaaff9e15..167f2e4c4fa8c4 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4831,6 +4831,7 @@ dependencies = [ "solana-version", "solana-vote", "solana-vote-program", + "solana-wen-restart", "strum", "strum_macros", "sys-info", @@ -6436,6 +6437,25 @@ dependencies = [ "thiserror", ] +[[package]] +name = "solana-wen-restart" +version = "1.18.0" +dependencies = [ + "log", + "prost", + "prost-build", + "prost-types", + "protobuf-src", + "rustc_version", + "solana-gossip", + "solana-ledger", + "solana-logger", + "solana-program", + "solana-runtime", + "solana-sdk", + "solana-vote-program", +] + [[package]] name = "solana-zk-token-proof-program" version = "1.18.0" diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 72e82ca13b56d9..cd3f3323589653 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1382,6 +1382,35 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .possible_values(BlockProductionMethod::cli_names()) .help(BlockProductionMethod::cli_message()) ) + .arg( + Arg::with_name("wen_restart") + .long("wen-restart") + .value_name("DIR") + .takes_value(true) + .required(false) + .default_value(&default_args.wen_restart_path) + .conflicts_with("wait_for_supermajority") + .help( + "When specified, the validator will enter Wen Restart mode which + pauses normal activity. Validators in this mode will gossip their last + vote to reach consensus on a safe restart slot and repair all blocks + on the selected fork. The safe slot will be a descendant of the latest + optimistically confirmed slot to ensure we do not roll back any + optimistically confirmed slots. + + The progress in this mode will be saved in the file location provided. + If consensus is reached, the validator will automatically exit and then + execute wait_for_supermajority logic so the cluster will resume execution. + The progress file will be kept around for future debugging. + + After the cluster resumes normal operation, the validator arguments can + be adjusted to remove --wen_restart and update expected_shred_version to + the new shred_version agreed on in the consensus. + + If wen_restart fails, refer to the progress file (in proto3 format) for + further debugging. + ") + ) .args(&get_deprecated_arguments()) .after_help("The default subcommand is run") .subcommand( @@ -1931,6 +1960,8 @@ pub struct DefaultArgs { pub wait_for_restart_window_max_delinquent_stake: String, pub banking_trace_dir_byte_limit: String, + + pub wen_restart_path: String, } impl DefaultArgs { @@ -2009,6 +2040,7 @@ impl DefaultArgs { wait_for_restart_window_min_idle_time: "10".to_string(), wait_for_restart_window_max_delinquent_stake: "5".to_string(), banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(), + wen_restart_path: "wen_restart_progress.proto".to_string(), } } } diff --git a/wen-restart/Cargo.toml b/wen-restart/Cargo.toml new file mode 100644 index 00000000000000..b74871801872af --- /dev/null +++ b/wen-restart/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "solana-wen-restart" +description = "Automatic repair and restart protocol" +documentation = "https://github.com/solana-foundation/solana-improvement-documents/pull/46" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } +publish = false + +[dependencies] +log = { workspace = true } +prost = { workspace = true } +prost-types = { workspace = true } +solana-gossip = { workspace = true } +solana-ledger = { workspace = true } +solana-logger = { workspace = true } +solana-program = { workspace = true } +solana-runtime = { workspace = true } +solana-sdk = { workspace = true } +solana-vote-program = { workspace = true } + +[dev-dependencies] +serial_test = { workspace = true } +solana-entry = { workspace = true } +solana-streamer = { workspace = true } + +[build-dependencies] +prost-build = { workspace = true } +rustc_version = { workspace = true } + +# windows users should install the protobuf compiler manually and set the PROTOC +# envar to point to the installed binary +[target."cfg(not(windows))".build-dependencies] +protobuf-src = { workspace = true } + +[lib] +name = "solana_wen_restart" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/wen-restart/build.rs b/wen-restart/build.rs new file mode 100644 index 00000000000000..4360117bb445d4 --- /dev/null +++ b/wen-restart/build.rs @@ -0,0 +1,41 @@ +extern crate rustc_version; + +use { + rustc_version::{version_meta, Channel}, + std::io::Result, +}; + +fn main() -> Result<()> { + const PROTOC_ENVAR: &str = "PROTOC"; + if std::env::var(PROTOC_ENVAR).is_err() { + #[cfg(not(windows))] + std::env::set_var(PROTOC_ENVAR, protobuf_src::protoc()); + } + + // Copied and adapted from + // https://github.com/Kimundi/rustc-version-rs/blob/1d692a965f4e48a8cb72e82cda953107c0d22f47/README.md#example + // Licensed under Apache-2.0 + MIT + match version_meta().unwrap().channel { + Channel::Stable => { + println!("cargo:rustc-cfg=RUSTC_WITHOUT_SPECIALIZATION"); + } + Channel::Beta => { + println!("cargo:rustc-cfg=RUSTC_WITHOUT_SPECIALIZATION"); + } + Channel::Nightly => { + println!("cargo:rustc-cfg=RUSTC_WITH_SPECIALIZATION"); + } + Channel::Dev => { + println!("cargo:rustc-cfg=RUSTC_WITH_SPECIALIZATION"); + // See https://github.com/solana-labs/solana/issues/11055 + // We may be running the custom `rust-bpf-builder` toolchain, + // which currently needs `#![feature(proc_macro_hygiene)]` to + // be applied. + println!("cargo:rustc-cfg=RUSTC_NEEDS_PROC_MACRO_HYGIENE"); + } + } + + // Generate rust files from protos. + prost_build::compile_protos(&["proto/wen_restart.proto"], &["proto/"])?; + Ok(()) +} diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto new file mode 100644 index 00000000000000..1f6423462b55b0 --- /dev/null +++ b/wen-restart/proto/wen_restart.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; +package solana.wen_restart_proto; + +enum State { + INIT = 0; + LAST_VOTED_FORK_SLOTS = 1; + HEAVIEST_FORK = 2; + GENERATING_SNAPSHOT = 3; + FINISHED_SNAPSHOT = 4; + WAITING_FOR_SUPERMAJORITY = 5; + DONE = 6; +} + +message MyLastVotedForkSlots { + uint64 last_vote_slot = 1; + string last_vote_bankhash = 2; + uint32 shred_version = 3; +} + +message WenRestartProgress { + State state = 1; + optional MyLastVotedForkSlots my_last_voted_fork_slots = 2; +} \ No newline at end of file diff --git a/wen-restart/src/lib.rs b/wen-restart/src/lib.rs new file mode 100644 index 00000000000000..e58a6d04bf831f --- /dev/null +++ b/wen-restart/src/lib.rs @@ -0,0 +1,7 @@ +pub(crate) mod solana { + pub(crate) mod wen_restart_proto { + include!(concat!(env!("OUT_DIR"), "/solana.wen_restart_proto.rs")); + } +} + +pub mod wen_restart; diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs new file mode 100644 index 00000000000000..75e4e21ce9431a --- /dev/null +++ b/wen-restart/src/wen_restart.rs @@ -0,0 +1,152 @@ +//! The `wen-restart` module handles automatic repair during a cluster restart + +use { + crate::solana::wen_restart_proto::{ + MyLastVotedForkSlots, State as RestartState, WenRestartProgress, + }, + log::*, + prost::Message, + solana_gossip::{cluster_info::ClusterInfo, epoch_slots::MAX_SLOTS_PER_ENTRY}, + solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore}, + solana_vote_program::vote_state::VoteTransaction, + std::{ + fs::File, + io::{Error, Write}, + path::PathBuf, + sync::Arc, + }, +}; + +pub fn wait_for_wen_restart( + wen_restart_path: &PathBuf, + last_vote: VoteTransaction, + blockstore: Arc, + cluster_info: Arc, +) -> Result<(), Box> { + // repair and restart option does not work without last voted slot. + let last_vote_slot = last_vote + .last_voted_slot() + .expect("wen_restart doesn't work if local tower is wiped"); + let mut last_vote_fork: Vec = AncestorIterator::new_inclusive(last_vote_slot, &blockstore) + .take(MAX_SLOTS_PER_ENTRY) + .collect(); + info!( + "wen_restart last voted fork {} {:?}", + last_vote_slot, last_vote_fork + ); + last_vote_fork.reverse(); + // Todo(wen): add the following back in after Gossip code is checked in. + // cluster_info.push_last_voted_fork_slots(&last_voted_fork, last_vote.hash()); + // The rest of the protocol will be in another PR. + let current_progress = WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: Some(MyLastVotedForkSlots { + last_vote_slot, + last_vote_bankhash: last_vote.hash().to_string(), + shred_version: cluster_info.my_shred_version() as u32, + }), + }; + write_wen_restart_records(wen_restart_path, current_progress)?; + Ok(()) +} + +fn write_wen_restart_records( + records_path: &PathBuf, + new_progress: WenRestartProgress, +) -> Result<(), Error> { + // overwrite anything if exists + let mut file = File::create(records_path)?; + info!("writing new record {:?}", new_progress); + let mut buf = Vec::with_capacity(new_progress.encoded_len()); + new_progress.encode(&mut buf)?; + file.write_all(&buf)?; + Ok(()) +} +#[cfg(test)] +mod tests { + use { + crate::wen_restart::*, + solana_entry::entry, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, + solana_ledger::{blockstore, get_tmp_ledger_path_auto_delete}, + solana_program::{hash::Hash, vote::state::Vote}, + solana_sdk::{ + signature::{Keypair, Signer}, + timing::timestamp, + }, + solana_streamer::socket::SocketAddrSpace, + std::{fs::read, sync::Arc}, + }; + + #[test] + fn test_wen_restart_normal_flow() { + solana_logger::setup(); + let node_keypair = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + { + let mut contact_info = + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()); + contact_info.set_shred_version(2); + contact_info + }, + node_keypair, + SocketAddrSpace::Unspecified, + )); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let mut wen_restart_proto_path = ledger_path.path().to_path_buf(); + wen_restart_proto_path.push("wen_restart_status.proto"); + let blockstore = Arc::new(blockstore::Blockstore::open(ledger_path.path()).unwrap()); + let expected_slots = 400; + let last_vote_slot = (MAX_SLOTS_PER_ENTRY + expected_slots).try_into().unwrap(); + let last_parent = (MAX_SLOTS_PER_ENTRY >> 1).try_into().unwrap(); + for i in 0..expected_slots { + let entries = entry::create_ticks(1, 0, Hash::default()); + let parent_slot = if i > 0 { + (MAX_SLOTS_PER_ENTRY + i).try_into().unwrap() + } else { + last_parent + }; + let shreds = blockstore::entries_to_test_shreds( + &entries, + (MAX_SLOTS_PER_ENTRY + i + 1).try_into().unwrap(), + parent_slot, + false, + 0, + true, // merkle_variant + ); + blockstore.insert_shreds(shreds, None, false).unwrap(); + } + // link directly to slot 1 whose distance to last_vote > MAX_SLOTS_PER_ENTRY so it will not be included. + let entries = entry::create_ticks(1, 0, Hash::default()); + let shreds = blockstore::entries_to_test_shreds( + &entries, + last_parent, + 1, + false, + 0, + true, // merkle_variant + ); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let last_vote_bankhash = Hash::new_unique(); + assert!(wait_for_wen_restart( + &wen_restart_proto_path, + VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)), + blockstore, + cluster_info + ) + .is_ok()); + let buffer = read(wen_restart_proto_path).unwrap(); + let progress = WenRestartProgress::decode(&mut std::io::Cursor::new(buffer)).unwrap(); + assert_eq!( + progress, + WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: Some(MyLastVotedForkSlots { + last_vote_slot, + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: 2, + }), + } + ) + } +}