Skip to content

Commit

Permalink
feat: retrieve node bin version and peer-id from within the container…
Browse files Browse the repository at this point in the history
… upon it starts
  • Loading branch information
bochaco committed Nov 29, 2024
1 parent 0d08b6e commit 2e33e49
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 1,506 deletions.
1,416 changes: 85 additions & 1,331 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ leptos = { version = "0.6" }
leptos_axum = { version = "0.6", optional = true }
leptos_meta = { version = "0.6" }
leptos_router = { version = "0.6" }
libp2p-identity = { version = "0.2.1", features = ["peerid","ed25519"], optional = true }
rand = "0.8.5"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"], optional = true }
semver = { version = "1.0.20", optional = true }
serde = "1.0.207"
serde_json = "1.0"
server_fn = { version = "0.6", optional = true }
sn_protocol = { version = "0.17.6", features = ["rpc"], optional = true }
sqlx = { version = "0.8.0", default-features = false, features = ["derive","migrate","runtime-tokio","tls-rustls","sqlite"], optional = true }
thiserror = "1"
tonic = { version = "0.6.2", default-features = false, features = ["tls"], optional = true }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread"], optional = true }
tower = { version = "0.4", optional = true }
tower = { version = "0.4", features = ["util"], optional = true }
tower-http = { version = "0.5", default-features = false, features = ["fs"], optional = true }
url = "2"
wasm-bindgen = "=0.2.95"
Expand All @@ -64,12 +63,11 @@ ssr = [
"dep:i2cdev",
"dep:lcd",
"dep:leptos_axum",
"dep:libp2p-identity",
"dep:reqwest",
"dep:semver",
"dep:server_fn",
"dep:sn_protocol",
"dep:sqlx",
"dep:tonic",
"dep:tokio",
"dep:tower",
"dep:tower-http",
Expand Down
2 changes: 2 additions & 0 deletions formica.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ RUN /app/safenode --version
# Set any required env variables
# Set default port numbers for node and its RPC API
ENV NODE_PORT=12000
# RPC API usage is deprecated
ENV RPC_PORT=13000
ENV METRICS_PORT=14000

Expand All @@ -37,6 +38,7 @@ ENV METRICS_PORT=14000
ENV REWARDS_ADDR_ARG=''

EXPOSE $NODE_PORT
# RPC API usage is deprecated
EXPOSE $RPC_PORT
EXPOSE $METRICS_PORT

Expand Down
20 changes: 5 additions & 15 deletions src/add_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::num::ParseIntError;

// TODO: find next available port numbers by looking at already used ones
const DEFAULT_NODE_PORT: u16 = 12000;
const DEFAULT_RPC_API_PORT: u16 = 13000;
const DEFAULT_METRICS_PORT: u16 = 14000;

// Expected length of entered hex-encoded rewards address.
Expand All @@ -19,23 +18,21 @@ const REWARDS_ADDR_LENGTH: usize = 40;
#[component]
pub fn AddNodeView() -> impl IntoView {
let port = create_rw_signal(Ok(DEFAULT_NODE_PORT));
let rpc_port = create_rw_signal(Ok(DEFAULT_RPC_API_PORT));
let metrics_port = create_rw_signal(Ok(DEFAULT_METRICS_PORT));
let rewards_addr = create_rw_signal(Err((
"Enter a rewards address".to_string(),
"0x".to_string(),
)));
let add_node = create_action(
move |(port, rpc_port, metrics_port, rewards_addr): &(u16, u16, u16, String)| {
move |(port, metrics_port, rewards_addr): &(u16, u16, String)| {
let port = *port;
let rpc_port = *rpc_port;
let metrics_port = *metrics_port;
let rewards_addr = rewards_addr
.strip_prefix("0x")
.unwrap_or(rewards_addr)
.to_string();
async move {
let _ = add_node_instance(port, rpc_port, metrics_port, rewards_addr).await;
let _ = add_node_instance(port, metrics_port, rewards_addr).await;
}
},
);
Expand Down Expand Up @@ -83,11 +80,6 @@ pub fn AddNodeView() -> impl IntoView {
default=DEFAULT_NODE_PORT
label="Port number:"
/>
<PortNumberInput
signal=rpc_port
default=DEFAULT_RPC_API_PORT
label="RPC API port number:"
/>
<PortNumberInput
signal=metrics_port
default=DEFAULT_METRICS_PORT
Expand All @@ -98,19 +90,17 @@ pub fn AddNodeView() -> impl IntoView {
<button
type="button"
disabled=move || {
port.get().is_err() || rpc_port.get().is_err()
|| metrics_port.get().is_err()
port.get().is_err() || metrics_port.get().is_err()
|| rewards_addr.get().is_err()
}
on:click=move |_| {
modal_visibility.set(false);
if let (Ok(p), Ok(r), Ok(m), Ok(addr)) = (
if let (Ok(p), Ok(m), Ok(addr)) = (
port.get(),
rpc_port.get(),
metrics_port.get(),
rewards_addr.get(),
) {
add_node.dispatch((p, r, m, addr));
add_node.dispatch((p, m, addr));
}
}
class="text-white bg-gray-800 hover:bg-gray-900 focus:outline-none focus:ring-4 focus:ring-gray-300 font-medium rounded-lg text-sm px-5 py-2.5 me-2 mb-2 dark:bg-gray-800 dark:hover:bg-gray-700 dark:focus:ring-gray-700 dark:border-gray-700"
Expand Down
1 change: 0 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ pub struct ServerGlobalState {
pub docker_client: super::docker_client::DockerClient,
pub latest_bin_version: Arc<Mutex<Option<String>>>,
pub nodes_metrics: Arc<Mutex<super::metrics_client::NodesMetrics>>,
pub server_api_hit: Arc<Mutex<bool>>,
pub node_status_locked: Arc<Mutex<HashSet<super::node_instance::ContainerId>>>,
pub updated_settings_tx: broadcast::Sender<AppSettings>,
}
Expand Down
55 changes: 16 additions & 39 deletions src/bg_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use super::{
lcd::display_stats_on_lcd,
metrics_client::{NodeMetricsClient, NodesMetrics},
node_instance::{ContainerId, NodeInstanceInfo},
node_rpc_client::NodeRpcClient,
server_api::helper_upgrade_node_instance,
};
use alloy::{
Expand Down Expand Up @@ -125,17 +124,13 @@ pub fn spawn_bg_tasks(
latest_bin_version: Arc<Mutex<Option<String>>>,
nodes_metrics: Arc<Mutex<NodesMetrics>>,
db_client: DbClient,
server_api_hit: Arc<Mutex<bool>>,
node_status_locked: Arc<Mutex<HashSet<ContainerId>>>,
mut updated_settings_rx: broadcast::Receiver<AppSettings>,
settings: AppSettings,
) {
logging::log!("App settings to use: {settings:#?}");
let mut ctx = TasksContext::from(settings);

// we start a count down to stop polling RPC API when there is no active client
let mut poll_rpc_countdown = 5;

// helper which create a new contract if the new configured values are valid.
let update_token_contract = |ctx: &TasksContext| match ctx.parse_token_addr_and_rpc_url() {
(Some(token_address), Some(rpc_url)) => {
Expand Down Expand Up @@ -232,14 +227,7 @@ pub fn spawn_bg_tasks(
));
},
_ = ctx.nodes_metrics_polling.tick() => {
if *server_api_hit.lock().await {
// reset the countdown to five more cycles
poll_rpc_countdown = 5;
*server_api_hit.lock().await = false;
} else if poll_rpc_countdown > 0 {
poll_rpc_countdown -= 1;
}
let poll_rpc_api = poll_rpc_countdown > 0;
let query_bin_version = ctx.app_settings.lcd_display_enabled;

// we don't spawn a task for this one just in case it's taking
// too long to complete and we may start overwhelming the backend
Expand All @@ -249,7 +237,7 @@ pub fn spawn_bg_tasks(
&nodes_metrics,
&db_client,
&node_status_locked,
poll_rpc_api,
query_bin_version,
&lcd_stats
).await;
// reset interval to start next period from this instant,
Expand Down Expand Up @@ -359,7 +347,7 @@ async fn update_nodes_info(
nodes_metrics: &Arc<Mutex<NodesMetrics>>,
db_client: &DbClient,
node_status_locked: &Arc<Mutex<HashSet<ContainerId>>>,
poll_rpc_api: bool,
query_bin_version: bool,
lcd_stats: &Arc<Mutex<HashMap<String, String>>>,
) {
let containers = docker_client
Expand All @@ -376,7 +364,6 @@ async fn update_nodes_info(
}

// let's collect stats to update LCD (if enabled)
let mut balance = alloy::primitives::U256::from(0);
let mut net_size = 0;
let mut weights = 0;
let mut num_active_nodes = 0;
Expand All @@ -389,20 +376,6 @@ async fn update_nodes_info(
if node_info.status.is_active() {
num_active_nodes += 1;

if poll_rpc_api {
// let's fetch up to date info using its RPC API
if let Some(port) = node_info.rpc_api_port {
match NodeRpcClient::new(&node_info.node_ip, port) {
Ok(node_rpc_client) => {
node_rpc_client.update_node_info(&mut node_info).await;
}
Err(err) => {
logging::log!("Failed to connect to RPC API endpoint: {err}")
}
}
}
}

if let Some(metrics_port) = node_info.metrics_port {
// let's now collect metrics from the node
let metrics_client = NodeMetricsClient::new(&node_info.node_ip, metrics_port);
Expand All @@ -417,22 +390,26 @@ async fn update_nodes_info(
}
}

balance += node_info.balance.unwrap_or_default();
net_size +=
node_info.connected_peers.unwrap_or_default() * node_info.net_size.unwrap_or_default();
weights += node_info.connected_peers.unwrap_or_default();
records += node_info.records.unwrap_or_default();
if let Some(ref version) = node_info.bin_version {
bin_version.insert(version.clone());
}

let update_status = !node_status_locked
.lock()
.await
.contains(&node_info.container_id);
db_client
.update_node_metadata(&node_info, update_status)
.await;

net_size +=
node_info.connected_peers.unwrap_or_default() * node_info.net_size.unwrap_or_default();
weights += node_info.connected_peers.unwrap_or_default();
records += node_info.records.unwrap_or_default();
if query_bin_version {
if let Some(ref version) = db_client
.get_node_bin_version(&node_info.container_id)
.await
{
bin_version.insert(version.clone());
}
}
}

let mut updated_vals = vec![(
Expand Down
28 changes: 19 additions & 9 deletions src/db_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ struct CachedNodeMetadata {
peer_id: String,
bin_version: String,
port: u16,
rpc_api_port: u16,
rewards: String,
balance: String,
records: String,
Expand All @@ -83,9 +82,6 @@ impl CachedNodeMetadata {
if self.port > 0 {
info.port = Some(self.port);
}
if self.rpc_api_port > 0 {
info.rpc_api_port = Some(self.rpc_api_port);
}
if !self.rewards.is_empty() {
if let Ok(v) = U256::from_str(&self.rewards) {
info.rewards = Some(v.into());
Expand Down Expand Up @@ -169,6 +165,22 @@ impl DbClient {
}
}

// Retrieve node binary version from local cache DB
pub async fn get_node_bin_version(&self, container_id: &str) -> Option<String> {
let db_lock = self.db.lock().await;
match sqlx::query("SELECT bin_version FROM nodes WHERE container_id=?")
.bind(container_id)
.fetch_all(&*db_lock)
.await
{
Ok(records) => records.first().map(|r| r.get("bin_version")),
Err(err) => {
logging::log!("Sqlite bin version query error: {err}");
None
}
}
}

// Retrieve the list of nodes which have a binary version not matching the provided version
// TODO: use semantic version to make the comparison.
pub async fn get_outdated_nodes_list(
Expand All @@ -177,7 +189,7 @@ impl DbClient {
) -> Result<Vec<(ContainerId, String)>, DbError> {
let db_lock = self.db.lock().await;
let data = sqlx::query(
"SELECT container_id,bin_version FROM nodes WHERE status = ? AND bin_version != ?",
"SELECT container_id, bin_version FROM nodes WHERE status = ? AND bin_version != ?",
)
.bind(json!(NodeStatus::Active).to_string())
.bind(version)
Expand All @@ -198,16 +210,14 @@ impl DbClient {
let query_str = format!(
"INSERT OR REPLACE INTO nodes (\
container_id, status, port, \
rpc_api_port, records, \
connected_peers, kbuckets_peers \
) VALUES (?, ?, ?, ?, ?, ?, ?)"
records, connected_peers, kbuckets_peers \
) VALUES (?, ?, ?, ?, ?, ?)"
);

match sqlx::query(&query_str)
.bind(info.container_id.clone())
.bind(json!(info.status).to_string())
.bind(info.port.clone())
.bind(info.rpc_api_port.clone())
.bind(info.records.map_or("".to_string(), |v| v.to_string()))
.bind(
info.connected_peers
Expand Down
Loading

0 comments on commit 2e33e49

Please sign in to comment.