Skip to content

Commit

Permalink
Implementing prioritiztion heap, and using it to sort transactions by…
Browse files Browse the repository at this point in the history
… priority
  • Loading branch information
godmodegalactus committed Mar 16, 2024
1 parent 374777c commit f0442ce
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 136 deletions.
7 changes: 4 additions & 3 deletions core/src/stores/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ impl DataCache {
sent_transaction_info: &SentTransactionInfo,
) -> bool {
let last_block_height = self.block_information_store.get_last_blockheight();
self.txs
.is_transaction_confirmed(&sent_transaction_info.signature)
|| last_block_height > sent_transaction_info.last_valid_block_height
last_block_height > sent_transaction_info.last_valid_block_height
|| self
.txs
.is_transaction_confirmed(&sent_transaction_info.signature)
}

pub async fn get_current_epoch(&self, commitment: CommitmentConfig) -> Epoch {
Expand Down
1 change: 1 addition & 0 deletions core/src/structures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod identity_stakes;
pub mod leader_data;
pub mod leaderschedule;
pub mod notifications;
pub mod prioritization_fee_heap;
pub mod produced_block;
pub mod proxy_request_format;
pub mod rotating_queue;
Expand Down
156 changes: 156 additions & 0 deletions core/src/structures/prioritization_fee_heap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::{
collections::{BTreeMap, VecDeque},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

use dashmap::DashSet;
use tokio::sync::Mutex;

use super::transaction_sent_info::SentTransactionInfo;

#[derive(Default, Clone)]
pub struct PrioritizationFeesHeap {
signatures: DashSet<String>,
map: Arc<Mutex<BTreeMap<u64, VecDeque<SentTransactionInfo>>>>,
min_prioritization_fees: Arc<AtomicU64>,
max_number_of_transactions: usize,
}

impl PrioritizationFeesHeap {
pub fn new(max_number_of_transactions: usize) -> Self {
Self {
signatures: DashSet::new(),
map: Arc::new(Mutex::new(BTreeMap::new())),
min_prioritization_fees: Arc::new(AtomicU64::new(0)),
max_number_of_transactions,
}
}

pub async fn pop(&self) -> Option<SentTransactionInfo> {
let mut write_lock = self.map.lock().await;
if let Some(mut entry) = write_lock.last_entry() {
if let Some(element) = entry.get_mut().pop_front() {
if entry.get().is_empty() {
entry.remove();
}
self.signatures.remove(&element.signature);
return Some(element);
}
}
None
}

pub async fn insert(&self, tx: SentTransactionInfo) {
if self.signatures.len() >= self.max_number_of_transactions {
// check if prioritization is more than prioritization in the map
if tx.prioritization_fee <= self.min_prioritization_fees.load(Ordering::Relaxed) {
return;
}
}
if self.signatures.contains(&tx.signature) {
// signature already in the list
return;
}

self.signatures.insert(tx.signature.clone());
let mut write_lock = self.map.lock().await;
match write_lock.get_mut(&tx.prioritization_fee) {
Some(value) => {
value.push_back(tx);
}
None => {
let mut vec_d = VecDeque::new();
let prioritization_fee = tx.prioritization_fee;
vec_d.push_back(tx);
write_lock.insert(prioritization_fee, vec_d);
}
}
if self.signatures.len() > self.max_number_of_transactions {
match write_lock.first_entry() {
Some(mut first_entry) => {
let tx_info = first_entry.get_mut().pop_front();
if let Some(tx_info) = tx_info {
self.signatures.remove(&tx_info.signature);
}
if first_entry.get().is_empty() {
first_entry.remove_entry();
}
}
None => {
panic!("Should not happen");
}
}
}
}

pub async fn remove_expired_transactions(&self, current_blockheight: u64) {
let mut write_lock = self.map.lock().await;
for (_, entry) in write_lock.iter_mut() {
entry.retain(|x| {
let retain = x.last_valid_block_height > current_blockheight;
if !retain {
self.signatures.remove(&x.signature);
}
retain
});
}
}
}

#[tokio::test]
pub async fn test_prioritization_heap() {
let p_heap = PrioritizationFeesHeap::new(4);
let tx_creator = |signature, prioritization_fee| SentTransactionInfo {
signature,
slot: 0,
transaction: vec![],
last_valid_block_height: 0,
prioritization_fee,
};

let tx_0 = tx_creator("0".to_string(), 0);
let tx_1 = tx_creator("1".to_string(), 10);
let tx_2 = tx_creator("2".to_string(), 100);
let tx_3 = tx_creator("3".to_string(), 0);
p_heap.insert(tx_0.clone()).await;
p_heap.insert(tx_0.clone()).await;
p_heap.insert(tx_0.clone()).await;
p_heap.insert(tx_0.clone()).await;
p_heap.insert(tx_0.clone()).await;
p_heap.insert(tx_1.clone()).await;
p_heap.insert(tx_2.clone()).await;
p_heap.insert(tx_2.clone()).await;
p_heap.insert(tx_2.clone()).await;
p_heap.insert(tx_2.clone()).await;
p_heap.insert(tx_3.clone()).await;

assert_eq!(p_heap.pop().await, Some(tx_2));
assert_eq!(p_heap.pop().await, Some(tx_1));
assert_eq!(p_heap.pop().await, Some(tx_0));
assert_eq!(p_heap.pop().await, Some(tx_3));
assert_eq!(p_heap.pop().await, None);

let tx_0 = tx_creator("0".to_string(), 0);
let tx_1 = tx_creator("1".to_string(), 10);
let tx_2 = tx_creator("2".to_string(), 100);
let tx_3 = tx_creator("3".to_string(), 0);
let tx_4 = tx_creator("4".to_string(), 0);
let tx_5 = tx_creator("5".to_string(), 1000);
let tx_6 = tx_creator("6".to_string(), 10);
p_heap.insert(tx_0.clone()).await;
p_heap.insert(tx_1.clone()).await;
p_heap.insert(tx_2.clone()).await;
p_heap.insert(tx_3.clone()).await;
p_heap.insert(tx_4.clone()).await;
p_heap.insert(tx_5.clone()).await;
p_heap.insert(tx_6.clone()).await;

assert_eq!(p_heap.pop().await, Some(tx_5));
assert_eq!(p_heap.pop().await, Some(tx_2));
assert_eq!(p_heap.pop().await, Some(tx_1));
assert_eq!(p_heap.pop().await, Some(tx_6));
assert_eq!(p_heap.pop().await, None);
}
3 changes: 2 additions & 1 deletion core/src/structures/transaction_sent_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use solana_sdk::slot_history::Slot;

pub type WireTransaction = Vec<u8>;

#[derive(Clone, Debug, PartialEq, PartialOrd)]
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq)]
pub struct SentTransactionInfo {
pub signature: String,
pub slot: Slot,
pub transaction: WireTransaction,
pub last_valid_block_height: u64,
pub prioritization_fee: u64,
}
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ pub fn build_raw_sample_tx(i: u32) -> SentTransactionInfo {
slot: 1,
transaction,
last_valid_block_height: 300,
prioritization_fee: 0,
}
}

Expand Down
Loading

0 comments on commit f0442ce

Please sign in to comment.