Skip to content

Commit

Permalink
Add p2p crate and use in sequencer
Browse files Browse the repository at this point in the history
  • Loading branch information
sergerad committed Oct 22, 2024
1 parent 05e4a66 commit e755625
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]

resolver = "2"
members = ["rollup", "script", "sequencer"]
members = ["rollup", "script", "sequencer", "p2p"]

[workspace.dependencies]
alloy-primitives = { version = "0.8.0", features = ["rand", "serde"] }
Expand Down
19 changes: 19 additions & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "p2p"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { workspace = true, features = ["full"] }
async-trait = "0.1"
futures = "0.3.30"
libp2p = { version = "^0.53", features = [
"tokio",
"gossipsub",
"mdns",
"noise",
"macros",
"tcp",
"yamux",
"quic",
] }
113 changes: 113 additions & 0 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use futures::stream::StreamExt;
use libp2p::{gossipsub, mdns, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use tokio::{io, select};

pub use gossipsub::Message as GossipMessage;

// We create a custom network behaviour that combines Gossipsub and Mdns.
#[derive(NetworkBehaviour)]
struct MyBehaviour {
gossipsub: gossipsub::Behaviour,
mdns: mdns::tokio::Behaviour,
}

pub struct Network {
swarm: libp2p::Swarm<MyBehaviour>,
}

impl Network {
pub fn new() -> Result<Self, Box<dyn Error>> {
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_quic()
.with_behaviour(|key| {
// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};

// Set a custom gossipsub configuration
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; // Temporary hack because `build` does not return a proper `std::error::Error`.

// build a gossipsub network behaviour
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
gossipsub_config,
)?;

let mdns = mdns::tokio::Behaviour::new(
mdns::Config::default(),
key.public().to_peer_id(),
)?;
Ok(MyBehaviour { gossipsub, mdns })
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

// Create a Gossipsub topic
let topic = gossipsub::IdentTopic::new("blocks");
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
let topic = gossipsub::IdentTopic::new("transactions");
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;

// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Ok(Self { swarm })
}

pub async fn poll(&mut self) -> Result<Option<GossipMessage>, Box<dyn Error>> {
select! {
event = self.swarm.select_next_some() => match event {
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
println!("mDNS discovered a new peer: {peer_id}");
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
Ok(None)
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, _multiaddr) in list {
println!("mDNS discover peer has expired: {peer_id}");
self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
Ok(None)
},
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,
})) => {
println!(
"poll Got message: '{}' with id: {id} from peer: {peer_id}",
String::from_utf8_lossy(&message.data),
);
Ok(Some(message))
},
SwarmEvent::NewListenAddr { address, .. } => {
println!("Local node is listening on {address}");
Ok(None)
}
_ => {
Ok(None)
}
}
}
}
}
1 change: 1 addition & 0 deletions rollup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ log = { workspace = true }
env_logger = { workspace = true }
sha2 = "0.10"
tokio = { version = "1", features = ["full"] }
p2p = { path = "../p2p" }
30 changes: 0 additions & 30 deletions rollup/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,33 +116,3 @@ impl Sequencer {
self.blockchain.lock().await.head()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_sequencer() {
// Create a sequencer.
let signer = Signer::random();
let pool = Arc::new(Mutex::new(vec![]));
let chain = Arc::new(Mutex::new(Blockchain::default()));
let mut sequencer = Sequencer::new(signer, pool, chain);

// Add a transaction to the sequencer.
let transaction = SignedTransaction::new(
Transaction::dynamic(sequencer.signer.address, 100, 1),
&sequencer.signer,
);
sequencer.add_transaction(transaction.clone()).await;

// Seal the block.
let block = sequencer.seal().await;

// Validate the block.
assert_eq!(block.transactions.len(), 1);
assert_eq!(block.transactions[0], transaction);
assert_eq!(sequencer.head().await, Some(block.clone()));
assert!(block.verify());
}
}

0 comments on commit e755625

Please sign in to comment.