Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gossipable trait + relevant logic #45

Draft
wants to merge 59 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
5cbdd66
feat: add gossipable trait
Sep 7, 2023
a0e662a
feat: add gossip
Sep 7, 2023
aa7f63c
feat: add channel to gossiper
Sep 7, 2023
ed23c1e
feat: add gossipable trait
Sep 7, 2023
0c35d21
feat: add gossip
Sep 7, 2023
86580a6
feat: add channel to gossiper
Sep 7, 2023
bb397b9
Merge remote-tracking branch 'origin/add_gossipable' into add_gossipable
Sep 8, 2023
10657e0
feat: add bloom
Sep 9, 2023
baf2923
feat: add test for bloom
Sep 9, 2023
1fea689
feat: add test for bloom
Sep 9, 2023
9f1ab3b
Merge branch 'ava-labs:main' into add_gossipable
Sanghren Sep 11, 2023
f5a464b
Merge branch 'ava-labs:main' into add_gossipable
Sanghren Sep 11, 2023
7ba33ea
feat: add proto
Sep 11, 2023
cc7edeb
feat: add proto generated file
Sep 11, 2023
231d93b
feat: some cleanup
Sep 12, 2023
c1b42c2
feat: some cleanup
Sep 12, 2023
1f57923
feat: add Namespace to Config
Sep 12, 2023
db4cf30
feat: add test
Sep 12, 2023
e52b0a1
feat: add test
Sep 12, 2023
94085a6
Merge branch 'ava-labs:main' into add_gossipable
Sanghren Sep 12, 2023
3c3e138
feat: remove warning debug print
Sep 12, 2023
c63269c
feat: add handler.rs
Sep 13, 2023
476016a
feat: cargo.toml dep
Sep 13, 2023
139dacb
feat: cargo.toml dep
Sep 14, 2023
b9fc9ec
feat: lint
Sep 14, 2023
a606351
feat: finish handler accept_request
Sep 14, 2023
573db11
feat: unused app-request param
Sep 14, 2023
f1b752a
feat: put tests under mod test
Sep 14, 2023
bbf1488
feat: clean up
Sep 15, 2023
5654fb1
feat: pass reference to bloom.has
Sep 15, 2023
20db932
feat: clean up
Sep 15, 2023
27df668
feat: address comments
Sep 15, 2023
0c446e9
feat: address comments
Sep 15, 2023
9e67429
feat: address comments
Sep 15, 2023
61c8256
feat: address comments
Sep 15, 2023
3a7a5cd
feat: address comments
Sep 15, 2023
8cc6084
feat: use associated type instead of phantomdata
Sep 16, 2023
b676024
feat: address warnings
Sep 18, 2023
acb1de0
Merge branch 'ava-labs:main' into add_gossipable
Sanghren Sep 19, 2023
3bec776
feat: implement an example that run gossip logic
Sep 26, 2023
e046013
feat: implement an example that run gossip logic
Sep 26, 2023
13205e7
feat: make things work - wip
Sep 27, 2023
7b1e8eb
Merge branch 'ava-labs:main' into add_gossipable
Sanghren Sep 29, 2023
f56ab25
feat: wip
Oct 1, 2023
884f1fd
feat: wip
Oct 1, 2023
c5446e6
feat: wip
Oct 1, 2023
e4b3400
feat: wip
Oct 1, 2023
fb0359e
feat: wip
Oct 1, 2023
ccce15b
feat: wip
Oct 1, 2023
774ca6e
feat: wip
Oct 1, 2023
ec6767a
feat: wip
Oct 1, 2023
7db76bc
feat: clean
Oct 1, 2023
36c40dd
feat: clean
Oct 1, 2023
f6e8baf
feat: clean
Oct 1, 2023
672c728
feat: clean
Oct 1, 2023
b89abd9
feat: remove arc for callback
Oct 2, 2023
3faedf9
feat: format
Oct 2, 2023
ff65229
feat: format
Oct 2, 2023
65e8a14
feat: format
Oct 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions core/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,42 @@ readme = "README.md"

