Skip to content

Commit

Permalink
Backport: Granular slot updates v2.0 (#52) (#58)
Browse files Browse the repository at this point in the history
Add implementation for on_entry to get more granular slot updates
  • Loading branch information
ebin-mathews authored Nov 14, 2024
1 parent 88ebc51 commit 84dce0e
Show file tree
Hide file tree
Showing 12 changed files with 581 additions and 17 deletions.
96 changes: 94 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rand = "0.8"
serde = "1.0.160"
serde_derive = "1.0.160"
serde_json = "1.0.96"
serde_with = "=3.9.0"
solana-account-decoder = "=2.0.15"
solana-logger = "=2.0.15"
solana-metrics = "=2.0.15"
Expand Down
36 changes: 34 additions & 2 deletions client/src/geyser_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::{
use jito_geyser_protos::solana::geyser::{
geyser_client::GeyserClient, maybe_partial_account_update, EmptyRequest,
MaybePartialAccountUpdate, SubscribeAccountUpdatesRequest,
SubscribePartialAccountUpdatesRequest, SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
SubscribePartialAccountUpdatesRequest, SubscribeSlotEntryUpdateRequest,
SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
};
use log::*;
use lru::LruCache;
Expand All @@ -32,7 +33,9 @@ use tonic::{codegen::InterceptedService, transport::Channel, Response, Status};

use crate::{
geyser_consumer::GeyserConsumerError::{MissedHeartbeat, StreamClosed},
types::{AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotUpdate},
types::{
AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotEntryUpdate, SlotUpdate,
},
GrpcInterceptor, Pubkey, Slot,
};

Expand Down Expand Up @@ -242,6 +245,35 @@ impl GeyserConsumer {
Ok(())
}

pub async fn consume_slot_entry_updates(
&self,
slot_updates_tx: UnboundedSender<SlotEntryUpdate>,
) -> Result<()> {
let mut c = self.client.clone();

let resp = c
.subscribe_slot_entry_updates(SubscribeSlotEntryUpdateRequest {})
.await?;
let mut stream = resp.into_inner();

while !self.exit.load(Ordering::Relaxed) {
match stream.message().await {
Ok(Some(slot_update)) => {
if slot_updates_tx
.send(SlotEntryUpdate::from(slot_update.entry_update.unwrap()))
.is_err()
{
return Err(GeyserConsumerError::ConsumerChannelDisconnected);
};
}
Ok(None) => return Err(StreamClosed),
Err(e) => return Err(GeyserConsumerError::from(e)),
}
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
fn process_account_update(
maybe_message: std::result::Result<Option<TimestampedAccountUpdate>, Status>,
Expand Down
14 changes: 14 additions & 0 deletions client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,17 @@ impl From<geyser::SlotUpdate> for SlotUpdate {
}
}
}

pub struct SlotEntryUpdate {
pub slot: Slot,
pub index: u64,
}

impl From<geyser::SlotEntryUpdate> for SlotEntryUpdate {
fn from(proto: geyser::SlotEntryUpdate) -> Self {
Self {
slot: proto.slot,
index: proto.index,
}
}
}
27 changes: 27 additions & 0 deletions proto/proto/geyser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,29 @@ message GetHeartbeatIntervalResponse {
uint64 heartbeat_interval_ms = 1;
}

/// Modelled based off of https://github.com/solana-labs/solana/blob/v2.0/geyser-plugin-interface/src/geyser_plugin_interface.rs#L210
/// If more details are needed can extend this structure.
message SlotEntryUpdate {
// The slot number of the block containing this Entry
uint64 slot = 1;
// The Entry's index in the block
uint64 index = 2;
// The number of executed transactions in the Entry
// If this number is zero, we can assume its a tick entry
uint64 executed_transaction_count = 3;
}

message TimestampedSlotEntryUpdate {
// Time at which the message was generated
// Send relative timestamp in micros using u32 to reduce overhead. Provides ~71 mins of accuracy between sender and receiver
// See [compact_timestamp::to_system_time]
uint32 ts = 1;
// SlotEntryUpdate update
SlotEntryUpdate entry_update = 2;
}

message SubscribeSlotEntryUpdateRequest {}

// The following __must__ be assumed:
// - Clients may receive data for slots out of order.
// - Clients may receive account updates for a given slot out of order.
Expand Down Expand Up @@ -186,4 +209,8 @@ service Geyser {

// Subscribes to block updates.
rpc SubscribeBlockUpdates(SubscribeBlockUpdatesRequest) returns (stream TimestampedBlockUpdate) {}

// Subscribes to entry updates.
// Returns the highest slot seen thus far and the entry index corresponding to the tick
rpc SubscribeSlotEntryUpdates(SubscribeSlotEntryUpdateRequest) returns (stream TimestampedSlotEntryUpdate) {}
}
6 changes: 4 additions & 2 deletions proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use solana_transaction_status::{
TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta,
};

use crate::solana::{entries, tx_by_addr};
use crate::{solana::storage::confirmed_block, StoredExtendedRewards, StoredTransactionStatusMeta};
use crate::{
solana::{entries, storage::confirmed_block, tx_by_addr},
StoredExtendedRewards, StoredTransactionStatusMeta,
};

impl From<Vec<Reward>> for confirmed_block::Rewards {
fn from(rewards: Vec<Reward>) -> Self {
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ prost-types = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
solana-logger = { workspace = true }
solana-metrics = { workspace = true }
solana-program = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion server/example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
"bind_address": "0.0.0.0:10000",
"account_update_buffer_size": 100000,
"slot_update_buffer_size": 100000,
"slot_entry_update_buffer_size": 1000000,
"block_update_buffer_size": 100000,
"transaction_update_buffer_size": 100000,
"geyser_service_config": {
"heartbeat_interval_ms": 1000,
"subscriber_buffer_size": 1000000
}
}
}
Loading

0 comments on commit 84dce0e

Please sign in to comment.