Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A Simple Scheduling Simulation #31

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"verifier",
"prio-graph-scheduler",
"rpc",
"scheduler",
]
resolver = "2"

Expand Down Expand Up @@ -112,9 +113,11 @@ spl-pod = "=0.3.0"
igloo-interface = { path = "interface" }
igloo-storage = { path = "storage" }
igloo-verifier = { path = "verifier" }
igloo-executor = { path = "executor" }
igloo-rpc = { path = "rpc" }
svm-executor = { path = "svm/executor" }
solana-prio-graph-scheduler = { path = "prio-graph-scheduler" }
igloo-schduler = { path = "scheduler" }

[patch.crates-io]
#crossbeam-epoch = { git = "https://github.com/anza-xyz/crossbeam", rev = "fd279d707025f0e60951e429bf778b4813d1b6bf" } // Patch `crossbeam-epoch v0.9.5 was not used in the crate graph
Expand Down
3 changes: 1 addition & 2 deletions prio-graph-scheduler/src/deserializable_packet.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

use solana_sdk::hash::Hash;
use solana_sdk::packet::Packet;
use solana_sdk::transaction::{SanitizedVersionedTransaction};
use solana_sdk::transaction::SanitizedVersionedTransaction;
use std::error::Error;

/// DeserializablePacket can be deserialized from a Packet.
Expand Down
214 changes: 106 additions & 108 deletions prio-graph-scheduler/src/in_flight_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,124 +1,122 @@
use {
crate::id_generator::IdGenerator,
crate::thread_aware_account_locks::ThreadId,
crate::scheduler_messages::TransactionBatchId,
std::collections::HashMap,
crate::id_generator::IdGenerator, crate::scheduler_messages::TransactionBatchId,
crate::thread_aware_account_locks::ThreadId, std::collections::HashMap,
};

/// Tracks the number of transactions that are in flight for each thread.
pub struct InFlightTracker {
num_in_flight_per_thread: Vec<usize>,
cus_in_flight_per_thread: Vec<u64>,
batches: HashMap<TransactionBatchId, BatchEntry>,
batch_id_generator: IdGenerator,
num_in_flight_per_thread: Vec<usize>,
cus_in_flight_per_thread: Vec<u64>,
batches: HashMap<TransactionBatchId, BatchEntry>,
batch_id_generator: IdGenerator,
}

struct BatchEntry {
thread_id: ThreadId,
num_transactions: usize,
total_cus: u64,
thread_id: ThreadId,
num_transactions: usize,
total_cus: u64,
}