[dependencies]
avalanche-types = { path = "../../crates/avalanche-types", features = ["message"] }
async-trait = { version = "0.1.73", features = [] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔, usually libraries re-export async-trait such that the trait definition remains consistent with the trait implementation.

I know I'll get my answer below, but are we defining traits with the macro or as we implementing traits with the macro? Or both?

byteorder = "1.4.3"
cert-manager = "0.0.10" # https://github.com/gyuho/cert-manager
log = "0.4.20"
rustls = { version = "0.21.5", features = ["logging", "dangerous_configuration"]} # https://github.com/rustls/rustls/tags
rustls = { version = "0.21.5", features = ["logging", "dangerous_configuration"] } # https://github.com/rustls/rustls/tags
rcgen = "0.10.0"
hyper-rustls = "0.24.1"
rustls-native-certs = "0.6.3"
hyper = { version = "0.14.27", features = ["full"], optional = true }
tokio-rustls = { version = "0.24.1", optional = true }
tokio = { version = "1.32.0", features = ["sync", "time"] }
prost = "0.12.0"
prost-types = "0.12.0"
prost-build = "0.12.0"
bincode = "1.3.3"
serde = { version = "1.0.188", features = ["derive"] }

# for feature "pem"
pem = { version = "3.0.0", optional = true } # https://github.com/jcreekmore/pem-rs


[dev-dependencies]
env_logger = "0.10.0"
mockall = "0.11.4"
proptest = "1.2.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

random-manager = "0.0.5"
tokio = { version = "1.32.0", features = ["full"] }
testing_logger = "0.1.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔, not sure we need this as it looks like it's unmaintained. I guess I'll see where it's used.

tokio = { version = "1.32.0", features = ["sync", "time", "rt-multi-thread"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"

[build-dependencies]
# ref. https://github.com/hyperium/tonic/tags
# ref. https://github.com/hyperium/tonic/tree/master/tonic-build
tonic-build = "0.9.2"

[features]
default = ["rustls", "pem_encoding"]
rustls = ["hyper", "tokio-rustls"]
Expand Down
10 changes: 10 additions & 0 deletions core/network/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/// ref. <https://github.com/hyperium/tonic/tree/master/tonic-build>
fn main() {
tonic_build::configure()
.out_dir("./src/p2p")
.build_server(true)
.build_client(true)
.compile(&["./src/p2p/gossip/sdk.proto"], &["./src/p2p/gossip/"])
.unwrap();

}
242 changes: 242 additions & 0 deletions core/network/examples/peer_gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
use std::error::Error;
use std::hash::Hash;
use tokio::sync::Mutex;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use network::p2p::gossip::gossip::{Config, Gossiper};
use tokio::sync::mpsc::{channel};
use avalanche_types::ids::Id;
use network::p2p::client::{AppResponseCallback, Client};
use network::p2p::gossip::{Gossipable, Set};
use network::p2p::gossip::handler::{Handler, HandlerConfig, new_handler};
use network::p2p::handler::Handler as TraitHandler;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer merged imports as it's easier to keep them consistent when using automatic imports from tools like rust-analyzer.

Suggested change
use std::error::Error;
use std::hash::Hash;
use tokio::sync::Mutex;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use network::p2p::gossip::gossip::{Config, Gossiper};
use tokio::sync::mpsc::{channel};
use avalanche_types::ids::Id;
use network::p2p::client::{AppResponseCallback, Client};
use network::p2p::gossip::{Gossipable, Set};
use network::p2p::gossip::handler::{Handler, HandlerConfig, new_handler};
use network::p2p::handler::Handler as TraitHandler;
use async_trait::async_trait;
use avalanche_types::ids::Id;
use network::p2p::{
client::{AppResponseCallback, Client},
gossip::{
gossip::{Config, Gossiper},
handler::{new_handler, Handler, HandlerConfig},
Gossipable, Set,
},
handler::Handler as TraitHandler,
};
use std::{error::Error, hash::Hash, sync::Arc, time::Duration};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
sync::{mpsc::channel, Mutex},
};

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I overlooked this, should check how you can configure this in CLion .


pub struct TestClient {
pub stream: Arc<Mutex<TcpStream>>,
pub listener: Arc<Mutex<TcpListener>>,
}

#[async_trait]
#[allow(unused_variables)]
impl Client for TestClient {
async fn app_gossip(&mut self, request_bytes: Vec<u8>) {
unimplemented!()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, todo! is a little shorter, and they pretty much do the same thing.

}

async fn app_request_any(&mut self, request_bytes: Vec<u8>, on_response: AppResponseCallback) {
let mut stream_guard = self.stream.try_lock().expect("aa");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔, you should be using lock().await here. It's also idiomatic to remove the _guard suffix since the type will deref to the stream anyway.

stream_guard.write_all(&*request_bytes).await.unwrap();

loop {
// Lock the listener and wait for a new connection
let clone = self.listener.clone();
let listener = clone.try_lock().expect("Unable to lock listener");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting that you chose to put a lock around the listener when you lock it in what seems to be an infinite loop.


match listener.accept().await {
Ok((mut stream, _)) => {
let mut buf = [0u8; 1024];
match stream.read(&mut buf).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... you can't accept any new incoming connections while your reading the stream here. I think you want to spawn this in a new task. You could use something like an mpsc and select! on listener.accept() and rx.recv() to see if there are any errors and shutdown

Ok(n) => {
if n == 0 {
break;
}
println!("Received a message of length: {}", n);
on_response(buf[0..n].to_vec());
}
Err(e) => {
eprintln!("Error reading from stream: {}", e);
break;
}
}
break;
}
Err(e) => {
eprintln!("Error accepting connection: {}", e);
break;
}
}
}
}
}

#[derive(Clone, Hash, Debug)]
struct TestGossipableType {
pub id: Id,
}

impl Default for TestGossipableType {
fn default() -> Self {
TestGossipableType {
id: Default::default(),
}
}
}

impl Gossipable for TestGossipableType {
fn get_id(&self) -> Id {
self.id
}

fn serialize(&self) -> Result<Vec<u8>, Box<dyn Error>> {
Ok(self.id.to_vec())
}

fn deserialize(&mut self, bytes: &[u8]) -> Result<(), Box<dyn Error>> {
self.id = Id::from_slice(bytes);
Ok(())
}
}

// Mock implementation for the Set trait
//ToDo Should we move all tests to a new file ?
#[derive(Debug, Clone, Hash)]
pub struct MockSet<TestGossipableType> {
pub set: Vec<TestGossipableType>,
}

impl<T> MockSet<T> {
pub fn len(&self) -> usize {
println!("{}", self.set.len());
self.set.len()
}
}

impl<T: Gossipable + Sync + Send + Clone + Hash> Set for MockSet<T> {
type Item = T;
fn add(&mut self, _gossipable: T) -> Result<(), Box<dyn Error>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable names shouldn't start with _

self.set.push(_gossipable.clone());
Ok(())
}

fn iterate(&self, _f: &dyn FnMut(&T) -> bool) {
// Do nothing
}

fn fetch_elements(&self) -> Self::Item {
self.set.get(0).unwrap().clone()
}
}

