From c30de4b06a0703fa73cba11d7005d54a5d8cef13 Mon Sep 17 00:00:00 2001 From: lewis Date: Mon, 21 Oct 2024 17:27:40 +0800 Subject: [PATCH 1/3] feat: refactor a unified scheduler crate --- Cargo.lock | 4 + Cargo.toml | 2 + .../src/deserializable_packet.rs | 3 +- prio-graph-scheduler/src/in_flight_tracker.rs | 214 ++- prio-graph-scheduler/src/lib.rs | 4 +- .../src/prio_graph_scheduler.rs | 4 +- .../src/read_write_account_set.rs | 494 +++--- prio-graph-scheduler/src/scheduler_metrics.rs | 680 ++++----- .../src/thread_aware_account_locks.rs | 1360 ++++++++--------- .../src/transaction_priority_id.rs | 92 +- .../src/transaction_state_container.rs | 6 +- scheduler/Cargo.toml | 49 + scheduler/src/lib.rs | 14 + 13 files changed, 1497 insertions(+), 1429 deletions(-) create mode 100644 scheduler/Cargo.toml create mode 100644 scheduler/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index d7dc9c1..8688e0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2383,6 +2383,10 @@ dependencies = [ "tokio-util 0.7.12", ] +[[package]] +name = "igloo-scheduler" +version = "0.1.0" + [[package]] name = "igloo-storage" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 2db0608..062027b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "verifier", "prio-graph-scheduler", "rpc", + "scheduler", ] resolver = "2" @@ -115,6 +116,7 @@ igloo-verifier = { path = "verifier" } igloo-rpc = { path = "rpc" } svm-executor = { path = "svm/executor" } solana-prio-graph-scheduler = { path = "prio-graph-scheduler" } +igloo-schduler = { path = "scheduler" } [patch.crates-io] #crossbeam-epoch = { git = "https://github.com/anza-xyz/crossbeam", rev = "fd279d707025f0e60951e429bf778b4813d1b6bf" } // Patch `crossbeam-epoch v0.9.5 was not used in the crate graph diff --git a/prio-graph-scheduler/src/deserializable_packet.rs b/prio-graph-scheduler/src/deserializable_packet.rs index 0db7828..71a04e3 100644 --- a/prio-graph-scheduler/src/deserializable_packet.rs +++ b/prio-graph-scheduler/src/deserializable_packet.rs @@ -1,7 +1,6 @@ - use solana_sdk::hash::Hash; use solana_sdk::packet::Packet; -use solana_sdk::transaction::{SanitizedVersionedTransaction}; +use solana_sdk::transaction::SanitizedVersionedTransaction; use std::error::Error; /// DeserializablePacket can be deserialized from a Packet. diff --git a/prio-graph-scheduler/src/in_flight_tracker.rs b/prio-graph-scheduler/src/in_flight_tracker.rs index f23a746..4e650a3 100644 --- a/prio-graph-scheduler/src/in_flight_tracker.rs +++ b/prio-graph-scheduler/src/in_flight_tracker.rs @@ -1,124 +1,122 @@ use { - crate::id_generator::IdGenerator, - crate::thread_aware_account_locks::ThreadId, - crate::scheduler_messages::TransactionBatchId, - std::collections::HashMap, + crate::id_generator::IdGenerator, crate::scheduler_messages::TransactionBatchId, + crate::thread_aware_account_locks::ThreadId, std::collections::HashMap, }; /// Tracks the number of transactions that are in flight for each thread. pub struct InFlightTracker { - num_in_flight_per_thread: Vec, - cus_in_flight_per_thread: Vec, - batches: HashMap, - batch_id_generator: IdGenerator, + num_in_flight_per_thread: Vec, + cus_in_flight_per_thread: Vec, + batches: HashMap, + batch_id_generator: IdGenerator, } struct BatchEntry { - thread_id: ThreadId, - num_transactions: usize, - total_cus: u64, + thread_id: ThreadId, + num_transactions: usize, + total_cus: u64, } impl InFlightTracker { - pub fn new(num_threads: usize) -> Self { - Self { - num_in_flight_per_thread: vec![0; num_threads], - cus_in_flight_per_thread: vec![0; num_threads], - batches: HashMap::new(), - batch_id_generator: IdGenerator::default(), - } - } - - /// Returns the number of transactions that are in flight for each thread. - pub fn num_in_flight_per_thread(&self) -> &[usize] { - &self.num_in_flight_per_thread - } - - /// Returns the number of cus that are in flight for each thread. - pub fn cus_in_flight_per_thread(&self) -> &[u64] { - &self.cus_in_flight_per_thread - } - - /// Tracks number of transactions and CUs in-flight for the `thread_id`. - /// Returns a `TransactionBatchId` that can be used to stop tracking the batch - /// when it is complete. - pub fn track_batch( - &mut self, - num_transactions: usize, - total_cus: u64, - thread_id: ThreadId, - ) -> TransactionBatchId { - let batch_id = self.batch_id_generator.next(); - self.num_in_flight_per_thread[thread_id] += num_transactions; - self.cus_in_flight_per_thread[thread_id] += total_cus; - self.batches.insert( - batch_id, - BatchEntry { - thread_id, - num_transactions, - total_cus, - }, - ); - - batch_id - } - - /// Stop tracking the batch with given `batch_id`. - /// Removes the number of transactions for the scheduled thread. - /// Returns the thread id that the batch was scheduled on. - /// - /// # Panics - /// Panics if the batch id does not exist in the tracker. - pub fn complete_batch(&mut self, batch_id: TransactionBatchId) -> ThreadId { - let Some(BatchEntry { - thread_id, - num_transactions, - total_cus, - }) = self.batches.remove(&batch_id) - else { - panic!("batch id {batch_id} is not being tracked"); - }; - self.num_in_flight_per_thread[thread_id] -= num_transactions; - self.cus_in_flight_per_thread[thread_id] -= total_cus; - - thread_id - } + pub fn new(num_threads: usize) -> Self { + Self { + num_in_flight_per_thread: vec![0; num_threads], + cus_in_flight_per_thread: vec![0; num_threads], + batches: HashMap::new(), + batch_id_generator: IdGenerator::default(), + } + } + + /// Returns the number of transactions that are in flight for each thread. + pub fn num_in_flight_per_thread(&self) -> &[usize] { + &self.num_in_flight_per_thread + } + + /// Returns the number of cus that are in flight for each thread. + pub fn cus_in_flight_per_thread(&self) -> &[u64] { + &self.cus_in_flight_per_thread + } + + /// Tracks number of transactions and CUs in-flight for the `thread_id`. + /// Returns a `TransactionBatchId` that can be used to stop tracking the batch + /// when it is complete. + pub fn track_batch( + &mut self, + num_transactions: usize, + total_cus: u64, + thread_id: ThreadId, + ) -> TransactionBatchId { + let batch_id = self.batch_id_generator.next(); + self.num_in_flight_per_thread[thread_id] += num_transactions; + self.cus_in_flight_per_thread[thread_id] += total_cus; + self.batches.insert( + batch_id, + BatchEntry { + thread_id, + num_transactions, + total_cus, + }, + ); + + batch_id + } + + /// Stop tracking the batch with given `batch_id`. + /// Removes the number of transactions for the scheduled thread. + /// Returns the thread id that the batch was scheduled on. + /// + /// # Panics + /// Panics if the batch id does not exist in the tracker. + pub fn complete_batch(&mut self, batch_id: TransactionBatchId) -> ThreadId { + let Some(BatchEntry { + thread_id, + num_transactions, + total_cus, + }) = self.batches.remove(&batch_id) + else { + panic!("batch id {batch_id} is not being tracked"); + }; + self.num_in_flight_per_thread[thread_id] -= num_transactions; + self.cus_in_flight_per_thread[thread_id] -= total_cus; + + thread_id + } } #[cfg(test)] mod tests { - use super::*; - - #[test] - #[should_panic(expected = "is not being tracked")] - fn test_in_flight_tracker_untracked_batch() { - let mut in_flight_tracker = InFlightTracker::new(2); - in_flight_tracker.complete_batch(TransactionBatchId::new(5)); - } - - #[test] - fn test_in_flight_tracker() { - let mut in_flight_tracker = InFlightTracker::new(2); - - // Add a batch with 2 transactions, 10 kCUs to thread 0. - let batch_id_0 = in_flight_tracker.track_batch(2, 10_000, 0); - assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[2, 0]); - assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[10_000, 0]); - - // Add a batch with 1 transaction, 15 kCUs to thread 1. - let batch_id_1 = in_flight_tracker.track_batch(1, 15_000, 1); - assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[2, 1]); - assert_eq!( - in_flight_tracker.cus_in_flight_per_thread(), - &[10_000, 15_000] - ); - - in_flight_tracker.complete_batch(batch_id_0); - assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[0, 1]); - assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[0, 15_000]); - - in_flight_tracker.complete_batch(batch_id_1); - assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[0, 0]); - assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[0, 0]); - } + use super::*; + + #[test] + #[should_panic(expected = "is not being tracked")] + fn test_in_flight_tracker_untracked_batch() { + let mut in_flight_tracker = InFlightTracker::new(2); + in_flight_tracker.complete_batch(TransactionBatchId::new(5)); + } + + #[test] + fn test_in_flight_tracker() { + let mut in_flight_tracker = InFlightTracker::new(2); + + // Add a batch with 2 transactions, 10 kCUs to thread 0. + let batch_id_0 = in_flight_tracker.track_batch(2, 10_000, 0); + assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[2, 0]); + assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[10_000, 0]); + + // Add a batch with 1 transaction, 15 kCUs to thread 1. + let batch_id_1 = in_flight_tracker.track_batch(1, 15_000, 1); + assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[2, 1]); + assert_eq!( + in_flight_tracker.cus_in_flight_per_thread(), + &[10_000, 15_000] + ); + + in_flight_tracker.complete_batch(batch_id_0); + assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[0, 1]); + assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[0, 15_000]); + + in_flight_tracker.complete_batch(batch_id_1); + assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[0, 0]); + assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[0, 0]); + } } diff --git a/prio-graph-scheduler/src/lib.rs b/prio-graph-scheduler/src/lib.rs index 2b91c3c..6c28df5 100644 --- a/prio-graph-scheduler/src/lib.rs +++ b/prio-graph-scheduler/src/lib.rs @@ -34,9 +34,7 @@ mod tests { message::Message, sanitize::SanitizeError, signature::Signature, - transaction::{ - SanitizedVersionedTransaction, VersionedTransaction, - }, + transaction::{SanitizedVersionedTransaction, VersionedTransaction}, }, solana_short_vec::decode_shortu16_len, std::{cmp::Ordering, mem::size_of}, diff --git a/prio-graph-scheduler/src/prio_graph_scheduler.rs b/prio-graph-scheduler/src/prio_graph_scheduler.rs index 4e2699f..bca3252 100644 --- a/prio-graph-scheduler/src/prio_graph_scheduler.rs +++ b/prio-graph-scheduler/src/prio_graph_scheduler.rs @@ -4,7 +4,9 @@ use { in_flight_tracker::InFlightTracker, read_write_account_set::ReadWriteAccountSet, scheduler_error::SchedulerError, - scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId, TransactionId, MaxAge}, + scheduler_messages::{ + ConsumeWork, FinishedConsumeWork, MaxAge, TransactionBatchId, TransactionId, + }, thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet}, transaction_priority_id::TransactionPriorityId, transaction_state::{SanitizedTransactionTTL, TransactionState}, diff --git a/prio-graph-scheduler/src/read_write_account_set.rs b/prio-graph-scheduler/src/read_write_account_set.rs index ea5fc0f..0e70837 100644 --- a/prio-graph-scheduler/src/read_write_account_set.rs +++ b/prio-graph-scheduler/src/read_write_account_set.rs @@ -1,287 +1,287 @@ use { - ahash::AHashSet, - solana_sdk::{message::SanitizedMessage, pubkey::Pubkey}, + ahash::AHashSet, + solana_sdk::{message::SanitizedMessage, pubkey::Pubkey}, }; /// Wrapper struct to accumulate locks for a batch of transactions. #[derive(Debug, Default)] pub struct ReadWriteAccountSet { - /// Set of accounts that are locked for read - read_set: AHashSet, - /// Set of accounts that are locked for write - write_set: AHashSet, + /// Set of accounts that are locked for read + read_set: AHashSet, + /// Set of accounts that are locked for write + write_set: AHashSet, } impl ReadWriteAccountSet { - /// Returns true if all account locks were available and false otherwise. - pub fn check_locks(&self, message: &SanitizedMessage) -> bool { - message - .account_keys() - .iter() - .enumerate() - .all(|(index, pubkey)| { - if message.is_writable(index) { - self.can_write(pubkey) - } else { - self.can_read(pubkey) - } - }) - } + /// Returns true if all account locks were available and false otherwise. + pub fn check_locks(&self, message: &SanitizedMessage) -> bool { + message + .account_keys() + .iter() + .enumerate() + .all(|(index, pubkey)| { + if message.is_writable(index) { + self.can_write(pubkey) + } else { + self.can_read(pubkey) + } + }) + } - /// Add all account locks. - /// Returns true if all account locks were available and false otherwise. - pub fn take_locks(&mut self, message: &SanitizedMessage) -> bool { - message - .account_keys() - .iter() - .enumerate() - .fold(true, |all_available, (index, pubkey)| { - if message.is_writable(index) { - all_available & self.add_write(pubkey) - } else { - all_available & self.add_read(pubkey) - } - }) - } + /// Add all account locks. + /// Returns true if all account locks were available and false otherwise. + pub fn take_locks(&mut self, message: &SanitizedMessage) -> bool { + message + .account_keys() + .iter() + .enumerate() + .fold(true, |all_available, (index, pubkey)| { + if message.is_writable(index) { + all_available & self.add_write(pubkey) + } else { + all_available & self.add_read(pubkey) + } + }) + } - /// Clears the read and write sets - #[allow(dead_code)] - pub fn clear(&mut self) { - self.read_set.clear(); - self.write_set.clear(); - } + /// Clears the read and write sets + #[allow(dead_code)] + pub fn clear(&mut self) { + self.read_set.clear(); + self.write_set.clear(); + } - /// Check if an account can be read-locked - fn can_read(&self, pubkey: &Pubkey) -> bool { - !self.write_set.contains(pubkey) - } + /// Check if an account can be read-locked + fn can_read(&self, pubkey: &Pubkey) -> bool { + !self.write_set.contains(pubkey) + } - /// Check if an account can be write-locked - fn can_write(&self, pubkey: &Pubkey) -> bool { - !self.write_set.contains(pubkey) && !self.read_set.contains(pubkey) - } + /// Check if an account can be write-locked + fn can_write(&self, pubkey: &Pubkey) -> bool { + !self.write_set.contains(pubkey) && !self.read_set.contains(pubkey) + } - /// Add an account to the read-set. - /// Returns true if the lock was available. - fn add_read(&mut self, pubkey: &Pubkey) -> bool { - let can_read = self.can_read(pubkey); - self.read_set.insert(*pubkey); + /// Add an account to the read-set. + /// Returns true if the lock was available. + fn add_read(&mut self, pubkey: &Pubkey) -> bool { + let can_read = self.can_read(pubkey); + self.read_set.insert(*pubkey); - can_read - } + can_read + } - /// Add an account to the write-set. - /// Returns true if the lock was available. - fn add_write(&mut self, pubkey: &Pubkey) -> bool { - let can_write = self.can_write(pubkey); - self.write_set.insert(*pubkey); + /// Add an account to the write-set. + /// Returns true if the lock was available. + fn add_write(&mut self, pubkey: &Pubkey) -> bool { + let can_write = self.can_write(pubkey); + self.write_set.insert(*pubkey); - can_write - } + can_write + } } #[cfg(test)] mod tests { - use { - super::ReadWriteAccountSet, - solana_ledger::genesis_utils::GenesisConfigInfo, - solana_runtime::{bank::Bank, bank_forks::BankForks, genesis_utils::create_genesis_config}, - solana_sdk::{ - account::AccountSharedData, - address_lookup_table::{ - self, - state::{AddressLookupTable, LookupTableMeta}, - }, - hash::Hash, - message::{ - v0::{self, MessageAddressTableLookup}, - MessageHeader, VersionedMessage, - }, - pubkey::Pubkey, - signature::Keypair, - signer::Signer, - transaction::{MessageHash, SanitizedTransaction, VersionedTransaction}, - }, - std::{ - borrow::Cow, - sync::{Arc, RwLock}, - }, - }; + use { + super::ReadWriteAccountSet, + solana_ledger::genesis_utils::GenesisConfigInfo, + solana_runtime::{bank::Bank, bank_forks::BankForks, genesis_utils::create_genesis_config}, + solana_sdk::{ + account::AccountSharedData, + address_lookup_table::{ + self, + state::{AddressLookupTable, LookupTableMeta}, + }, + hash::Hash, + message::{ + v0::{self, MessageAddressTableLookup}, + MessageHeader, VersionedMessage, + }, + pubkey::Pubkey, + signature::Keypair, + signer::Signer, + transaction::{MessageHash, SanitizedTransaction, VersionedTransaction}, + }, + std::{ + borrow::Cow, + sync::{Arc, RwLock}, + }, + }; - fn create_test_versioned_message( - write_keys: &[Pubkey], - read_keys: &[Pubkey], - address_table_lookups: Vec, - ) -> VersionedMessage { - VersionedMessage::V0(v0::Message { - header: MessageHeader { - num_required_signatures: write_keys.len() as u8, - num_readonly_signed_accounts: 0, - num_readonly_unsigned_accounts: read_keys.len() as u8, - }, - recent_blockhash: Hash::default(), - account_keys: write_keys.iter().chain(read_keys.iter()).copied().collect(), - address_table_lookups, - instructions: vec![], - }) - } + fn create_test_versioned_message( + write_keys: &[Pubkey], + read_keys: &[Pubkey], + address_table_lookups: Vec, + ) -> VersionedMessage { + VersionedMessage::V0(v0::Message { + header: MessageHeader { + num_required_signatures: write_keys.len() as u8, + num_readonly_signed_accounts: 0, + num_readonly_unsigned_accounts: read_keys.len() as u8, + }, + recent_blockhash: Hash::default(), + account_keys: write_keys.iter().chain(read_keys.iter()).copied().collect(), + address_table_lookups, + instructions: vec![], + }) + } - fn create_test_sanitized_transaction( - write_keypair: &Keypair, - read_keys: &[Pubkey], - address_table_lookups: Vec, - bank: &Bank, - ) -> SanitizedTransaction { - let message = create_test_versioned_message( - &[write_keypair.pubkey()], - read_keys, - address_table_lookups, - ); - SanitizedTransaction::try_create( - VersionedTransaction::try_new(message, &[write_keypair]).unwrap(), - MessageHash::Compute, - Some(false), - bank, - bank.get_reserved_account_keys(), - ) - .unwrap() - } + fn create_test_sanitized_transaction( + write_keypair: &Keypair, + read_keys: &[Pubkey], + address_table_lookups: Vec, + bank: &Bank, + ) -> SanitizedTransaction { + let message = create_test_versioned_message( + &[write_keypair.pubkey()], + read_keys, + address_table_lookups, + ); + SanitizedTransaction::try_create( + VersionedTransaction::try_new(message, &[write_keypair]).unwrap(), + MessageHash::Compute, + Some(false), + bank, + bank.get_reserved_account_keys(), + ) + .unwrap() + } - fn create_test_address_lookup_table( - bank: Arc, - num_addresses: usize, - ) -> (Arc, Pubkey) { - let mut addresses = Vec::with_capacity(num_addresses); - addresses.resize_with(num_addresses, Pubkey::new_unique); - let address_lookup_table = AddressLookupTable { - meta: LookupTableMeta { - authority: None, - ..LookupTableMeta::default() - }, - addresses: Cow::Owned(addresses), - }; + fn create_test_address_lookup_table( + bank: Arc, + num_addresses: usize, + ) -> (Arc, Pubkey) { + let mut addresses = Vec::with_capacity(num_addresses); + addresses.resize_with(num_addresses, Pubkey::new_unique); + let address_lookup_table = AddressLookupTable { + meta: LookupTableMeta { + authority: None, + ..LookupTableMeta::default() + }, + addresses: Cow::Owned(addresses), + }; - let address_table_key = Pubkey::new_unique(); - let data = address_lookup_table.serialize_for_tests().unwrap(); - let mut account = - AccountSharedData::new(1, data.len(), &address_lookup_table::program::id()); - account.set_data(data); - bank.store_account(&address_table_key, &account); + let address_table_key = Pubkey::new_unique(); + let data = address_lookup_table.serialize_for_tests().unwrap(); + let mut account = + AccountSharedData::new(1, data.len(), &address_lookup_table::program::id()); + account.set_data(data); + bank.store_account(&address_table_key, &account); - let slot = bank.slot() + 1; - ( - Arc::new(Bank::new_from_parent(bank, &Pubkey::new_unique(), slot)), - address_table_key, - ) - } + let slot = bank.slot() + 1; + ( + Arc::new(Bank::new_from_parent(bank, &Pubkey::new_unique(), slot)), + address_table_key, + ) + } - fn create_test_bank() -> (Arc, Arc>) { - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); - Bank::new_no_wallclock_throttle_for_tests(&genesis_config) - } + fn create_test_bank() -> (Arc, Arc>) { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + Bank::new_no_wallclock_throttle_for_tests(&genesis_config) + } - // Helper function (could potentially use test_case in future). - // conflict_index = 0 means write lock conflict with static key - // conflict_index = 1 means read lock conflict with static key - // conflict_index = 2 means write lock conflict with address table key - // conflict_index = 3 means read lock conflict with address table key - fn test_check_and_take_locks(conflict_index: usize, add_write: bool, expectation: bool) { - let (bank, _bank_forks) = create_test_bank(); - let (bank, table_address) = create_test_address_lookup_table(bank, 2); - let tx = create_test_sanitized_transaction( - &Keypair::new(), - &[Pubkey::new_unique()], - vec![MessageAddressTableLookup { - account_key: table_address, - writable_indexes: vec![0], - readonly_indexes: vec![1], - }], - &bank, - ); - let message = tx.message(); + // Helper function (could potentially use test_case in future). + // conflict_index = 0 means write lock conflict with static key + // conflict_index = 1 means read lock conflict with static key + // conflict_index = 2 means write lock conflict with address table key + // conflict_index = 3 means read lock conflict with address table key + fn test_check_and_take_locks(conflict_index: usize, add_write: bool, expectation: bool) { + let (bank, _bank_forks) = create_test_bank(); + let (bank, table_address) = create_test_address_lookup_table(bank, 2); + let tx = create_test_sanitized_transaction( + &Keypair::new(), + &[Pubkey::new_unique()], + vec![MessageAddressTableLookup { + account_key: table_address, + writable_indexes: vec![0], + readonly_indexes: vec![1], + }], + &bank, + ); + let message = tx.message(); - let mut account_locks = ReadWriteAccountSet::default(); + let mut account_locks = ReadWriteAccountSet::default(); - let conflict_key = message.account_keys().get(conflict_index).unwrap(); - if add_write { - account_locks.add_write(conflict_key); - } else { - account_locks.add_read(conflict_key); - } - assert_eq!(expectation, account_locks.check_locks(message)); - assert_eq!(expectation, account_locks.take_locks(message)); - } + let conflict_key = message.account_keys().get(conflict_index).unwrap(); + if add_write { + account_locks.add_write(conflict_key); + } else { + account_locks.add_read(conflict_key); + } + assert_eq!(expectation, account_locks.check_locks(message)); + assert_eq!(expectation, account_locks.take_locks(message)); + } - #[test] - fn test_check_and_take_locks_write_write_conflict() { - test_check_and_take_locks(0, true, false); // static key conflict - test_check_and_take_locks(2, true, false); // lookup key conflict - } + #[test] + fn test_check_and_take_locks_write_write_conflict() { + test_check_and_take_locks(0, true, false); // static key conflict + test_check_and_take_locks(2, true, false); // lookup key conflict + } - #[test] - fn test_check_and_take_locks_read_write_conflict() { - test_check_and_take_locks(0, false, false); // static key conflict - test_check_and_take_locks(2, false, false); // lookup key conflict - } + #[test] + fn test_check_and_take_locks_read_write_conflict() { + test_check_and_take_locks(0, false, false); // static key conflict + test_check_and_take_locks(2, false, false); // lookup key conflict + } - #[test] - fn test_check_and_take_locks_write_read_conflict() { - test_check_and_take_locks(1, true, false); // static key conflict - test_check_and_take_locks(3, true, false); // lookup key conflict - } + #[test] + fn test_check_and_take_locks_write_read_conflict() { + test_check_and_take_locks(1, true, false); // static key conflict + test_check_and_take_locks(3, true, false); // lookup key conflict + } - #[test] - fn test_check_and_take_locks_read_read_non_conflict() { - test_check_and_take_locks(1, false, true); // static key conflict - test_check_and_take_locks(3, false, true); // lookup key conflict - } + #[test] + fn test_check_and_take_locks_read_read_non_conflict() { + test_check_and_take_locks(1, false, true); // static key conflict + test_check_and_take_locks(3, false, true); // lookup key conflict + } - #[test] - pub fn test_write_write_conflict() { - let mut account_locks = ReadWriteAccountSet::default(); - let account = Pubkey::new_unique(); - assert!(account_locks.can_write(&account)); - account_locks.add_write(&account); - assert!(!account_locks.can_write(&account)); - } + #[test] + pub fn test_write_write_conflict() { + let mut account_locks = ReadWriteAccountSet::default(); + let account = Pubkey::new_unique(); + assert!(account_locks.can_write(&account)); + account_locks.add_write(&account); + assert!(!account_locks.can_write(&account)); + } - #[test] - pub fn test_read_write_conflict() { - let mut account_locks = ReadWriteAccountSet::default(); - let account = Pubkey::new_unique(); - assert!(account_locks.can_read(&account)); - account_locks.add_read(&account); - assert!(!account_locks.can_write(&account)); - assert!(account_locks.can_read(&account)); - } + #[test] + pub fn test_read_write_conflict() { + let mut account_locks = ReadWriteAccountSet::default(); + let account = Pubkey::new_unique(); + assert!(account_locks.can_read(&account)); + account_locks.add_read(&account); + assert!(!account_locks.can_write(&account)); + assert!(account_locks.can_read(&account)); + } - #[test] - pub fn test_write_read_conflict() { - let mut account_locks = ReadWriteAccountSet::default(); - let account = Pubkey::new_unique(); - assert!(account_locks.can_write(&account)); - account_locks.add_write(&account); - assert!(!account_locks.can_write(&account)); - assert!(!account_locks.can_read(&account)); - } + #[test] + pub fn test_write_read_conflict() { + let mut account_locks = ReadWriteAccountSet::default(); + let account = Pubkey::new_unique(); + assert!(account_locks.can_write(&account)); + account_locks.add_write(&account); + assert!(!account_locks.can_write(&account)); + assert!(!account_locks.can_read(&account)); + } - #[test] - pub fn test_read_read_non_conflict() { - let mut account_locks = ReadWriteAccountSet::default(); - let account = Pubkey::new_unique(); - assert!(account_locks.can_read(&account)); - account_locks.add_read(&account); - assert!(account_locks.can_read(&account)); - } + #[test] + pub fn test_read_read_non_conflict() { + let mut account_locks = ReadWriteAccountSet::default(); + let account = Pubkey::new_unique(); + assert!(account_locks.can_read(&account)); + account_locks.add_read(&account); + assert!(account_locks.can_read(&account)); + } - #[test] - pub fn test_write_write_different_keys() { - let mut account_locks = ReadWriteAccountSet::default(); - let account1 = Pubkey::new_unique(); - let account2 = Pubkey::new_unique(); - assert!(account_locks.can_write(&account1)); - account_locks.add_write(&account1); - assert!(account_locks.can_write(&account2)); - assert!(account_locks.can_read(&account2)); - } + #[test] + pub fn test_write_write_different_keys() { + let mut account_locks = ReadWriteAccountSet::default(); + let account1 = Pubkey::new_unique(); + let account2 = Pubkey::new_unique(); + assert!(account_locks.can_write(&account1)); + account_locks.add_write(&account1); + assert!(account_locks.can_write(&account2)); + assert!(account_locks.can_read(&account2)); + } } diff --git a/prio-graph-scheduler/src/scheduler_metrics.rs b/prio-graph-scheduler/src/scheduler_metrics.rs index 922c105..bb8cbbe 100644 --- a/prio-graph-scheduler/src/scheduler_metrics.rs +++ b/prio-graph-scheduler/src/scheduler_metrics.rs @@ -1,408 +1,408 @@ use { - itertools::MinMaxResult, - solana_poh::poh_recorder::BankStart, - solana_sdk::{clock::Slot, timing::AtomicInterval}, - std::time::Instant, + itertools::MinMaxResult, + solana_poh::poh_recorder::BankStart, + solana_sdk::{clock::Slot, timing::AtomicInterval}, + std::time::Instant, }; #[derive(Default)] pub struct SchedulerCountMetrics { - interval: IntervalSchedulerCountMetrics, - slot: SlotSchedulerCountMetrics, + interval: IntervalSchedulerCountMetrics, + slot: SlotSchedulerCountMetrics, } impl SchedulerCountMetrics { - pub fn update(&mut self, update: impl Fn(&mut SchedulerCountMetricsInner)) { - update(&mut self.interval.metrics); - update(&mut self.slot.metrics); - } - - pub fn maybe_report_and_reset_slot(&mut self, slot: Option) { - self.slot.maybe_report_and_reset(slot); - } - - pub fn maybe_report_and_reset_interval(&mut self, should_report: bool) { - self.interval.maybe_report_and_reset(should_report); - } - - pub fn interval_has_data(&self) -> bool { - self.interval.metrics.has_data() - } + pub fn update(&mut self, update: impl Fn(&mut SchedulerCountMetricsInner)) { + update(&mut self.interval.metrics); + update(&mut self.slot.metrics); + } + + pub fn maybe_report_and_reset_slot(&mut self, slot: Option) { + self.slot.maybe_report_and_reset(slot); + } + + pub fn maybe_report_and_reset_interval(&mut self, should_report: bool) { + self.interval.maybe_report_and_reset(should_report); + } + + pub fn interval_has_data(&self) -> bool { + self.interval.metrics.has_data() + } } #[derive(Default)] struct IntervalSchedulerCountMetrics { - interval: AtomicInterval, - metrics: SchedulerCountMetricsInner, + interval: AtomicInterval, + metrics: SchedulerCountMetricsInner, } #[derive(Default)] struct SlotSchedulerCountMetrics { - slot: Option, - metrics: SchedulerCountMetricsInner, + slot: Option, + metrics: SchedulerCountMetricsInner, } #[derive(Default)] pub struct SchedulerCountMetricsInner { - /// Number of packets received. - pub num_received: usize, - /// Number of packets buffered. - pub num_buffered: usize, - - /// Number of transactions scheduled. - pub num_scheduled: usize, - /// Number of transactions that were unschedulable. - pub num_unschedulable: usize, - /// Number of transactions that were filtered out during scheduling. - pub num_schedule_filtered_out: usize, - /// Number of completed transactions received from workers. - pub num_finished: usize, - /// Number of transactions that were retryable. - pub num_retryable: usize, - /// Number of transactions that were scheduled to be forwarded. - pub num_forwarded: usize, - - /// Number of transactions that were immediately dropped on receive. - pub num_dropped_on_receive: usize, - /// Number of transactions that were dropped due to sanitization failure. - pub num_dropped_on_sanitization: usize, - /// Number of transactions that were dropped due to failed lock validation. - pub num_dropped_on_validate_locks: usize, - /// Number of transactions that were dropped due to failed transaction - /// checks during receive. - pub num_dropped_on_receive_transaction_checks: usize, - /// Number of transactions that were dropped due to clearing. - pub num_dropped_on_clear: usize, - /// Number of transactions that were dropped due to age and status checks. - pub num_dropped_on_age_and_status: usize, - /// Number of transactions that were dropped due to exceeded capacity. - pub num_dropped_on_capacity: usize, - /// Min prioritization fees in the transaction container - pub min_prioritization_fees: u64, - /// Max prioritization fees in the transaction container - pub max_prioritization_fees: u64, + /// Number of packets received. + pub num_received: usize, + /// Number of packets buffered. + pub num_buffered: usize, + + /// Number of transactions scheduled. + pub num_scheduled: usize, + /// Number of transactions that were unschedulable. + pub num_unschedulable: usize, + /// Number of transactions that were filtered out during scheduling. + pub num_schedule_filtered_out: usize, + /// Number of completed transactions received from workers. + pub num_finished: usize, + /// Number of transactions that were retryable. + pub num_retryable: usize, + /// Number of transactions that were scheduled to be forwarded. + pub num_forwarded: usize, + + /// Number of transactions that were immediately dropped on receive. + pub num_dropped_on_receive: usize, + /// Number of transactions that were dropped due to sanitization failure. + pub num_dropped_on_sanitization: usize, + /// Number of transactions that were dropped due to failed lock validation. + pub num_dropped_on_validate_locks: usize, + /// Number of transactions that were dropped due to failed transaction + /// checks during receive. + pub num_dropped_on_receive_transaction_checks: usize, + /// Number of transactions that were dropped due to clearing. + pub num_dropped_on_clear: usize, + /// Number of transactions that were dropped due to age and status checks. + pub num_dropped_on_age_and_status: usize, + /// Number of transactions that were dropped due to exceeded capacity. + pub num_dropped_on_capacity: usize, + /// Min prioritization fees in the transaction container + pub min_prioritization_fees: u64, + /// Max prioritization fees in the transaction container + pub max_prioritization_fees: u64, } impl IntervalSchedulerCountMetrics { - fn maybe_report_and_reset(&mut self, should_report: bool) { - const REPORT_INTERVAL_MS: u64 = 1000; - if self.interval.should_update(REPORT_INTERVAL_MS) { - if should_report { - self.metrics.report("banking_stage_scheduler_counts", None); - } - self.metrics.reset(); - } - } + fn maybe_report_and_reset(&mut self, should_report: bool) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) { + if should_report { + self.metrics.report("banking_stage_scheduler_counts", None); + } + self.metrics.reset(); + } + } } impl SlotSchedulerCountMetrics { - fn maybe_report_and_reset(&mut self, slot: Option) { - if self.slot != slot { - // Only report if there was an assigned slot. - if self.slot.is_some() { - self.metrics - .report("banking_stage_scheduler_slot_counts", self.slot); - } - self.metrics.reset(); - self.slot = slot; - } - } + fn maybe_report_and_reset(&mut self, slot: Option) { + if self.slot != slot { + // Only report if there was an assigned slot. + if self.slot.is_some() { + self.metrics + .report("banking_stage_scheduler_slot_counts", self.slot); + } + self.metrics.reset(); + self.slot = slot; + } + } } impl SchedulerCountMetricsInner { - fn report(&self, name: &'static str, slot: Option) { - let mut datapoint = create_datapoint!( - @point name, - ("num_received", self.num_received, i64), - ("num_buffered", self.num_buffered, i64), - ("num_scheduled", self.num_scheduled, i64), - ("num_unschedulable", self.num_unschedulable, i64), - ( - "num_schedule_filtered_out", - self.num_schedule_filtered_out, - i64 - ), - ("num_finished", self.num_finished, i64), - ("num_retryable", self.num_retryable, i64), - ("num_forwarded", self.num_forwarded, i64), - ("num_dropped_on_receive", self.num_dropped_on_receive, i64), - ( - "num_dropped_on_sanitization", - self.num_dropped_on_sanitization, - i64 - ), - ( - "num_dropped_on_validate_locks", - self.num_dropped_on_validate_locks, - i64 - ), - ( - "num_dropped_on_receive_transaction_checks", - self.num_dropped_on_receive_transaction_checks, - i64 - ), - ("num_dropped_on_clear", self.num_dropped_on_clear, i64), - ( - "num_dropped_on_age_and_status", - self.num_dropped_on_age_and_status, - i64 - ), - ("num_dropped_on_capacity", self.num_dropped_on_capacity, i64), - ("min_priority", self.get_min_priority(), i64), - ("max_priority", self.get_max_priority(), i64) - ); - if let Some(slot) = slot { - datapoint.add_field_i64("slot", slot as i64); - } - solana_metrics::submit(datapoint, log::Level::Info); - } - - pub fn has_data(&self) -> bool { - self.num_received != 0 - || self.num_buffered != 0 - || self.num_scheduled != 0 - || self.num_unschedulable != 0 - || self.num_schedule_filtered_out != 0 - || self.num_finished != 0 - || self.num_retryable != 0 - || self.num_forwarded != 0 - || self.num_dropped_on_receive != 0 - || self.num_dropped_on_sanitization != 0 - || self.num_dropped_on_validate_locks != 0 - || self.num_dropped_on_receive_transaction_checks != 0 - || self.num_dropped_on_clear != 0 - || self.num_dropped_on_age_and_status != 0 - || self.num_dropped_on_capacity != 0 - } - - fn reset(&mut self) { - self.num_received = 0; - self.num_buffered = 0; - self.num_scheduled = 0; - self.num_unschedulable = 0; - self.num_schedule_filtered_out = 0; - self.num_finished = 0; - self.num_retryable = 0; - self.num_forwarded = 0; - self.num_dropped_on_receive = 0; - self.num_dropped_on_sanitization = 0; - self.num_dropped_on_validate_locks = 0; - self.num_dropped_on_receive_transaction_checks = 0; - self.num_dropped_on_clear = 0; - self.num_dropped_on_age_and_status = 0; - self.num_dropped_on_capacity = 0; - self.min_prioritization_fees = u64::MAX; - self.max_prioritization_fees = 0; - } - - pub fn update_priority_stats(&mut self, min_max_fees: MinMaxResult) { - // update min/max priority - match min_max_fees { - itertools::MinMaxResult::NoElements => { - // do nothing - } - itertools::MinMaxResult::OneElement(e) => { - self.min_prioritization_fees = e; - self.max_prioritization_fees = e; - } - itertools::MinMaxResult::MinMax(min, max) => { - self.min_prioritization_fees = min; - self.max_prioritization_fees = max; - } - } - } - - pub fn get_min_priority(&self) -> u64 { - // to avoid getting u64::max recorded by metrics / in case of edge cases - if self.min_prioritization_fees != u64::MAX { - self.min_prioritization_fees - } else { - 0 - } - } - - pub fn get_max_priority(&self) -> u64 { - self.max_prioritization_fees - } + fn report(&self, name: &'static str, slot: Option) { + let mut datapoint = create_datapoint!( + @point name, + ("num_received", self.num_received, i64), + ("num_buffered", self.num_buffered, i64), + ("num_scheduled", self.num_scheduled, i64), + ("num_unschedulable", self.num_unschedulable, i64), + ( + "num_schedule_filtered_out", + self.num_schedule_filtered_out, + i64 + ), + ("num_finished", self.num_finished, i64), + ("num_retryable", self.num_retryable, i64), + ("num_forwarded", self.num_forwarded, i64), + ("num_dropped_on_receive", self.num_dropped_on_receive, i64), + ( + "num_dropped_on_sanitization", + self.num_dropped_on_sanitization, + i64 + ), + ( + "num_dropped_on_validate_locks", + self.num_dropped_on_validate_locks, + i64 + ), + ( + "num_dropped_on_receive_transaction_checks", + self.num_dropped_on_receive_transaction_checks, + i64 + ), + ("num_dropped_on_clear", self.num_dropped_on_clear, i64), + ( + "num_dropped_on_age_and_status", + self.num_dropped_on_age_and_status, + i64 + ), + ("num_dropped_on_capacity", self.num_dropped_on_capacity, i64), + ("min_priority", self.get_min_priority(), i64), + ("max_priority", self.get_max_priority(), i64) + ); + if let Some(slot) = slot { + datapoint.add_field_i64("slot", slot as i64); + } + solana_metrics::submit(datapoint, log::Level::Info); + } + + pub fn has_data(&self) -> bool { + self.num_received != 0 + || self.num_buffered != 0 + || self.num_scheduled != 0 + || self.num_unschedulable != 0 + || self.num_schedule_filtered_out != 0 + || self.num_finished != 0 + || self.num_retryable != 0 + || self.num_forwarded != 0 + || self.num_dropped_on_receive != 0 + || self.num_dropped_on_sanitization != 0 + || self.num_dropped_on_validate_locks != 0 + || self.num_dropped_on_receive_transaction_checks != 0 + || self.num_dropped_on_clear != 0 + || self.num_dropped_on_age_and_status != 0 + || self.num_dropped_on_capacity != 0 + } + + fn reset(&mut self) { + self.num_received = 0; + self.num_buffered = 0; + self.num_scheduled = 0; + self.num_unschedulable = 0; + self.num_schedule_filtered_out = 0; + self.num_finished = 0; + self.num_retryable = 0; + self.num_forwarded = 0; + self.num_dropped_on_receive = 0; + self.num_dropped_on_sanitization = 0; + self.num_dropped_on_validate_locks = 0; + self.num_dropped_on_receive_transaction_checks = 0; + self.num_dropped_on_clear = 0; + self.num_dropped_on_age_and_status = 0; + self.num_dropped_on_capacity = 0; + self.min_prioritization_fees = u64::MAX; + self.max_prioritization_fees = 0; + } + + pub fn update_priority_stats(&mut self, min_max_fees: MinMaxResult) { + // update min/max priority + match min_max_fees { + itertools::MinMaxResult::NoElements => { + // do nothing + } + itertools::MinMaxResult::OneElement(e) => { + self.min_prioritization_fees = e; + self.max_prioritization_fees = e; + } + itertools::MinMaxResult::MinMax(min, max) => { + self.min_prioritization_fees = min; + self.max_prioritization_fees = max; + } + } + } + + pub fn get_min_priority(&self) -> u64 { + // to avoid getting u64::max recorded by metrics / in case of edge cases + if self.min_prioritization_fees != u64::MAX { + self.min_prioritization_fees + } else { + 0 + } + } + + pub fn get_max_priority(&self) -> u64 { + self.max_prioritization_fees + } } #[derive(Default)] pub struct SchedulerTimingMetrics { - interval: IntervalSchedulerTimingMetrics, - slot: SlotSchedulerTimingMetrics, + interval: IntervalSchedulerTimingMetrics, + slot: SlotSchedulerTimingMetrics, } impl SchedulerTimingMetrics { - pub fn update(&mut self, update: impl Fn(&mut SchedulerTimingMetricsInner)) { - update(&mut self.interval.metrics); - update(&mut self.slot.metrics); - } - - pub fn maybe_report_and_reset_slot(&mut self, slot: Option) { - self.slot.maybe_report_and_reset(slot); - } - - pub fn maybe_report_and_reset_interval(&mut self, should_report: bool) { - self.interval.maybe_report_and_reset(should_report); - } + pub fn update(&mut self, update: impl Fn(&mut SchedulerTimingMetricsInner)) { + update(&mut self.interval.metrics); + update(&mut self.slot.metrics); + } + + pub fn maybe_report_and_reset_slot(&mut self, slot: Option) { + self.slot.maybe_report_and_reset(slot); + } + + pub fn maybe_report_and_reset_interval(&mut self, should_report: bool) { + self.interval.maybe_report_and_reset(should_report); + } } #[derive(Default)] struct IntervalSchedulerTimingMetrics { - interval: AtomicInterval, - metrics: SchedulerTimingMetricsInner, + interval: AtomicInterval, + metrics: SchedulerTimingMetricsInner, } #[derive(Default)] struct SlotSchedulerTimingMetrics { - slot: Option, - metrics: SchedulerTimingMetricsInner, + slot: Option, + metrics: SchedulerTimingMetricsInner, } #[derive(Default)] pub struct SchedulerTimingMetricsInner { - /// Time spent making processing decisions. - pub decision_time_us: u64, - /// Time spent receiving packets. - pub receive_time_us: u64, - /// Time spent buffering packets. - pub buffer_time_us: u64, - /// Time spent filtering transactions during scheduling. - pub schedule_filter_time_us: u64, - /// Time spent scheduling transactions. - pub schedule_time_us: u64, - /// Time spent clearing transactions from the container. - pub clear_time_us: u64, - /// Time spent cleaning expired or processed transactions from the container. - pub clean_time_us: u64, - /// Time spent forwarding transactions. - pub forward_time_us: u64, - /// Time spent receiving completed transactions. - pub receive_completed_time_us: u64, + /// Time spent making processing decisions. + pub decision_time_us: u64, + /// Time spent receiving packets. + pub receive_time_us: u64, + /// Time spent buffering packets. + pub buffer_time_us: u64, + /// Time spent filtering transactions during scheduling. + pub schedule_filter_time_us: u64, + /// Time spent scheduling transactions. + pub schedule_time_us: u64, + /// Time spent clearing transactions from the container. + pub clear_time_us: u64, + /// Time spent cleaning expired or processed transactions from the container. + pub clean_time_us: u64, + /// Time spent forwarding transactions. + pub forward_time_us: u64, + /// Time spent receiving completed transactions. + pub receive_completed_time_us: u64, } impl IntervalSchedulerTimingMetrics { - fn maybe_report_and_reset(&mut self, should_report: bool) { - const REPORT_INTERVAL_MS: u64 = 1000; - if self.interval.should_update(REPORT_INTERVAL_MS) { - if should_report { - self.metrics.report("banking_stage_scheduler_timing", None); - } - self.metrics.reset(); - } - } + fn maybe_report_and_reset(&mut self, should_report: bool) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) { + if should_report { + self.metrics.report("banking_stage_scheduler_timing", None); + } + self.metrics.reset(); + } + } } impl SlotSchedulerTimingMetrics { - fn maybe_report_and_reset(&mut self, slot: Option) { - if self.slot != slot { - // Only report if there was an assigned slot. - if self.slot.is_some() { - self.metrics - .report("banking_stage_scheduler_slot_timing", self.slot); - } - self.metrics.reset(); - self.slot = slot; - } - } + fn maybe_report_and_reset(&mut self, slot: Option) { + if self.slot != slot { + // Only report if there was an assigned slot. + if self.slot.is_some() { + self.metrics + .report("banking_stage_scheduler_slot_timing", self.slot); + } + self.metrics.reset(); + self.slot = slot; + } + } } impl SchedulerTimingMetricsInner { - fn report(&self, name: &'static str, slot: Option) { - let mut datapoint = create_datapoint!( - @point name, - ("decision_time_us", self.decision_time_us, i64), - ("receive_time_us", self.receive_time_us, i64), - ("buffer_time_us", self.buffer_time_us, i64), - ("schedule_filter_time_us", self.schedule_filter_time_us, i64), - ("schedule_time_us", self.schedule_time_us, i64), - ("clear_time_us", self.clear_time_us, i64), - ("clean_time_us", self.clean_time_us, i64), - ("forward_time_us", self.forward_time_us, i64), - ( - "receive_completed_time_us", - self.receive_completed_time_us, - i64 - ) - ); - if let Some(slot) = slot { - datapoint.add_field_i64("slot", slot as i64); - } - solana_metrics::submit(datapoint, log::Level::Info); - } - - fn reset(&mut self) { - self.decision_time_us = 0; - self.receive_time_us = 0; - self.buffer_time_us = 0; - self.schedule_filter_time_us = 0; - self.schedule_time_us = 0; - self.clear_time_us = 0; - self.clean_time_us = 0; - self.forward_time_us = 0; - self.receive_completed_time_us = 0; - } + fn report(&self, name: &'static str, slot: Option) { + let mut datapoint = create_datapoint!( + @point name, + ("decision_time_us", self.decision_time_us, i64), + ("receive_time_us", self.receive_time_us, i64), + ("buffer_time_us", self.buffer_time_us, i64), + ("schedule_filter_time_us", self.schedule_filter_time_us, i64), + ("schedule_time_us", self.schedule_time_us, i64), + ("clear_time_us", self.clear_time_us, i64), + ("clean_time_us", self.clean_time_us, i64), + ("forward_time_us", self.forward_time_us, i64), + ( + "receive_completed_time_us", + self.receive_completed_time_us, + i64 + ) + ); + if let Some(slot) = slot { + datapoint.add_field_i64("slot", slot as i64); + } + solana_metrics::submit(datapoint, log::Level::Info); + } + + fn reset(&mut self) { + self.decision_time_us = 0; + self.receive_time_us = 0; + self.buffer_time_us = 0; + self.schedule_filter_time_us = 0; + self.schedule_time_us = 0; + self.clear_time_us = 0; + self.clean_time_us = 0; + self.forward_time_us = 0; + self.receive_completed_time_us = 0; + } } #[derive(Default)] pub struct SchedulerLeaderDetectionMetrics { - inner: Option, + inner: Option, } struct SchedulerLeaderDetectionMetricsInner { - slot: Slot, - bank_creation_time: Instant, - bank_detected_time: Instant, + slot: Slot, + bank_creation_time: Instant, + bank_detected_time: Instant, } impl SchedulerLeaderDetectionMetrics { - pub fn update_and_maybe_report(&mut self, bank_start: Option<&BankStart>) { - match (&self.inner, bank_start) { - (None, Some(bank_start)) => self.initialize_inner(bank_start), - (Some(_inner), None) => self.report_and_reset(), - (Some(inner), Some(bank_start)) if inner.slot != bank_start.working_bank.slot() => { - self.report_and_reset(); - self.initialize_inner(bank_start); - } - _ => {} - } - } - - fn initialize_inner(&mut self, bank_start: &BankStart) { - let bank_detected_time = Instant::now(); - self.inner = Some(SchedulerLeaderDetectionMetricsInner { - slot: bank_start.working_bank.slot(), - bank_creation_time: *bank_start.bank_creation_time, - bank_detected_time, - }); - } - - fn report_and_reset(&mut self) { - let SchedulerLeaderDetectionMetricsInner { - slot, - bank_creation_time, - bank_detected_time, - } = self.inner.take().expect("inner must be present"); - - let bank_detected_delay_us = bank_detected_time - .duration_since(bank_creation_time) - .as_micros() - .try_into() - .unwrap_or(i64::MAX); - let bank_detected_to_slot_end_detected_us = bank_detected_time - .elapsed() - .as_micros() - .try_into() - .unwrap_or(i64::MAX); - datapoint_info!( - "banking_stage_scheduler_leader_detection", - ("slot", slot, i64), - ("bank_detected_delay_us", bank_detected_delay_us, i64), - ( - "bank_detected_to_slot_end_detected_us", - bank_detected_to_slot_end_detected_us, - i64 - ), - ); - } + pub fn update_and_maybe_report(&mut self, bank_start: Option<&BankStart>) { + match (&self.inner, bank_start) { + (None, Some(bank_start)) => self.initialize_inner(bank_start), + (Some(_inner), None) => self.report_and_reset(), + (Some(inner), Some(bank_start)) if inner.slot != bank_start.working_bank.slot() => { + self.report_and_reset(); + self.initialize_inner(bank_start); + } + _ => {} + } + } + + fn initialize_inner(&mut self, bank_start: &BankStart) { + let bank_detected_time = Instant::now(); + self.inner = Some(SchedulerLeaderDetectionMetricsInner { + slot: bank_start.working_bank.slot(), + bank_creation_time: *bank_start.bank_creation_time, + bank_detected_time, + }); + } + + fn report_and_reset(&mut self) { + let SchedulerLeaderDetectionMetricsInner { + slot, + bank_creation_time, + bank_detected_time, + } = self.inner.take().expect("inner must be present"); + + let bank_detected_delay_us = bank_detected_time + .duration_since(bank_creation_time) + .as_micros() + .try_into() + .unwrap_or(i64::MAX); + let bank_detected_to_slot_end_detected_us = bank_detected_time + .elapsed() + .as_micros() + .try_into() + .unwrap_or(i64::MAX); + datapoint_info!( + "banking_stage_scheduler_leader_detection", + ("slot", slot, i64), + ("bank_detected_delay_us", bank_detected_delay_us, i64), + ( + "bank_detected_to_slot_end_detected_us", + bank_detected_to_slot_end_detected_us, + i64 + ), + ); + } } diff --git a/prio-graph-scheduler/src/thread_aware_account_locks.rs b/prio-graph-scheduler/src/thread_aware_account_locks.rs index f5b134d..6d9d4c2 100644 --- a/prio-graph-scheduler/src/thread_aware_account_locks.rs +++ b/prio-graph-scheduler/src/thread_aware_account_locks.rs @@ -1,11 +1,11 @@ use { - ahash::AHashMap, - solana_sdk::pubkey::Pubkey, - std::{ - collections::hash_map::Entry, - fmt::{Debug, Display}, - ops::{BitAnd, BitAndAssign, Sub}, - }, + ahash::AHashMap, + solana_sdk::pubkey::Pubkey, + std::{ + collections::hash_map::Entry, + fmt::{Debug, Display}, + ops::{BitAnd, BitAndAssign, Sub}, + }, }; pub const MAX_THREADS: usize = u64::BITS as usize; @@ -20,13 +20,13 @@ type LockCount = u32; pub struct ThreadSet(u64); struct AccountWriteLocks { - thread_id: ThreadId, - lock_count: LockCount, + thread_id: ThreadId, + lock_count: LockCount, } struct AccountReadLocks { - thread_set: ThreadSet, - lock_counts: [LockCount; MAX_THREADS], + thread_set: ThreadSet, + lock_counts: [LockCount; MAX_THREADS], } /// Account locks. @@ -36,8 +36,8 @@ struct AccountReadLocks { /// Contains thread-set for easily checking which threads are scheduled. #[derive(Default)] struct AccountLocks { - pub write_locks: Option, - pub read_locks: Option, + pub write_locks: Option, + pub read_locks: Option, } /// Thread-aware account locks which allows for scheduling on threads @@ -45,698 +45,698 @@ struct AccountLocks { /// queued transactions to be scheduled on a thread while the transaction /// is still being executed on the thread. pub struct ThreadAwareAccountLocks { - /// Number of threads. - num_threads: usize, // 0..MAX_THREADS - /// Locks for each account. An account should only have an entry if there - /// is at least one lock. - locks: AHashMap, + /// Number of threads. + num_threads: usize, // 0..MAX_THREADS + /// Locks for each account. An account should only have an entry if there + /// is at least one lock. + locks: AHashMap, } impl ThreadAwareAccountLocks { - /// Creates a new `ThreadAwareAccountLocks` with the given number of threads. - pub fn new(num_threads: usize) -> Self { - assert!(num_threads > 0, "num threads must be > 0"); - assert!( - num_threads <= MAX_THREADS, - "num threads must be <= {MAX_THREADS}" - ); - - Self { - num_threads, - locks: AHashMap::new(), - } - } - - /// Returns the `ThreadId` if the accounts are able to be locked - /// for the given thread, otherwise `None` is returned. - /// `allowed_threads` is a set of threads that the caller restricts locking to. - /// If accounts are schedulable, then they are locked for the thread - /// selected by the `thread_selector` function. - /// `thread_selector` is only called if all accounts are schdulable, meaning - /// that the `thread_set` passed to `thread_selector` is non-empty. - pub fn try_lock_accounts<'a>( - &mut self, - write_account_locks: impl Iterator + Clone, - read_account_locks: impl Iterator + Clone, - allowed_threads: ThreadSet, - thread_selector: impl FnOnce(ThreadSet) -> ThreadId, - ) -> Option { - let schedulable_threads = self.accounts_schedulable_threads( - write_account_locks.clone(), - read_account_locks.clone(), - )? & allowed_threads; - (!schedulable_threads.is_empty()).then(|| { - let thread_id = thread_selector(schedulable_threads); - self.lock_accounts(write_account_locks, read_account_locks, thread_id); - thread_id - }) - } - - /// Unlocks the accounts for the given thread. - pub fn unlock_accounts<'a>( - &mut self, - write_account_locks: impl Iterator, - read_account_locks: impl Iterator, - thread_id: ThreadId, - ) { - for account in write_account_locks { - self.write_unlock_account(account, thread_id); - } - - for account in read_account_locks { - self.read_unlock_account(account, thread_id); - } - } - - /// Returns `ThreadSet` that the given accounts can be scheduled on. - fn accounts_schedulable_threads<'a>( - &self, - write_account_locks: impl Iterator, - read_account_locks: impl Iterator, - ) -> Option { - let mut schedulable_threads = ThreadSet::any(self.num_threads); - - for account in write_account_locks { - schedulable_threads &= self.write_schedulable_threads(account); - if schedulable_threads.is_empty() { - return None; - } - } - - for account in read_account_locks { - schedulable_threads &= self.read_schedulable_threads(account); - if schedulable_threads.is_empty() { - return None; - } - } - - Some(schedulable_threads) - } - - /// Returns `ThreadSet` of schedulable threads for the given readable account. - fn read_schedulable_threads(&self, account: &Pubkey) -> ThreadSet { - self.schedulable_threads::(account) - } - - /// Returns `ThreadSet` of schedulable threads for the given writable account. - fn write_schedulable_threads(&self, account: &Pubkey) -> ThreadSet { - self.schedulable_threads::(account) - } - - /// Returns `ThreadSet` of schedulable threads. - /// If there are no locks, then all threads are schedulable. - /// If only write-locked, then only the thread holding the write lock is schedulable. - /// If a mix of locks, then only the write thread is schedulable. - /// If only read-locked, the only write-schedulable thread is if a single thread - /// holds all read locks. Otherwise, no threads are write-schedulable. - /// If only read-locked, all threads are read-schedulable. - fn schedulable_threads(&self, account: &Pubkey) -> ThreadSet { - match self.locks.get(account) { - None => ThreadSet::any(self.num_threads), - Some(AccountLocks { - write_locks: None, - read_locks: Some(read_locks), - }) => { - if WRITE { - read_locks - .thread_set - .only_one_contained() - .map(ThreadSet::only) - .unwrap_or_else(ThreadSet::none) - } else { - ThreadSet::any(self.num_threads) - } - } - Some(AccountLocks { - write_locks: Some(write_locks), - read_locks: None, - }) => ThreadSet::only(write_locks.thread_id), - Some(AccountLocks { - write_locks: Some(write_locks), - read_locks: Some(read_locks), - }) => { - assert_eq!( - read_locks.thread_set.only_one_contained(), - Some(write_locks.thread_id) - ); - read_locks.thread_set - } - Some(AccountLocks { - write_locks: None, - read_locks: None, - }) => unreachable!(), - } - } - - /// Add locks for all writable and readable accounts on `thread_id`. - fn lock_accounts<'a>( - &mut self, - write_account_locks: impl Iterator, - read_account_locks: impl Iterator, - thread_id: ThreadId, - ) { - assert!( - thread_id < self.num_threads, - "thread_id must be < num_threads" - ); - for account in write_account_locks { - self.write_lock_account(account, thread_id); - } - - for account in read_account_locks { - self.read_lock_account(account, thread_id); - } - } - - /// Locks the given `account` for writing on `thread_id`. - /// Panics if the account is already locked for writing on another thread. - fn write_lock_account(&mut self, account: &Pubkey, thread_id: ThreadId) { - let entry = self.locks.entry(*account).or_default(); - - let AccountLocks { - write_locks, - read_locks, - } = entry; - - if let Some(read_locks) = read_locks { - assert_eq!( - read_locks.thread_set.only_one_contained(), - Some(thread_id), - "outstanding read lock must be on same thread" - ); - } - - if let Some(write_locks) = write_locks { - assert_eq!( - write_locks.thread_id, thread_id, - "outstanding write lock must be on same thread" - ); - write_locks.lock_count += 1; - } else { - *write_locks = Some(AccountWriteLocks { - thread_id, - lock_count: 1, - }); - } - } - - /// Unlocks the given `account` for writing on `thread_id`. - /// Panics if the account is not locked for writing on `thread_id`. - fn write_unlock_account(&mut self, account: &Pubkey, thread_id: ThreadId) { - let Entry::Occupied(mut entry) = self.locks.entry(*account) else { - panic!("write lock must exist for account: {account}"); - }; - - let AccountLocks { - write_locks: maybe_write_locks, - read_locks, - } = entry.get_mut(); - - let Some(write_locks) = maybe_write_locks else { - panic!("write lock must exist for account: {account}"); - }; - - assert_eq!( - write_locks.thread_id, thread_id, - "outstanding write lock must be on same thread" - ); - - write_locks.lock_count -= 1; - if write_locks.lock_count == 0 { - *maybe_write_locks = None; - if read_locks.is_none() { - entry.remove(); - } - } - } - - /// Locks the given `account` for reading on `thread_id`. - /// Panics if the account is already locked for writing on another thread. - fn read_lock_account(&mut self, account: &Pubkey, thread_id: ThreadId) { - let AccountLocks { - write_locks, - read_locks, - } = self.locks.entry(*account).or_default(); - - if let Some(write_locks) = write_locks { - assert_eq!( - write_locks.thread_id, thread_id, - "outstanding write lock must be on same thread" - ); - } - - match read_locks { - Some(read_locks) => { - read_locks.thread_set.insert(thread_id); - read_locks.lock_counts[thread_id] += 1; - } - None => { - let mut lock_counts = [0; MAX_THREADS]; - lock_counts[thread_id] = 1; - *read_locks = Some(AccountReadLocks { - thread_set: ThreadSet::only(thread_id), - lock_counts, - }); - } - } - } - - /// Unlocks the given `account` for reading on `thread_id`. - /// Panics if the account is not locked for reading on `thread_id`. - fn read_unlock_account(&mut self, account: &Pubkey, thread_id: ThreadId) { - let Entry::Occupied(mut entry) = self.locks.entry(*account) else { - panic!("read lock must exist for account: {account}"); - }; - - let AccountLocks { - write_locks, - read_locks: maybe_read_locks, - } = entry.get_mut(); - - let Some(read_locks) = maybe_read_locks else { - panic!("read lock must exist for account: {account}"); - }; - - assert!( - read_locks.thread_set.contains(thread_id), - "outstanding read lock must be on same thread" - ); - - read_locks.lock_counts[thread_id] -= 1; - if read_locks.lock_counts[thread_id] == 0 { - read_locks.thread_set.remove(thread_id); - if read_locks.thread_set.is_empty() { - *maybe_read_locks = None; - if write_locks.is_none() { - entry.remove(); - } - } - } - } + /// Creates a new `ThreadAwareAccountLocks` with the given number of threads. + pub fn new(num_threads: usize) -> Self { + assert!(num_threads > 0, "num threads must be > 0"); + assert!( + num_threads <= MAX_THREADS, + "num threads must be <= {MAX_THREADS}" + ); + + Self { + num_threads, + locks: AHashMap::new(), + } + } + + /// Returns the `ThreadId` if the accounts are able to be locked + /// for the given thread, otherwise `None` is returned. + /// `allowed_threads` is a set of threads that the caller restricts locking to. + /// If accounts are schedulable, then they are locked for the thread + /// selected by the `thread_selector` function. + /// `thread_selector` is only called if all accounts are schdulable, meaning + /// that the `thread_set` passed to `thread_selector` is non-empty. + pub fn try_lock_accounts<'a>( + &mut self, + write_account_locks: impl Iterator + Clone, + read_account_locks: impl Iterator + Clone, + allowed_threads: ThreadSet, + thread_selector: impl FnOnce(ThreadSet) -> ThreadId, + ) -> Option { + let schedulable_threads = self.accounts_schedulable_threads( + write_account_locks.clone(), + read_account_locks.clone(), + )? & allowed_threads; + (!schedulable_threads.is_empty()).then(|| { + let thread_id = thread_selector(schedulable_threads); + self.lock_accounts(write_account_locks, read_account_locks, thread_id); + thread_id + }) + } + + /// Unlocks the accounts for the given thread. + pub fn unlock_accounts<'a>( + &mut self, + write_account_locks: impl Iterator, + read_account_locks: impl Iterator, + thread_id: ThreadId, + ) { + for account in write_account_locks { + self.write_unlock_account(account, thread_id); + } + + for account in read_account_locks { + self.read_unlock_account(account, thread_id); + } + } + + /// Returns `ThreadSet` that the given accounts can be scheduled on. + fn accounts_schedulable_threads<'a>( + &self, + write_account_locks: impl Iterator, + read_account_locks: impl Iterator, + ) -> Option { + let mut schedulable_threads = ThreadSet::any(self.num_threads); + + for account in write_account_locks { + schedulable_threads &= self.write_schedulable_threads(account); + if schedulable_threads.is_empty() { + return None; + } + } + + for account in read_account_locks { + schedulable_threads &= self.read_schedulable_threads(account); + if schedulable_threads.is_empty() { + return None; + } + } + + Some(schedulable_threads) + } + + /// Returns `ThreadSet` of schedulable threads for the given readable account. + fn read_schedulable_threads(&self, account: &Pubkey) -> ThreadSet { + self.schedulable_threads::(account) + } + + /// Returns `ThreadSet` of schedulable threads for the given writable account. + fn write_schedulable_threads(&self, account: &Pubkey) -> ThreadSet { + self.schedulable_threads::(account) + } + + /// Returns `ThreadSet` of schedulable threads. + /// If there are no locks, then all threads are schedulable. + /// If only write-locked, then only the thread holding the write lock is schedulable. + /// If a mix of locks, then only the write thread is schedulable. + /// If only read-locked, the only write-schedulable thread is if a single thread + /// holds all read locks. Otherwise, no threads are write-schedulable. + /// If only read-locked, all threads are read-schedulable. + fn schedulable_threads(&self, account: &Pubkey) -> ThreadSet { + match self.locks.get(account) { + None => ThreadSet::any(self.num_threads), + Some(AccountLocks { + write_locks: None, + read_locks: Some(read_locks), + }) => { + if WRITE { + read_locks + .thread_set + .only_one_contained() + .map(ThreadSet::only) + .unwrap_or_else(ThreadSet::none) + } else { + ThreadSet::any(self.num_threads) + } + } + Some(AccountLocks { + write_locks: Some(write_locks), + read_locks: None, + }) => ThreadSet::only(write_locks.thread_id), + Some(AccountLocks { + write_locks: Some(write_locks), + read_locks: Some(read_locks), + }) => { + assert_eq!( + read_locks.thread_set.only_one_contained(), + Some(write_locks.thread_id) + ); + read_locks.thread_set + } + Some(AccountLocks { + write_locks: None, + read_locks: None, + }) => unreachable!(), + } + } + + /// Add locks for all writable and readable accounts on `thread_id`. + fn lock_accounts<'a>( + &mut self, + write_account_locks: impl Iterator, + read_account_locks: impl Iterator, + thread_id: ThreadId, + ) { + assert!( + thread_id < self.num_threads, + "thread_id must be < num_threads" + ); + for account in write_account_locks { + self.write_lock_account(account, thread_id); + } + + for account in read_account_locks { + self.read_lock_account(account, thread_id); + } + } + + /// Locks the given `account` for writing on `thread_id`. + /// Panics if the account is already locked for writing on another thread. + fn write_lock_account(&mut self, account: &Pubkey, thread_id: ThreadId) { + let entry = self.locks.entry(*account).or_default(); + + let AccountLocks { + write_locks, + read_locks, + } = entry; + + if let Some(read_locks) = read_locks { + assert_eq!( + read_locks.thread_set.only_one_contained(), + Some(thread_id), + "outstanding read lock must be on same thread" + ); + } + + if let Some(write_locks) = write_locks { + assert_eq!( + write_locks.thread_id, thread_id, + "outstanding write lock must be on same thread" + ); + write_locks.lock_count += 1; + } else { + *write_locks = Some(AccountWriteLocks { + thread_id, + lock_count: 1, + }); + } + } + + /// Unlocks the given `account` for writing on `thread_id`. + /// Panics if the account is not locked for writing on `thread_id`. + fn write_unlock_account(&mut self, account: &Pubkey, thread_id: ThreadId) { + let Entry::Occupied(mut entry) = self.locks.entry(*account) else { + panic!("write lock must exist for account: {account}"); + }; + + let AccountLocks { + write_locks: maybe_write_locks, + read_locks, + } = entry.get_mut(); + + let Some(write_locks) = maybe_write_locks else { + panic!("write lock must exist for account: {account}"); + }; + + assert_eq!( + write_locks.thread_id, thread_id, + "outstanding write lock must be on same thread" + ); + + write_locks.lock_count -= 1; + if write_locks.lock_count == 0 { + *maybe_write_locks = None; + if read_locks.is_none() { + entry.remove(); + } + } + } + + /// Locks the given `account` for reading on `thread_id`. + /// Panics if the account is already locked for writing on another thread. + fn read_lock_account(&mut self, account: &Pubkey, thread_id: ThreadId) { + let AccountLocks { + write_locks, + read_locks, + } = self.locks.entry(*account).or_default(); + + if let Some(write_locks) = write_locks { + assert_eq!( + write_locks.thread_id, thread_id, + "outstanding write lock must be on same thread" + ); + } + + match read_locks { + Some(read_locks) => { + read_locks.thread_set.insert(thread_id); + read_locks.lock_counts[thread_id] += 1; + } + None => { + let mut lock_counts = [0; MAX_THREADS]; + lock_counts[thread_id] = 1; + *read_locks = Some(AccountReadLocks { + thread_set: ThreadSet::only(thread_id), + lock_counts, + }); + } + } + } + + /// Unlocks the given `account` for reading on `thread_id`. + /// Panics if the account is not locked for reading on `thread_id`. + fn read_unlock_account(&mut self, account: &Pubkey, thread_id: ThreadId) { + let Entry::Occupied(mut entry) = self.locks.entry(*account) else { + panic!("read lock must exist for account: {account}"); + }; + + let AccountLocks { + write_locks, + read_locks: maybe_read_locks, + } = entry.get_mut(); + + let Some(read_locks) = maybe_read_locks else { + panic!("read lock must exist for account: {account}"); + }; + + assert!( + read_locks.thread_set.contains(thread_id), + "outstanding read lock must be on same thread" + ); + + read_locks.lock_counts[thread_id] -= 1; + if read_locks.lock_counts[thread_id] == 0 { + read_locks.thread_set.remove(thread_id); + if read_locks.thread_set.is_empty() { + *maybe_read_locks = None; + if write_locks.is_none() { + entry.remove(); + } + } + } + } } impl BitAnd for ThreadSet { - type Output = Self; + type Output = Self; - fn bitand(self, rhs: Self) -> Self::Output { - Self(self.0 & rhs.0) - } + fn bitand(self, rhs: Self) -> Self::Output { + Self(self.0 & rhs.0) + } } impl BitAndAssign for ThreadSet { - fn bitand_assign(&mut self, rhs: Self) { - self.0 &= rhs.0; - } + fn bitand_assign(&mut self, rhs: Self) { + self.0 &= rhs.0; + } } impl Sub for ThreadSet { - type Output = Self; + type Output = Self; - fn sub(self, rhs: Self) -> Self::Output { - Self(self.0 & !rhs.0) - } + fn sub(self, rhs: Self) -> Self::Output { + Self(self.0 & !rhs.0) + } } impl Display for ThreadSet { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ThreadSet({:#0width$b})", self.0, width = MAX_THREADS) - } + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ThreadSet({:#0width$b})", self.0, width = MAX_THREADS) + } } impl Debug for ThreadSet { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Display::fmt(self, f) - } + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(self, f) + } } impl ThreadSet { - #[inline(always)] - pub const fn none() -> Self { - Self(0b0) - } - - #[inline(always)] - pub const fn any(num_threads: usize) -> Self { - if num_threads == MAX_THREADS { - Self(u64::MAX) - } else { - Self(Self::as_flag(num_threads) - 1) - } - } - - #[inline(always)] - pub const fn only(thread_id: ThreadId) -> Self { - Self(Self::as_flag(thread_id)) - } - - #[inline(always)] - pub fn num_threads(&self) -> u32 { - self.0.count_ones() - } - - #[inline(always)] - pub fn only_one_contained(&self) -> Option { - (self.num_threads() == 1).then_some(self.0.trailing_zeros() as ThreadId) - } - - #[inline(always)] - pub fn is_empty(&self) -> bool { - self == &Self::none() - } - - #[inline(always)] - pub fn contains(&self, thread_id: ThreadId) -> bool { - self.0 & Self::as_flag(thread_id) != 0 - } - - #[inline(always)] - pub fn insert(&mut self, thread_id: ThreadId) { - self.0 |= Self::as_flag(thread_id); - } - - #[inline(always)] - pub fn remove(&mut self, thread_id: ThreadId) { - self.0 &= !Self::as_flag(thread_id); - } - - #[inline(always)] - pub fn contained_threads_iter(self) -> impl Iterator { - (0..MAX_THREADS).filter(move |thread_id| self.contains(*thread_id)) - } - - #[inline(always)] - const fn as_flag(thread_id: ThreadId) -> u64 { - 0b1 << thread_id - } + #[inline(always)] + pub const fn none() -> Self { + Self(0b0) + } + + #[inline(always)] + pub const fn any(num_threads: usize) -> Self { + if num_threads == MAX_THREADS { + Self(u64::MAX) + } else { + Self(Self::as_flag(num_threads) - 1) + } + } + + #[inline(always)] + pub const fn only(thread_id: ThreadId) -> Self { + Self(Self::as_flag(thread_id)) + } + + #[inline(always)] + pub fn num_threads(&self) -> u32 { + self.0.count_ones() + } + + #[inline(always)] + pub fn only_one_contained(&self) -> Option { + (self.num_threads() == 1).then_some(self.0.trailing_zeros() as ThreadId) + } + + #[inline(always)] + pub fn is_empty(&self) -> bool { + self == &Self::none() + } + + #[inline(always)] + pub fn contains(&self, thread_id: ThreadId) -> bool { + self.0 & Self::as_flag(thread_id) != 0 + } + + #[inline(always)] + pub fn insert(&mut self, thread_id: ThreadId) { + self.0 |= Self::as_flag(thread_id); + } + + #[inline(always)] + pub fn remove(&mut self, thread_id: ThreadId) { + self.0 &= !Self::as_flag(thread_id); + } + + #[inline(always)] + pub fn contained_threads_iter(self) -> impl Iterator { + (0..MAX_THREADS).filter(move |thread_id| self.contains(*thread_id)) + } + + #[inline(always)] + const fn as_flag(thread_id: ThreadId) -> u64 { + 0b1 << thread_id + } } #[cfg(test)] mod tests { - use super::*; - - const TEST_NUM_THREADS: usize = 4; - const TEST_ANY_THREADS: ThreadSet = ThreadSet::any(TEST_NUM_THREADS); - - // Simple thread selector to select the first schedulable thread - fn test_thread_selector(thread_set: ThreadSet) -> ThreadId { - thread_set.contained_threads_iter().next().unwrap() - } - - #[test] - #[should_panic(expected = "num threads must be > 0")] - fn test_too_few_num_threads() { - ThreadAwareAccountLocks::new(0); - } - - #[test] - #[should_panic(expected = "num threads must be <=")] - fn test_too_many_num_threads() { - ThreadAwareAccountLocks::new(MAX_THREADS + 1); - } - - #[test] - fn test_try_lock_accounts_none() { - let pk1 = Pubkey::new_unique(); - let pk2 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.read_lock_account(&pk1, 2); - locks.read_lock_account(&pk1, 3); - assert_eq!( - locks.try_lock_accounts( - [&pk1].into_iter(), - [&pk2].into_iter(), - TEST_ANY_THREADS, - test_thread_selector - ), - None - ); - } - - #[test] - fn test_try_lock_accounts_one() { - let pk1 = Pubkey::new_unique(); - let pk2 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.write_lock_account(&pk2, 3); - - assert_eq!( - locks.try_lock_accounts( - [&pk1].into_iter(), - [&pk2].into_iter(), - TEST_ANY_THREADS, - test_thread_selector - ), - Some(3) - ); - } - - #[test] - fn test_try_lock_accounts_multiple() { - let pk1 = Pubkey::new_unique(); - let pk2 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.read_lock_account(&pk2, 0); - locks.read_lock_account(&pk2, 0); - - assert_eq!( - locks.try_lock_accounts( - [&pk1].into_iter(), - [&pk2].into_iter(), - TEST_ANY_THREADS - ThreadSet::only(0), // exclude 0 - test_thread_selector - ), - Some(1) - ); - } - - #[test] - fn test_try_lock_accounts_any() { - let pk1 = Pubkey::new_unique(); - let pk2 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - assert_eq!( - locks.try_lock_accounts( - [&pk1].into_iter(), - [&pk2].into_iter(), - TEST_ANY_THREADS, - test_thread_selector - ), - Some(0) - ); - } - - #[test] - fn test_accounts_schedulable_threads_no_outstanding_locks() { - let pk1 = Pubkey::new_unique(); - let locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - - assert_eq!( - locks.accounts_schedulable_threads([&pk1].into_iter(), std::iter::empty()), - Some(TEST_ANY_THREADS) - ); - assert_eq!( - locks.accounts_schedulable_threads(std::iter::empty(), [&pk1].into_iter()), - Some(TEST_ANY_THREADS) - ); - } - - #[test] - fn test_accounts_schedulable_threads_outstanding_write_only() { - let pk1 = Pubkey::new_unique(); - let pk2 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - - locks.write_lock_account(&pk1, 2); - assert_eq!( - locks.accounts_schedulable_threads([&pk1, &pk2].into_iter(), std::iter::empty()), - Some(ThreadSet::only(2)) - ); - assert_eq!( - locks.accounts_schedulable_threads(std::iter::empty(), [&pk1, &pk2].into_iter()), - Some(ThreadSet::only(2)) - ); - } - - #[test] - fn test_accounts_schedulable_threads_outstanding_read_only() { - let pk1 = Pubkey::new_unique(); - let pk2 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - - locks.read_lock_account(&pk1, 2); - assert_eq!( - locks.accounts_schedulable_threads([&pk1, &pk2].into_iter(), std::iter::empty()), - Some(ThreadSet::only(2)) - ); - assert_eq!( - locks.accounts_schedulable_threads(std::iter::empty(), [&pk1, &pk2].into_iter()), - Some(TEST_ANY_THREADS) - ); - - locks.read_lock_account(&pk1, 0); - assert_eq!( - locks.accounts_schedulable_threads([&pk1, &pk2].into_iter(), std::iter::empty()), - None - ); - assert_eq!( - locks.accounts_schedulable_threads(std::iter::empty(), [&pk1, &pk2].into_iter()), - Some(TEST_ANY_THREADS) - ); - } - - #[test] - fn test_accounts_schedulable_threads_outstanding_mixed() { - let pk1 = Pubkey::new_unique(); - let pk2 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - - locks.read_lock_account(&pk1, 2); - locks.write_lock_account(&pk1, 2); - assert_eq!( - locks.accounts_schedulable_threads([&pk1, &pk2].into_iter(), std::iter::empty()), - Some(ThreadSet::only(2)) - ); - assert_eq!( - locks.accounts_schedulable_threads(std::iter::empty(), [&pk1, &pk2].into_iter()), - Some(ThreadSet::only(2)) - ); - } - - #[test] - #[should_panic(expected = "outstanding write lock must be on same thread")] - fn test_write_lock_account_write_conflict_panic() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.write_lock_account(&pk1, 0); - locks.write_lock_account(&pk1, 1); - } - - #[test] - #[should_panic(expected = "outstanding read lock must be on same thread")] - fn test_write_lock_account_read_conflict_panic() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.read_lock_account(&pk1, 0); - locks.write_lock_account(&pk1, 1); - } - - #[test] - #[should_panic(expected = "write lock must exist")] - fn test_write_unlock_account_not_locked() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.write_unlock_account(&pk1, 0); - } - - #[test] - #[should_panic(expected = "outstanding write lock must be on same thread")] - fn test_write_unlock_account_thread_mismatch() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.write_lock_account(&pk1, 1); - locks.write_unlock_account(&pk1, 0); - } - - #[test] - #[should_panic(expected = "outstanding write lock must be on same thread")] - fn test_read_lock_account_write_conflict_panic() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.write_lock_account(&pk1, 0); - locks.read_lock_account(&pk1, 1); - } - - #[test] - #[should_panic(expected = "read lock must exist")] - fn test_read_unlock_account_not_locked() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.read_unlock_account(&pk1, 1); - } - - #[test] - #[should_panic(expected = "outstanding read lock must be on same thread")] - fn test_read_unlock_account_thread_mismatch() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.read_lock_account(&pk1, 0); - locks.read_unlock_account(&pk1, 1); - } - - #[test] - fn test_write_locking() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.write_lock_account(&pk1, 1); - locks.write_lock_account(&pk1, 1); - locks.write_unlock_account(&pk1, 1); - locks.write_unlock_account(&pk1, 1); - assert!(locks.locks.is_empty()); - } - - #[test] - fn test_read_locking() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.read_lock_account(&pk1, 1); - locks.read_lock_account(&pk1, 1); - locks.read_unlock_account(&pk1, 1); - locks.read_unlock_account(&pk1, 1); - assert!(locks.locks.is_empty()); - } - - #[test] - #[should_panic(expected = "thread_id must be < num_threads")] - fn test_lock_accounts_invalid_thread() { - let pk1 = Pubkey::new_unique(); - let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); - locks.lock_accounts([&pk1].into_iter(), std::iter::empty(), TEST_NUM_THREADS); - } - - #[test] - fn test_thread_set() { - let mut thread_set = ThreadSet::none(); - assert!(thread_set.is_empty()); - assert_eq!(thread_set.num_threads(), 0); - assert_eq!(thread_set.only_one_contained(), None); - for idx in 0..MAX_THREADS { - assert!(!thread_set.contains(idx)); - } - - thread_set.insert(4); - assert!(!thread_set.is_empty()); - assert_eq!(thread_set.num_threads(), 1); - assert_eq!(thread_set.only_one_contained(), Some(4)); - for idx in 0..MAX_THREADS { - assert_eq!(thread_set.contains(idx), idx == 4); - } - - thread_set.insert(2); - assert!(!thread_set.is_empty()); - assert_eq!(thread_set.num_threads(), 2); - assert_eq!(thread_set.only_one_contained(), None); - for idx in 0..MAX_THREADS { - assert_eq!(thread_set.contains(idx), idx == 2 || idx == 4); - } - - thread_set.remove(4); - assert!(!thread_set.is_empty()); - assert_eq!(thread_set.num_threads(), 1); - assert_eq!(thread_set.only_one_contained(), Some(2)); - for idx in 0..MAX_THREADS { - assert_eq!(thread_set.contains(idx), idx == 2); - } - } - - #[test] - fn test_thread_set_any_zero() { - let any_threads = ThreadSet::any(0); - assert_eq!(any_threads.num_threads(), 0); - } - - #[test] - fn test_thread_set_any_max() { - let any_threads = ThreadSet::any(MAX_THREADS); - assert_eq!(any_threads.num_threads(), MAX_THREADS as u32); - } + use super::*; + + const TEST_NUM_THREADS: usize = 4; + const TEST_ANY_THREADS: ThreadSet = ThreadSet::any(TEST_NUM_THREADS); + + // Simple thread selector to select the first schedulable thread + fn test_thread_selector(thread_set: ThreadSet) -> ThreadId { + thread_set.contained_threads_iter().next().unwrap() + } + + #[test] + #[should_panic(expected = "num threads must be > 0")] + fn test_too_few_num_threads() { + ThreadAwareAccountLocks::new(0); + } + + #[test] + #[should_panic(expected = "num threads must be <=")] + fn test_too_many_num_threads() { + ThreadAwareAccountLocks::new(MAX_THREADS + 1); + } + + #[test] + fn test_try_lock_accounts_none() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.read_lock_account(&pk1, 2); + locks.read_lock_account(&pk1, 3); + assert_eq!( + locks.try_lock_accounts( + [&pk1].into_iter(), + [&pk2].into_iter(), + TEST_ANY_THREADS, + test_thread_selector + ), + None + ); + } + + #[test] + fn test_try_lock_accounts_one() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.write_lock_account(&pk2, 3); + + assert_eq!( + locks.try_lock_accounts( + [&pk1].into_iter(), + [&pk2].into_iter(), + TEST_ANY_THREADS, + test_thread_selector + ), + Some(3) + ); + } + + #[test] + fn test_try_lock_accounts_multiple() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.read_lock_account(&pk2, 0); + locks.read_lock_account(&pk2, 0); + + assert_eq!( + locks.try_lock_accounts( + [&pk1].into_iter(), + [&pk2].into_iter(), + TEST_ANY_THREADS - ThreadSet::only(0), // exclude 0 + test_thread_selector + ), + Some(1) + ); + } + + #[test] + fn test_try_lock_accounts_any() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + assert_eq!( + locks.try_lock_accounts( + [&pk1].into_iter(), + [&pk2].into_iter(), + TEST_ANY_THREADS, + test_thread_selector + ), + Some(0) + ); + } + + #[test] + fn test_accounts_schedulable_threads_no_outstanding_locks() { + let pk1 = Pubkey::new_unique(); + let locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + + assert_eq!( + locks.accounts_schedulable_threads([&pk1].into_iter(), std::iter::empty()), + Some(TEST_ANY_THREADS) + ); + assert_eq!( + locks.accounts_schedulable_threads(std::iter::empty(), [&pk1].into_iter()), + Some(TEST_ANY_THREADS) + ); + } + + #[test] + fn test_accounts_schedulable_threads_outstanding_write_only() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + + locks.write_lock_account(&pk1, 2); + assert_eq!( + locks.accounts_schedulable_threads([&pk1, &pk2].into_iter(), std::iter::empty()), + Some(ThreadSet::only(2)) + ); + assert_eq!( + locks.accounts_schedulable_threads(std::iter::empty(), [&pk1, &pk2].into_iter()), + Some(ThreadSet::only(2)) + ); + } + + #[test] + fn test_accounts_schedulable_threads_outstanding_read_only() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + + locks.read_lock_account(&pk1, 2); + assert_eq!( + locks.accounts_schedulable_threads([&pk1, &pk2].into_iter(), std::iter::empty()), + Some(ThreadSet::only(2)) + ); + assert_eq!( + locks.accounts_schedulable_threads(std::iter::empty(), [&pk1, &pk2].into_iter()), + Some(TEST_ANY_THREADS) + ); + + locks.read_lock_account(&pk1, 0); + assert_eq!( + locks.accounts_schedulable_threads([&pk1, &pk2].into_iter(), std::iter::empty()), + None + ); + assert_eq!( + locks.accounts_schedulable_threads(std::iter::empty(), [&pk1, &pk2].into_iter()), + Some(TEST_ANY_THREADS) + ); + } + + #[test] + fn test_accounts_schedulable_threads_outstanding_mixed() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + + locks.read_lock_account(&pk1, 2); + locks.write_lock_account(&pk1, 2); + assert_eq!( + locks.accounts_schedulable_threads([&pk1, &pk2].into_iter(), std::iter::empty()), + Some(ThreadSet::only(2)) + ); + assert_eq!( + locks.accounts_schedulable_threads(std::iter::empty(), [&pk1, &pk2].into_iter()), + Some(ThreadSet::only(2)) + ); + } + + #[test] + #[should_panic(expected = "outstanding write lock must be on same thread")] + fn test_write_lock_account_write_conflict_panic() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.write_lock_account(&pk1, 0); + locks.write_lock_account(&pk1, 1); + } + + #[test] + #[should_panic(expected = "outstanding read lock must be on same thread")] + fn test_write_lock_account_read_conflict_panic() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.read_lock_account(&pk1, 0); + locks.write_lock_account(&pk1, 1); + } + + #[test] + #[should_panic(expected = "write lock must exist")] + fn test_write_unlock_account_not_locked() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.write_unlock_account(&pk1, 0); + } + + #[test] + #[should_panic(expected = "outstanding write lock must be on same thread")] + fn test_write_unlock_account_thread_mismatch() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.write_lock_account(&pk1, 1); + locks.write_unlock_account(&pk1, 0); + } + + #[test] + #[should_panic(expected = "outstanding write lock must be on same thread")] + fn test_read_lock_account_write_conflict_panic() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.write_lock_account(&pk1, 0); + locks.read_lock_account(&pk1, 1); + } + + #[test] + #[should_panic(expected = "read lock must exist")] + fn test_read_unlock_account_not_locked() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.read_unlock_account(&pk1, 1); + } + + #[test] + #[should_panic(expected = "outstanding read lock must be on same thread")] + fn test_read_unlock_account_thread_mismatch() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.read_lock_account(&pk1, 0); + locks.read_unlock_account(&pk1, 1); + } + + #[test] + fn test_write_locking() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.write_lock_account(&pk1, 1); + locks.write_lock_account(&pk1, 1); + locks.write_unlock_account(&pk1, 1); + locks.write_unlock_account(&pk1, 1); + assert!(locks.locks.is_empty()); + } + + #[test] + fn test_read_locking() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.read_lock_account(&pk1, 1); + locks.read_lock_account(&pk1, 1); + locks.read_unlock_account(&pk1, 1); + locks.read_unlock_account(&pk1, 1); + assert!(locks.locks.is_empty()); + } + + #[test] + #[should_panic(expected = "thread_id must be < num_threads")] + fn test_lock_accounts_invalid_thread() { + let pk1 = Pubkey::new_unique(); + let mut locks = ThreadAwareAccountLocks::new(TEST_NUM_THREADS); + locks.lock_accounts([&pk1].into_iter(), std::iter::empty(), TEST_NUM_THREADS); + } + + #[test] + fn test_thread_set() { + let mut thread_set = ThreadSet::none(); + assert!(thread_set.is_empty()); + assert_eq!(thread_set.num_threads(), 0); + assert_eq!(thread_set.only_one_contained(), None); + for idx in 0..MAX_THREADS { + assert!(!thread_set.contains(idx)); + } + + thread_set.insert(4); + assert!(!thread_set.is_empty()); + assert_eq!(thread_set.num_threads(), 1); + assert_eq!(thread_set.only_one_contained(), Some(4)); + for idx in 0..MAX_THREADS { + assert_eq!(thread_set.contains(idx), idx == 4); + } + + thread_set.insert(2); + assert!(!thread_set.is_empty()); + assert_eq!(thread_set.num_threads(), 2); + assert_eq!(thread_set.only_one_contained(), None); + for idx in 0..MAX_THREADS { + assert_eq!(thread_set.contains(idx), idx == 2 || idx == 4); + } + + thread_set.remove(4); + assert!(!thread_set.is_empty()); + assert_eq!(thread_set.num_threads(), 1); + assert_eq!(thread_set.only_one_contained(), Some(2)); + for idx in 0..MAX_THREADS { + assert_eq!(thread_set.contains(idx), idx == 2); + } + } + + #[test] + fn test_thread_set_any_zero() { + let any_threads = ThreadSet::any(0); + assert_eq!(any_threads.num_threads(), 0); + } + + #[test] + fn test_thread_set_any_max() { + let any_threads = ThreadSet::any(MAX_THREADS); + assert_eq!(any_threads.num_threads(), MAX_THREADS as u32); + } } diff --git a/prio-graph-scheduler/src/transaction_priority_id.rs b/prio-graph-scheduler/src/transaction_priority_id.rs index f399273..05720c8 100644 --- a/prio-graph-scheduler/src/transaction_priority_id.rs +++ b/prio-graph-scheduler/src/transaction_priority_id.rs @@ -1,69 +1,69 @@ use { - crate::scheduler_messages::TransactionId, - prio_graph::TopLevelId, - std::hash::{Hash, Hasher}, + crate::scheduler_messages::TransactionId, + prio_graph::TopLevelId, + std::hash::{Hash, Hasher}, }; /// A unique identifier tied with priority ordering for a transaction/packet: #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct TransactionPriorityId { - pub priority: u64, - pub id: TransactionId, + pub priority: u64, + pub id: TransactionId, } impl TransactionPriorityId { - pub fn new(priority: u64, id: TransactionId) -> Self { - Self { priority, id } - } + pub fn new(priority: u64, id: TransactionId) -> Self { + Self { priority, id } + } } impl Hash for TransactionPriorityId { - fn hash(&self, state: &mut H) { - self.id.hash(state) - } + fn hash(&self, state: &mut H) { + self.id.hash(state) + } } impl TopLevelId for TransactionPriorityId { - fn id(&self) -> Self { - *self - } + fn id(&self) -> Self { + *self + } } #[cfg(test)] mod tests { - use super::*; + use super::*; - #[test] - fn test_transaction_priority_id_ordering() { - // Higher priority first - { - let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); - let id2 = TransactionPriorityId::new(2, TransactionId::new(1)); - assert!(id1 < id2); - assert!(id1 <= id2); - assert!(id2 > id1); - assert!(id2 >= id1); - } + #[test] + fn test_transaction_priority_id_ordering() { + // Higher priority first + { + let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); + let id2 = TransactionPriorityId::new(2, TransactionId::new(1)); + assert!(id1 < id2); + assert!(id1 <= id2); + assert!(id2 > id1); + assert!(id2 >= id1); + } - // Equal priority then compare by id - { - let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); - let id2 = TransactionPriorityId::new(1, TransactionId::new(2)); - assert!(id1 < id2); - assert!(id1 <= id2); - assert!(id2 > id1); - assert!(id2 >= id1); - } + // Equal priority then compare by id + { + let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); + let id2 = TransactionPriorityId::new(1, TransactionId::new(2)); + assert!(id1 < id2); + assert!(id1 <= id2); + assert!(id2 > id1); + assert!(id2 >= id1); + } - // Equal priority and id - { - let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); - let id2 = TransactionPriorityId::new(1, TransactionId::new(1)); - assert_eq!(id1, id2); - assert!(id1 >= id2); - assert!(id1 <= id2); - assert!(id2 >= id1); - assert!(id2 <= id1); - } - } + // Equal priority and id + { + let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); + let id2 = TransactionPriorityId::new(1, TransactionId::new(1)); + assert_eq!(id1, id2); + assert!(id1 >= id2); + assert!(id1 <= id2); + assert!(id2 >= id1); + assert!(id2 <= id1); + } + } } diff --git a/prio-graph-scheduler/src/transaction_state_container.rs b/prio-graph-scheduler/src/transaction_state_container.rs index 9afcd73..2f11629 100644 --- a/prio-graph-scheduler/src/transaction_state_container.rs +++ b/prio-graph-scheduler/src/transaction_state_container.rs @@ -146,7 +146,10 @@ impl TransactionStateContainer

