diff --git a/one/src/daemon.rs b/one/src/daemon.rs index d2189ac5..45230821 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -19,7 +19,7 @@ use ceramic_sql::sqlite::SqlitePool; use clap::Args; use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; -use recon::{FullInterests, Recon, ReconInterestProvider}; +use recon::{Recon, ReconInterestProvider}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use std::sync::Arc; @@ -530,13 +530,6 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { // Construct a recon implementation for peers. let recon_peer = Recon::new(peer_svc.clone(), PeerKeyInterests, recon_metrics.clone()); - // Construct a recon implementation for interests. - let recon_interest = Recon::new( - interest_svc.clone(), - FullInterests::default(), - recon_metrics.clone(), - ); - // Construct a recon implementation for models. let recon_model = Recon::new( model_svc.clone(), @@ -545,7 +538,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { recon_metrics, ); - let recons = Some((recon_peer, recon_interest, recon_model)); + let recons = Some((recon_peer, recon_model)); let ipfs_metrics = ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register); let p2p_metrics = MetricsHandle::register(ceramic_p2p::Metrics::register); diff --git a/one/src/network.rs b/one/src/network.rs index 8ed52a2a..e447a5b8 100644 --- a/one/src/network.rs +++ b/one/src/network.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use ceramic_core::{EventId, Interest, NodeId, NodeKey, PeerKey}; +use ceramic_core::{EventId, NodeId, NodeKey, PeerKey}; use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService}; use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, PeerService}; use iroh_rpc_client::P2pClient; @@ -34,18 +34,17 @@ impl BuilderState for WithP2p {} /// Configure the p2p service impl Builder { - pub async fn with_p2p( + pub async fn with_p2p( self, libp2p_config: Libp2pConfig, node_key: NodeKey, peer_svc: impl PeerService + 'static, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, metrics: ceramic_p2p::Metrics, ) -> anyhow::Result> where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index cee741d0..1bd8581f 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -1,7 +1,7 @@ use std::time::Duration; use anyhow::Result; -use ceramic_core::{EventId, Interest, PeerKey}; +use ceramic_core::{EventId, PeerKey}; use iroh_bitswap::{Bitswap, Block, Config as BitswapConfig}; use libp2p::{ autonat, @@ -36,7 +36,7 @@ pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION" /// Libp2p behaviour for the node. #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "Event")] -pub(crate) struct NodeBehaviour +pub(crate) struct NodeBehaviour where S: iroh_bitswap::Store + Send + Sync, { @@ -56,13 +56,12 @@ where relay: Toggle, relay_client: Toggle, dcutr: Toggle, - recon: Toggle>, + recon: Toggle>, } -impl NodeBehaviour +impl NodeBehaviour where P: Recon + Send + Sync, - I: Recon + Send + Sync, M: Recon + Send + Sync, S: iroh_bitswap::Store + Send + Sync, { @@ -70,7 +69,7 @@ where local_key: &Keypair, config: &Libp2pConfig, relay_client: Option, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, @@ -186,8 +185,8 @@ where .with_max_pending_incoming(Some(config.max_conns_pending_in)) .with_max_established_per_peer(Some(config.max_conns_per_peer)), ); - let recon = recons.map(|(peer, interest, model)| { - recon::libp2p::Behaviour::new(peer, interest, model, recon::libp2p::Config::default()) + let recon = recons.map(|(peer, model)| { + recon::libp2p::Behaviour::new(peer, model, recon::libp2p::Config::default()) }); Ok(NodeBehaviour { ping: Ping::default(), diff --git a/p2p/src/node.rs b/p2p/src/node.rs index fdccee70..ebc034a8 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration}; use ahash::AHashMap; use anyhow::{anyhow, bail, Context, Result}; -use ceramic_core::{EventId, Interest, NodeKey, PeerKey}; +use ceramic_core::{EventId, NodeKey, PeerKey}; use ceramic_metrics::{libp2p_metrics, Recorder}; use cid::Cid; use futures_util::stream::StreamExt; @@ -62,15 +62,14 @@ pub enum NetworkEvent { /// Node implements a peer to peer node that participates on the Ceramic network. /// /// Node provides an external API via RpcMessages. -pub struct Node +pub struct Node where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { metrics: Metrics, - swarm: Swarm>, + swarm: Swarm>, supported_protocols: HashSet, net_receiver_in: Receiver, dial_queries: AHashMap>>>, @@ -92,10 +91,9 @@ where active_address_probe: Option, } -impl fmt::Debug for Node +impl fmt::Debug for Node where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { @@ -128,10 +126,9 @@ const NICE_INTERVAL: Duration = Duration::from_secs(6); const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60); const EXPIRY_INTERVAL: Duration = Duration::from_secs(1); -impl Drop for Node +impl Drop for Node where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { @@ -143,12 +140,10 @@ where // Allow IntoConnectionHandler deprecated associated type. // We are not using IntoConnectionHandler directly only referencing the type as part of this event signature. -type NodeSwarmEvent = - SwarmEvent< as NetworkBehaviour>::ToSwarm>; -impl Node +type NodeSwarmEvent = SwarmEvent< as NetworkBehaviour>::ToSwarm>; +impl Node where P: Recon + Send + Sync, - I: Recon + Send + Sync, M: Recon + Send + Sync, S: iroh_bitswap::Store + Send + Sync, { @@ -157,7 +152,7 @@ where rpc_addr: P2pAddr, node_key: NodeKey, peer_svc: impl PeerService + 'static, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, metrics: Metrics, ) -> Result { @@ -494,7 +489,7 @@ where #[tracing::instrument(skip_all)] async fn handle_swarm_event( &mut self, - event: NodeSwarmEvent, + event: NodeSwarmEvent, ) -> Result> { libp2p_metrics().record(&event); match event { diff --git a/p2p/src/swarm.rs b/p2p/src/swarm.rs index 2095292c..13352cf0 100644 --- a/p2p/src/swarm.rs +++ b/p2p/src/swarm.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use ceramic_core::{EventId, Interest, PeerKey}; +use ceramic_core::{EventId, PeerKey}; use libp2p::{dns, noise, relay, tcp, tls, yamux, Swarm, SwarmBuilder}; use libp2p_identity::Keypair; use recon::{libp2p::Recon, Sha256a}; @@ -28,17 +28,16 @@ fn get_dns_config() -> (dns::ResolverConfig, dns::ResolverOpts) { } } -pub(crate) async fn build_swarm( +pub(crate) async fn build_swarm( config: &Libp2pConfig, keypair: Keypair, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, -) -> Result>> +) -> Result>> where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { @@ -105,18 +104,17 @@ where } } -fn new_behavior( +fn new_behavior( config: &Libp2pConfig, keypair: &Keypair, relay_client: Option, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, -) -> Result> +) -> Result> where P: Recon + Send, - I: Recon + Send, M: Recon + Send, S: iroh_bitswap::Store, { diff --git a/p2p/tests/node.rs b/p2p/tests/node.rs index e40ef6ca..676a65f1 100644 --- a/p2p/tests/node.rs +++ b/p2p/tests/node.rs @@ -6,7 +6,7 @@ use ceramic_event_svc::store::SqlitePool; use iroh_rpc_client::P2pClient; use iroh_rpc_types::Addr; use libp2p::{Multiaddr, PeerId}; -use recon::{FullInterests, Recon, ReconInterestProvider}; +use recon::{Recon, ReconInterestProvider}; use test_log::test; use ceramic_p2p::{Config, Metrics, NetworkEvent, Node, PeerKeyInterests}; @@ -50,11 +50,6 @@ impl TestRunnerBuilder { Arc::clone(&peer_svc), Some(( Recon::new(peer_svc, PeerKeyInterests, recon_metrics.clone()), - Recon::new( - Arc::clone(&interest_svc), - FullInterests::default(), - recon_metrics.clone(), - ), Recon::new( Arc::clone(&event_svc), ReconInterestProvider::new(node_key.id(), interest_svc), diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index bf25cd92..46f29d52 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -20,7 +20,7 @@ mod upgrade; pub use crate::protocol::Recon; pub use stream_set::StreamSet; -use ceramic_core::{EventId, Interest, PeerKey}; +use ceramic_core::{EventId, PeerKey}; use futures::{future::BoxFuture, FutureExt}; use libp2p::{ core::ConnectedPoint, @@ -43,8 +43,6 @@ use crate::{ /// Name of the Recon protocol for synchronizing peers pub const PROTOCOL_NAME_PEER: &str = "/ceramic/recon/0.1.0/peer"; -/// Name of the Recon protocol for synchronizing interests -pub const PROTOCOL_NAME_INTEREST: &str = "/ceramic/recon/0.1.0/interest"; /// Name of the Recon protocol for synchronizing models pub const PROTOCOL_NAME_MODEL: &str = "/ceramic/recon/0.1.0/model"; @@ -76,9 +74,8 @@ impl Default for Config { /// The Behavior tracks all peers on the network that speak the Recon protocol. /// It is responsible for starting and stopping syncs with various peers depending on the needs of /// the application. -pub struct Behaviour { +pub struct Behaviour { peer: P, - interest: I, model: M, config: Config, peers: BTreeMap, @@ -87,15 +84,13 @@ pub struct Behaviour { next_sync: Option>, } -impl std::fmt::Debug for Behaviour +impl std::fmt::Debug for Behaviour where P: std::fmt::Debug, - I: std::fmt::Debug, M: std::fmt::Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Behaviour") - .field("interest", &self.interest) .field("model", &self.model) .field("config", &self.config) .field("peers", &self.peers) @@ -148,18 +143,16 @@ pub enum PeerStatus { Stopped, } -impl Behaviour { +impl Behaviour { /// Create a new Behavior with the provided Recon implementation. - pub fn new(peer: P, interest: I, model: M, config: Config) -> Self + pub fn new(peer: P, model: M, config: Config) -> Self where P: Recon, - I: Recon, M: Recon, { let (tx, rx) = tokio::sync::mpsc::channel(1000); Self { peer, - interest, model, config, peers: BTreeMap::new(), @@ -178,13 +171,12 @@ impl Behaviour { } } -impl NetworkBehaviour for Behaviour +impl NetworkBehaviour for Behaviour where P: Recon, - I: Recon, M: Recon, { - type ConnectionHandler = Handler; + type ConnectionHandler = Handler; type ToSwarm = Event; @@ -205,13 +197,8 @@ where next_sync: BTreeMap::from_iter([ // Schedule all stream_sets initially (StreamSet::Peer, Instant::now()), - // Schedule interests after peers - ( - StreamSet::Interest, - Instant::now() + Duration::from_millis(1), - ), - // Schedule models after interests - (StreamSet::Model, Instant::now() + Duration::from_millis(2)), + // Schedule models after peers + (StreamSet::Model, Instant::now() + Duration::from_millis(1)), ]), sync_delay: Default::default(), }); @@ -395,7 +382,6 @@ where connection_id, handler::State::WaitingInbound, self.peer.clone(), - self.interest.clone(), self.model.clone(), )) } @@ -416,7 +402,6 @@ where stream_set: StreamSet::Peer, }, self.peer.clone(), - self.interest.clone(), self.model.clone(), )) } diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index f4e45927..dfec0fa1 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -5,7 +5,7 @@ use std::{collections::VecDeque, task::Poll}; use anyhow::Result; -use ceramic_core::{EventId, Interest, PeerKey}; +use ceramic_core::{EventId, PeerKey}; use libp2p::{ futures::FutureExt, swarm::{ @@ -22,20 +22,18 @@ use crate::{ }; #[derive(Debug)] -pub struct Handler { +pub struct Handler { remote_peer_id: PeerId, connection_id: ConnectionId, peer: P, - interest: I, model: M, state: State, behavior_events_queue: VecDeque, } -impl Handler +impl Handler where P: Recon, - I: Recon, M: Recon, { pub fn new( @@ -43,14 +41,12 @@ where connection_id: ConnectionId, state: State, peer: P, - interest: I, model: M, ) -> Self { Self { remote_peer_id: peer_id, connection_id, peer, - interest, model, state, behavior_events_queue: VecDeque::new(), @@ -156,10 +152,9 @@ pub enum FromHandler { }, } -impl ConnectionHandler for Handler +impl ConnectionHandler for Handler where P: Recon + Clone + Send + 'static, - I: Recon + Clone + Send + 'static, M: Recon + Clone + Send + 'static, { type FromBehaviour = FromBehaviour; @@ -173,7 +168,7 @@ where &self, ) -> libp2p::swarm::SubstreamProtocol { SubstreamProtocol::new( - MultiReadyUpgrade::new(vec![StreamSet::Peer, StreamSet::Interest, StreamSet::Model]), + MultiReadyUpgrade::new(vec![StreamSet::Peer, StreamSet::Model]), (), ) } @@ -276,14 +271,6 @@ where stream, ) .boxed(), - StreamSet::Interest => protocol::respond_synchronize( - self.remote_peer_id, - self.connection_id, - stream_set, - self.interest.clone(), - stream, - ) - .boxed(), StreamSet::Model => protocol::respond_synchronize( self.remote_peer_id, self.connection_id, @@ -321,14 +308,6 @@ where stream, ) .boxed(), - StreamSet::Interest => protocol::initiate_synchronize( - self.remote_peer_id, - self.connection_id, - stream_set, - self.interest.clone(), - stream, - ) - .boxed(), StreamSet::Model => protocol::initiate_synchronize( self.remote_peer_id, self.connection_id, diff --git a/recon/src/libp2p/stream_set.rs b/recon/src/libp2p/stream_set.rs index e5dcf446..879e31f4 100644 --- a/recon/src/libp2p/stream_set.rs +++ b/recon/src/libp2p/stream_set.rs @@ -1,14 +1,12 @@ use anyhow::anyhow; -use super::{PROTOCOL_NAME_INTEREST, PROTOCOL_NAME_MODEL, PROTOCOL_NAME_PEER}; +use super::{PROTOCOL_NAME_MODEL, PROTOCOL_NAME_PEER}; /// Represents a stream set key #[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub enum StreamSet { /// Stream set of peer ranges Peer, - /// Stream set of interest ranges - Interest, /// Stream set of models Model, } @@ -18,7 +16,6 @@ impl StreamSet { pub fn sort_key(&self) -> &str { match self { StreamSet::Peer => "peer", - StreamSet::Interest => "interest", StreamSet::Model => "model", } } @@ -31,7 +28,6 @@ impl TryFrom<&str> for StreamSet { match value { "peer" => Ok(StreamSet::Peer), "model" => Ok(StreamSet::Model), - "interest" => Ok(StreamSet::Interest), _ => Err(anyhow!("unknown sort_key {}", value)), } } @@ -40,7 +36,6 @@ impl TryFrom<&str> for StreamSet { impl AsRef for StreamSet { fn as_ref(&self) -> &str { match self { - StreamSet::Interest => PROTOCOL_NAME_INTEREST, StreamSet::Peer => PROTOCOL_NAME_PEER, StreamSet::Model => PROTOCOL_NAME_MODEL, } diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index ed11d688..27c2d810 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -120,24 +120,12 @@ macro_rules! setup_test { Metrics::register(&mut Registry::default()), ); - let alice_interest = Recon::new( - $alice_interest, - FullInterests::default(), - Metrics::register(&mut Registry::default()), - ); - let bob_peer = Recon::new( $bob_peer, FullInterests::default(), Metrics::register(&mut Registry::default()), ); - let bob_interest = Recon::new( - $bob_interest, - FullInterests::default(), - Metrics::register(&mut Registry::default()), - ); - let bob = Recon::new( $bob_store, FullInterests::default(), @@ -151,11 +139,9 @@ macro_rules! setup_test { per_peer_maximum_sync_delay: std::time::Duration::from_millis(1000), }; let swarm1 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(alice_peer, alice_interest, alice, config.clone()) - }); - let swarm2 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(bob_peer, bob_interest, bob, config) + crate::libp2p::Behaviour::new(alice_peer, alice, config.clone()) }); + let swarm2 = Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(bob_peer, bob, config)); (swarm1, swarm2) }}; @@ -179,7 +165,7 @@ async fn in_sync_no_overlap() { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 6]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 4], [crate::libp2p::Event; 4]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; assert_in_sync(p2, p1_events); @@ -208,7 +194,7 @@ async fn initiator_model_error() { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 5]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 4], [crate::libp2p::Event; 3]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; for ev in p1_events.iter().chain(p2_events.iter()) { @@ -216,7 +202,7 @@ async fn initiator_model_error() { } assert_eq!( - p1_events[5], + p1_events[3], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm2.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -248,14 +234,14 @@ async fn responder_model_error() { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 6]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 4], [crate::libp2p::Event; 4]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; for ev in p1_events.iter().chain(p2_events.iter()) { info!("{:?}", ev); } assert_eq!( - p2_events[5], + p2_events[3], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -288,7 +274,7 @@ async fn model_error_backoff() { swarm2.connect(&mut swarm1).await; // Expect interests to sync twice in a row since models fail to sync - let (p1_events, p2_events): ([crate::libp2p::Event; 18], [crate::libp2p::Event; 18]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 12], [crate::libp2p::Event; 12]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; let events = [p1_events, p2_events]; @@ -317,27 +303,18 @@ async fn model_error_backoff() { // First peers sync Some(StreamSet::Peer), Some(StreamSet::Peer), - // First interests sync - Some(StreamSet::Interest), - Some(StreamSet::Interest), // First model sync Some(StreamSet::Model), Some(StreamSet::Model), // Second peers sync Some(StreamSet::Peer), Some(StreamSet::Peer), - // Second interests sync - Some(StreamSet::Interest), - Some(StreamSet::Interest), // Second model sync with initial short backoff Some(StreamSet::Model), Some(StreamSet::Model), // Third peers sync Some(StreamSet::Peer), Some(StreamSet::Peer), - // Third interests sync - Some(StreamSet::Interest), - Some(StreamSet::Interest), // Third model sync is skipped because the backoff pushed it past the peer sync Some(StreamSet::Peer), Some(StreamSet::Peer), @@ -348,7 +325,7 @@ async fn model_error_backoff() { ); // Assert we saw the errors assert_eq!( - events[1][5], + events[1][3], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -357,7 +334,7 @@ async fn model_error_backoff() { }) ); assert_eq!( - events[1][11], + events[1][7], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -376,7 +353,7 @@ fn into_peer_event(ev: crate::libp2p::Event) -> PeerEvent { } } -fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { +fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 4]) { assert_eq!( into_peer_event(events[0].clone()), PeerEvent { @@ -398,25 +375,6 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { ); assert_eq!( into_peer_event(events[2].clone()), - PeerEvent { - remote_peer_id: id, - status: PeerStatus::Started { - stream_set: StreamSet::Interest - } - } - ); - assert_eq!( - into_peer_event(events[3].clone()), - PeerEvent { - remote_peer_id: id, - status: PeerStatus::Synchronized { - stream_set: StreamSet::Interest, - new_count: 0, - } - } - ); - assert_eq!( - into_peer_event(events[4].clone()), PeerEvent { remote_peer_id: id, status: PeerStatus::Started { @@ -425,7 +383,7 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { } ); assert_eq!( - into_peer_event(events[5].clone()), + into_peer_event(events[3].clone()), PeerEvent { remote_peer_id: id, status: PeerStatus::Synchronized {