async fn fake_handler_server_logic(mut socket: TcpStream, client_socket: Arc<Mutex<TcpStream>>, handler: Handler<MockSet<TestGossipableType>>) {

// Initialize a buffer of size 1024.
let mut buf = [0u8; 1024];

loop {
let n = socket.read(&mut buf).await.unwrap();

// Empty, wait 5 sec before next attempt
if n == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When socket.read returns 0, that means the client has disconnected. You will forever get 0, so this is an infinite loop.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the client disconnects, the read call will error and the program will panic on that unwrap. If wouldn't infinite loop.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not true. Example program: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=d809a48e2983d4fe94833852a8a28961

Client drops the tcp_stream on line 35, and waits for the server to finish up before exiting. While it's in this state, read() repeatedly returns Ok(0)

tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}

// Fake test data.
let node_id: avalanche_types::ids::node::Id = avalanche_types::ids::node::Id::from_slice(&random_manager::secure_bytes(20).unwrap());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should fully qualify Id such that you can write:

let node_id = Id::from_slice(&random_manager::secure_bytes(20).unwrap());


let res_bytes = handler.app_gossip(node_id, buf[0..n].to_vec()).await.expect("Issue while attempting to gossip in fake_handler_logic");

let mut guard = client_socket.try_lock().expect("Lock of client_socket failed");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone else was holding the lock around the client_socket, this code would crash. I know it's an example, but all you have to do is call let socket = client_socket.lock().await.


let _ = guard.write_all(&res_bytes).await;
}
}

async fn start_fake_node(gossip_handler_addr: String, listener_handler_addr: String, client_addr: String) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about addresses, they should be &str. You shouldn't be cloning here.

// Initialize the configuration for the gossiper
let config = Config {
namespace: "test".to_string(),
frequency: Duration::from_secs(10),
poll_size: 1, // As we only have 1 other "node" in our test setup, set it to 1
};

