Skip to content

Commit

Permalink
simple gsfa compressed matrix
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Nov 20, 2023
1 parent 1dedc5d commit f96987b
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 7 deletions.
7 changes: 4 additions & 3 deletions lite-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }
solana-lite-rpc-history = { workspace = true }
fxhash = "0.2.1"
csv = "1.3.0"
countmap = "0.2.0"
itertools = "0.12.0"

[dev-dependencies]
bench = { path = "../bench" }
fxhash = "0.2.1"
csv = "1.3.0"
countmap = "0.2.0"
140 changes: 136 additions & 4 deletions lite-rpc/tests/storage_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use log::{debug, error, info};
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription;
use solana_lite_rpc_core::structures::epoch::EpochCache;
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
use solana_lite_rpc_core::structures::produced_block::{ProducedBlock, TransactionInfo};
use solana_lite_rpc_core::traits::block_storage_interface::BlockStorageInterface;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore;
Expand All @@ -21,13 +21,15 @@ use chrono::format::format_item;
use countmap::CountMap;
use csv::WriterBuilder;
use fxhash::FxHasher32;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use solana_sdk::blake3::{hash, Hash, HASH_BYTES};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::message::VersionedMessage;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::shred_version::compute_shred_version;
use solana_sdk::signature::Signature;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;
use tokio::time::sleep;
Expand Down Expand Up @@ -162,6 +164,7 @@ fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {

let mut csv_writer = WriterBuilder::new().from_path(format!("collissions.csv")).unwrap();
let mut slots = HashSet::new();
let mut num_accounts: Vec<usize> = Vec::new();

loop {
match block_notifier.recv().await {
Expand All @@ -178,10 +181,15 @@ fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {
let started = Instant::now();
let mut count = 0;
for tx in block.transactions {
// info!("tx {}", tx.signature);
if is_vote(&tx) {
continue;
}

info!("tx {}", tx.signature);
num_accounts.push(tx.static_account_keys.len());

for acc in tx.static_account_keys {
// info!("- {}", acc);
info!("- {}", acc);
let hash: u32 = hash32(&acc);
let hash2: u32 = hash32_check(&acc);
count_by_key.insert_or_increment(acc.clone());
Expand Down Expand Up @@ -224,8 +232,15 @@ fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {

count += 1;

if count % 10000 == 0 {
if count % 1000 == 0 {
info!("Distinct account seen so far: {}", count_by_key.len());

let (sum,cnt) = num_accounts.iter().fold((0, 0), |acc, &n| {
(acc.0 + n, acc.1 + 1)
});
// 10.2
info!("Avg size of account set: {}", sum as f64 / cnt as f64);

}

}
Expand All @@ -251,6 +266,123 @@ fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {
})
}

// n txs x m distinct accounts
struct MatrixCompressionChunk {
txs: Vec<CompressedTransaction>,
// TODO is u32 enough?
global_collision_protection: Vec<u32>,
}

impl MatrixCompressionChunk {
pub fn new() -> Self {
Self {
txs: Vec::new(),
global_collision_protection: Vec::new(),
}
}
pub fn is_in_domain(&self, account_key: &Pubkey) -> bool {
let gh = hash32_check(&account_key);
self.global_collision_protection.binary_search(&gh).is_ok()
}
pub fn insert(&mut self, tx_sig: &Pubkey, account_keys: &[Pubkey]) {

let sorted_hashed_accounts = account_keys.iter().map(|acc| hash32(&acc)).sorted().collect_vec();

info!("sorted_hashed_accounts= {:?}", sorted_hashed_accounts);

self.txs.push(CompressedTransaction {
tx_sig: tx_sig.clone(),
acc_compressed: sorted_hashed_accounts,
});

for account_key in account_keys {
let gh = hash32_check(&account_key);
let matching = self.global_collision_protection.binary_search(&gh);
match matching {
Err(insertion_idx) => {
self.global_collision_protection.insert(insertion_idx, gh);
}
Ok(_) => continue,
}
}

}

pub fn get_signatures_for_address(&self, account_key: &Pubkey) -> Vec<Pubkey> {
if !self.is_in_domain(account_key) {
// account pub key not mapped
return vec![];
}

let hash = hash32(account_key);
let signatures =
self.txs.iter()
.filter(|tx| tx.has_account_by_hash(hash))
.map(|tx| tx.tx_sig.clone())
.collect_vec();

return signatures;
}
}

struct CompressedTransaction {
tx_sig: Pubkey,
// must be sorted!
acc_compressed: Vec<u32>,
}

impl CompressedTransaction {
pub fn has_account(&self, acc: &Pubkey) -> bool {
let hash = hash32(acc);
self.acc_compressed.binary_search(&hash).is_ok()
}
pub fn has_account_by_hash(&self, account_hash: u32) -> bool {
self.acc_compressed.binary_search(&account_hash).is_ok()
}
}

#[test]
fn insert_into_matrix() {
tracing_subscriber::fmt::init();

let mut matrix = MatrixCompressionChunk::new();
let tx_sig = Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap();
let account1 = Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ").unwrap();
let account2 = Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu").unwrap();

matrix.insert(&tx_sig, &[account1, account2]);

}


#[test]
fn gsfa() {
tracing_subscriber::fmt::init();

let mut matrix = MatrixCompressionChunk::new();

let account1 = Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ").unwrap();
let account2 = Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu").unwrap();

let tx1_sig = Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap();
matrix.insert(&tx1_sig, &[account1, account2]);
let tx2_sig = Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeoq").unwrap();
matrix.insert(&tx2_sig, &[account1]);

let found = matrix.get_signatures_for_address(&Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ").unwrap());
assert_eq!(2, found.len());

let found = matrix.get_signatures_for_address(&Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu").unwrap());
assert_eq!(1, found.len());
}


fn is_vote(tx: &TransactionInfo) -> bool {
let vote_account = Pubkey::from_str("Vote111111111111111111111111111111111111111").unwrap();
let maybe_vote = tx.static_account_keys.get(2);
maybe_vote.map(|acc| acc == &vote_account).unwrap_or(false)
}

#[inline]
fn hash16(p0: &Pubkey) -> u16 {
fxhash::hash32(p0.as_ref()) as u16
Expand Down

0 comments on commit f96987b

Please sign in to comment.