Skip to content

Commit

Permalink
Implement header Store
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Sep 18, 2023
1 parent 8df7715 commit ffa6055
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 15 deletions.
3 changes: 3 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "Apache-2.0"
[dependencies]
celestia-proto = { workspace = true }
celestia-types = { workspace = true }
tendermint = { workspace = true }
tendermint-proto = { workspace = true }

async-trait = "0.1.73"
Expand All @@ -20,12 +21,14 @@ libp2p = { version = "0.52.3", features = [
"request-response",
] }
prost = "0.12.0"
rand = "0.8.5"
thiserror = "1.0.48"
tokio = { version = "1.32.0", features = ["macros", "sync"] }
tracing = "0.1.37"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.32.0", features = ["rt-multi-thread"] }
dashmap = "5.5.3"

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.10", features = ["js"] }
Expand Down
5 changes: 4 additions & 1 deletion node/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use tracing::instrument;
mod client;
mod server;
mod utils;
pub use utils::ExtendedHeaderExt;

use crate::exchange::client::ExchangeClientHandler;
use crate::exchange::server::ExchangeServerHandler;
use crate::p2p::P2pError;
use crate::peer_tracker::PeerTracker;
use crate::store::Store;
use crate::utils::{stream_protocol_id, OneshotResultSender};

/// Max request size in bytes
Expand All @@ -48,6 +50,7 @@ pub(crate) struct ExchangeBehaviour {
pub(crate) struct ExchangeConfig<'a> {
pub network_id: &'a str,
pub peer_tracker: Arc<PeerTracker>,
pub header_store: Arc<Store>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -79,7 +82,7 @@ impl ExchangeBehaviour {
request_response::Config::default(),
),
client_handler: ExchangeClientHandler::new(config.peer_tracker),
server_handler: ExchangeServerHandler::new(),
server_handler: ExchangeServerHandler::new(config.header_store),
}
}

Expand Down
2 changes: 1 addition & 1 deletion node/src/exchange/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::exchange::utils::ExtendedHeaderExt;
use crate::exchange::utils::ToHeaderResponse;
use celestia_proto::p2p::pb::header_request::Data;
use celestia_proto::p2p::pb::StatusCode;
use celestia_types::consts::HASH_SIZE;
Expand Down
9 changes: 6 additions & 3 deletions node/src/exchange/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ use libp2p::{
request_response::{InboundFailure, RequestId, ResponseChannel},
PeerId,
};
use std::sync::Arc;
use tracing::instrument;

use crate::store::Store;

pub(super) struct ExchangeServerHandler {
// TODO
_store: Arc<Store>,
}

impl ExchangeServerHandler {
pub(super) fn new() -> Self {
ExchangeServerHandler {}
pub(super) fn new(store: Arc<Store>) -> Self {
ExchangeServerHandler { _store: store }
}

#[instrument(level = "trace", skip(self, _respond_to))]
Expand Down
62 changes: 60 additions & 2 deletions node/src/exchange/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use celestia_proto::header::pb::ExtendedHeader as RawExtendedHeader;
use celestia_proto::p2p::pb::header_request::Data;
use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse, StatusCode};
use celestia_types::consts::HASH_SIZE;
use celestia_types::{DataAvailabilityHeader, ValidatorSet};
use celestia_types::{ExtendedHeader, Hash};
use tendermint::block::header::Header;
use tendermint::block::Commit;
use tendermint::Time;
use tendermint::{block::header::Version, AppHash};
use tendermint_proto::Protobuf;

use crate::exchange::ExchangeError;
Expand Down Expand Up @@ -58,11 +64,63 @@ impl HeaderResponseExt for HeaderResponse {
}
}