// Create a TcpListener to receive messages on.
// Wrapping it in Arc and Mutex to safely share it between threads.
let handler_listener = Arc::new(Mutex::new(TcpListener::bind(listener_handler_addr.clone()).await.unwrap()));
let gossip_listener = Arc::new(Mutex::new(TcpListener::bind(gossip_handler_addr.clone()).await.unwrap()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't be cloning Strings here, you can just pass in &str instead.


// Create a TcpStream to send messages to.
// Wrapping it in Arc and Mutex to safely share it between threads.
let stream = Arc::new(Mutex::new(TcpStream::connect(client_addr).await.unwrap()));

// Initialize the configuration for the handler and create a new handler
let handler_config = HandlerConfig { namespace: "test".to_string(), target_response_size: 100 };
let handler = new_handler(
handler_config,
Arc::new(Mutex::new(MockSet { set: Vec::<TestGossipableType>::new() })),
);

// Clone listener and stream for use inside the spawned task.
let listener_clone = handler_listener.clone();
let stream_clone = stream.clone();
// Spawn an asynchronous task that will handle incoming connections in a loop
tokio::spawn(async move {
// Accept incoming connections and spawn a new task to handle each connection
let (listener_socket, _) = listener_clone.try_lock().expect("Error acquiring lock on listener_clone").accept().await.unwrap();
fake_handler_server_logic(listener_socket, stream_clone.clone(), handler.clone()).await;
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a cool pattern for cloning

    tokio::spawn({
        let (listener, stream) = (listener.clone(), stream.clone()); 
    
        async move {
            // Accept incoming connections and spawn a new task to handle each connection
            let (listener_socket, _) = 
                listener
                    .try_lock()
                    .expect("Error acquiring lock on listener_clone")
                    .accept()
                    .await
                    .unwrap();

            fake_handler_server_logic(listener_socket, stream_clone.clone(), handler.clone()).await;
        }
    });

This also suffers from the same issue, with only one thread being able to access the listener at a time.


// Initialize a MockSet and populate it with some test data
let set = Arc::new(Mutex::new(MockSet { set: Vec::<TestGossipableType>::new() }));
// Generating fake data and pushing to set
{
for _ in 0..3 {
set.try_lock().expect("Error acquiring lock on set").set.push(
TestGossipableType { id: Id::from_slice(&random_manager::secure_bytes(32).unwrap()) }
);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the extra code block. The guard is dropped at the end of every loop iteration. If it wasn't, the program would panic trying to acquire the lock again.

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=24083cb02a99d93b16d0d10ff2be5542


{
assert_eq!(set.try_lock().expect("Failed to acquire lock").set.len().clone(), 3);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


let (stop_tx, stop_rx) = channel(1);

// Spawn the gossiping task
let set_clone = set.clone();
let gossip_task = tokio::spawn(async move {
// Initialize a TestClient instance with the given stream and listener
let gossip_client = Arc::new(Mutex::new(TestClient { stream: stream.clone(), listener: gossip_listener.clone() }));

// Create a channel for stopping the gossiper

// Initialize the Gossiper with the provided configuration, set, client, and receiver end of the stop channel
let mut gossiper = Gossiper::new(config, set_clone, gossip_client.clone(), stop_rx);

gossiper.gossip().await;
});

// Sleep for a few seconds, make sure the whole process ran
tokio::time::sleep(Duration::from_secs(1)).await;

{
assert_eq!(set.try_lock().expect("Failed to acquire lock").set.len(), 4);
}

// Send the stop signal before awaiting the task.
if stop_tx.send(()).await.is_err() {
eprintln!("Failed to send stop signal");
}

// Await the completion of the gossiping task
let _ = gossip_task.await.expect("Gossip task failed");
}


#[tokio::main]
async fn main() {

// Start the client
// listen on 8080 , send message to 8081
let client_01_handle = tokio::spawn(start_fake_node("127.0.0.1:8080".to_string(), "127.0.0.1:8081".to_string(), "127.0.0.1:8082".to_string()));
let client_02_handle = tokio::spawn(start_fake_node("127.0.0.1:8082".to_string(), "127.0.0.1:8083".to_string(), "127.0.0.1:8080".to_string()));

// Wait for the server and client to complete
client_01_handle.await.unwrap();
client_02_handle.await.unwrap();
}

1 change: 1 addition & 0 deletions core/network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
//! A library for building p2p inbound and outbound connections.
pub mod p2p;
pub mod peer;
22 changes: 22 additions & 0 deletions core/network/src/p2p/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::sync::Arc;
use async_trait::async_trait;

pub type AppResponseCallback = Arc<dyn Fn(Vec<u8>) + Send + Sync>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have to wrap the callback in an Arc?

#[async_trait]
#[allow(unused_variables)]
pub trait Client: Send + Sync {
async fn app_request_any(&mut self, request_bytes: Vec<u8>, on_response: AppResponseCallback) {}
async fn app_request(&mut self, request_bytes: Vec<u8>) {}
async fn app_gossip(&mut self, request_bytes: Vec<u8>) {}
async fn app_gossip_specific(&mut self, request_bytes: Vec<u8>) {}
async fn cross_chain_app_request(&mut self, request_bytes: Vec<u8>) {}
async fn prefix_message(&mut self, request_bytes: Vec<u8>) {}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you meant to have a default implementation for all of these methods.

It's also likely that you would want to return Result for at least some of these.


pub struct NoOpClient;

unsafe impl Sync for NoOpClient {}
unsafe impl Send for NoOpClient {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary

impl Client for NoOpClient {

}
Loading