impl InFlightTracker {
pub fn new(num_threads: usize) -> Self {
Self {
num_in_flight_per_thread: vec![0; num_threads],
cus_in_flight_per_thread: vec![0; num_threads],
batches: HashMap::new(),
batch_id_generator: IdGenerator::default(),
}
}

/// Returns the number of transactions that are in flight for each thread.
pub fn num_in_flight_per_thread(&self) -> &[usize] {
&self.num_in_flight_per_thread
}

/// Returns the number of cus that are in flight for each thread.
pub fn cus_in_flight_per_thread(&self) -> &[u64] {
&self.cus_in_flight_per_thread
}

/// Tracks number of transactions and CUs in-flight for the `thread_id`.
/// Returns a `TransactionBatchId` that can be used to stop tracking the batch
/// when it is complete.
pub fn track_batch(
&mut self,
num_transactions: usize,
total_cus: u64,
thread_id: ThreadId,
) -> TransactionBatchId {
let batch_id = self.batch_id_generator.next();
self.num_in_flight_per_thread[thread_id] += num_transactions;
self.cus_in_flight_per_thread[thread_id] += total_cus;
self.batches.insert(
batch_id,
BatchEntry {
thread_id,
num_transactions,
total_cus,
},
);

batch_id
}

/// Stop tracking the batch with given `batch_id`.
/// Removes the number of transactions for the scheduled thread.
/// Returns the thread id that the batch was scheduled on.
///
/// # Panics
/// Panics if the batch id does not exist in the tracker.
pub fn complete_batch(&mut self, batch_id: TransactionBatchId) -> ThreadId {
let Some(BatchEntry {
thread_id,
num_transactions,
total_cus,
}) = self.batches.remove(&batch_id)
else {
panic!("batch id {batch_id} is not being tracked");
};
self.num_in_flight_per_thread[thread_id] -= num_transactions;
self.cus_in_flight_per_thread[thread_id] -= total_cus;

thread_id
}
pub fn new(num_threads: usize) -> Self {
Self {
num_in_flight_per_thread: vec![0; num_threads],
cus_in_flight_per_thread: vec![0; num_threads],
batches: HashMap::new(),
batch_id_generator: IdGenerator::default(),
}
}

/// Returns the number of transactions that are in flight for each thread.
pub fn num_in_flight_per_thread(&self) -> &[usize] {
&self.num_in_flight_per_thread
}

/// Returns the number of cus that are in flight for each thread.
pub fn cus_in_flight_per_thread(&self) -> &[u64] {
&self.cus_in_flight_per_thread
}

/// Tracks number of transactions and CUs in-flight for the `thread_id`.
/// Returns a `TransactionBatchId` that can be used to stop tracking the batch
/// when it is complete.
pub fn track_batch(
&mut self,
num_transactions: usize,
total_cus: u64,
thread_id: ThreadId,
) -> TransactionBatchId {
let batch_id = self.batch_id_generator.next();
self.num_in_flight_per_thread[thread_id] += num_transactions;
self.cus_in_flight_per_thread[thread_id] += total_cus;
self.batches.insert(
batch_id,
BatchEntry {
thread_id,
num_transactions,
total_cus,
},
);

batch_id
}

/// Stop tracking the batch with given `batch_id`.
/// Removes the number of transactions for the scheduled thread.
/// Returns the thread id that the batch was scheduled on.
///
/// # Panics
/// Panics if the batch id does not exist in the tracker.
pub fn complete_batch(&mut self, batch_id: TransactionBatchId) -> ThreadId {
let Some(BatchEntry {
thread_id,
num_transactions,
total_cus,
}) = self.batches.remove(&batch_id)
else {
panic!("batch id {batch_id} is not being tracked");
};
self.num_in_flight_per_thread[thread_id] -= num_transactions;
self.cus_in_flight_per_thread[thread_id] -= total_cus;

thread_id
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
#[should_panic(expected = "is not being tracked")]
fn test_in_flight_tracker_untracked_batch() {
let mut in_flight_tracker = InFlightTracker::new(2);
in_flight_tracker.complete_batch(TransactionBatchId::new(5));
}

#[test]
fn test_in_flight_tracker() {
let mut in_flight_tracker = InFlightTracker::new(2);

// Add a batch with 2 transactions, 10 kCUs to thread 0.
let batch_id_0 = in_flight_tracker.track_batch(2, 10_000, 0);
assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[2, 0]);
assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[10_000, 0]);

// Add a batch with 1 transaction, 15 kCUs to thread 1.
let batch_id_1 = in_flight_tracker.track_batch(1, 15_000, 1);
assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[2, 1]);
assert_eq!(
in_flight_tracker.cus_in_flight_per_thread(),
&[10_000, 15_000]
);

in_flight_tracker.complete_batch(batch_id_0);
assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[0, 1]);
assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[0, 15_000]);

in_flight_tracker.complete_batch(batch_id_1);
assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[0, 0]);
assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[0, 0]);
}
use super::*;

#[test]
#[should_panic(expected = "is not being tracked")]
fn test_in_flight_tracker_untracked_batch() {
let mut in_flight_tracker = InFlightTracker::new(2);
in_flight_tracker.complete_batch(TransactionBatchId::new(5));
}

#[test]
fn test_in_flight_tracker() {
let mut in_flight_tracker = InFlightTracker::new(2);

// Add a batch with 2 transactions, 10 kCUs to thread 0.
let batch_id_0 = in_flight_tracker.track_batch(2, 10_000, 0);
assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[2, 0]);
assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[10_000, 0]);

// Add a batch with 1 transaction, 15 kCUs to thread 1.
let batch_id_1 = in_flight_tracker.track_batch(1, 15_000, 1);
assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[2, 1]);
assert_eq!(
in_flight_tracker.cus_in_flight_per_thread(),
&[10_000, 15_000]
);

in_flight_tracker.complete_batch(batch_id_0);
assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[0, 1]);
assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[0, 15_000]);

in_flight_tracker.complete_batch(batch_id_1);
assert_eq!(in_flight_tracker.num_in_flight_per_thread(), &[0, 0]);
assert_eq!(in_flight_tracker.cus_in_flight_per_thread(), &[0, 0]);
}
}
4 changes: 1 addition & 3 deletions prio-graph-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ mod tests {
message::Message,
sanitize::SanitizeError,
signature::Signature,
transaction::{
SanitizedVersionedTransaction, VersionedTransaction,
},
transaction::{SanitizedVersionedTransaction, VersionedTransaction},
},
solana_short_vec::decode_shortu16_len,
std::{cmp::Ordering, mem::size_of},
Expand Down
4 changes: 3 additions & 1 deletion prio-graph-scheduler/src/prio_graph_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use {
in_flight_tracker::InFlightTracker,
read_write_account_set::ReadWriteAccountSet,
scheduler_error::SchedulerError,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId, TransactionId, MaxAge},
scheduler_messages::{
ConsumeWork, FinishedConsumeWork, MaxAge, TransactionBatchId, TransactionId,
},
thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet},
transaction_priority_id::TransactionPriorityId,
transaction_state::{SanitizedTransactionTTL, TransactionState},
Expand Down
Loading
Loading