From 56f7c5b3dd4e92d95d185d2359b3c161da6113b7 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 10 Dec 2024 09:50:29 -0700 Subject: [PATCH 1/2] feat: stop synchronizing interests With this change the interests ring is no longer synchronized between peers. A local node will still use the interest-svc to know its own interests and the beginning of each Recon conversation negotiates shared interests. Therefore it is no longer necessary to synchronize interests. In the future we may decide that we want to use Recon to sync interests instead of a linear sharing of interests before each conversation, however that is only a performance optimization that is not important at this stage. Fixes #610 Fixes #611 --- one/src/daemon.rs | 11 ++--------- one/src/network.rs | 7 +++---- p2p/src/behaviour.rs | 15 +++++++-------- p2p/src/node.rs | 23 +++++++++-------------- p2p/src/swarm.rs | 16 +++++++--------- recon/src/libp2p.rs | 33 +++++++++------------------------ recon/src/libp2p/handler.rs | 31 +++++-------------------------- recon/src/libp2p/stream_set.rs | 7 +------ 8 files changed, 43 insertions(+), 100 deletions(-) diff --git a/one/src/daemon.rs b/one/src/daemon.rs index d2189ac5..45230821 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -19,7 +19,7 @@ use ceramic_sql::sqlite::SqlitePool; use clap::Args; use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; -use recon::{FullInterests, Recon, ReconInterestProvider}; +use recon::{Recon, ReconInterestProvider}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use std::sync::Arc; @@ -530,13 +530,6 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { // Construct a recon implementation for peers. let recon_peer = Recon::new(peer_svc.clone(), PeerKeyInterests, recon_metrics.clone()); - // Construct a recon implementation for interests. - let recon_interest = Recon::new( - interest_svc.clone(), - FullInterests::default(), - recon_metrics.clone(), - ); - // Construct a recon implementation for models. let recon_model = Recon::new( model_svc.clone(), @@ -545,7 +538,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { recon_metrics, ); - let recons = Some((recon_peer, recon_interest, recon_model)); + let recons = Some((recon_peer, recon_model)); let ipfs_metrics = ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register); let p2p_metrics = MetricsHandle::register(ceramic_p2p::Metrics::register); diff --git a/one/src/network.rs b/one/src/network.rs index 8ed52a2a..e447a5b8 100644 --- a/one/src/network.rs +++ b/one/src/network.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use ceramic_core::{EventId, Interest, NodeId, NodeKey, PeerKey}; +use ceramic_core::{EventId, NodeId, NodeKey, PeerKey}; use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService}; use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, PeerService}; use iroh_rpc_client::P2pClient; @@ -34,18 +34,17 @@ impl BuilderState for WithP2p {} /// Configure the p2p service impl Builder { - pub async fn with_p2p( + pub async fn with_p2p( self, libp2p_config: Libp2pConfig, node_key: NodeKey, peer_svc: impl PeerService + 'static, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, metrics: ceramic_p2p::Metrics, ) -> anyhow::Result> where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index cee741d0..1bd8581f 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -1,7 +1,7 @@ use std::time::Duration; use anyhow::Result; -use ceramic_core::{EventId, Interest, PeerKey}; +use ceramic_core::{EventId, PeerKey}; use iroh_bitswap::{Bitswap, Block, Config as BitswapConfig}; use libp2p::{ autonat, @@ -36,7 +36,7 @@ pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION" /// Libp2p behaviour for the node. #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "Event")] -pub(crate) struct NodeBehaviour +pub(crate) struct NodeBehaviour where S: iroh_bitswap::Store + Send + Sync, { @@ -56,13 +56,12 @@ where relay: Toggle, relay_client: Toggle, dcutr: Toggle, - recon: Toggle>, + recon: Toggle>, } -impl NodeBehaviour +impl NodeBehaviour where P: Recon + Send + Sync, - I: Recon + Send + Sync, M: Recon + Send + Sync, S: iroh_bitswap::Store + Send + Sync, { @@ -70,7 +69,7 @@ where local_key: &Keypair, config: &Libp2pConfig, relay_client: Option, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, @@ -186,8 +185,8 @@ where .with_max_pending_incoming(Some(config.max_conns_pending_in)) .with_max_established_per_peer(Some(config.max_conns_per_peer)), ); - let recon = recons.map(|(peer, interest, model)| { - recon::libp2p::Behaviour::new(peer, interest, model, recon::libp2p::Config::default()) + let recon = recons.map(|(peer, model)| { + recon::libp2p::Behaviour::new(peer, model, recon::libp2p::Config::default()) }); Ok(NodeBehaviour { ping: Ping::default(), diff --git a/p2p/src/node.rs b/p2p/src/node.rs index fdccee70..ebc034a8 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration}; use ahash::AHashMap; use anyhow::{anyhow, bail, Context, Result}; -use ceramic_core::{EventId, Interest, NodeKey, PeerKey}; +use ceramic_core::{EventId, NodeKey, PeerKey}; use ceramic_metrics::{libp2p_metrics, Recorder}; use cid::Cid; use futures_util::stream::StreamExt; @@ -62,15 +62,14 @@ pub enum NetworkEvent { /// Node implements a peer to peer node that participates on the Ceramic network. /// /// Node provides an external API via RpcMessages. -pub struct Node +pub struct Node where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { metrics: Metrics, - swarm: Swarm>, + swarm: Swarm>, supported_protocols: HashSet, net_receiver_in: Receiver, dial_queries: AHashMap>>>, @@ -92,10 +91,9 @@ where active_address_probe: Option, } -impl fmt::Debug for Node +impl fmt::Debug for Node where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { @@ -128,10 +126,9 @@ const NICE_INTERVAL: Duration = Duration::from_secs(6); const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60); const EXPIRY_INTERVAL: Duration = Duration::from_secs(1); -impl Drop for Node +impl Drop for Node where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { @@ -143,12 +140,10 @@ where // Allow IntoConnectionHandler deprecated associated type. // We are not using IntoConnectionHandler directly only referencing the type as part of this event signature. -type NodeSwarmEvent = - SwarmEvent< as NetworkBehaviour>::ToSwarm>; -impl Node +type NodeSwarmEvent = SwarmEvent< as NetworkBehaviour>::ToSwarm>; +impl Node where P: Recon + Send + Sync, - I: Recon + Send + Sync, M: Recon + Send + Sync, S: iroh_bitswap::Store + Send + Sync, { @@ -157,7 +152,7 @@ where rpc_addr: P2pAddr, node_key: NodeKey, peer_svc: impl PeerService + 'static, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, metrics: Metrics, ) -> Result { @@ -494,7 +489,7 @@ where #[tracing::instrument(skip_all)] async fn handle_swarm_event( &mut self, - event: NodeSwarmEvent, + event: NodeSwarmEvent, ) -> Result> { libp2p_metrics().record(&event); match event { diff --git a/p2p/src/swarm.rs b/p2p/src/swarm.rs index 2095292c..13352cf0 100644 --- a/p2p/src/swarm.rs +++ b/p2p/src/swarm.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use ceramic_core::{EventId, Interest, PeerKey}; +use ceramic_core::{EventId, PeerKey}; use libp2p::{dns, noise, relay, tcp, tls, yamux, Swarm, SwarmBuilder}; use libp2p_identity::Keypair; use recon::{libp2p::Recon, Sha256a}; @@ -28,17 +28,16 @@ fn get_dns_config() -> (dns::ResolverConfig, dns::ResolverOpts) { } } -pub(crate) async fn build_swarm( +pub(crate) async fn build_swarm( config: &Libp2pConfig, keypair: Keypair, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, -) -> Result>> +) -> Result>> where P: Recon, - I: Recon, M: Recon, S: iroh_bitswap::Store, { @@ -105,18 +104,17 @@ where } } -fn new_behavior( +fn new_behavior( config: &Libp2pConfig, keypair: &Keypair, relay_client: Option, - recons: Option<(P, I, M)>, + recons: Option<(P, M)>, block_store: Arc, peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, -) -> Result> +) -> Result> where P: Recon + Send, - I: Recon + Send, M: Recon + Send, S: iroh_bitswap::Store, { diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index bf25cd92..46f29d52 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -20,7 +20,7 @@ mod upgrade; pub use crate::protocol::Recon; pub use stream_set::StreamSet; -use ceramic_core::{EventId, Interest, PeerKey}; +use ceramic_core::{EventId, PeerKey}; use futures::{future::BoxFuture, FutureExt}; use libp2p::{ core::ConnectedPoint, @@ -43,8 +43,6 @@ use crate::{ /// Name of the Recon protocol for synchronizing peers pub const PROTOCOL_NAME_PEER: &str = "/ceramic/recon/0.1.0/peer"; -/// Name of the Recon protocol for synchronizing interests -pub const PROTOCOL_NAME_INTEREST: &str = "/ceramic/recon/0.1.0/interest"; /// Name of the Recon protocol for synchronizing models pub const PROTOCOL_NAME_MODEL: &str = "/ceramic/recon/0.1.0/model"; @@ -76,9 +74,8 @@ impl Default for Config { /// The Behavior tracks all peers on the network that speak the Recon protocol. /// It is responsible for starting and stopping syncs with various peers depending on the needs of /// the application. -pub struct Behaviour { +pub struct Behaviour { peer: P, - interest: I, model: M, config: Config, peers: BTreeMap, @@ -87,15 +84,13 @@ pub struct Behaviour { next_sync: Option>, } -impl std::fmt::Debug for Behaviour +impl std::fmt::Debug for Behaviour where P: std::fmt::Debug, - I: std::fmt::Debug, M: std::fmt::Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Behaviour") - .field("interest", &self.interest) .field("model", &self.model) .field("config", &self.config) .field("peers", &self.peers) @@ -148,18 +143,16 @@ pub enum PeerStatus { Stopped, } -impl Behaviour { +impl Behaviour { /// Create a new Behavior with the provided Recon implementation. - pub fn new(peer: P, interest: I, model: M, config: Config) -> Self + pub fn new(peer: P, model: M, config: Config) -> Self where P: Recon, - I: Recon, M: Recon, { let (tx, rx) = tokio::sync::mpsc::channel(1000); Self { peer, - interest, model, config, peers: BTreeMap::new(), @@ -178,13 +171,12 @@ impl Behaviour { } } -impl NetworkBehaviour for Behaviour +impl NetworkBehaviour for Behaviour where P: Recon, - I: Recon, M: Recon, { - type ConnectionHandler = Handler; + type ConnectionHandler = Handler; type ToSwarm = Event; @@ -205,13 +197,8 @@ where next_sync: BTreeMap::from_iter([ // Schedule all stream_sets initially (StreamSet::Peer, Instant::now()), - // Schedule interests after peers - ( - StreamSet::Interest, - Instant::now() + Duration::from_millis(1), - ), - // Schedule models after interests - (StreamSet::Model, Instant::now() + Duration::from_millis(2)), + // Schedule models after peers + (StreamSet::Model, Instant::now() + Duration::from_millis(1)), ]), sync_delay: Default::default(), }); @@ -395,7 +382,6 @@ where connection_id, handler::State::WaitingInbound, self.peer.clone(), - self.interest.clone(), self.model.clone(), )) } @@ -416,7 +402,6 @@ where stream_set: StreamSet::Peer, }, self.peer.clone(), - self.interest.clone(), self.model.clone(), )) } diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index f4e45927..dfec0fa1 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -5,7 +5,7 @@ use std::{collections::VecDeque, task::Poll}; use anyhow::Result; -use ceramic_core::{EventId, Interest, PeerKey}; +use ceramic_core::{EventId, PeerKey}; use libp2p::{ futures::FutureExt, swarm::{ @@ -22,20 +22,18 @@ use crate::{ }; #[derive(Debug)] -pub struct Handler { +pub struct Handler { remote_peer_id: PeerId, connection_id: ConnectionId, peer: P, - interest: I, model: M, state: State, behavior_events_queue: VecDeque, } -impl Handler +impl Handler where P: Recon, - I: Recon, M: Recon, { pub fn new( @@ -43,14 +41,12 @@ where connection_id: ConnectionId, state: State, peer: P, - interest: I, model: M, ) -> Self { Self { remote_peer_id: peer_id, connection_id, peer, - interest, model, state, behavior_events_queue: VecDeque::new(), @@ -156,10 +152,9 @@ pub enum FromHandler { }, } -impl ConnectionHandler for Handler +impl ConnectionHandler for Handler where P: Recon + Clone + Send + 'static, - I: Recon + Clone + Send + 'static, M: Recon + Clone + Send + 'static, { type FromBehaviour = FromBehaviour; @@ -173,7 +168,7 @@ where &self, ) -> libp2p::swarm::SubstreamProtocol { SubstreamProtocol::new( - MultiReadyUpgrade::new(vec![StreamSet::Peer, StreamSet::Interest, StreamSet::Model]), + MultiReadyUpgrade::new(vec![StreamSet::Peer, StreamSet::Model]), (), ) } @@ -276,14 +271,6 @@ where stream, ) .boxed(), - StreamSet::Interest => protocol::respond_synchronize( - self.remote_peer_id, - self.connection_id, - stream_set, - self.interest.clone(), - stream, - ) - .boxed(), StreamSet::Model => protocol::respond_synchronize( self.remote_peer_id, self.connection_id, @@ -321,14 +308,6 @@ where stream, ) .boxed(), - StreamSet::Interest => protocol::initiate_synchronize( - self.remote_peer_id, - self.connection_id, - stream_set, - self.interest.clone(), - stream, - ) - .boxed(), StreamSet::Model => protocol::initiate_synchronize( self.remote_peer_id, self.connection_id, diff --git a/recon/src/libp2p/stream_set.rs b/recon/src/libp2p/stream_set.rs index e5dcf446..879e31f4 100644 --- a/recon/src/libp2p/stream_set.rs +++ b/recon/src/libp2p/stream_set.rs @@ -1,14 +1,12 @@ use anyhow::anyhow; -use super::{PROTOCOL_NAME_INTEREST, PROTOCOL_NAME_MODEL, PROTOCOL_NAME_PEER}; +use super::{PROTOCOL_NAME_MODEL, PROTOCOL_NAME_PEER}; /// Represents a stream set key #[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub enum StreamSet { /// Stream set of peer ranges Peer, - /// Stream set of interest ranges - Interest, /// Stream set of models Model, } @@ -18,7 +16,6 @@ impl StreamSet { pub fn sort_key(&self) -> &str { match self { StreamSet::Peer => "peer", - StreamSet::Interest => "interest", StreamSet::Model => "model", } } @@ -31,7 +28,6 @@ impl TryFrom<&str> for StreamSet { match value { "peer" => Ok(StreamSet::Peer), "model" => Ok(StreamSet::Model), - "interest" => Ok(StreamSet::Interest), _ => Err(anyhow!("unknown sort_key {}", value)), } } @@ -40,7 +36,6 @@ impl TryFrom<&str> for StreamSet { impl AsRef for StreamSet { fn as_ref(&self) -> &str { match self { - StreamSet::Interest => PROTOCOL_NAME_INTEREST, StreamSet::Peer => PROTOCOL_NAME_PEER, StreamSet::Model => PROTOCOL_NAME_MODEL, } From 593fe6128469403eb0375145009053e3074c9b2a Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 10 Dec 2024 13:03:44 -0700 Subject: [PATCH 2/2] fix: test --- p2p/tests/node.rs | 7 +---- recon/src/libp2p/tests.rs | 66 +++++++-------------------------------- 2 files changed, 13 insertions(+), 60 deletions(-) diff --git a/p2p/tests/node.rs b/p2p/tests/node.rs index e40ef6ca..676a65f1 100644 --- a/p2p/tests/node.rs +++ b/p2p/tests/node.rs @@ -6,7 +6,7 @@ use ceramic_event_svc::store::SqlitePool; use iroh_rpc_client::P2pClient; use iroh_rpc_types::Addr; use libp2p::{Multiaddr, PeerId}; -use recon::{FullInterests, Recon, ReconInterestProvider}; +use recon::{Recon, ReconInterestProvider}; use test_log::test; use ceramic_p2p::{Config, Metrics, NetworkEvent, Node, PeerKeyInterests}; @@ -50,11 +50,6 @@ impl TestRunnerBuilder { Arc::clone(&peer_svc), Some(( Recon::new(peer_svc, PeerKeyInterests, recon_metrics.clone()), - Recon::new( - Arc::clone(&interest_svc), - FullInterests::default(), - recon_metrics.clone(), - ), Recon::new( Arc::clone(&event_svc), ReconInterestProvider::new(node_key.id(), interest_svc), diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index ed11d688..27c2d810 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -120,24 +120,12 @@ macro_rules! setup_test { Metrics::register(&mut Registry::default()), ); - let alice_interest = Recon::new( - $alice_interest, - FullInterests::default(), - Metrics::register(&mut Registry::default()), - ); - let bob_peer = Recon::new( $bob_peer, FullInterests::default(), Metrics::register(&mut Registry::default()), ); - let bob_interest = Recon::new( - $bob_interest, - FullInterests::default(), - Metrics::register(&mut Registry::default()), - ); - let bob = Recon::new( $bob_store, FullInterests::default(), @@ -151,11 +139,9 @@ macro_rules! setup_test { per_peer_maximum_sync_delay: std::time::Duration::from_millis(1000), }; let swarm1 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(alice_peer, alice_interest, alice, config.clone()) - }); - let swarm2 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(bob_peer, bob_interest, bob, config) + crate::libp2p::Behaviour::new(alice_peer, alice, config.clone()) }); + let swarm2 = Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(bob_peer, bob, config)); (swarm1, swarm2) }}; @@ -179,7 +165,7 @@ async fn in_sync_no_overlap() { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 6]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 4], [crate::libp2p::Event; 4]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; assert_in_sync(p2, p1_events); @@ -208,7 +194,7 @@ async fn initiator_model_error() { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 5]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 4], [crate::libp2p::Event; 3]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; for ev in p1_events.iter().chain(p2_events.iter()) { @@ -216,7 +202,7 @@ async fn initiator_model_error() { } assert_eq!( - p1_events[5], + p1_events[3], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm2.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -248,14 +234,14 @@ async fn responder_model_error() { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 6]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 4], [crate::libp2p::Event; 4]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; for ev in p1_events.iter().chain(p2_events.iter()) { info!("{:?}", ev); } assert_eq!( - p2_events[5], + p2_events[3], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -288,7 +274,7 @@ async fn model_error_backoff() { swarm2.connect(&mut swarm1).await; // Expect interests to sync twice in a row since models fail to sync - let (p1_events, p2_events): ([crate::libp2p::Event; 18], [crate::libp2p::Event; 18]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 12], [crate::libp2p::Event; 12]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; let events = [p1_events, p2_events]; @@ -317,27 +303,18 @@ async fn model_error_backoff() { // First peers sync Some(StreamSet::Peer), Some(StreamSet::Peer), - // First interests sync - Some(StreamSet::Interest), - Some(StreamSet::Interest), // First model sync Some(StreamSet::Model), Some(StreamSet::Model), // Second peers sync Some(StreamSet::Peer), Some(StreamSet::Peer), - // Second interests sync - Some(StreamSet::Interest), - Some(StreamSet::Interest), // Second model sync with initial short backoff Some(StreamSet::Model), Some(StreamSet::Model), // Third peers sync Some(StreamSet::Peer), Some(StreamSet::Peer), - // Third interests sync - Some(StreamSet::Interest), - Some(StreamSet::Interest), // Third model sync is skipped because the backoff pushed it past the peer sync Some(StreamSet::Peer), Some(StreamSet::Peer), @@ -348,7 +325,7 @@ async fn model_error_backoff() { ); // Assert we saw the errors assert_eq!( - events[1][5], + events[1][3], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -357,7 +334,7 @@ async fn model_error_backoff() { }) ); assert_eq!( - events[1][11], + events[1][7], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -376,7 +353,7 @@ fn into_peer_event(ev: crate::libp2p::Event) -> PeerEvent { } } -fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { +fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 4]) { assert_eq!( into_peer_event(events[0].clone()), PeerEvent { @@ -398,25 +375,6 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { ); assert_eq!( into_peer_event(events[2].clone()), - PeerEvent { - remote_peer_id: id, - status: PeerStatus::Started { - stream_set: StreamSet::Interest - } - } - ); - assert_eq!( - into_peer_event(events[3].clone()), - PeerEvent { - remote_peer_id: id, - status: PeerStatus::Synchronized { - stream_set: StreamSet::Interest, - new_count: 0, - } - } - ); - assert_eq!( - into_peer_event(events[4].clone()), PeerEvent { remote_peer_id: id, status: PeerStatus::Started { @@ -425,7 +383,7 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { } ); assert_eq!( - into_peer_event(events[5].clone()), + into_peer_event(events[3].clone()), PeerEvent { remote_peer_id: id, status: PeerStatus::Synchronized {