{ #[cfg(test)] mod tests { use { - super::*, crate::tests::MockImmutableDeserializedPacket, solana_sdk::{ + super::*, + crate::scheduler_messages::MaxAge, + crate::tests::MockImmutableDeserializedPacket, + solana_sdk::{ compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, @@ -157,7 +160,6 @@ mod tests { system_instruction, transaction::{SanitizedTransaction, Transaction}, }, - crate::scheduler_messages::MaxAge, }; /// Returns (transaction_ttl, priority, cost) diff --git a/scheduler/Cargo.toml b/scheduler/Cargo.toml new file mode 100644 index 0000000..4e24ac7 --- /dev/null +++ b/scheduler/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "igloo-scheduler" +version.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +solana-sdk = { workspace = true } +solana-poh = { workspace = true } +solana-metrics = { workspace = true } +solana-ledger = { workspace = true } +solana-runtime = { workspace = true } +solana-gossip = { workspace = true } +solana-cost-model = { workspace = true } +solana-measure = { workspace = true } + +ahash = { workspace = true } +prio-graph = { workspace = true } +thiserror = { workspace = true } +itertools = { workspace = true } +log = { workspace = true } +crossbeam-channel = { workspace = true } +arrayvec = { workspace = true } +min-max-heap = { workspace = true } + +[dev-dependencies] +assert_matches = { workspace = true } +solana-compute-budget = { workspace = true } +solana-perf = { workspace = true } +solana-runtime-transaction = { workspace = true } +solana-sanitize = { workspace = true } +solana-short-vec = { workspace = true } +# let dev-context-only-utils works when running this crate. +solana-prio-graph-scheduler = { path = ".", features = [ + "dev-context-only-utils", +] } +solana-sdk = { workspace = true, features = ["dev-context-only-utils"] } + +bincode = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[features] +dev-context-only-utils = ["solana-runtime/dev-context-only-utils"] + diff --git a/scheduler/src/lib.rs b/scheduler/src/lib.rs new file mode 100644 index 0000000..b93cf3f --- /dev/null +++ b/scheduler/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: u64, right: u64) -> u64 { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} From 9d71762d57fd8e9b5562f87903a7d5d23fb3431d Mon Sep 17 00:00:00 2001 From: lewis Date: Mon, 28 Oct 2024 16:02:23 +0800 Subject: [PATCH 2/3] feat: unified scheduler frame --- Cargo.lock | 25 +++ scheduler/Cargo.toml | 4 - scheduler/src/impls/mod.rs | 2 + scheduler/src/impls/no_lock_scheduler.rs | 2 + scheduler/src/impls/prio_graph_scheduler.rs | 1 + scheduler/src/lib.rs | 19 +- scheduler/src/scheduler.rs | 22 +++ scheduler/src/scheduler_messages.rs | 67 +++++++ scheduler/src/status_slicing.rs | 190 ++++++++++++++++++++ scheduler/src/stopwatch.rs | 51 ++++++ 10 files changed, 365 insertions(+), 18 deletions(-) create mode 100644 scheduler/src/impls/mod.rs create mode 100644 scheduler/src/impls/no_lock_scheduler.rs create mode 100644 scheduler/src/impls/prio_graph_scheduler.rs create mode 100644 scheduler/src/scheduler.rs create mode 100644 scheduler/src/scheduler_messages.rs create mode 100644 scheduler/src/status_slicing.rs create mode 100644 scheduler/src/stopwatch.rs diff --git a/Cargo.lock b/Cargo.lock index 8688e0e..7539634 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2386,6 +2386,31 @@ dependencies = [ [[package]] name = "igloo-scheduler" version = "0.1.0" +dependencies = [ + "ahash 0.8.11", + "arrayvec", + "assert_matches", + "bincode", + "crossbeam-channel", + "itertools 0.13.0", + "log", + "min-max-heap", + "prio-graph", + "solana-compute-budget", + "solana-cost-model", + "solana-gossip", + "solana-ledger", + "solana-measure", + "solana-metrics", + "solana-perf", + "solana-poh", + "solana-runtime", + "solana-runtime-transaction", + "solana-sanitize", + "solana-sdk", + "solana-short-vec", + "thiserror", +] [[package]] name = "igloo-storage" diff --git a/scheduler/Cargo.toml b/scheduler/Cargo.toml index 4e24ac7..cb12eac 100644 --- a/scheduler/Cargo.toml +++ b/scheduler/Cargo.toml @@ -33,10 +33,6 @@ solana-perf = { workspace = true } solana-runtime-transaction = { workspace = true } solana-sanitize = { workspace = true } solana-short-vec = { workspace = true } -# let dev-context-only-utils works when running this crate. -solana-prio-graph-scheduler = { path = ".", features = [ - "dev-context-only-utils", -] } solana-sdk = { workspace = true, features = ["dev-context-only-utils"] } bincode = { workspace = true } diff --git a/scheduler/src/impls/mod.rs b/scheduler/src/impls/mod.rs new file mode 100644 index 0000000..f10c131 --- /dev/null +++ b/scheduler/src/impls/mod.rs @@ -0,0 +1,2 @@ +pub mod no_lock_scheduler; +pub mod prio_graph_scheduler; diff --git a/scheduler/src/impls/no_lock_scheduler.rs b/scheduler/src/impls/no_lock_scheduler.rs new file mode 100644 index 0000000..a60d33e --- /dev/null +++ b/scheduler/src/impls/no_lock_scheduler.rs @@ -0,0 +1,2 @@ +/// NoLockScheduler is a dummy scheduler that does not lock any resources. +pub struct NoLockScheduler {} diff --git a/scheduler/src/impls/prio_graph_scheduler.rs b/scheduler/src/impls/prio_graph_scheduler.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/scheduler/src/impls/prio_graph_scheduler.rs @@ -0,0 +1 @@ + diff --git a/scheduler/src/lib.rs b/scheduler/src/lib.rs index b93cf3f..a108e32 100644 --- a/scheduler/src/lib.rs +++ b/scheduler/src/lib.rs @@ -1,14 +1,5 @@ -pub fn add(left: u64, right: u64) -> u64 { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} +pub mod impls; +pub mod scheduler; +mod scheduler_messages; +pub mod status_slicing; +pub mod stopwatch; diff --git a/scheduler/src/scheduler.rs b/scheduler/src/scheduler.rs new file mode 100644 index 0000000..7525ec5 --- /dev/null +++ b/scheduler/src/scheduler.rs @@ -0,0 +1,22 @@ +use crate::scheduler_messages::{SchedulingBatch, SchedulingBatchResult}; +use crossbeam_channel::Receiver; +use std::sync::mpsc::Sender; + +/// A Scheduler is a single-thread centralized scheduling thread. +/// +/// It will be initialized with N task sending channels and a task callback channel, +/// with normally an inner scheduling status machine. +/// Workflow just like below: +/// -> Task channel1 -> [worker1] -> Task finish callback -> +/// | ... | +/// Scheduler -- Task channelK -> [workerK] -> Task finish callback -> Scheduler +/// | ... | +/// -> Task channelN -> [workerN] -> Task finish callback -> +pub trait Scheduler { + fn new( + schedule_task_receivers: Vec>, + task_finish_receiver: Receiver, + ) -> Self; + + fn schedule_batch(&mut self, txs: SchedulingBatch); +} diff --git a/scheduler/src/scheduler_messages.rs b/scheduler/src/scheduler_messages.rs new file mode 100644 index 0000000..9966b06 --- /dev/null +++ b/scheduler/src/scheduler_messages.rs @@ -0,0 +1,67 @@ +use crate::status_slicing::SvmWorkerSlicingStatus; +use { + solana_sdk::transaction::SanitizedTransaction, + std::fmt::Display, +}; + +/// A unique identifier for a transaction batch. +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +pub struct TransactionBatchId(u64); + +impl TransactionBatchId { + pub fn new(index: u64) -> Self { + Self(index) + } +} + +impl Display for TransactionBatchId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for TransactionBatchId { + fn from(id: u64) -> Self { + Self(id) + } +} + +/// A unique identifier for a transaction. +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct TransactionId(u64); + +impl TransactionId { + pub fn new(index: u64) -> Self { + Self(index) + } +} + +impl Display for TransactionId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for TransactionId { + fn from(id: u64) -> Self { + Self(id) + } +} + +/// Scheduling unit. +pub struct SchedulingBatch { + pub batch_id: TransactionBatchId, + pub ids: Vec, + pub transactions: Vec, +} + +/// The scheduling result from worker one time. +/// Since the `SchedulingBatch` will be dispute to different subset to multi workers, +/// the `SchedulingBatchResult` is not 1-1 with SchedulingBatch. +/// One `batch_id` may occur mostly `num_of_worker` times. +pub struct SchedulingBatchResult { + // workload. + pub batch: SchedulingBatch, + // time slice status for this batch job. + pub status_summary: Vec, +} diff --git a/scheduler/src/status_slicing.rs b/scheduler/src/status_slicing.rs new file mode 100644 index 0000000..916ba10 --- /dev/null +++ b/scheduler/src/status_slicing.rs @@ -0,0 +1,190 @@ +use ahash::{HashMap, HashMapExt}; + +/// Represents the current status of an SVM worker, including its duration. +#[derive(Debug, Clone)] +pub enum SvmWorkerSlicingStatus { + /// Worker is actively processing transactions. + Active { start: u64, end: u64 }, + /// Worker is idle, waiting for new transactions. + Idle { start: u64, end: u64 }, +} + +impl SvmWorkerSlicingStatus { + /// Period start time (Unix timestamp in milliseconds). + pub fn start(&self) -> u64 { + match self { + SvmWorkerSlicingStatus::Active { start, .. } + | SvmWorkerSlicingStatus::Idle { start, .. } => *start, + } + } + + /// Period end time (Unix timestamp in milliseconds). + pub fn end(&self) -> u64 { + match self { + SvmWorkerSlicingStatus::Active { end, .. } + | SvmWorkerSlicingStatus::Idle { end, .. } => *end, + } + } + + /// Returns true if the status is Active. + pub fn is_active(&self) -> bool { + matches!(self, SvmWorkerSlicingStatus::Active { .. }) + } + + /// Returns true if the status is Idle. + pub fn is_idle(&self) -> bool { + matches!(self, SvmWorkerSlicingStatus::Idle { .. }) + } +} + +impl SvmWorkerSlicingStatus { + /// Creates a new Active status with the given start and end times. + pub fn new_active(start: u64, end: u64) -> Self { + SvmWorkerSlicingStatus::Active { start, end } + } + + /// Creates a new Idle status with the given start and end times. + pub fn new_idle(start: u64, end: u64) -> Self { + SvmWorkerSlicingStatus::Idle { start, end } + } + + /// Returns the duration of the status in milliseconds. + pub fn duration_ms(&self) -> u64 { + let (start, end) = match self { + SvmWorkerSlicingStatus::Active { start, end } => (start, end), + SvmWorkerSlicingStatus::Idle { start, end } => (start, end), + }; + end.saturating_sub(*start) + } +} + +/// Represents a status update from a worker thread. +#[derive(Debug)] +pub struct WorkerStatusUpdate { + /// The ID of the worker thread. + pub thread_id: usize, + /// The current status of the worker. + pub status: SvmWorkerSlicingStatus, +} + +/// Calculates and summarizes the thread load and coverage for a given set of worker status updates. +pub fn calculate_thread_load_summary(updates: &[WorkerStatusUpdate]) -> ThreadLoadSummary { + let mut summary = ThreadLoadSummary::default(); + let mut time_windows: Vec<(u64, u64)> = Vec::new(); + let mut thread_statuses: HashMap> = HashMap::new(); + + // Collect all time windows and group statuses by thread + for update in updates { + let (start, end) = match update.status { + SvmWorkerSlicingStatus::Active { start, end } => (start, end), + SvmWorkerSlicingStatus::Idle { start, end } => (start, end), + }; + time_windows.push((start, end)); + thread_statuses + .entry(update.thread_id) + .or_default() + .push(update.status.clone()); + } + + // Sort and merge overlapping time windows + time_windows.sort_by_key(|&(start, _)| start); + let merged_windows = merge_time_windows(time_windows); + summary.merged_windows = merged_windows.len() as u64; + + let total_threads = thread_statuses.len() as f64; + + for (window_start, window_end) in merged_windows { + let window_duration = window_end - window_start; + let mut active_thread_time = 0; + + for statuses in thread_statuses.values() { + for status in statuses { + match status { + SvmWorkerSlicingStatus::Active { start, end } => { + if *start < window_end && *end > window_start { + let overlap_start = (*start).max(window_start); + let overlap_end = (*end).min(window_end); + active_thread_time += overlap_end - overlap_start; + } + } + SvmWorkerSlicingStatus::Idle { .. } => {} + } + } + } + + let window_load = active_thread_time as f64 / (window_duration as f64 * total_threads); + summary.total_duration += window_duration; + summary.weighted_load += window_load * window_duration as f64; + } + + summary.average_load = summary.weighted_load / summary.total_duration as f64; + summary +} + +/// Merges overlapping time windows. +/// +/// This function takes a vector of time windows (start, end) and merges +/// overlapping or adjacent windows into single, continuous windows. +/// +/// # Examples: +/// +/// 1. Non-overlapping windows: +/// Input: [(1, 3), (5, 7), (9, 11)] +/// Output: [(1, 3), (5, 7), (9, 11)] +/// +/// 2. Overlapping windows: +/// Input: [(1, 5), (2, 6), (3, 7)] +/// Output: [(1, 7)] +/// +/// 3. Adjacent windows: +/// Input: [(1, 3), (3, 5), (7, 9)] +/// Output: [(1, 5), (7, 9)] +/// +/// 4. Mixed case: +/// Input: [(1, 3), (2, 4), (5, 7), (6, 8), (10, 12)] +/// Output: [(1, 4), (5, 8), (10, 12)] +fn merge_time_windows(windows: Vec<(u64, u64)>) -> Vec<(u64, u64)> { + if windows.is_empty() { + return vec![]; + } + + let mut merged = vec![windows[0]]; + + for window in windows.into_iter().skip(1) { + let last = merged.last_mut().unwrap(); + if window.0 <= last.1 { + // If the start of the current window is less than or equal to + // the end of the last merged window, we have an overlap or adjacent windows. + // Extend the last merged window to cover both. + last.1 = last.1.max(window.1); + } else { + // If there's no overlap, add the current window as a new entry. + merged.push(window); + } + } + merged +} + +/// Represents a summary of thread load and coverage. +#[derive(Debug, Default)] +pub struct ThreadLoadSummary { + /// The total duration of all merged time windows. + pub total_duration: u64, + /// The weighted sum of load across all time windows. + pub weighted_load: f64, + /// The average load across all time windows. + pub average_load: f64, + /// total merged windows + pub merged_windows: u64, +} + +impl ThreadLoadSummary { + /// Prints a human-readable summary of the thread load and coverage. + pub fn print_summary(&self) { + println!("Thread Load Summary:"); + println!("Total Merged Windows: {}", self.merged_windows); + println!("Total Duration: {} ms", self.total_duration); + println!("Average Load: {:.2}%", self.average_load * 100.0); + println!("Overall Thread Coverage: {:.2}%", self.average_load * 100.0); + } +} diff --git a/scheduler/src/stopwatch.rs b/scheduler/src/stopwatch.rs new file mode 100644 index 0000000..8b4a8c3 --- /dev/null +++ b/scheduler/src/stopwatch.rs @@ -0,0 +1,51 @@ +use std::time::{Duration, Instant}; + +/// StopWatch is a utility struct for measuring and recording elapsed time at various points. +pub struct StopWatch { + /// The name of this StopWatch instance + name: String, + /// The time when this StopWatch was created or reset + start_time: Instant, + /// A vector of tuples containing the elapsed time and a note for each click + clicks: Vec<(Duration, String)>, +} + +impl StopWatch { + /// Creates a new StopWatch with a given name + pub fn new(name: impl Into) -> Self { + StopWatch { + name: name.into(), + start_time: Instant::now(), + clicks: Vec::new(), + } + } + + /// Records a new click with the given note and the current elapsed time + pub fn click(&mut self, note: impl Into) { + let elapsed = self.start_time.elapsed(); + self.clicks.push((elapsed, note.into())); + } + + /// Generates a summary of all recorded clicks + pub fn summary(&self) -> String { + let mut result = String::new(); + result.push_str(&format!("{} Summary:\n", self.name)); + + if self.clicks.is_empty() { + result.push_str("No clicks recorded.\n"); + } else { + let mut last_time = Duration::from_secs(0); + for (time, note) in self.clicks.iter() { + let duration = *time - last_time; + result.push_str(&format!("[{}] - duration: {:?}\n", note, duration)); + last_time = *time; + } + result.push_str(&format!( + "Total time: {:?}\n", + self.clicks.last().unwrap().0 + )); + } + + result + } +} From bf18708c298b3763b58c6ff67fa83cf456ff2647 Mon Sep 17 00:00:00 2001 From: lewis Date: Mon, 28 Oct 2024 17:16:49 +0800 Subject: [PATCH 3/3] feat: a new framework and simulation --- Cargo.lock | 5 + Cargo.toml | 1 + scheduler/Cargo.toml | 10 + scheduler/bin/scheduling_simulation.rs | 278 +++++++++++++++++++++++ scheduler/src/id_generator.rs | 19 ++ scheduler/src/impls/no_lock_scheduler.rs | 38 +++- scheduler/src/lib.rs | 3 +- scheduler/src/scheduler.rs | 7 +- scheduler/src/scheduler_messages.rs | 16 +- 9 files changed, 365 insertions(+), 12 deletions(-) create mode 100644 scheduler/bin/scheduling_simulation.rs create mode 100644 scheduler/src/id_generator.rs diff --git a/Cargo.lock b/Cargo.lock index 7539634..71142bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2392,6 +2392,9 @@ dependencies = [ "assert_matches", "bincode", "crossbeam-channel", + "igloo-executor", + "igloo-storage", + "igloo-verifier", "itertools 0.13.0", "log", "min-max-heap", @@ -2404,11 +2407,13 @@ dependencies = [ "solana-metrics", "solana-perf", "solana-poh", + "solana-program", "solana-runtime", "solana-runtime-transaction", "solana-sanitize", "solana-sdk", "solana-short-vec", + "tempfile", "thiserror", ] diff --git a/Cargo.toml b/Cargo.toml index 062027b..b3e2b9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ spl-pod = "=0.3.0" igloo-interface = { path = "interface" } igloo-storage = { path = "storage" } igloo-verifier = { path = "verifier" } +igloo-executor = { path = "executor" } igloo-rpc = { path = "rpc" } svm-executor = { path = "svm/executor" } solana-prio-graph-scheduler = { path = "prio-graph-scheduler" } diff --git a/scheduler/Cargo.toml b/scheduler/Cargo.toml index cb12eac..e95a1b3 100644 --- a/scheduler/Cargo.toml +++ b/scheduler/Cargo.toml @@ -16,6 +16,7 @@ solana-runtime = { workspace = true } solana-gossip = { workspace = true } solana-cost-model = { workspace = true } solana-measure = { workspace = true } +solana-program = { workspace = true } ahash = { workspace = true } prio-graph = { workspace = true } @@ -25,6 +26,11 @@ log = { workspace = true } crossbeam-channel = { workspace = true } arrayvec = { workspace = true } min-max-heap = { workspace = true } +tempfile = { workspace = true } + +igloo-executor = { workspace = true } +igloo-storage = { workspace = true } +igloo-verifier = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } @@ -43,3 +49,7 @@ targets = ["x86_64-unknown-linux-gnu"] [features] dev-context-only-utils = ["solana-runtime/dev-context-only-utils"] + +[[bin]] +name = "scheduling-simulation" +path = "bin/scheduling_simulation.rs" \ No newline at end of file diff --git a/scheduler/bin/scheduling_simulation.rs b/scheduler/bin/scheduling_simulation.rs new file mode 100644 index 0000000..4ba71ac --- /dev/null +++ b/scheduler/bin/scheduling_simulation.rs @@ -0,0 +1,278 @@ +use crossbeam_channel::{unbounded, Receiver, Sender}; +use igloo_executor::processor::TransactionProcessor; +use igloo_scheduler::id_generator::IdGenerator; +use igloo_scheduler::impls::no_lock_scheduler::NoLockScheduler; +use igloo_scheduler::scheduler::Scheduler; +use igloo_scheduler::scheduler_messages::{ + SchedulingBatch, SchedulingBatchResult, +}; +use igloo_scheduler::status_slicing::{ + calculate_thread_load_summary, SvmWorkerSlicingStatus, WorkerStatusUpdate, +}; +use igloo_scheduler::stopwatch::StopWatch; +use igloo_storage::{config::GlobalConfig, RollupStorage}; +use igloo_verifier::settings::{Settings, Switchs}; +use itertools::Itertools; +use solana_program::hash::Hash; +use solana_sdk::account::AccountSharedData; +use solana_sdk::transaction::SanitizedTransaction; +use solana_sdk::{ + pubkey::Pubkey, signature::Keypair, signer::Signer, system_program, system_transaction, +}; +use std::borrow::Cow; +use std::error::Error; +use std::sync::Arc; +use std::thread; +use std::time::{SystemTime, UNIX_EPOCH}; + +type E = Box; + +/// Generate a mocked transfer transaction from one account to another +/// +/// # Parameters +/// * `from` - The `Keypair` of the sender account +/// * `to` - The `Pubkey` of the recipient account +/// * `amount` - The amount of lamports to transfer +/// +/// # Returns +/// A `Result` containing a `SanitizedTransaction` representing the transfer, or an error +fn mocking_transfer_tx( + from: &Keypair, + to: &Pubkey, + amount: u64, + recent_blockhash: Hash, +) -> Result { + let transaction = system_transaction::transfer(from, to, amount, recent_blockhash); + Ok(SanitizedTransaction::from_transaction_for_tests( + transaction, + )) +} + +const TOTAL_TX_NUM: usize = 1024 * 16; +const TOTAL_WORKER_NUM: usize = 4; +// each tx need 2 unique accounts. +const NUM_ACCOUNTS: usize = TOTAL_TX_NUM * 2; +// initial account balance: 100 SOL. +const ACCOUNT_BALANCE: u64 = 100_000_000_000; +// batch size of each call of scheduler. +const SCHEDULER_BATCH_SIZE: usize = 2048; + +/// Worker process function that receives ScheduledTransactions and processes them +/// +/// # Parameters +/// * `receiver` - The channel receiver for incoming ScheduledTransactions +/// * `store` - The RollupStorage instance +/// * `settings` - The Settings for the TransactionProcessor +/// +/// # Returns +/// A Result containing the number of successfully processed transactions, or an error +fn worker_process( + thread_id: usize, + receiver: Receiver, + store: Arc, + settings: Settings, + status_sender: Sender, + completed_sender: Sender, +) -> Result { + let bank_processor = TransactionProcessor::new(store.current_bank(), settings); + let mut success_count = 0; + let mut idle_start = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + while let Ok(scheduled_txs) = receiver.recv() { + // Calculate and send idle status before processing + let active_start = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let idle_status = SvmWorkerSlicingStatus::new_idle(idle_start, active_start); + status_sender.send(WorkerStatusUpdate { + thread_id, + status: idle_status, + })?; + // Process transactions + let execute_result = bank_processor.process(Cow::Borrowed(&scheduled_txs.transactions))?; + success_count += execute_result + .execution_results + .iter() + .filter(|x| x.was_executed_successfully()) + .count(); + + let active_end = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let active_status = SvmWorkerSlicingStatus::new_active(active_start, active_end); + if let Err(e) = status_sender.send(WorkerStatusUpdate { + thread_id, + status: active_status, + }) { + eprintln!("send status error: {:?}", e); + } + + // TODO retryable_indexes logic + let result = SchedulingBatchResult { + batch: scheduled_txs, + retryable_indexes: vec![], + }; + + // it's ok to ignore send error. + // because error is handled by the scheduler. + // if scheduler exits, means all task is scheduled. + // no need to maintain lock now. + let _ = completed_sender.send(result); + + // Update idle_start for next iteration + idle_start = active_end; + } + + Ok(success_count) +} + +fn main() -> Result<(), E> { + let mut stopwatch = StopWatch::new("scheduling_simulation"); + + // workload channel. + let (senders, receivers): (Vec>, Vec>) = + (0..TOTAL_WORKER_NUM).map(|_| unbounded()).unzip(); + + // thread slice_status update channel + let (status_sender, status_receiver) = unbounded(); + + // transaction completed channel + let (completed_sender, completed_receiver) = unbounded(); + + let accounts: Vec<_> = (0..NUM_ACCOUNTS) + .map(|_| (Keypair::new(), ACCOUNT_BALANCE)) + .collect(); + stopwatch.click("account initialization"); + + let ledger_path = tempfile::tempdir()?.into_path(); + let mut config = GlobalConfig::new_temp(&ledger_path)?; + config.allow_default_genesis = true; + + // Insert accounts into genesis + for (keypair, balance) in &accounts { + config.genesis.add_account( + keypair.pubkey(), + AccountSharedData::new(*balance, 0, &system_program::id()), + ); + } + + let mut store = RollupStorage::new(config)?; + store.init()?; + store.bump()?; + let store = Arc::new(store); + stopwatch.click("storage initialization"); + + let recent_hash = store.current_bank().last_blockhash(); + let transfer_txs = accounts + .chunks(2) + .enumerate() + .map(|(_, chunk)| { + mocking_transfer_tx(&chunk[0].0, &chunk[1].0.pubkey(), 1e9 as u64, recent_hash) + }) + .collect::, _>>()?; + stopwatch.click("tx generation"); + + // Start worker threads + let settings = Settings { + max_age: Default::default(), + switchs: Switchs { + tx_sanity_check: false, + txs_conflict_check: true, + }, + fee_structure: Default::default(), + }; + let worker_handles: Vec<_> = receivers + .into_iter() + .enumerate() + .map(|(i, receiver)| { + let store_clone = Arc::clone(&store); + let settings_clone = settings.clone(); + thread::spawn({ + let ss = status_sender.clone(); + let cs = completed_sender.clone(); + move || worker_process(i, receiver, store_clone, settings_clone, ss, cs) + }) + }) + .collect(); + + let mut batch_id_gen = IdGenerator::default(); + let mut tx_id_gen = IdGenerator::default(); + + let mut scheduler = NoLockScheduler::new(senders.clone(), completed_receiver); + for chunk in transfer_txs + .into_iter() + .chunks(SCHEDULER_BATCH_SIZE) + .into_iter() + .map(|chunk| chunk.collect()) + .map(|transactions: Vec<_>| { + let ids = transactions + .iter() + .map(|_| tx_id_gen.next()) + .collect::>(); + SchedulingBatch { + batch_id: batch_id_gen.next(), + ids, + transactions, + } + }) + .collect::>() + { + scheduler.schedule_batch(chunk); + } + + // Close senders to signal workers to finish + drop(senders); + drop(scheduler); + drop(status_sender); + + // Wait for workers to finish and collect results + let total_success_count: usize = worker_handles + .into_iter() + .map(|handle| handle.join().unwrap().unwrap()) + .sum(); + stopwatch.click(format!("tx execution(done: {})", total_success_count)); + + // let commit_start = Instant::now(); + // let result = store.commit_block( + // vec![TransactionsResultWrapper { + // output: Default::default(), // Note: This needs to be updated with actual results + // }], + // vec![CommitBatch::new(transfer_txs.into())], + // )?; + // let commit_duration = commit_start.elapsed(); + // println!("Block commit time: {:?}", commit_duration); + let mut all_status_point = Vec::with_capacity(1024 * 256); + while let Ok(point) = status_receiver.recv() { + all_status_point.push(point); + } + + println!("time stat: {}", stopwatch.summary()); + + // Sort all_status_point by thread_id and then by start time + all_status_point.sort_by(|a, b| { + a.thread_id.cmp(&b.thread_id).then_with(|| { + let a_start = match &a.status { + SvmWorkerSlicingStatus::Active { start, .. } => *start, + SvmWorkerSlicingStatus::Idle { start, .. } => *start, + }; + let b_start = match &b.status { + SvmWorkerSlicingStatus::Active { start, .. } => *start, + SvmWorkerSlicingStatus::Idle { start, .. } => *start, + }; + a_start.cmp(&b_start) + }) + }); + + // Print sorted worker status updates + println!( + "thread load: {:?}", + calculate_thread_load_summary(all_status_point.as_slice()) + ); + + Ok(()) +} diff --git a/scheduler/src/id_generator.rs b/scheduler/src/id_generator.rs new file mode 100644 index 0000000..3090e4e --- /dev/null +++ b/scheduler/src/id_generator.rs @@ -0,0 +1,19 @@ +/// Simple reverse-sequential ID generator for `TransactionId`s. +/// These IDs uniquely identify transactions during the scheduling process. +pub struct IdGenerator { + next_id: u64, +} + +impl Default for IdGenerator { + fn default() -> Self { + Self { next_id: u64::MAX } + } +} + +impl IdGenerator { + pub fn next>(&mut self) -> T { + let id = self.next_id; + self.next_id = self.next_id.wrapping_sub(1); + T::from(id) + } +} diff --git a/scheduler/src/impls/no_lock_scheduler.rs b/scheduler/src/impls/no_lock_scheduler.rs index a60d33e..808221f 100644 --- a/scheduler/src/impls/no_lock_scheduler.rs +++ b/scheduler/src/impls/no_lock_scheduler.rs @@ -1,2 +1,38 @@ +use crate::scheduler::Scheduler; +use crate::scheduler_messages::{SchedulingBatch, SchedulingBatchResult}; +use crossbeam_channel::{Receiver, Sender}; + /// NoLockScheduler is a dummy scheduler that does not lock any resources. -pub struct NoLockScheduler {} +pub struct NoLockScheduler { + thread_num: usize, + task_senders: Vec>, +} + +impl Scheduler for NoLockScheduler { + fn new( + schedule_task_senders: Vec>, + _: Receiver, + ) -> Self { + Self { + thread_num: schedule_task_senders.len(), + task_senders: schedule_task_senders, + } + } + + fn schedule_batch(&mut self, txs: SchedulingBatch) { + let exec_batch = 64; + txs.transactions + .chunks(exec_batch) + .enumerate() + .for_each(|(i, chunk)| { + let worker_id = + (txs.batch_id.value() as usize % self.thread_num + i) % self.thread_num; + let batch = SchedulingBatch { + batch_id: txs.batch_id, + ids: txs.ids[i * exec_batch..(i + 1) * exec_batch].to_vec(), + transactions: chunk.to_vec(), + }; + self.task_senders[worker_id].send(batch).unwrap(); + }); + } +} diff --git a/scheduler/src/lib.rs b/scheduler/src/lib.rs index a108e32..0739c71 100644 --- a/scheduler/src/lib.rs +++ b/scheduler/src/lib.rs @@ -1,5 +1,6 @@ pub mod impls; pub mod scheduler; -mod scheduler_messages; +pub mod scheduler_messages; pub mod status_slicing; pub mod stopwatch; +pub mod id_generator; diff --git a/scheduler/src/scheduler.rs b/scheduler/src/scheduler.rs index 7525ec5..b375d1a 100644 --- a/scheduler/src/scheduler.rs +++ b/scheduler/src/scheduler.rs @@ -1,6 +1,5 @@ use crate::scheduler_messages::{SchedulingBatch, SchedulingBatchResult}; -use crossbeam_channel::Receiver; -use std::sync::mpsc::Sender; +use crossbeam_channel::{Receiver, Sender}; /// A Scheduler is a single-thread centralized scheduling thread. /// @@ -14,8 +13,8 @@ use std::sync::mpsc::Sender; /// -> Task channelN -> [workerN] -> Task finish callback -> pub trait Scheduler { fn new( - schedule_task_receivers: Vec>, - task_finish_receiver: Receiver, + schedule_task_senders: Vec>, + task_finished_receivers: Receiver, ) -> Self; fn schedule_batch(&mut self, txs: SchedulingBatch); diff --git a/scheduler/src/scheduler_messages.rs b/scheduler/src/scheduler_messages.rs index 9966b06..c7870ed 100644 --- a/scheduler/src/scheduler_messages.rs +++ b/scheduler/src/scheduler_messages.rs @@ -1,8 +1,4 @@ -use crate::status_slicing::SvmWorkerSlicingStatus; -use { - solana_sdk::transaction::SanitizedTransaction, - std::fmt::Display, -}; +use {solana_sdk::transaction::SanitizedTransaction, std::fmt::Display}; /// A unique identifier for a transaction batch. #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] @@ -12,6 +8,10 @@ impl TransactionBatchId { pub fn new(index: u64) -> Self { Self(index) } + + pub fn value(&self) -> u64 { + self.0 + } } impl Display for TransactionBatchId { @@ -34,6 +34,10 @@ impl TransactionId { pub fn new(index: u64) -> Self { Self(index) } + + pub fn value(&self) -> u64 { + self.0 + } } impl Display for TransactionId { @@ -63,5 +67,5 @@ pub struct SchedulingBatchResult { // workload. pub batch: SchedulingBatch, // time slice status for this batch job. - pub status_summary: Vec, + pub retryable_indexes: Vec, }