Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: pd: refuse to bootstrap services if the app is not ready (#4436) #4450

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(clippy::clone_on_copy)]
#![deny(clippy::unwrap_used)]
#![recursion_limit = "512"]
use std::error::Error;
use std::io::IsTerminal as _;
use std::{error::Error, process::exit};

use metrics_tracing_context::{MetricsLayer, TracingContextLayer};
use metrics_util::layers::Stack;
Expand Down Expand Up @@ -112,6 +112,13 @@ async fn main() -> anyhow::Result<()> {
"starting pd"
);

if penumbra_app::app::App::is_ready(storage.latest_snapshot()).await {
tracing::info!("application ready to start");
} else {
tracing::warn!("application is halted, refusing to start");
exit(0)
}

let abci_server = tokio::task::spawn(
penumbra_app::server::new(storage.clone()).listen_tcp(abci_bind),
);
Expand Down
60 changes: 41 additions & 19 deletions crates/core/app/src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::process;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{Context, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -35,7 +37,8 @@ use tendermint::abci::{self, Event};

use tendermint::v0_37::abci::{request, response};
use tendermint::validator::Update;
use tracing::Instrument;
use tokio::time::sleep;
use tracing::{instrument, Instrument};

use crate::action_handler::AppActionHandler;
use crate::genesis::AppState;
Expand All @@ -59,22 +62,28 @@ pub struct App {

impl App {
/// Constructs a new application, using the provided [`Snapshot`].
pub async fn new(snapshot: Snapshot) -> Result<Self> {
/// Callers should ensure that [`App::is_ready`]) returns `true`, but this is not enforced.
#[instrument(skip_all)]
pub fn new(snapshot: Snapshot) -> Self {
tracing::debug!("initializing App instance");

// We perform the `Arc` wrapping of `State` here to ensure
// there should be no unexpected copies elsewhere.
let state = Arc::new(StateDelta::new(snapshot));

// If the state says that the chain is halted, we should not proceed. This is a safety check
// to ensure that automatic restarts by software like systemd do not cause the chain to come
// back up again after a halt.
if state.is_chain_halted(TOTAL_HALT_COUNT).await? {
tracing::error!("chain is halted, refusing to restart!");
anyhow::bail!("chain is halted, refusing to restart");
}
Self { state }
}

Ok(Self { state })
/// Returns whether the application is ready to start.
#[instrument(skip_all, ret)]
pub async fn is_ready(state: Snapshot) -> bool {
// If the chain is halted, we are not ready to start the application.
// This is a safety mechanism to prevent the chain from starting if it
// is in a halted state.
!state
.is_chain_halted(TOTAL_HALT_COUNT)
.await
.expect("failed to read total halt count")
}

// StateDelta::apply only works when the StateDelta wraps an underlying
Expand Down Expand Up @@ -152,6 +161,18 @@ impl App {
&mut self,
proposal: request::PrepareProposal,
) -> response::PrepareProposal {
if self
.state
.is_chain_halted(TOTAL_HALT_COUNT)
.await
.expect("failed to read total halt count")
{
// If we find ourselves preparing a proposal for a halted chain
// we stop abruptly to prevent any progress.
// The persistent halt mechanism will prevent restarts until we are ready.
process::exit(0);
}

let mut included_txs = Vec::new();
let num_candidate_txs = proposal.txs.len();
tracing::debug!(
Expand Down Expand Up @@ -526,15 +547,16 @@ impl App {
.await
.expect("must be able to successfully commit to storage");

// If we should halt, we should end the process here.
if should_halt {
tracing::info!("committed block when a chain halt was signaled; exiting now");
std::process::exit(0);
}

if is_upgrade_height {
tracing::info!("committed block at upgrade height; exiting now");
std::process::exit(0);
// We want to halt the node, but not before we submit an ABCI `Commit`
// response to `CometBFT`. To do this, we schedule a process exit in `2s`,
// assuming a `5s` timeout.
// See #4443 for more context.
if should_halt || is_upgrade_height {
tokio::spawn(async move {
sleep(Duration::from_secs(2)).await;
tracing::info!("halt signal recorded, exiting process");
std::process::exit(0);
});
}

tracing::debug!(?jmt_root, "finished committing state");
Expand Down
3 changes: 1 addition & 2 deletions crates/core/app/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ pub fn new(
req.create_span()
}))
.service(tower_actor::Actor::new(10, |queue: _| {
let storage = storage.clone();
async move { Mempool::new(storage.clone(), queue).await?.run().await }
Mempool::new(storage.clone(), queue).run()
}));
let info = Info::new(storage.clone());
let snapshot = Snapshot {};
Expand Down
18 changes: 6 additions & 12 deletions crates/core/app/src/server/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,21 @@ impl Consensus {

pub fn new(storage: Storage) -> ConsensusService {
tower_actor::Actor::new(Self::QUEUE_SIZE, |queue: _| {
let storage = storage.clone();
async move {
Consensus::new_inner(storage.clone(), queue)
.await?
.run()
.await
}
Consensus::new_inner(storage, queue).run()
})
}

async fn new_inner(
fn new_inner(
storage: Storage,
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
) -> Result<Self> {
let app = App::new(storage.latest_snapshot()).await?;
) -> Self {
let app = App::new(storage.latest_snapshot());

Ok(Self {
Self {
queue,
storage,
app,
})
}
}

async fn run(mut self) -> Result<(), tower::BoxError> {
Expand Down
12 changes: 6 additions & 6 deletions crates/core/app/src/server/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ pub struct Mempool {
}

impl Mempool {
pub async fn new(
pub fn new(
storage: Storage,
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
) -> Result<Self> {
let app = App::new(storage.latest_snapshot()).await?;
) -> Self {
let app = App::new(storage.latest_snapshot());
let snapshot_rx = storage.subscribe();

Ok(Self {
Self {
queue,
app,
rx_snapshot: snapshot_rx,
})
}
}

pub async fn check_tx(&mut self, req: Request) -> Result<Response, tower::BoxError> {
Expand Down Expand Up @@ -91,7 +91,7 @@ impl Mempool {
if let Ok(()) = change {
let snapshot = self.rx_snapshot.borrow().clone();
tracing::debug!(height = ?snapshot.version(), "resetting ephemeral mempool state");
self.app = App::new(snapshot).await?;
self.app = App::new(snapshot);
} else {
// TODO: what triggers this, now that the channel is owned by the
// shared Storage instance, rather than the consensus worker?
Expand Down
2 changes: 1 addition & 1 deletion crates/core/app/tests/common/temp_storage_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl TempStorageExt for TempStorage {
}

// Apply the genesis state to the storage
let mut app = App::new(self.latest_snapshot()).await?;
let mut app = App::new(self.latest_snapshot());
app.init_chain(&genesis).await;
app.commit(self.deref().clone()).await;

Expand Down
Loading