Skip to content

Commit

Permalink
feat: refactor scheduler module
Browse files Browse the repository at this point in the history
  • Loading branch information
flame4 committed Oct 21, 2024
1 parent 52e6577 commit 3aadd4b
Show file tree
Hide file tree
Showing 16 changed files with 203 additions and 1,135 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition.workspace = true
[dependencies]
igloo-storage = { workspace = true }
igloo-verifier = { workspace = true }
solana-prio-graph-scheduler = { workspace = true }
tempfile = { workspace = true }

thiserror = { workspace = true }
Expand Down
35 changes: 15 additions & 20 deletions executor/bin/scheduling_simulation.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use crossbeam_channel::{unbounded, Receiver, Sender};
use igloo_executor::processor::TransactionProcessor;
use igloo_executor::scheduling::prio_graph_scheduler::PrioGraphScheduler;
use igloo_executor::scheduling::scheduler_messages::{
ConsumeWork, FinishedConsumeWork, TransactionBatchId,
};
use igloo_executor::scheduling::status_slicing::{
calculate_thread_load_summary, SvmWorkerSlicingStatus, WorkerStatusUpdate,
};
use igloo_executor::scheduling::stopwatch::StopWatch;
use igloo_executor::scheduling::ScheduledTransaction;
use igloo_storage::{config::GlobalConfig, RollupStorage};
use igloo_verifier::settings::{Settings, Switchs};
use itertools::Itertools;
use solana_prio_graph_scheduler::prio_graph_scheduler::PrioGraphScheduler;
use solana_prio_graph_scheduler::scheduler_messages::{
ConsumeWork, FinishedConsumeWork, TransactionBatchId,
};
use solana_program::hash::Hash;
use solana_program::instruction::{AccountMeta, Instruction};
use solana_sdk::account::AccountSharedData;
use solana_sdk::transaction::{SanitizedTransaction, Transaction};
use solana_sdk::transaction::{SanitizedTransaction, SanitizedVersionedTransaction, VersionedTransaction};
use solana_sdk::{
pubkey::Pubkey, signature::Keypair, signer::Signer, system_program, system_transaction,
};
Expand All @@ -41,11 +39,12 @@ fn mocking_transfer_tx(
to: &Pubkey,
amount: u64,
recent_blockhash: Hash,
) -> Result<SanitizedTransaction, E> {
) -> Result<SanitizedVersionedTransaction, E> {
let transaction = system_transaction::transfer(from, to, amount, recent_blockhash);
Ok(SanitizedTransaction::from_transaction_for_tests(
transaction,
))
let versioned_transaction = VersionedTransaction::from(transaction);
Ok(SanitizedVersionedTransaction::try_new(
versioned_transaction,
)?)
}

const TOTAL_TX_NUM: usize = 1024 * 16;
Expand All @@ -68,8 +67,7 @@ const SCHEDULER_BATCH_SIZE: usize = 2048;
/// A Result containing the number of successfully processed transactions, or an error
fn worker_process(
thread_id: usize,
// TODO Arc.
receiver: Receiver<Vec<ScheduledTransaction>>,
receiver: Receiver<ConsumeWork>,
store: Arc<RollupStorage>,
settings: Settings,
status_sender: Sender<WorkerStatusUpdate>,
Expand All @@ -94,8 +92,7 @@ fn worker_process(
status: idle_status,
})?;

let sanitized_txs: Vec<SanitizedTransaction> =
scheduled_txs.into_iter().map(|st| st.transaction).collect();
let sanitized_txs: Vec<SanitizedTransaction> = scheduled_txs.transactions;