pub(super) trait ExtendedHeaderExt {
fn to_header_response(&self) -> HeaderResponse;
pub trait ExtendedHeaderExt {
fn with_height(height: u64) -> ExtendedHeader;
}

impl ExtendedHeaderExt for ExtendedHeader {
fn with_height(height: u64) -> ExtendedHeader {
RawExtendedHeader {
header: Some(
Header {
version: Version { block: 11, app: 1 },
chain_id: "private".to_string().try_into().unwrap(),
height: height.try_into().unwrap(),
time: Time::now(),
last_block_id: None,
last_commit_hash: Hash::default(),
data_hash: Hash::default(),
validators_hash: Hash::default(),
next_validators_hash: Hash::default(),
consensus_hash: Hash::default(),
app_hash: AppHash::default(),
last_results_hash: Hash::default(),
evidence_hash: Hash::default(),
proposer_address: tendermint::account::Id::new([0; 20]),
}
.into(),
),
commit: Some(
Commit {
height: height.try_into().unwrap(),
block_id: tendermint::block::Id {
hash: Hash::Sha256(rand::random()),
..Default::default()
},
..Default::default()
}
.into(),
),
validator_set: Some(ValidatorSet::new(Vec::new(), None).into()),
dah: Some(
DataAvailabilityHeader {
row_roots: Vec::new(),
column_roots: Vec::new(),
hash: [0; 32],
}
.into(),
),
}
.try_into()
.unwrap()
}
}

pub(super) trait ToHeaderResponse {
fn to_header_response(&self) -> HeaderResponse;
}

impl ToHeaderResponse for ExtendedHeader {
fn to_header_response(&self) -> HeaderResponse {
HeaderResponse {
body: self.encode_vec().unwrap(),
Expand Down
4 changes: 2 additions & 2 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::identity::Keypair;
use libp2p::{Multiaddr, PeerId};
use tokio::sync::RwLock;

use crate::p2p::{P2p, P2pArgs, P2pService};
use crate::store::Store;
Expand Down Expand Up @@ -54,7 +53,7 @@ where
SyncerSrv: SyncerService<P2pSrv>,
{
pub async fn new(config: NodeConfig) -> Result<Self, NodeError<P2pSrv, SyncerSrv>> {
let store = Arc::new(RwLock::new(Store::new()));
let store = Arc::new(Store::new());

let p2p = Arc::new(
P2pSrv::start(P2pArgs {
Expand All @@ -63,6 +62,7 @@ where
local_keypair: config.p2p_local_keypair,
bootstrap_peers: config.p2p_bootstrap_peers,
listen_on: config.p2p_listen_on,
store: store.clone(),
})
.await
.map_err(NodeError::P2pService)?,
Expand Down
9 changes: 8 additions & 1 deletion node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ use libp2p::{identify, Multiaddr, PeerId, TransportError};
use tendermint_proto::Protobuf;
use tokio::select;
use tokio::sync::oneshot;
use tracing::{debug, instrument, warn};
use tracing::{debug, info, instrument, warn};

use crate::exchange::{ExchangeBehaviour, ExchangeConfig};
use crate::executor::{spawn, Executor};
use crate::peer_tracker::PeerTracker;
use crate::store::Store;
use crate::utils::{gossipsub_ident_topic, OneshotResultSender, OneshotSenderExt};
use crate::Service;

Expand Down Expand Up @@ -73,6 +74,7 @@ pub struct P2pArgs {
pub local_keypair: Keypair,
pub bootstrap_peers: Vec<Multiaddr>,
pub listen_on: Vec<Multiaddr>,
pub store: Arc<Store>,
}

#[doc(hidden)]
Expand Down Expand Up @@ -269,6 +271,7 @@ impl Worker {
let header_ex = ExchangeBehaviour::new(ExchangeConfig {
network_id: &args.network_id,
peer_tracker: peer_tracker.clone(),
header_store: args.store,
});

let behaviour = Behaviour {
Expand Down Expand Up @@ -338,6 +341,10 @@ impl Worker {
SwarmEvent::ConnectionClosed { peer_id, .. } => {
self.peer_tracker.remove(peer_id);
}
#[cfg(debug_assertions)]
SwarmEvent::NewListenAddr { address, .. } => {
info!("listening on: {address}");
}
_ => {}
}

Expand Down
Loading

0 comments on commit ffa6055

Please sign in to comment.