Skip to content

Commit

Permalink
feat: migrate logic in bank_stage to use prio-graph crate
Browse files Browse the repository at this point in the history
  • Loading branch information
flame4 committed Oct 14, 2024
1 parent d554cec commit 4f6d25d
Show file tree
Hide file tree
Showing 31 changed files with 79 additions and 3,069 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ solana-version = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
solana-wen-restart = { workspace = true }
solana-prio-graph-scheduler = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
sys-info = { workspace = true }
Expand Down
15 changes: 6 additions & 9 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ use {
},
crate::{
banking_stage::{
consume_worker::ConsumeWorker,
packet_deserializer::PacketDeserializer,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphScheduler,
scheduler_controller::SchedulerController, scheduler_error::SchedulerError,
},
consume_worker::ConsumeWorker, packet_deserializer::PacketDeserializer,
scheduler_controller::SchedulerController,
},
banking_trace::BankingPacketReceiver,
tracer_packet_stats::TracerPacketStats,
Expand All @@ -36,6 +32,9 @@ use {
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_prio_graph_scheduler::{
prio_graph_scheduler::PrioGraphScheduler, scheduler_error::SchedulerError,
},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
Expand Down Expand Up @@ -74,9 +73,7 @@ mod packet_deserializer;
mod packet_filter;
mod packet_receiver;
pub mod read_write_account_set;
#[allow(dead_code)]
mod scheduler_messages;
mod transaction_scheduler;
mod scheduler_controller;

// Fixed thread size seems to be fastest on GCP setup
pub const NUM_THREADS: u32 = 6;
Expand Down
4 changes: 2 additions & 2 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use {
super::{
consumer::{Consumer, ExecuteAndCommitTransactionsOutput, ProcessTransactionBatchOutput},
leader_slot_timing_metrics::LeaderExecuteAndCommitTimings,
scheduler_messages::{ConsumeWork, FinishedConsumeWork},
},
crossbeam_channel::{Receiver, RecvError, SendError, Sender},
solana_measure::measure_us,
solana_poh::leader_bank_notifier::LeaderBankNotifier,
solana_prio_graph_scheduler::scheduler_messages::{ConsumeWork, FinishedConsumeWork},
solana_runtime::bank::Bank,
solana_sdk::timing::AtomicInterval,
solana_svm::transaction_error_metrics::TransactionErrorMetrics,
Expand Down Expand Up @@ -694,7 +694,6 @@ mod tests {
crate::banking_stage::{
committer::Committer,
qos_service::QosService,
scheduler_messages::{TransactionBatchId, TransactionId},
tests::{create_slow_genesis_config, sanitize_transactions, simulate_poh},
},
crossbeam_channel::unbounded,
Expand All @@ -703,6 +702,7 @@ mod tests {
get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache,
},
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
solana_prio_graph_scheduler::scheduler_messages::{TransactionBatchId, TransactionId},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteReceiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
},
solana_feature_set::FeatureSet,
solana_perf::packet::Packet,
solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket,
solana_sdk::transaction::SanitizedTransaction,
std::sync::Arc,
};
Expand Down
27 changes: 16 additions & 11 deletions core/src/banking_stage/forward_worker.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
use {
super::{
forwarder::Forwarder,
scheduler_messages::{FinishedForwardWork, ForwardWork},
forwarder::Forwarder, immutable_deserialized_packet::ImmutableDeserializedPacket,
ForwardOption,
},
crate::banking_stage::LikeClusterInfo,
crossbeam_channel::{Receiver, RecvError, SendError, Sender},
solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket,
solana_prio_graph_scheduler::scheduler_messages::{FinishedForwardWork, ForwardWork},
thiserror::Error,
};

type DefaultForwardWork = ForwardWork<ImmutableDeserializedPacket>;
type DefaultFinishedForwardWork = FinishedForwardWork<ImmutableDeserializedPacket>;

#[derive(Debug, Error)]
pub enum ForwardWorkerError {
#[error("Failed to receive work from scheduler: {0}")]
Recv(#[from] RecvError),
#[error("Failed to send finalized forward work to scheduler: {0}")]
Send(#[from] SendError<FinishedForwardWork>),
Send(#[from] SendError<DefaultFinishedForwardWork>),
}

pub(crate) struct ForwardWorker<T: LikeClusterInfo> {
forward_receiver: Receiver<ForwardWork>,
forward_receiver: Receiver<DefaultForwardWork>,
forward_option: ForwardOption,
forwarder: Forwarder<T>,
forwarded_sender: Sender<FinishedForwardWork>,
forwarded_sender: Sender<DefaultFinishedForwardWork>,
}

#[allow(dead_code)]
impl<T: LikeClusterInfo> ForwardWorker<T> {
pub fn new(
forward_receiver: Receiver<ForwardWork>,
forward_receiver: Receiver<DefaultForwardWork>,
forward_option: ForwardOption,
forwarder: Forwarder<T>,
forwarded_sender: Sender<FinishedForwardWork>,
forwarded_sender: Sender<DefaultFinishedForwardWork>,
) -> Self {
Self {
forward_receiver,
Expand All @@ -47,7 +51,7 @@ impl<T: LikeClusterInfo> ForwardWorker<T> {
}
}

fn forward_loop(&self, work: ForwardWork) -> Result<(), ForwardWorkerError> {
fn forward_loop(&self, work: DefaultForwardWork) -> Result<(), ForwardWorkerError> {
for work in try_drain_iter(work, &self.forward_receiver) {
let (res, _num_packets, _forward_us, _leader_pubkey) = self.forwarder.forward_packets(
&self.forward_option,
Expand All @@ -64,7 +68,7 @@ impl<T: LikeClusterInfo> ForwardWorker<T> {
Ok(())
}

fn failed_forward_drain(&self, work: ForwardWork) -> Result<(), ForwardWorkerError> {
fn failed_forward_drain(&self, work: DefaultForwardWork) -> Result<(), ForwardWorkerError> {
for work in try_drain_iter(work, &self.forward_receiver) {
self.forwarded_sender.send(FinishedForwardWork {
work,
Expand Down Expand Up @@ -98,6 +102,7 @@ mod tests {
},
solana_perf::packet::to_packet_batches,
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket,
solana_runtime::bank::Bank,
solana_sdk::{
genesis_config::GenesisConfig, poh_config::PohConfig, pubkey::Pubkey,
Expand All @@ -119,8 +124,8 @@ mod tests {
_entry_receiver: Receiver<WorkingBankEntry>,
_poh_simulator: JoinHandle<()>,

forward_sender: Sender<ForwardWork>,
forwarded_receiver: Receiver<FinishedForwardWork>,
forward_sender: Sender<DefaultForwardWork>,
forwarded_receiver: Receiver<DefaultFinishedForwardWork>,
}

fn setup_test_frame() -> (TestFrame, ForwardWorker<Arc<ClusterInfo>>) {
Expand Down
20 changes: 11 additions & 9 deletions core/src/banking_stage/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
super::packet_filter::PacketFilterFailure,
solana_compute_budget::compute_budget_limits::ComputeBudgetLimits,
solana_perf::packet::Packet,
solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket,
solana_runtime_transaction::instructions_processor::process_compute_budget_instructions,
solana_sanitize::SanitizeError,
solana_sdk::{
Expand Down Expand Up @@ -49,8 +50,9 @@ pub struct ImmutableDeserializedPacket {
pub compute_unit_limit: u32,
}

impl ImmutableDeserializedPacket {
pub fn new(packet: Packet) -> Result<Self, DeserializedPacketError> {
impl DeserializableTxPacket for ImmutableDeserializedPacket {
type DeserializeError = DeserializedPacketError;
fn new(packet: Packet) -> Result<Self, Self::DeserializeError> {
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
Expand Down Expand Up @@ -85,33 +87,33 @@ impl ImmutableDeserializedPacket {
})
}

pub fn original_packet(&self) -> &Packet {
fn original_packet(&self) -> &Packet {
&self.original_packet
}

pub fn transaction(&self) -> &SanitizedVersionedTransaction {
fn transaction(&self) -> &SanitizedVersionedTransaction {
&self.transaction
}

pub fn message_hash(&self) -> &Hash {
fn message_hash(&self) -> &Hash {
&self.message_hash
}

pub fn is_simple_vote(&self) -> bool {
fn is_simple_vote(&self) -> bool {
self.is_simple_vote
}

pub fn compute_unit_price(&self) -> u64 {
fn compute_unit_price(&self) -> u64 {
self.compute_unit_price
}

pub fn compute_unit_limit(&self) -> u64 {
fn compute_unit_limit(&self) -> u64 {
u64::from(self.compute_unit_limit)
}

// This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages.
pub fn build_sanitized_transaction(
fn build_sanitized_transaction(
&self,
votes_only: bool,
address_loader: impl AddressLoader,
Expand Down
13 changes: 3 additions & 10 deletions core/src/banking_stage/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@ use {
super::{
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
},
itertools::Itertools,
rand::{thread_rng, Rng},
solana_perf::packet::Packet,
solana_runtime::{bank::Bank, epoch_stakes::EpochStakes},
solana_sdk::{
}, itertools::Itertools, rand::{thread_rng, Rng}, solana_perf::packet::Packet, solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket, solana_runtime::{bank::Bank, epoch_stakes::EpochStakes}, solana_sdk::{
account::from_account,
clock::{Slot, UnixTimestamp},
feature_set::{self},
Expand All @@ -16,17 +11,15 @@ use {
pubkey::Pubkey,
slot_hashes::SlotHashes,
sysvar,
},
solana_vote_program::vote_instruction::VoteInstruction,
std::{
}, solana_vote_program::vote_instruction::VoteInstruction, std::{
cmp,
collections::HashMap,
ops::DerefMut,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
},
}
};

#[derive(PartialEq, Eq, Debug, Copy, Clone)]
Expand Down
9 changes: 2 additions & 7 deletions core/src/banking_stage/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@ use {
super::{
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
packet_filter::PacketFilterFailure,
},
crate::{
}, crate::{
banking_trace::{BankingPacketBatch, BankingPacketReceiver},
sigverify::SigverifyTracerPacketStats,
},
crossbeam_channel::RecvTimeoutError,
solana_perf::packet::PacketBatch,
solana_sdk::saturating_add_assign,
std::time::{Duration, Instant},
}, crossbeam_channel::RecvTimeoutError, solana_perf::packet::PacketBatch, solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket, solana_sdk::saturating_add_assign, std::time::{Duration, Instant}
};

/// Results from deserializing packet batches.
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/packet_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
solana_builtins_default_costs::BUILTIN_INSTRUCTION_COSTS,
solana_sdk::{ed25519_program, saturating_add_assign, secp256k1_program},
thiserror::Error,
solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket,
};

#[derive(Debug, Error, PartialEq)]
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
crate::{banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats},
crossbeam_channel::RecvTimeoutError,
solana_measure::{measure::Measure, measure_us},
solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket,
solana_sdk::{saturating_add_assign, timing::timestamp},
std::{sync::atomic::Ordering, time::Duration},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,6 @@
//!
use {
super::{
prio_graph_scheduler::PrioGraphScheduler,
scheduler_error::SchedulerError,
scheduler_metrics::{
SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics,
},
transaction_id_generator::TransactionIdGenerator,
transaction_state::SanitizedTransactionTTL,
transaction_state_container::TransactionStateContainer,
},
crate::banking_stage::{
consume_worker::ConsumeWorkerMetrics,
consumer::Consumer,
Expand All @@ -26,6 +16,17 @@ use {
solana_accounts_db::account_locks::validate_account_locks,
solana_cost_model::cost_model::CostModel,
solana_measure::measure_us,
solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket,
solana_prio_graph_scheduler::{
id_generator::IdGenerator,
prio_graph_scheduler::PrioGraphScheduler,
scheduler_error::SchedulerError,
scheduler_metrics::{
SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics,
},
transaction_state::SanitizedTransactionTTL,
transaction_state_container::TransactionStateContainer,
},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_runtime_transaction::instructions_processor::process_compute_budget_instructions,
solana_sdk::{
Expand All @@ -51,12 +52,12 @@ pub(crate) struct SchedulerController<T: LikeClusterInfo> {
packet_receiver: PacketDeserializer,
bank_forks: Arc<RwLock<BankForks>>,
/// Generates unique IDs for incoming transactions.
transaction_id_generator: TransactionIdGenerator,
transaction_id_generator: IdGenerator,
/// Container for transaction state.
/// Shared resource between `packet_receiver` and `scheduler`.
container: TransactionStateContainer,
container: TransactionStateContainer<ImmutableDeserializedPacket>,
/// State for scheduling and communicating with worker threads.
scheduler: PrioGraphScheduler,
scheduler: PrioGraphScheduler<ImmutableDeserializedPacket>,
/// Metrics tracking time for leader bank detection.
leader_detection_metrics: SchedulerLeaderDetectionMetrics,
/// Metrics tracking counts on transactions in different states
Expand All @@ -76,16 +77,18 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
decision_maker: DecisionMaker,
packet_deserializer: PacketDeserializer,
bank_forks: Arc<RwLock<BankForks>>,
scheduler: PrioGraphScheduler,
scheduler: PrioGraphScheduler<ImmutableDeserializedPacket>,
worker_metrics: Vec<Arc<ConsumeWorkerMetrics>>,
forwarder: Option<Forwarder<T>>,
) -> Self {
Self {
decision_maker,
packet_receiver: packet_deserializer,
bank_forks,
transaction_id_generator: TransactionIdGenerator::default(),
container: TransactionStateContainer::with_capacity(TOTAL_BUFFERED_PACKETS),
transaction_id_generator: IdGenerator::default(),
container: TransactionStateContainer::<ImmutableDeserializedPacket>::with_capacity(
TOTAL_BUFFERED_PACKETS,
),
scheduler,
leader_detection_metrics: SchedulerLeaderDetectionMetrics::default(),
count_metrics: SchedulerCountMetrics::default(),
Expand Down Expand Up @@ -661,9 +664,7 @@ mod tests {
super::*,
crate::{
banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, tests::create_slow_genesis_config,
},
banking_trace::BankingPacketBatch,
sigverify::SigverifyTracerPacketStats,
Expand All @@ -677,6 +678,9 @@ mod tests {
},
solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry},
solana_prio_graph_scheduler::scheduler_messages::{
ConsumeWork, FinishedConsumeWork, TransactionBatchId,
},
solana_runtime::bank::Bank,
solana_sdk::{
compute_budget::ComputeBudgetInstruction, fee_calculator::FeeRateGovernor, hash::Hash,
Expand Down
Loading

0 comments on commit 4f6d25d

Please sign in to comment.