diff --git a/Cargo.lock b/Cargo.lock index 71142bd..f15e4da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5375,36 +5375,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "solana-prio-graph-scheduler" -version = "0.1.0" -dependencies = [ - "ahash 0.8.11", - "arrayvec", - "assert_matches", - "bincode", - "crossbeam-channel", - "itertools 0.13.0", - "log", - "min-max-heap", - "prio-graph", - "solana-compute-budget", - "solana-cost-model", - "solana-gossip", - "solana-ledger", - "solana-measure", - "solana-metrics", - "solana-perf", - "solana-poh", - "solana-prio-graph-scheduler", - "solana-runtime", - "solana-runtime-transaction", - "solana-sanitize", - "solana-sdk", - "solana-short-vec", - "thiserror", -] - [[package]] name = "solana-program" version = "2.0.13" diff --git a/Cargo.toml b/Cargo.toml index b3e2b9e..32db3e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ members = [ "svm/cli", "svm/executor", "verifier", - "prio-graph-scheduler", "rpc", "scheduler", ] diff --git a/prio-graph-scheduler/Cargo.toml b/prio-graph-scheduler/Cargo.toml deleted file mode 100644 index 8015547..0000000 --- a/prio-graph-scheduler/Cargo.toml +++ /dev/null @@ -1,50 +0,0 @@ -[package] -name = "solana-prio-graph-scheduler" -description = "Solana Priority Graph Scheduler" -documentation = "https://docs.rs/solana-prio-graph-scheduler" -version.workspace = true -authors.workspace = true -repository.workspace = true -homepage.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -solana-sdk = { workspace = true } -solana-poh = { workspace = true } -solana-metrics = { workspace = true } -solana-ledger = { workspace = true } -solana-runtime = { workspace = true } -solana-gossip = { workspace = true } -solana-cost-model = { workspace = true } -solana-measure = { workspace = true } - -ahash = { workspace = true } -prio-graph = { workspace = true } -thiserror = { workspace = true } -itertools = { workspace = true } -log = { workspace = true } -crossbeam-channel = { workspace = true } -arrayvec = { workspace = true } -min-max-heap = { workspace = true } - -[dev-dependencies] -assert_matches = { workspace = true } -solana-compute-budget = { workspace = true } -solana-perf = { workspace = true } -solana-runtime-transaction = { workspace = true } -solana-sanitize = { workspace = true } -solana-short-vec = { workspace = true } -# let dev-context-only-utils works when running this crate. -solana-prio-graph-scheduler = { path = ".", features = [ - "dev-context-only-utils", -] } -solana-sdk = { workspace = true, features = ["dev-context-only-utils"] } - -bincode = { workspace = true } - -[package.metadata.docs.rs] -targets = ["x86_64-unknown-linux-gnu"] - -[features] -dev-context-only-utils = ["solana-runtime/dev-context-only-utils"] diff --git a/prio-graph-scheduler/src/deserializable_packet.rs b/prio-graph-scheduler/src/deserializable_packet.rs deleted file mode 100644 index 71a04e3..0000000 --- a/prio-graph-scheduler/src/deserializable_packet.rs +++ /dev/null @@ -1,23 +0,0 @@ -use solana_sdk::hash::Hash; -use solana_sdk::packet::Packet; -use solana_sdk::transaction::SanitizedVersionedTransaction; -use std::error::Error; - -/// DeserializablePacket can be deserialized from a Packet. -/// -/// DeserializablePacket will be deserialized as a SanitizedTransaction -/// to be scheduled in transaction stream and scheduler. -pub trait DeserializableTxPacket: PartialEq + PartialOrd + Eq + Sized { - type DeserializeError: Error; - - fn new(packet: Packet) -> Result; - - fn original_packet(&self) -> &Packet; - - /// deserialized into versionedTx, and then to SanitizedTransaction. - fn transaction(&self) -> &SanitizedVersionedTransaction; - - fn message_hash(&self) -> &Hash; - - fn is_simple_vote(&self) -> bool; -} diff --git a/prio-graph-scheduler/src/id_generator.rs b/prio-graph-scheduler/src/id_generator.rs deleted file mode 100644 index 3090e4e..0000000 --- a/prio-graph-scheduler/src/id_generator.rs +++ /dev/null @@ -1,19 +0,0 @@ -/// Simple reverse-sequential ID generator for `TransactionId`s. -/// These IDs uniquely identify transactions during the scheduling process. -pub struct IdGenerator { - next_id: u64, -} - -impl Default for IdGenerator { - fn default() -> Self { - Self { next_id: u64::MAX } - } -} - -impl IdGenerator { - pub fn next>(&mut self) -> T { - let id = self.next_id; - self.next_id = self.next_id.wrapping_sub(1); - T::from(id) - } -} diff --git a/prio-graph-scheduler/src/lib.rs b/prio-graph-scheduler/src/lib.rs deleted file mode 100644 index 6c28df5..0000000 --- a/prio-graph-scheduler/src/lib.rs +++ /dev/null @@ -1,131 +0,0 @@ -//! Solana Priority Graph Scheduler. -pub mod id_generator; -pub mod in_flight_tracker; -pub mod scheduler_error; -pub mod scheduler_messages; -pub mod scheduler_metrics; -pub mod thread_aware_account_locks; -pub mod transaction_priority_id; -pub mod transaction_state; -// pub mod scheduler_controller; -pub mod deserializable_packet; -pub mod prio_graph_scheduler; -pub mod transaction_state_container; - -#[macro_use] -extern crate solana_metrics; - -#[cfg(test)] -#[macro_use] -extern crate assert_matches; - -/// Consumer will create chunks of transactions from buffer with up to this size. -pub const TARGET_NUM_TRANSACTIONS_PER_BATCH: usize = 64; - -mod read_write_account_set; - -#[cfg(test)] -mod tests { - use { - crate::deserializable_packet::DeserializableTxPacket, - solana_perf::packet::Packet, - solana_sdk::{ - hash::Hash, - message::Message, - sanitize::SanitizeError, - signature::Signature, - transaction::{SanitizedVersionedTransaction, VersionedTransaction}, - }, - solana_short_vec::decode_shortu16_len, - std::{cmp::Ordering, mem::size_of}, - thiserror::Error, - }; - - #[derive(Debug, Error)] - pub enum MockDeserializedPacketError { - #[error("ShortVec Failed to Deserialize")] - // short_vec::decode_shortu16_len() currently returns () on error - ShortVecError(()), - #[error("Deserialization Error: {0}")] - DeserializationError(#[from] bincode::Error), - #[error("overflowed on signature size {0}")] - SignatureOverflowed(usize), - #[error("packet failed sanitization {0}")] - SanitizeError(#[from] SanitizeError), - } - - #[derive(Debug, Eq)] - pub struct MockImmutableDeserializedPacket { - pub original_packet: Packet, - pub transaction: SanitizedVersionedTransaction, - pub message_hash: Hash, - pub is_simple_vote: bool, - } - - impl DeserializableTxPacket for MockImmutableDeserializedPacket { - type DeserializeError = MockDeserializedPacketError; - fn new(packet: Packet) -> Result { - let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; - let sanitized_transaction = - SanitizedVersionedTransaction::try_from(versioned_transaction)?; - let message_bytes = packet_message(&packet)?; - let message_hash = Message::hash_raw_message(message_bytes); - let is_simple_vote = packet.meta().is_simple_vote_tx(); - - Ok(Self { - original_packet: packet, - transaction: sanitized_transaction, - message_hash, - is_simple_vote, - }) - } - - fn original_packet(&self) -> &Packet { - &self.original_packet - } - - fn transaction(&self) -> &SanitizedVersionedTransaction { - &self.transaction - } - - fn message_hash(&self) -> &Hash { - &self.message_hash - } - - fn is_simple_vote(&self) -> bool { - self.is_simple_vote - } - } - - // PartialEq MUST be consistent with PartialOrd and Ord - impl PartialEq for MockImmutableDeserializedPacket { - fn eq(&self, other: &Self) -> bool { - self.message_hash == other.message_hash - } - } - - impl PartialOrd for MockImmutableDeserializedPacket { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } - } - - impl Ord for MockImmutableDeserializedPacket { - fn cmp(&self, other: &Self) -> Ordering { - self.message_hash().cmp(other.message_hash()) - } - } - - /// Read the transaction message from packet data - fn packet_message(packet: &Packet) -> Result<&[u8], MockDeserializedPacketError> { - let (sig_len, sig_size) = packet - .data(..) - .and_then(|bytes| decode_shortu16_len(bytes).ok()) - .ok_or(MockDeserializedPacketError::ShortVecError(()))?; - sig_len - .checked_mul(size_of::()) - .and_then(|v| v.checked_add(sig_size)) - .and_then(|msg_start| packet.data(msg_start..)) - .ok_or(MockDeserializedPacketError::SignatureOverflowed(sig_size)) - } -} diff --git a/prio-graph-scheduler/src/scheduler_messages.rs b/prio-graph-scheduler/src/scheduler_messages.rs deleted file mode 100644 index b7ecf19..0000000 --- a/prio-graph-scheduler/src/scheduler_messages.rs +++ /dev/null @@ -1,70 +0,0 @@ -use { - solana_sdk::{clock::Slot, transaction::SanitizedTransaction}, - std::fmt::Display, -}; - -/// A unique identifier for a transaction batch. -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] -pub struct TransactionBatchId(u64); - -impl TransactionBatchId { - pub fn new(index: u64) -> Self { - Self(index) - } -} - -impl Display for TransactionBatchId { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl From for TransactionBatchId { - fn from(id: u64) -> Self { - Self(id) - } -} - -/// A unique identifier for a transaction. -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct TransactionId(u64); - -impl TransactionId { - pub fn new(index: u64) -> Self { - Self(index) - } -} - -impl Display for TransactionId { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl From for TransactionId { - fn from(id: u64) -> Self { - Self(id) - } -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct MaxAge { - pub epoch_invalidation_slot: Slot, - pub alt_invalidation_slot: Slot, -} - -/// Message: [Scheduler -> Worker] -/// Transactions to be consumed (i.e. executed, recorded, and committed) -pub struct ConsumeWork { - pub batch_id: TransactionBatchId, - pub ids: Vec, - pub transactions: Vec, - pub max_ages: Vec, -} - -/// Message: [Worker -> Scheduler] -/// Processed transactions. -pub struct FinishedConsumeWork { - pub work: ConsumeWork, - pub retryable_indexes: Vec, -} diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index e80da9f..bf0051e 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,4 +1,5 @@ #![allow(dead_code)] +#![allow(clippy::result_large_err)] #[macro_use] extern crate log; diff --git a/scheduler/bin/scheduling_simulation.rs b/scheduler/bin/scheduling_simulation.rs index 4ba71ac..12cfacb 100644 --- a/scheduler/bin/scheduling_simulation.rs +++ b/scheduler/bin/scheduling_simulation.rs @@ -1,11 +1,9 @@ use crossbeam_channel::{unbounded, Receiver, Sender}; use igloo_executor::processor::TransactionProcessor; use igloo_scheduler::id_generator::IdGenerator; -use igloo_scheduler::impls::no_lock_scheduler::NoLockScheduler; +use igloo_scheduler::impls::prio_graph_scheduler::PrioGraphSchedulerWrapper; use igloo_scheduler::scheduler::Scheduler; -use igloo_scheduler::scheduler_messages::{ - SchedulingBatch, SchedulingBatchResult, -}; +use igloo_scheduler::scheduler_messages::{MaxAge, SchedulingBatch, SchedulingBatchResult}; use igloo_scheduler::status_slicing::{ calculate_thread_load_summary, SvmWorkerSlicingStatus, WorkerStatusUpdate, }; @@ -48,7 +46,7 @@ fn mocking_transfer_tx( )) } -const TOTAL_TX_NUM: usize = 1024 * 16; +const TOTAL_TX_NUM: usize = 1024 * 4; const TOTAL_WORKER_NUM: usize = 4; // each tx need 2 unique accounts. const NUM_ACCOUNTS: usize = TOTAL_TX_NUM * 2; @@ -121,7 +119,7 @@ fn worker_process( // it's ok to ignore send error. // because error is handled by the scheduler. // if scheduler exits, means all task is scheduled. - // no need to maintain lock now. + // no need to maintain channel now. let _ = completed_sender.send(result); // Update idle_start for next iteration @@ -170,8 +168,7 @@ fn main() -> Result<(), E> { let recent_hash = store.current_bank().last_blockhash(); let transfer_txs = accounts .chunks(2) - .enumerate() - .map(|(_, chunk)| { + .map(|chunk| { mocking_transfer_tx(&chunk[0].0, &chunk[1].0.pubkey(), 1e9 as u64, recent_hash) }) .collect::, _>>()?; @@ -203,26 +200,29 @@ fn main() -> Result<(), E> { let mut batch_id_gen = IdGenerator::default(); let mut tx_id_gen = IdGenerator::default(); - let mut scheduler = NoLockScheduler::new(senders.clone(), completed_receiver); + let mut scheduler = PrioGraphSchedulerWrapper::new(senders.clone(), completed_receiver); for chunk in transfer_txs .into_iter() .chunks(SCHEDULER_BATCH_SIZE) .into_iter() .map(|chunk| chunk.collect()) .map(|transactions: Vec<_>| { + let len = transactions.len(); let ids = transactions .iter() - .map(|_| tx_id_gen.next()) + .map(|_| tx_id_gen.gen()) .collect::>(); SchedulingBatch { - batch_id: batch_id_gen.next(), + batch_id: batch_id_gen.gen(), ids, transactions, + max_ages: vec![MaxAge::default(); len], } }) .collect::>() { - scheduler.schedule_batch(chunk); + scheduler.schedule_batch(chunk)?; + scheduler.receive_complete()?; } // Close senders to signal workers to finish diff --git a/scheduler/src/id_generator.rs b/scheduler/src/id_generator.rs index 3090e4e..b0429fa 100644 --- a/scheduler/src/id_generator.rs +++ b/scheduler/src/id_generator.rs @@ -11,7 +11,7 @@ impl Default for IdGenerator { } impl IdGenerator { - pub fn next>(&mut self) -> T { + pub fn gen>(&mut self) -> T { let id = self.next_id; self.next_id = self.next_id.wrapping_sub(1); T::from(id) diff --git a/scheduler/src/impls/no_lock_scheduler.rs b/scheduler/src/impls/no_lock_scheduler/mod.rs similarity index 80% rename from scheduler/src/impls/no_lock_scheduler.rs rename to scheduler/src/impls/no_lock_scheduler/mod.rs index 808221f..ff96ea3 100644 --- a/scheduler/src/impls/no_lock_scheduler.rs +++ b/scheduler/src/impls/no_lock_scheduler/mod.rs @@ -1,3 +1,4 @@ +use crate::impls::prio_graph_scheduler::scheduler_error::SchedulerError; use crate::scheduler::Scheduler; use crate::scheduler_messages::{SchedulingBatch, SchedulingBatchResult}; use crossbeam_channel::{Receiver, Sender}; @@ -19,7 +20,7 @@ impl Scheduler for NoLockScheduler { } } - fn schedule_batch(&mut self, txs: SchedulingBatch) { + fn schedule_batch(&mut self, txs: SchedulingBatch) -> Result<(), SchedulerError> { let exec_batch = 64; txs.transactions .chunks(exec_batch) @@ -31,8 +32,14 @@ impl Scheduler for NoLockScheduler { batch_id: txs.batch_id, ids: txs.ids[i * exec_batch..(i + 1) * exec_batch].to_vec(), transactions: chunk.to_vec(), + max_ages: vec![], }; self.task_senders[worker_id].send(batch).unwrap(); }); + Ok(()) + } + + fn receive_complete(&mut self) -> Result<(), SchedulerError> { + Ok(()) } } diff --git a/scheduler/src/impls/prio_graph_scheduler.rs b/scheduler/src/impls/prio_graph_scheduler.rs deleted file mode 100644 index 8b13789..0000000 --- a/scheduler/src/impls/prio_graph_scheduler.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/prio-graph-scheduler/src/in_flight_tracker.rs b/scheduler/src/impls/prio_graph_scheduler/in_flight_tracker.rs similarity index 94% rename from prio-graph-scheduler/src/in_flight_tracker.rs rename to scheduler/src/impls/prio_graph_scheduler/in_flight_tracker.rs index 4e650a3..329ba90 100644 --- a/prio-graph-scheduler/src/in_flight_tracker.rs +++ b/scheduler/src/impls/prio_graph_scheduler/in_flight_tracker.rs @@ -1,6 +1,7 @@ use { - crate::id_generator::IdGenerator, crate::scheduler_messages::TransactionBatchId, - crate::thread_aware_account_locks::ThreadId, std::collections::HashMap, + crate::id_generator::IdGenerator, + crate::impls::prio_graph_scheduler::thread_aware_account_locks::ThreadId, + crate::scheduler_messages::TransactionBatchId, std::collections::HashMap, }; /// Tracks the number of transactions that are in flight for each thread. @@ -46,7 +47,7 @@ impl InFlightTracker { total_cus: u64, thread_id: ThreadId, ) -> TransactionBatchId { - let batch_id = self.batch_id_generator.next(); + let batch_id = self.batch_id_generator.gen(); self.num_in_flight_per_thread[thread_id] += num_transactions; self.cus_in_flight_per_thread[thread_id] += total_cus; self.batches.insert( diff --git a/scheduler/src/impls/prio_graph_scheduler/mod.rs b/scheduler/src/impls/prio_graph_scheduler/mod.rs new file mode 100644 index 0000000..dbc9a54 --- /dev/null +++ b/scheduler/src/impls/prio_graph_scheduler/mod.rs @@ -0,0 +1,70 @@ +pub mod in_flight_tracker; +pub mod read_write_account_set; +pub mod scheduler; +pub mod scheduler_error; +pub mod scheduler_metrics; +pub mod thread_aware_account_locks; +pub mod transaction_priority_id; +pub mod transaction_state; +pub mod transaction_state_container; + +use crate::impls::prio_graph_scheduler::scheduler::PrioGraphScheduler; +use crate::impls::prio_graph_scheduler::scheduler_error::SchedulerError; +use crate::impls::prio_graph_scheduler::transaction_state::SanitizedTransactionTTL; +use crate::impls::prio_graph_scheduler::transaction_state_container::TransactionStateContainer; +use crate::scheduler::Scheduler; +use crate::scheduler_messages::{SchedulingBatch, SchedulingBatchResult}; +use crossbeam_channel::{Receiver, Sender}; + +pub const TARGET_NUM_TRANSACTIONS_PER_BATCH: usize = 128; + +pub struct PrioGraphSchedulerWrapper { + inner: PrioGraphScheduler, + container: TransactionStateContainer, +} + +impl Scheduler for PrioGraphSchedulerWrapper { + fn new( + schedule_task_senders: Vec>, + task_finished_receivers: Receiver, + ) -> Self { + Self { + inner: PrioGraphScheduler::new(schedule_task_senders, task_finished_receivers), + container: TransactionStateContainer::with_capacity(10240), + } + } + + fn schedule_batch(&mut self, mut txs: SchedulingBatch) -> Result<(), SchedulerError> { + for ((tx, tx_id), max_age) in txs + .transactions + .drain(..) + .zip(txs.ids.drain(..)) + .zip(txs.max_ages.drain(..)) + { + self.container.insert_new_transaction( + tx_id, + SanitizedTransactionTTL { + transaction: tx, + max_age, + }, + // TODO migrate priority + 0, + 100, + ); + } + + self.inner.schedule( + &mut self.container, + // TODO: migrate pre-filter transactions + |_, result| result.fill(true), + |_| true, + )?; + Ok(()) + } + + fn receive_complete(&mut self) -> Result<(), SchedulerError> { + // TODO metrics logic + self.inner.receive_completed(&mut self.container)?; + Ok(()) + } +} diff --git a/prio-graph-scheduler/src/read_write_account_set.rs b/scheduler/src/impls/prio_graph_scheduler/read_write_account_set.rs similarity index 100% rename from prio-graph-scheduler/src/read_write_account_set.rs rename to scheduler/src/impls/prio_graph_scheduler/read_write_account_set.rs diff --git a/prio-graph-scheduler/src/prio_graph_scheduler.rs b/scheduler/src/impls/prio_graph_scheduler/scheduler.rs similarity index 92% rename from prio-graph-scheduler/src/prio_graph_scheduler.rs rename to scheduler/src/impls/prio_graph_scheduler/scheduler.rs index bca3252..1c2e9d8 100644 --- a/prio-graph-scheduler/src/prio_graph_scheduler.rs +++ b/scheduler/src/impls/prio_graph_scheduler/scheduler.rs @@ -1,17 +1,21 @@ +use crate::scheduler_messages::MaxAge; use { + super::{ + transaction_state::{SanitizedTransactionTTL, TransactionState}, + transaction_state_container::TransactionStateContainer, + }, crate::{ - deserializable_packet::DeserializableTxPacket, - in_flight_tracker::InFlightTracker, - read_write_account_set::ReadWriteAccountSet, - scheduler_error::SchedulerError, + impls::prio_graph_scheduler::in_flight_tracker::InFlightTracker, + impls::prio_graph_scheduler::read_write_account_set::ReadWriteAccountSet, + impls::prio_graph_scheduler::scheduler_error::SchedulerError, + impls::prio_graph_scheduler::thread_aware_account_locks::{ + ThreadAwareAccountLocks, ThreadId, ThreadSet, + }, + impls::prio_graph_scheduler::transaction_priority_id::TransactionPriorityId, + impls::prio_graph_scheduler::TARGET_NUM_TRANSACTIONS_PER_BATCH, scheduler_messages::{ - ConsumeWork, FinishedConsumeWork, MaxAge, TransactionBatchId, TransactionId, + SchedulingBatch, SchedulingBatchResult, TransactionBatchId, TransactionId, }, - thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet}, - transaction_priority_id::TransactionPriorityId, - transaction_state::{SanitizedTransactionTTL, TransactionState}, - transaction_state_container::TransactionStateContainer, - TARGET_NUM_TRANSACTIONS_PER_BATCH, }, crossbeam_channel::{Receiver, Sender, TryRecvError}, itertools::izip, @@ -21,19 +25,18 @@ use { solana_sdk::{pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction}, }; -pub struct PrioGraphScheduler { +pub struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, - consume_work_senders: Vec>, - finished_consume_work_receiver: Receiver, + consume_work_senders: Vec>, + finished_consume_work_receiver: Receiver, look_ahead_window_size: usize, - phantom: std::marker::PhantomData