let execute_result = bank_processor.process(Cow::Borrowed(&sanitized_txs))?;
success_count += execute_result
Expand All @@ -116,11 +113,11 @@ fn worker_process(
eprintln!("send status error: {:?}", e);
}
let finish_work = FinishedConsumeWork {
thread_id,
work: ConsumeWork {
batch_id: TransactionBatchId::new(0),
ids: vec![],
transactions: sanitized_txs,
max_ages: vec![],
},
retryable_indexes: vec![],
};
Expand All @@ -142,10 +139,8 @@ fn main() -> Result<(), E> {
let mut stopwatch = StopWatch::new("scheduling_simulation");

// workload channel.
let (senders, receivers): (
Vec<Sender<Vec<ScheduledTransaction>>>,
Vec<Receiver<Vec<ScheduledTransaction>>>,
) = (0..TOTAL_WORKER_NUM).map(|_| unbounded()).unzip();
let (senders, receivers): (Vec<Sender<ConsumeWork>>, Vec<Receiver<ConsumeWork>>) =
(0..TOTAL_WORKER_NUM).map(|_| unbounded()).unzip();

// thread slice_status update channel
let (status_sender, status_receiver) = unbounded();
Expand Down
72 changes: 72 additions & 0 deletions executor/src/scheduling/lazy_channel_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crossbeam_channel as channel;
use std::marker::PhantomData;

pub struct LazyReceiver<A, B, F>
where
F: Fn(A) -> B,
{
inner: channel::Receiver<A>,
transform: F,
_phantom: PhantomData<B>,
}

impl<A, B, F> LazyReceiver<A, B, F>
where
F: Fn(A) -> B,
{
pub fn new(rx: channel::Receiver<A>, transform: F) -> Self {
LazyReceiver {
inner: rx,
transform,
_phantom: PhantomData,
}
}

pub fn recv(&self) -> Result<B, channel::RecvError> {
self.inner.recv().map(|a| (self.transform)(a))
}

pub fn try_recv(&self) -> Result<B, channel::TryRecvError> {
self.inner.try_recv().map(|a| (self.transform)(a))
}
}

pub fn lazy_wrapper<A, B, F>(rx: channel::Receiver<A>, transform: F) -> LazyReceiver<A, B, F>
where
F: Fn(A) -> B,
{
LazyReceiver::new(rx, transform)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_lazy_receiver_recv() {
let (tx, rx) = channel::unbounded();
let lazy_rx = lazy_wrapper(rx, |x: i32| x * 2);

tx.send(5).unwrap();
tx.send(10).unwrap();

assert_eq!(lazy_rx.recv().unwrap(), 10);
assert_eq!(lazy_rx.recv().unwrap(), 20);
}

#[test]
fn test_lazy_receiver_try_recv() {
let (tx, rx) = channel::unbounded();
let lazy_rx = lazy_wrapper(rx, |s: String| s.len());

assert!(lazy_rx.try_recv().is_err());

tx.send("hello".to_string()).unwrap();
assert_eq!(lazy_rx.try_recv().unwrap(), 5);

tx.send("world".to_string()).unwrap();
assert_eq!(lazy_rx.try_recv().unwrap(), 5);

assert!(lazy_rx.try_recv().is_err());
}
}
52 changes: 43 additions & 9 deletions executor/src/scheduling/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use solana_sdk::transaction::SanitizedTransaction;
use solana_prio_graph_scheduler::deserializable_packet::DeserializableTxPacket;
use solana_program::hash::Hash;
use solana_sdk::transaction::{SanitizedTransaction, SanitizedVersionedTransaction};
use std::cmp::Ordering;
use solana_sdk::packet::Packet;

pub mod prio_graph_scheduler;
pub mod read_write_account_set;
pub mod seq_id_generator;
pub mod thread_aware_account_locks;
pub mod transaction_state_container;
pub mod stopwatch;

mod scheduler;
pub mod status_slicing;
pub mod scheduler_messages;
pub mod stopwatch;
mod lazy_channel_wrapper;

/// Represents a scheduled transaction with additional metadata
///
Expand All @@ -20,5 +21,38 @@ pub struct ScheduledTransaction {
/// The priority of this transaction in the scheduling queue
pub priority: u64,
/// The actual transaction data
pub transaction: SanitizedTransaction,
pub transaction: SanitizedVersionedTransaction,
}

impl PartialEq for ScheduledTransaction {
fn eq(&self, other: &Self) -> bool {
self.id.eq(&other.id)
}
}

impl PartialOrd for ScheduledTransaction {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.id.partial_cmp(&other.id)
}
}

impl Eq for ScheduledTransaction {}

impl DeserializableTxPacket for ScheduledTransaction {
type DeserializeError = ();

fn new(packet: Packet) -> Result<Self, Self::DeserializeError> {
todo!()
}
fn transaction(&self) -> &SanitizedVersionedTransaction {
&self.transaction
}

fn message_hash(&self) -> &Hash {
todo!()
}

fn is_simple_vote(&self) -> bool {
todo!()
}
}
Loading

0 comments on commit 3aadd4b

Please sign in to comment.