Skip to content

Commit

Permalink
grpc multiplexing integrate (#255)
Browse files Browse the repository at this point in the history
allow multiple GRPC sources
  • Loading branch information
grooviegermanikus authored Dec 22, 2023
1 parent b91f054 commit 5cfb6de
Show file tree
Hide file tree
Showing 9 changed files with 484 additions and 131 deletions.
93 changes: 64 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"

[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
#geyser-grpc-connector = { tag = "v0.5.0+yellowstone.1.11+solana.1.16.17", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = "0.7.0+yellowstone.1.11"

solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-transaction-status = { workspace = true }
Expand All @@ -24,6 +28,7 @@ bs58 = { workspace = true }
base64 = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
merge-streams = "0.1.2"
bytes = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
Expand Down
66 changes: 66 additions & 0 deletions cluster-endpoints/src/grpc_inspect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use log::{debug, warn};
use solana_lite_rpc_core::types::BlockStream;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;

pub fn block_debug_listen(
mut block_notifier: BlockStream,
commitment_config: CommitmentConfig,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut last_highest_slot_number = 0;

loop {
match block_notifier.recv().await {
Ok(block) => {
if block.commitment_config != commitment_config {
continue;
}

debug!(
"Saw block: {} @ {} with {} txs",
block.slot,
block.commitment_config.commitment,
block.transactions.len()
);

if last_highest_slot_number != 0 {
if block.parent_slot == last_highest_slot_number {
debug!(
"parent slot is correct ({} -> {})",
block.slot, block.parent_slot
);
} else {
warn!(
"parent slot not correct ({} -> {})",
block.slot, block.parent_slot
);
}
}

if block.slot > last_highest_slot_number {
last_highest_slot_number = block.slot;
} else {
// note: ATM this fails very often (using the RPC poller)
warn!(
"Monotonic check failed - block {} is out of order, last highest was {}",
block.slot, last_highest_slot_number
);
}
} // -- Ok
Err(RecvError::Lagged(missed_blocks)) => {
warn!(
"Could not keep up with producer - missed {} blocks",
missed_blocks
);
}
Err(other_err) => {
panic!("Error receiving block: {:?}", other_err);
}
}

// ...
}
})
}
Loading

0 comments on commit 5cfb6de

Please sign in to comment.