, } -impl PrioGraphScheduler

{ +impl PrioGraphScheduler { pub fn new( - consume_work_senders: Vec>, - finished_consume_work_receiver: Receiver, + consume_work_senders: Vec>, + finished_consume_work_receiver: Receiver, ) -> Self { let num_threads = consume_work_senders.len(); Self { @@ -42,7 +45,6 @@ impl PrioGraphScheduler

{ consume_work_senders, finished_consume_work_receiver, look_ahead_window_size: 2048, - phantom: std::marker::PhantomData, } } @@ -64,7 +66,7 @@ impl PrioGraphScheduler

{ /// not cause conflicts in the near future. pub fn schedule( &mut self, - container: &mut TransactionStateContainer

, + container: &mut TransactionStateContainer, pre_graph_filter: impl Fn(&[&SanitizedTransaction], &mut [bool]), pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool, ) -> Result { @@ -100,7 +102,7 @@ impl PrioGraphScheduler

{ let mut total_filter_time_us: u64 = 0; let mut window_budget = self.look_ahead_window_size; - let mut chunked_pops = |container: &mut TransactionStateContainer

, + let mut chunked_pops = |container: &mut TransactionStateContainer, prio_graph: &mut PrioGraph<_, _, _, _>, window_budget: &mut usize| { while *window_budget > 0 { @@ -279,7 +281,7 @@ impl PrioGraphScheduler

{ /// Returns (num_transactions, num_retryable_transactions) on success. pub fn receive_completed( &mut self, - container: &mut TransactionStateContainer

, + container: &mut TransactionStateContainer, ) -> Result<(usize, usize), SchedulerError> { let mut total_num_transactions: usize = 0; let mut total_num_retryable: usize = 0; @@ -298,12 +300,12 @@ impl PrioGraphScheduler

{ /// Returns `Ok((num_transactions, num_retryable))` if a batch was received, `Ok((0, 0))` if no batch was received. fn try_receive_completed( &mut self, - container: &mut TransactionStateContainer

, + container: &mut TransactionStateContainer, ) -> Result<(usize, usize), SchedulerError> { match self.finished_consume_work_receiver.try_recv() { - Ok(FinishedConsumeWork { - work: - ConsumeWork { + Ok(SchedulingBatchResult { + batch: + SchedulingBatch { batch_id, ids, transactions, @@ -397,7 +399,7 @@ impl PrioGraphScheduler

{ .track_batch(ids.len(), total_cus, thread_index); let num_scheduled = ids.len(); - let work = ConsumeWork { + let work = SchedulingBatch { batch_id, ids, transactions, @@ -533,8 +535,8 @@ enum TransactionSchedulingError { UnschedulableConflicts, } -fn try_schedule_transaction( - transaction_state: &mut TransactionState

, +fn try_schedule_transaction( + transaction_state: &mut TransactionState, pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool, blocking_locks: &mut ReadWriteAccountSet, account_locks: &mut ThreadAwareAccountLocks, @@ -590,18 +592,17 @@ fn try_schedule_transaction( #[cfg(test)] mod tests { + use assert_matches::assert_matches; use { super::*, - crate::tests::MockImmutableDeserializedPacket, - crate::TARGET_NUM_TRANSACTIONS_PER_BATCH, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, solana_sdk::{ clock::Slot, compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, - packet::Packet, pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction, + pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, }, - std::{borrow::Borrow, sync::Arc}, + std::borrow::Borrow, }; macro_rules! txid { @@ -619,17 +620,15 @@ mod tests { fn create_test_frame( num_threads: usize, ) -> ( - PrioGraphScheduler, - Vec>, - Sender, + PrioGraphScheduler, + Vec>, + Sender, ) { let (consume_work_senders, consume_work_receivers) = (0..num_threads).map(|_| unbounded()).unzip(); let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); - let scheduler = PrioGraphScheduler::::new( - consume_work_senders, - finished_consume_work_receiver, - ); + let scheduler = + PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver); ( scheduler, consume_work_receivers, @@ -666,9 +665,8 @@ mod tests { u64, ), >, - ) -> TransactionStateContainer { - let mut container = - TransactionStateContainer::::with_capacity(10 * 1024); + ) -> TransactionStateContainer { + let mut container = TransactionStateContainer::with_capacity(10 * 1024); for (index, (from_keypair, to_pubkeys, lamports, compute_unit_price)) in tx_infos.into_iter().enumerate() { @@ -679,12 +677,7 @@ mod tests { lamports, compute_unit_price, ); - let packet = Arc::new( - MockImmutableDeserializedPacket::new( - Packet::from_data(None, transaction.to_versioned_transaction()).unwrap(), - ) - .unwrap(), - ); + let transaction_ttl = SanitizedTransactionTTL { transaction, max_age: MaxAge { @@ -696,7 +689,6 @@ mod tests { container.insert_new_transaction( id, transaction_ttl, - packet, compute_unit_price, TEST_TRANSACTION_COST, ); @@ -706,8 +698,8 @@ mod tests { } fn collect_work( - receiver: &Receiver, - ) -> (Vec, Vec>) { + receiver: &Receiver, + ) -> (Vec, Vec>) { receiver .try_iter() .map(|work| { @@ -866,8 +858,8 @@ mod tests { // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 finished_work_sender - .send(FinishedConsumeWork { - work: thread_0_work.into_iter().next().unwrap(), + .send(SchedulingBatchResult { + batch: thread_0_work.into_iter().next().unwrap(), retryable_indexes: vec![], }) .unwrap(); diff --git a/prio-graph-scheduler/src/scheduler_error.rs b/scheduler/src/impls/prio_graph_scheduler/scheduler_error.rs similarity index 100% rename from prio-graph-scheduler/src/scheduler_error.rs rename to scheduler/src/impls/prio_graph_scheduler/scheduler_error.rs diff --git a/prio-graph-scheduler/src/scheduler_metrics.rs b/scheduler/src/impls/prio_graph_scheduler/scheduler_metrics.rs similarity index 99% rename from prio-graph-scheduler/src/scheduler_metrics.rs rename to scheduler/src/impls/prio_graph_scheduler/scheduler_metrics.rs index bb8cbbe..2c0e39d 100644 --- a/prio-graph-scheduler/src/scheduler_metrics.rs +++ b/scheduler/src/impls/prio_graph_scheduler/scheduler_metrics.rs @@ -1,3 +1,4 @@ +use solana_metrics::{create_datapoint, datapoint_info}; use { itertools::MinMaxResult, solana_poh::poh_recorder::BankStart, diff --git a/prio-graph-scheduler/src/thread_aware_account_locks.rs b/scheduler/src/impls/prio_graph_scheduler/thread_aware_account_locks.rs similarity index 100% rename from prio-graph-scheduler/src/thread_aware_account_locks.rs rename to scheduler/src/impls/prio_graph_scheduler/thread_aware_account_locks.rs diff --git a/prio-graph-scheduler/src/transaction_priority_id.rs b/scheduler/src/impls/prio_graph_scheduler/transaction_priority_id.rs similarity index 100% rename from prio-graph-scheduler/src/transaction_priority_id.rs rename to scheduler/src/impls/prio_graph_scheduler/transaction_priority_id.rs diff --git a/prio-graph-scheduler/src/transaction_state.rs b/scheduler/src/impls/prio_graph_scheduler/transaction_state.rs similarity index 78% rename from prio-graph-scheduler/src/transaction_state.rs rename to scheduler/src/impls/prio_graph_scheduler/transaction_state.rs index a4985c3..76b4328 100644 --- a/prio-graph-scheduler/src/transaction_state.rs +++ b/scheduler/src/impls/prio_graph_scheduler/transaction_state.rs @@ -1,9 +1,8 @@ -use { - crate::deserializable_packet::DeserializableTxPacket, crate::scheduler_messages::MaxAge, - solana_sdk::transaction::SanitizedTransaction, std::sync::Arc, -}; +use crate::scheduler_messages::MaxAge; +use solana_sdk::transaction::SanitizedTransaction; /// Simple wrapper type to tie a sanitized transaction to max age slot. +#[derive(Clone, Debug)] pub struct SanitizedTransactionTTL { pub transaction: SanitizedTransaction, pub max_age: MaxAge, @@ -29,42 +28,30 @@ pub struct SanitizedTransactionTTL { /// to the appropriate thread for processing. This is done to avoid cloning the /// `SanitizedTransaction`. #[allow(clippy::large_enum_variant)] -pub enum TransactionState { +pub enum TransactionState { /// The transaction is available for scheduling. Unprocessed { transaction_ttl: SanitizedTransactionTTL, - packet: Arc

, priority: u64, cost: u64, - should_forward: bool, }, /// The transaction is currently scheduled or being processed. Pending { - packet: Arc

, + transaction_ttl: SanitizedTransactionTTL, priority: u64, cost: u64, - should_forward: bool, }, /// Only used during transition. Transitioning, } -impl TransactionState

{ +impl TransactionState { /// Creates a new `TransactionState` in the `Unprocessed` state. - pub fn new( - transaction_ttl: SanitizedTransactionTTL, - packet: Arc

, - priority: u64, - cost: u64, - ) -> Self { - let should_forward = !packet.original_packet().meta().forwarded() - && packet.original_packet().meta().is_from_staked_node(); + pub fn new(transaction_ttl: SanitizedTransactionTTL, priority: u64, cost: u64) -> Self { Self::Unprocessed { transaction_ttl, - packet, priority, cost, - should_forward, } } @@ -88,40 +75,6 @@ impl TransactionState

{ } } - /// Return whether packet should be attempted to be forwarded. - pub fn should_forward(&self) -> bool { - match self { - Self::Unprocessed { - should_forward: forwarded, - .. - } => *forwarded, - Self::Pending { - should_forward: forwarded, - .. - } => *forwarded, - Self::Transitioning => unreachable!(), - } - } - - /// Mark the packet as forwarded. - /// This is used to prevent the packet from being forwarded multiple times. - pub fn mark_forwarded(&mut self) { - match self { - Self::Unprocessed { should_forward, .. } => *should_forward = false, - Self::Pending { should_forward, .. } => *should_forward = false, - Self::Transitioning => unreachable!(), - } - } - - /// Return the packet of the transaction. - pub fn packet(&self) -> &Arc

{ - match self { - Self::Unprocessed { packet, .. } => packet, - Self::Pending { packet, .. } => packet, - Self::Transitioning => unreachable!(), - } - } - /// Intended to be called when a transaction is scheduled. This method will /// transition the transaction from `Unprocessed` to `Pending` and return the /// `SanitizedTransactionTTL` for processing. @@ -133,18 +86,17 @@ impl TransactionState

{ match self.take() { TransactionState::Unprocessed { transaction_ttl, - packet, priority, cost, - should_forward: forwarded, } => { + let transaction_ttl_c = transaction_ttl.clone(); *self = TransactionState::Pending { - packet, + transaction_ttl, priority, cost, - should_forward: forwarded, }; - transaction_ttl + // TODO cancel this clone. + transaction_ttl_c } TransactionState::Pending { .. } => { panic!("transaction already pending"); @@ -162,18 +114,11 @@ impl TransactionState

{ pub fn transition_to_unprocessed(&mut self, transaction_ttl: SanitizedTransactionTTL) { match self.take() { TransactionState::Unprocessed { .. } => panic!("already unprocessed"), - TransactionState::Pending { - packet, - priority, - cost, - should_forward: forwarded, - } => { + TransactionState::Pending { priority, cost, .. } => { *self = Self::Unprocessed { transaction_ttl, - packet, priority, cost, - should_forward: forwarded, } } Self::Transitioning => unreachable!(), @@ -205,17 +150,13 @@ impl TransactionState

{ mod tests { use { super::*, - crate::tests::MockImmutableDeserializedPacket, solana_sdk::{ clock::Slot, compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, - packet::Packet, signature::Keypair, signer::Signer, system_instruction, - transaction::Transaction, + signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, }, }; - fn create_transaction_state( - compute_unit_price: u64, - ) -> TransactionState { + fn create_transaction_state(compute_unit_price: u64) -> TransactionState { let from_keypair = Keypair::new(); let ixs = vec![ system_instruction::transfer( @@ -228,10 +169,6 @@ mod tests { let message = Message::new(&ixs, Some(&from_keypair.pubkey())); let tx = Transaction::new(&[&from_keypair], message, Hash::default()); - let packet = Arc::new( - MockImmutableDeserializedPacket::new(Packet::from_data(None, tx.clone()).unwrap()) - .unwrap(), - ); let transaction_ttl = SanitizedTransactionTTL { transaction: SanitizedTransaction::from_transaction_for_tests(tx), max_age: MaxAge { @@ -240,20 +177,18 @@ mod tests { }, }; const TEST_TRANSACTION_COST: u64 = 5000; - TransactionState::new( - transaction_ttl, - packet, - compute_unit_price, - TEST_TRANSACTION_COST, - ) + TransactionState::new(transaction_ttl, compute_unit_price, TEST_TRANSACTION_COST) } #[test] #[should_panic(expected = "already pending")] fn test_transition_to_pending_panic() { let mut transaction_state = create_transaction_state(0); + println!("111"); transaction_state.transition_to_pending(); + println!("222"); transaction_state.transition_to_pending(); // invalid transition + println!("333"); } #[test] diff --git a/prio-graph-scheduler/src/transaction_state_container.rs b/scheduler/src/impls/prio_graph_scheduler/transaction_state_container.rs similarity index 87% rename from prio-graph-scheduler/src/transaction_state_container.rs rename to scheduler/src/impls/prio_graph_scheduler/transaction_state_container.rs index 2f11629..9bbaa1c 100644 --- a/prio-graph-scheduler/src/transaction_state_container.rs +++ b/scheduler/src/impls/prio_graph_scheduler/transaction_state_container.rs @@ -3,10 +3,10 @@ use { transaction_priority_id::TransactionPriorityId, transaction_state::{SanitizedTransactionTTL, TransactionState}, }, - crate::{deserializable_packet::DeserializableTxPacket, scheduler_messages::TransactionId}, + crate::scheduler_messages::TransactionId, itertools::MinMaxResult, min_max_heap::MinMaxHeap, - std::{collections::HashMap, sync::Arc}, + std::collections::HashMap, }; /// This structure will hold `TransactionState` for the entirety of a @@ -34,12 +34,12 @@ use { /// /// The container maintains a fixed capacity. If the queue is full when pushing /// a new transaction, the lowest priority transaction will be dropped. -pub struct TransactionStateContainer { +pub struct TransactionStateContainer { priority_queue: MinMaxHeap, - id_to_transaction_state: HashMap>, + id_to_transaction_state: HashMap, } -impl TransactionStateContainer

{ +impl TransactionStateContainer { pub fn with_capacity(capacity: usize) -> Self { Self { priority_queue: MinMaxHeap::with_capacity(capacity), @@ -47,6 +47,10 @@ impl TransactionStateContainer

{ } } + pub fn len(&self) -> usize { + self.priority_queue.len() + } + /// Returns true if the queue is empty. pub fn is_empty(&self) -> bool { self.priority_queue.is_empty() @@ -66,7 +70,7 @@ impl TransactionStateContainer

{ pub fn get_mut_transaction_state( &mut self, id: &TransactionId, - ) -> Option<&mut TransactionState

> { + ) -> Option<&mut TransactionState> { self.id_to_transaction_state.get_mut(id) } @@ -84,14 +88,13 @@ impl TransactionStateContainer

{ &mut self, transaction_id: TransactionId, transaction_ttl: SanitizedTransactionTTL, - packet: Arc

, priority: u64, cost: u64, ) -> bool { let priority_id = TransactionPriorityId::new(priority, transaction_id); self.id_to_transaction_state.insert( transaction_id, - TransactionState::new(transaction_ttl, packet, priority, cost), + TransactionState::new(transaction_ttl, priority, cost), ); self.push_id_into_queue(priority_id) } @@ -148,12 +151,10 @@ mod tests { use { super::*, crate::scheduler_messages::MaxAge, - crate::tests::MockImmutableDeserializedPacket, solana_sdk::{ compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, - packet::Packet, signature::Keypair, signer::Signer, slot_history::Slot, @@ -163,14 +164,7 @@ mod tests { }; /// Returns (transaction_ttl, priority, cost) - fn test_transaction( - priority: u64, - ) -> ( - SanitizedTransactionTTL, - Arc, - u64, - u64, - ) { + fn test_transaction(priority: u64) -> (SanitizedTransactionTTL, u64, u64) { let from_keypair = Keypair::new(); let ixs = vec![ system_instruction::transfer( @@ -186,12 +180,6 @@ mod tests { message, Hash::default(), )); - let packet = Arc::new( - MockImmutableDeserializedPacket::new( - Packet::from_data(None, tx.to_versioned_transaction()).unwrap(), - ) - .unwrap(), - ); let transaction_ttl = SanitizedTransactionTTL { transaction: tx, max_age: MaxAge { @@ -200,20 +188,16 @@ mod tests { }, }; const TEST_TRANSACTION_COST: u64 = 5000; - (transaction_ttl, packet, priority, TEST_TRANSACTION_COST) + (transaction_ttl, priority, TEST_TRANSACTION_COST) } - fn push_to_container( - container: &mut TransactionStateContainer, - num: usize, - ) { + fn push_to_container(container: &mut TransactionStateContainer, num: usize) { for id in 0..num as u64 { let priority = id; - let (transaction_ttl, packet, priority, cost) = test_transaction(priority); + let (transaction_ttl, priority, cost) = test_transaction(priority); container.insert_new_transaction( TransactionId::new(id), transaction_ttl, - packet, priority, cost, ); diff --git a/scheduler/src/lib.rs b/scheduler/src/lib.rs index 0739c71..b2faedf 100644 --- a/scheduler/src/lib.rs +++ b/scheduler/src/lib.rs @@ -1,6 +1,6 @@ +pub mod id_generator; pub mod impls; pub mod scheduler; pub mod scheduler_messages; pub mod status_slicing; pub mod stopwatch; -pub mod id_generator; diff --git a/scheduler/src/scheduler.rs b/scheduler/src/scheduler.rs index b375d1a..e5bcf95 100644 --- a/scheduler/src/scheduler.rs +++ b/scheduler/src/scheduler.rs @@ -1,3 +1,4 @@ +use crate::impls::prio_graph_scheduler::scheduler_error::SchedulerError; use crate::scheduler_messages::{SchedulingBatch, SchedulingBatchResult}; use crossbeam_channel::{Receiver, Sender}; @@ -11,11 +12,16 @@ use crossbeam_channel::{Receiver, Sender}; /// Scheduler -- Task channelK -> [workerK] -> Task finish callback -> Scheduler /// | ... | /// -> Task channelN -> [workerN] -> Task finish callback -> +/// +/// so there should be a scheduler thread, accepting upstreaming transaction flow from rpc, +/// going with a loop and calling scheduler repeatedly with `schedule_batch` and `receive_complete`. pub trait Scheduler { fn new( schedule_task_senders: Vec>, task_finished_receivers: Receiver, ) -> Self; - fn schedule_batch(&mut self, txs: SchedulingBatch); + fn schedule_batch(&mut self, txs: SchedulingBatch) -> Result<(), SchedulerError>; + + fn receive_complete(&mut self) -> Result<(), SchedulerError>; } diff --git a/scheduler/src/scheduler_messages.rs b/scheduler/src/scheduler_messages.rs index c7870ed..2bcace6 100644 --- a/scheduler/src/scheduler_messages.rs +++ b/scheduler/src/scheduler_messages.rs @@ -1,3 +1,4 @@ +use solana_program::clock::Slot; use {solana_sdk::transaction::SanitizedTransaction, std::fmt::Display}; /// A unique identifier for a transaction batch. @@ -57,6 +58,13 @@ pub struct SchedulingBatch { pub batch_id: TransactionBatchId, pub ids: Vec, pub transactions: Vec, + pub max_ages: Vec, +} + +impl SchedulingBatch { + pub fn valid(&self) -> bool { + self.transactions.len() == self.ids.len() && self.ids.len() == self.max_ages.len() + } } /// The scheduling result from worker one time. @@ -69,3 +77,10 @@ pub struct SchedulingBatchResult { // time slice status for this batch job. pub retryable_indexes: Vec, } + +/// A TTL flag for a transaction. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)] +pub struct MaxAge { + pub epoch_invalidation_slot: Slot, + pub alt_invalidation_slot: Slot, +}