From 902ed7a1f815400cb0048390da3c6904cc1c3ae8 Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 25 Oct 2024 20:13:43 +1300 Subject: [PATCH] add p2p --- Cargo.toml | 2 +- README.md | 6 ++-- p2p/src/lib.rs | 55 ++++++++++++++++++++++++++++----- rollup/src/sequencer.rs | 49 +++++++++++++++++++++++++++--- rpc/Cargo.toml | 16 ++++++++++ rpc/src/main.rs | 67 +++++++++++++++++++++++++++++++++++++++++ script/src/main.rs | 21 ++++++++++++- sequencer/Cargo.toml | 1 + sequencer/src/main.rs | 6 ++-- 9 files changed, 206 insertions(+), 17 deletions(-) create mode 100644 rpc/Cargo.toml create mode 100644 rpc/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index ae53c96..726de0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["rollup", "script", "sequencer", "p2p"] +members = ["rollup", "script", "sequencer", "p2p", "rpc"] [workspace.dependencies] alloy-primitives = { version = "0.8.0", features = ["rand", "serde"] } diff --git a/README.md b/README.md index 97dce9c..57c0153 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,11 @@ Because of this design, Nolemma is capable of achieving near real-time block pro ## Repository Structure The workspace contains the following: +* `p2p` library crate for the basic p2p stack used by all nodes; * `rollup` library crate for all core types and functionality; -* `sequencer` binary crate for running a sequencer; and -* `script` binary crate for bootstrapping a local sequencer, sending transactions, and validating sealed blocks. +* `sequencer` binary crate for running a sequencer; +* `rpc` binary crate for running RPC nodes; and +* `script` binary crate for bootstrapping a local sequencer and RPC, sending transactions to the RPC, and validating sealed blocks from the sequencer. ## Usage diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 94d062d..f59b847 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -4,7 +4,8 @@ use std::collections::hash_map::DefaultHasher; use std::error::Error; use std::hash::{Hash, Hasher}; use std::time::Duration; -use tokio::{io, select}; +use tokio::sync::mpsc::Receiver; +use tokio::{io, select, task}; pub use gossipsub::Message as GossipMessage; @@ -72,6 +73,50 @@ impl Network { Ok(Self { swarm }) } + pub fn start(mut outbound: Receiver<(Vec, String)>) -> Receiver { + let (tx, rx) = tokio::sync::mpsc::channel(32); + let mut network = Network::new().unwrap(); + task::spawn(async move { + loop { + select! { + Some((data, topic)) = outbound.recv() => { + let topic = gossipsub::IdentTopic::new(topic); + if let Err(e) = network.swarm.behaviour_mut().gossipsub.publish(topic, data) { + println!("Failed to publish message: {e}"); + } + }, + event = network.swarm.select_next_some() => match event { + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { + for (peer_id, _multiaddr) in list { + println!("mDNS discovered a new peer: {peer_id}"); + network.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + } + }, + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => { + for (peer_id, _multiaddr) in list { + println!("mDNS discover peer has expired: {peer_id}"); + network.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + } + }, + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message { + propagation_source: _peer_id, + message_id: _id, + message, + })) => { + tx.send(message).await.unwrap(); + }, + SwarmEvent::NewListenAddr { address, .. } => { + println!("Local node is listening on {address}"); + } + _ => { + } + } + } + } + }); + rx + } + pub async fn poll(&mut self) -> Result, Box> { select! { event = self.swarm.select_next_some() => match event { @@ -90,14 +135,10 @@ impl Network { Ok(None) }, SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message { - propagation_source: peer_id, - message_id: id, + propagation_source: _peer_id, + message_id: _id, message, })) => { - println!( - "poll Got message: '{}' with id: {id} from peer: {peer_id}", - String::from_utf8_lossy(&message.data), - ); Ok(Some(message)) }, SwarmEvent::NewListenAddr { address, .. } => { diff --git a/rollup/src/sequencer.rs b/rollup/src/sequencer.rs index 955f01d..74a03a6 100644 --- a/rollup/src/sequencer.rs +++ b/rollup/src/sequencer.rs @@ -1,7 +1,15 @@ use std::sync::Arc; use log::info; -use tokio::sync::Mutex; +use p2p::GossipMessage; +use serde_json::json; +use tokio::{ + sync::{ + mpsc::{Receiver, Sender}, + Mutex, + }, + task, +}; use crate::{ Block, BlockHeader, Blockchain, SignedBlockHeader, SignedTransaction, Signer, Transaction, @@ -10,16 +18,30 @@ use crate::{ pub struct TransactionSubmitter { transactions_pool: Arc>>, + outbound: Sender<(Vec, String)>, } impl TransactionSubmitter { - pub fn new(transactions_pool: Arc>>) -> Self { - TransactionSubmitter { transactions_pool } + pub fn new( + transactions_pool: Arc>>, + outbound: Sender<(Vec, String)>, + ) -> Self { + TransactionSubmitter { + transactions_pool, + outbound, + } } pub async fn submit(&self, transaction: SignedTransaction) { let transactions_pool = self.transactions_pool.clone(); - transactions_pool.lock().await.push(transaction); + transactions_pool.lock().await.push(transaction.clone()); + self.outbound + .send(( + json!(transaction).to_string().as_bytes().to_vec(), + "transactions".to_string(), + )) + .await + .unwrap(); } } @@ -44,7 +66,26 @@ impl Sequencer { signer: impl Into, transactions_pool: Arc>>, blockchain: Arc>, + mut inbound: Receiver, ) -> Self { + let tx_pool = transactions_pool.clone(); + task::spawn(async move { + loop { + if let Some(msg) = inbound.recv().await { + match msg.topic.as_str() { + "transactions" => { + let transaction: SignedTransaction = + serde_json::from_slice(&msg.data).unwrap(); + tx_pool.lock().await.push(transaction); + } + "block" => { + unimplemented!("Handle incoming blocks"); + } + _ => {} + } + } + } + }); Sequencer { signer: signer.into(), transactions_pool, diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml new file mode 100644 index 0000000..b37b015 --- /dev/null +++ b/rpc/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "rpc" +version = "0.1.0" +edition = "2021" + +[dependencies] +bincode = "1.3" +clap = { version = "4.0", features = ["env"] } +rocket = { version = "0.5", features = ["json"] } +rollup = { package = "rollup", path = "../rollup", version = "0.1.0" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +sha2 = "0.10" +env_logger = { workspace = true } +tokio = { version = "1", features = ["full"] } +p2p = { package = "p2p", path = "../p2p", version = "0.1.0" } diff --git a/rpc/src/main.rs b/rpc/src/main.rs new file mode 100644 index 0000000..ea136ff --- /dev/null +++ b/rpc/src/main.rs @@ -0,0 +1,67 @@ +#[macro_use] +extern crate rocket; + +use std::sync::Arc; + +use rocket::State; +use rocket::{serde::json::Json, Config}; +use rollup::{Blockchain, SignedTransaction, TransactionSubmitter}; +use serde_json::{json, Value}; +use tokio::sync::Mutex; + +/// Accepts a transaction and adds it to the respective transaction pools. +#[post("/", data = "")] +async fn submit( + submitter: &State, + payload: Json, +) -> Value { + // Extract the transaction from the payload. + let transaction = payload.into_inner(); + let tx_digest = transaction.transaction.hash(); + + // Add the transaction to the pool. + submitter.submit(transaction).await; + + // Respond with the transaction digest. + json!({ "tx_digest": tx_digest.to_string() }) +} + +/// Returns the head block of the blockchain. +#[get("/")] +async fn head(chain: &State>>) -> Value { + // Retrieve the head block from the sequencer and return it. + let head = chain.lock().await.head(); + json!(head) +} + +#[launch] +#[tokio::main] +async fn rocket() -> _ { + env_logger::init(); + // Set up sequencer. + let pool = Arc::new(tokio::sync::Mutex::new(vec![])); + let chain = Arc::new(tokio::sync::Mutex::new(Blockchain::default())); + let (tx_out, rx_out) = tokio::sync::mpsc::channel::<(Vec, String)>(32); + let mut rx_in = p2p::Network::start(rx_out); + let submitter = TransactionSubmitter::new(pool, tx_out); + + // Spawn block producing sequencer task. + tokio::task::spawn(async move { + loop { + let msg = rx_in.recv().await.unwrap(); + println!("RPC Received message: {:?}", msg); + } + }); + + // Launch the HTTP server. + let mut config = Config { + log_level: rocket::config::LogLevel::Critical, + ..Config::debug_default() + }; + config.port = 8001; + rocket::build() + .configure(config) + .mount("/", routes![submit, head]) + .manage(submitter) + .manage(chain) +} diff --git a/script/src/main.rs b/script/src/main.rs index ef03563..a9de28e 100644 --- a/script/src/main.rs +++ b/script/src/main.rs @@ -4,6 +4,7 @@ use tokio::process::Command; /// Specifies the anticipated URL that the sequencer will listen on. const SEQUENCER_URL: &str = "127.0.0.1:8000"; +const RPC_URL: &str = "127.0.0.1:8001"; /// Runs the sequencer process and blocks on it's completion. async fn run_sequencer(sk: SecretKey) { @@ -22,10 +23,25 @@ async fn run_sequencer(sk: SecretKey) { .expect("Failure while waiting for sequencer process"); } +async fn run_rpc() { + let mut sequencer = Command::new("cargo") + .arg("run") + .arg("--bin") + .arg("rpc") + .arg("--") + .kill_on_drop(true) + .spawn() + .expect("Failed to start sequencer process"); + let _ = sequencer + .wait() + .await + .expect("Failure while waiting for sequencer process"); +} + /// Sends the provided transaction to the sequencer and waits for the response. async fn send_transaction(tx: SignedTransaction) -> Result { reqwest::Client::new() - .post(&format!("http://{}/", SEQUENCER_URL)) + .post(&format!("http://{}/", RPC_URL)) .json(&tx) .send() .await @@ -105,6 +121,9 @@ async fn main() { tokio::spawn(async move { run_sequencer(sk).await; }); + tokio::spawn(async move { + run_rpc().await; + }); // Continuously check the head block. tokio::spawn(async move { diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index fd24633..07517dd 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -13,3 +13,4 @@ serde_json = "1.0" sha2 = "0.10" env_logger = { workspace = true } tokio = { version = "1", features = ["full"] } +p2p = { package = "p2p", path = "../p2p", version = "0.1.0" } diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index cd4b6d4..2f13ee7 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -42,8 +42,10 @@ async fn rocket() -> _ { let sk = std::env::var("KEY").unwrap(); let pool = Arc::new(tokio::sync::Mutex::new(vec![])); let chain = Arc::new(tokio::sync::Mutex::new(Blockchain::default())); - let mut sequencer = Sequencer::new(sk.as_str(), pool.clone(), chain.clone()); - let submitter = TransactionSubmitter::new(pool); + let (tx_out, rx_out) = tokio::sync::mpsc::channel::<(Vec, String)>(32); + let rx_in = p2p::Network::start(rx_out); + let mut sequencer = Sequencer::new(sk.as_str(), pool.clone(), chain.clone(), rx_in); + let submitter = TransactionSubmitter::new(pool, tx_out); // Spawn block producing sequencer task. tokio::task::spawn(async move {