Skip to content

Commit

Permalink
Broadcast various requests as per #4684 (#4920)
Browse files Browse the repository at this point in the history
* multiple broadcast flags

* rewrite with single --broadcast option

* satisfy cargo fmt

* shorten sync-committee-messages

* fix a doc comment and a test

* use strum

* Add broadcast test to simulator

* bring --disable-run-on-all flag back with deprecation notice
  • Loading branch information
uvizhe authored Nov 27, 2023
1 parent e856a90 commit b4556a3
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 64 additions & 6 deletions lighthouse/tests/validator_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use validator_client::Config;
use validator_client::{ApiTopic, Config};

use crate::exec::CommandLineTestExec;
use bls::{Keypair, PublicKeyBytes};
Expand Down Expand Up @@ -493,20 +493,78 @@ fn monitoring_endpoint() {
assert_eq!(api_conf.update_period_secs, Some(30));
});
}

#[test]
fn disable_run_on_all_default() {
fn disable_run_on_all_flag() {
CommandLineTest::new()
.flag("disable-run-on-all", None)
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![]);
});
// --broadcast flag takes precedence
CommandLineTest::new()
.flag("disable-run-on-all", None)
.flag("broadcast", Some("attestations"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]);
});
}

#[test]
fn no_broadcast_flag() {
CommandLineTest::new().run().with_config(|config| {
assert!(!config.disable_run_on_all);
assert_eq!(config.broadcast_topics, vec![ApiTopic::Subscriptions]);
});
}

#[test]
fn disable_run_on_all() {
fn broadcast_flag() {
// "none" variant
CommandLineTest::new()
.flag("disable-run-on-all", None)
.flag("broadcast", Some("none"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![]);
});
// "none" with other values is ignored
CommandLineTest::new()
.flag("broadcast", Some("none,sync-committee"))
.run()
.with_config(|config| {
assert!(config.disable_run_on_all);
assert_eq!(config.broadcast_topics, vec![ApiTopic::SyncCommittee]);
});
// Other valid variants
CommandLineTest::new()
.flag("broadcast", Some("blocks, subscriptions"))
.run()
.with_config(|config| {
assert_eq!(
config.broadcast_topics,
vec![ApiTopic::Blocks, ApiTopic::Subscriptions],
);
});
// Omitted "subscription" overrides default
CommandLineTest::new()
.flag("broadcast", Some("attestations"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]);
});
}

#[test]
#[should_panic(expected = "Unknown API topic")]
fn wrong_broadcast_flag() {
CommandLineTest::new()
.flag("broadcast", Some("foo, subscriptions"))
.run()
.with_config(|config| {
assert_eq!(
config.broadcast_topics,
vec![ApiTopic::Blocks, ApiTopic::Subscriptions],
);
});
}

Expand Down
2 changes: 1 addition & 1 deletion testing/node_test_rig/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ types = { workspace = true }
tempfile = { workspace = true }
eth2 = { workspace = true }
validator_client = { workspace = true }
validator_dir = { workspace = true }
validator_dir = { workspace = true, features = ["insecure_keys"] }
sensitive_url = { workspace = true }
execution_layer = { workspace = true }
tokio = { workspace = true }
2 changes: 1 addition & 1 deletion testing/node_test_rig/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use eth2;
pub use execution_layer::test_utils::{
Config as MockServerConfig, MockExecutionConfig, MockServer,
};
pub use validator_client::Config as ValidatorConfig;
pub use validator_client::{ApiTopic, Config as ValidatorConfig};

/// The global timeout for HTTP requests to the beacon node.
const HTTP_TIMEOUT: Duration = Duration::from_secs(8);
Expand Down
26 changes: 21 additions & 5 deletions testing/simulator/src/eth1_sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use futures::prelude::*;
use node_test_rig::environment::RuntimeContext;
use node_test_rig::{
environment::{EnvironmentBuilder, LoggerConfig},
testing_client_config, testing_validator_config, ClientConfig, ClientGenesis, ValidatorFiles,
testing_client_config, testing_validator_config, ApiTopic, ClientConfig, ClientGenesis,
ValidatorFiles,
};
use rayon::prelude::*;
use sensitive_url::SensitiveUrl;
Expand Down Expand Up @@ -159,10 +160,25 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
validator_config.fee_recipient = Some(SUGGESTED_FEE_RECIPIENT.into());
}
println!("Adding validator client {}", i);
network_1
.add_validator_client(validator_config, i, files, i % 2 == 0)
.await
.expect("should add validator");

// Enable broadcast on every 4th node.
if i % 4 == 0 {
validator_config.broadcast_topics = ApiTopic::all();
let beacon_nodes = vec![i, (i + 1) % node_count];
network_1
.add_validator_client_with_fallbacks(
validator_config,
i,
beacon_nodes,
files,
)
.await
} else {
network_1
.add_validator_client(validator_config, i, files, i % 2 == 0)
.await
}
.expect("should add validator");
},
"vc",
);
Expand Down
42 changes: 42 additions & 0 deletions testing/simulator/src/local_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,48 @@ impl<E: EthSpec> LocalNetwork<E> {
Ok(())
}

pub async fn add_validator_client_with_fallbacks(
&self,
mut validator_config: ValidatorConfig,
validator_index: usize,
beacon_nodes: Vec<usize>,
validator_files: ValidatorFiles,
) -> Result<(), String> {
let context = self
.context
.service_context(format!("validator_{}", validator_index));
let self_1 = self.clone();
let mut beacon_node_urls = vec![];
for beacon_node in beacon_nodes {
let socket_addr = {
let read_lock = self.beacon_nodes.read();
let beacon_node = read_lock
.get(beacon_node)
.ok_or_else(|| format!("No beacon node for index {}", beacon_node))?;
beacon_node
.client
.http_api_listen_addr()
.expect("Must have http started")
};
let beacon_node = SensitiveUrl::parse(
format!("http://{}:{}", socket_addr.ip(), socket_addr.port()).as_str(),
)
.unwrap();
beacon_node_urls.push(beacon_node);
}

validator_config.beacon_nodes = beacon_node_urls;

let validator_client = LocalValidatorClient::production_with_insecure_keypairs(
context,
validator_config,
validator_files,
)
.await?;
self_1.validator_clients.write().push(validator_client);
Ok(())
}

/// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API.
pub fn remote_nodes(&self) -> Result<Vec<BeaconNodeHttpClient>, String> {
let beacon_nodes = self.beacon_nodes.read();
Expand Down
1 change: 1 addition & 0 deletions validator_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ malloc_utils = { workspace = true }
sysinfo = { workspace = true }
system_health = { path = "../common/system_health" }
logging = { workspace = true }
strum = { workspace = true }
5 changes: 3 additions & 2 deletions validator_client/src/attestation_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
Expand Down Expand Up @@ -433,9 +433,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Post the attestations to the BN.
match self
.beacon_nodes
.first_success(
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Attestations,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
Expand Down
57 changes: 47 additions & 10 deletions validator_client/src/beacon_node_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_RE
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::future;
use serde::{Deserialize, Serialize};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::fmt;
Expand All @@ -15,6 +16,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum::{EnumString, EnumVariantNames};
use tokio::{sync::RwLock, time::sleep};
use types::{ChainSpec, Config, EthSpec};

Expand Down Expand Up @@ -330,22 +332,22 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
pub struct BeaconNodeFallback<T, E> {
candidates: Vec<CandidateBeaconNode<E>>,
slot_clock: Option<T>,
disable_run_on_all: bool,
broadcast_topics: Vec<ApiTopic>,
spec: ChainSpec,
log: Logger,
}

impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub fn new(
candidates: Vec<CandidateBeaconNode<E>>,
disable_run_on_all: bool,
broadcast_topics: Vec<ApiTopic>,
spec: ChainSpec,
log: Logger,
) -> Self {
Self {
candidates,
slot_clock: None,
disable_run_on_all,
broadcast_topics,
spec,
log,
}
Expand Down Expand Up @@ -579,7 +581,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
/// It returns a list of errors along with the beacon node id that failed for `func`.
/// Since this ignores the actual result of `func`, this function should only be used for beacon
/// node calls whose results we do not care about, only that they completed successfully.
pub async fn run_on_all<'a, F, O, Err, R>(
pub async fn broadcast<'a, F, O, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
Expand Down Expand Up @@ -687,25 +689,60 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
}

/// Call `func` on first beacon node that returns success or on all beacon nodes
/// depending on the value of `disable_run_on_all`.
pub async fn run<'a, F, Err, R>(
/// depending on the `topic` and configuration.
pub async fn request<'a, F, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
topic: ApiTopic,
func: F,
) -> Result<(), Errors<Err>>
where
F: Fn(&'a BeaconNodeHttpClient) -> R,
R: Future<Output = Result<(), Err>>,
Err: Debug,
{
if self.disable_run_on_all {
if self.broadcast_topics.contains(&topic) {
self.broadcast(require_synced, offline_on_failure, func)
.await
} else {
self.first_success(require_synced, offline_on_failure, func)
.await?;
Ok(())
} else {
self.run_on_all(require_synced, offline_on_failure, func)
.await
}
}
}

/// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted.
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)]
#[strum(serialize_all = "kebab-case")]
pub enum ApiTopic {
Attestations,
Blocks,
Subscriptions,
SyncCommittee,
}

impl ApiTopic {
pub fn all() -> Vec<ApiTopic> {
use ApiTopic::*;
vec![Attestations, Blocks, Subscriptions, SyncCommittee]
}
}

#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;
use strum::VariantNames;

#[test]
fn api_topic_all() {
let all = ApiTopic::all();
assert_eq!(all.len(), ApiTopic::VARIANTS.len());
assert!(ApiTopic::VARIANTS
.iter()
.map(|topic| ApiTopic::from_str(topic).unwrap())
.eq(all.into_iter()));
}
}
Loading

0 comments on commit b4556a3

Please sign in to comment.