Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
4943: Add catch up & shutdown mode r=EdHastingsCasperAssociation a=wojcik91

Added a mode to sync global state and shut down the node.

As discussed followed a simple approach - at the point when node would switch from CatchUp to KeepUp instead shut it down.

Closes casper-network#4799


Co-authored-by: Maciej Wójcik <[email protected]>
  • Loading branch information
casperlabs-bors-ng[bot] and Maciej Wójcik authored Nov 5, 2024
2 parents b531c85 + 5c15266 commit 1305f65
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 7 deletions.
5 changes: 5 additions & 0 deletions node/src/effect/announcements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub(crate) enum ControlAnnouncement {
/// The node should shut down with exit code 0 in readiness for the next binary to start.
ShutdownForUpgrade,

/// The node started in catch up and shutdown mode has caught up to tip and can now exit.
ShutdownAfterCatchingUp,

/// The component has encountered a fatal error and cannot continue.
///
/// This usually triggers a shutdown of the application.
Expand Down Expand Up @@ -78,6 +81,7 @@ impl Debug for ControlAnnouncement {
match self {
ControlAnnouncement::ShutdownDueToUserRequest => write!(f, "ShutdownDueToUserRequest"),
ControlAnnouncement::ShutdownForUpgrade => write!(f, "ShutdownForUpgrade"),
ControlAnnouncement::ShutdownAfterCatchingUp => write!(f, "ShutdownAfterCatchingUp"),
ControlAnnouncement::FatalError { file, line, msg } => f
.debug_struct("FatalError")
.field("file", file)
Expand All @@ -102,6 +106,7 @@ impl Display for ControlAnnouncement {
write!(f, "shutdown due to user request")
}
ControlAnnouncement::ShutdownForUpgrade => write!(f, "shutdown for upgrade"),
ControlAnnouncement::ShutdownAfterCatchingUp => write!(f, "shutdown after catching up"),
ControlAnnouncement::FatalError { file, line, msg } => {
write!(f, "fatal error [{}:{}]: {}", file, line, msg)
}
Expand Down
5 changes: 5 additions & 0 deletions node/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ where
Some(ControlAnnouncement::ShutdownForUpgrade) => {
(Effects::new(), Some(ExitCode::Success), QueueKind::Control)
}
Some(ControlAnnouncement::ShutdownAfterCatchingUp) => (
Effects::new(),
Some(ExitCode::CleanExitDontRestart),
QueueKind::Control,
),
Some(ControlAnnouncement::FatalError { file, line, msg }) => {
error!(%file, %line, %msg, "fatal error via control announcement");
(Effects::new(), Some(ExitCode::Abort), QueueKind::Control)
Expand Down
26 changes: 20 additions & 6 deletions node/src/reactor/main_reactor/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
contract_runtime::ExecutionPreState,
diagnostics_port, event_stream_server, network, rest_server, upgrade_watcher,
},
effect::{EffectBuilder, EffectExt, Effects},
effect::{announcements::ControlAnnouncement, EffectBuilder, EffectExt, Effects},
fatal,
reactor::main_reactor::{
catch_up::CatchUpInstruction, genesis_instruction::GenesisInstruction,
Expand Down Expand Up @@ -137,11 +137,19 @@ impl MainReactor {
if let Err(msg) = self.refresh_contract_runtime() {
return (Duration::ZERO, fatal!(effect_builder, "{}", msg).ignore());
}
// purge to avoid polluting the status endpoints w/ stale state
info!("CatchUp: switch to KeepUp");
self.block_synchronizer.purge();
self.state = ReactorState::KeepUp;
(Duration::ZERO, Effects::new())
// shut down instead of switching to KeepUp if catch up and shutdown mode is
// enabled
if self.sync_handling.is_complete_block() {
info!("CatchUp: immediate shutdown after catching up");
self.state = ReactorState::ShutdownAfterCatchingUp;
(Duration::ZERO, Effects::new())
} else {
// purge to avoid polluting the status endpoints w/ stale state
info!("CatchUp: switch to KeepUp");
self.block_synchronizer.purge();
self.state = ReactorState::KeepUp;
(Duration::ZERO, Effects::new())
}
}
},
ReactorState::KeepUp => match self.keep_up_instruction(effect_builder, rng) {
Expand Down Expand Up @@ -226,6 +234,12 @@ impl MainReactor {
}
}
}
ReactorState::ShutdownAfterCatchingUp => {
let effects = effect_builder.immediately().event(|()| {
MainEvent::ControlAnnouncement(ControlAnnouncement::ShutdownAfterCatchingUp)
});
(Duration::ZERO, effects)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions node/src/reactor/main_reactor/reactor_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
/// KeepUp --> Validate
/// Validate --> KeepUp
/// CatchUp --> ShutdownForUpgrade
/// CatchUp --> ShutdownAfterCatchingUp
/// KeepUp --> ShutdownForUpgrade
/// Validate --> ShutdownForUpgrade
/// CatchUp --> Upgrading
Expand Down Expand Up @@ -70,4 +71,6 @@ pub enum ReactorState {
Validate,
/// Node should be shut down for upgrade.
ShutdownForUpgrade,
/// Node should shut down after catching up.
ShutdownAfterCatchingUp,
}
67 changes: 67 additions & 0 deletions node/src/reactor/main_reactor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,73 @@ async fn should_not_historical_sync_no_sync_node() {
.unwrap_err();
}

#[tokio::test]
async fn should_catch_up_and_shutdown() {
let initial_stakes = InitialStakes::Random { count: 5 };
let spec_override = ConfigsOverride {
minimum_block_time: "4seconds".parse().unwrap(),
minimum_era_height: 2,
..Default::default()
};
let mut fixture = TestFixture::new(initial_stakes, Some(spec_override)).await;

// Wait for all nodes to complete block 1.
fixture.run_until_block_height(1, ONE_MIN).await;

// Create a joiner node.
let highest_block = fixture.highest_complete_block();
let trusted_hash = *highest_block.hash();
let trusted_height = highest_block.height();
assert!(
trusted_height > 0,
"trusted height must be non-zero to allow for checking that the joiner doesn't do \
historical syncing"
);

info!("joining node using block {trusted_height} {trusted_hash}");
let secret_key = SecretKey::random(&mut fixture.rng);
let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1);
config.node.sync_handling = SyncHandling::CompleteBlock;
let joiner_id = fixture
.add_node(Arc::new(secret_key), config, storage_dir)
.await;

let joiner_avail_range = |nodes: &Nodes| {
nodes
.get(&joiner_id)
.expect("should have joiner")
.main_reactor()
.storage()
.get_available_block_range()
};

// Run until the joiner shuts down after catching up
fixture
.network
.settle_on_node_exit(
&mut fixture.rng,
&joiner_id,
ExitCode::CleanExitDontRestart,
ONE_MIN,
)
.await;

let available_block_range = joiner_avail_range(fixture.network.nodes());

let low = available_block_range.low();
assert!(
low >= trusted_height,
"should not have acquired a block earlier than trusted hash block {low} {trusted_hash}",
);

let highest_block_height = fixture.highest_complete_block().height();
let high = available_block_range.high();
assert!(
low < high && high <= highest_block_height,
"should have acquired more recent blocks before shutting down {low} {high} {highest_block_height}",
);
}

#[tokio::test]
async fn run_equivocator_network() {
let mut rng = crate::new_rng();
Expand Down
3 changes: 3 additions & 0 deletions node/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ impl<REv: 'static> ComponentHarness<REv> {
ControlAnnouncement::ShutdownForUpgrade { .. } => {
panic!("a control announcement requesting a shutdown for upgrade was received")
}
ControlAnnouncement::ShutdownAfterCatchingUp { .. } => {
panic!("a control announcement requesting a shutdown after catching up was received")
}
fatal @ ControlAnnouncement::FatalError { .. } => {
panic!(
"a control announcement requesting a fatal error was received: {}",
Expand Down
63 changes: 63 additions & 0 deletions node/src/testing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,29 @@ where
.unwrap_or_else(|_| panic!("network did not settle on condition within {:?}", within))
}

/// Runs the main loop of every reactor until a specified node returns the expected exit code.
///
/// Panics if the node does not exit inside of `within`, or if any node returns an unexpected
/// exit code.
pub(crate) async fn settle_on_node_exit(
&mut self,
rng: &mut TestRng,
node_id: &NodeId,
expected: ExitCode,
within: Duration,
) {
time::timeout(
within,
self.settle_on_node_exit_indefinitely(rng, node_id, expected),
)
.await
.unwrap_or_else(|elapsed| {
panic!(
"network did not settle on condition within {within:?}, time elapsed: {elapsed:?}",
)
})
}

/// Keeps cranking the network until every reactor's specified component is in the given state.
///
/// # Panics
Expand Down Expand Up @@ -498,6 +521,46 @@ where
}
}

async fn settle_on_node_exit_indefinitely(
&mut self,
rng: &mut TestRng,
node_id: &NodeId,
expected: ExitCode,
) {
'outer: loop {
let mut event_count = 0;
for node in self.nodes.values_mut() {
let current_node_id = node.reactor().node_id();
match node
.try_crank(rng)
.instrument(error_span!("crank", node_id = %node_id))
.await
{
TryCrankOutcome::NoEventsToProcess => (),
TryCrankOutcome::ProcessedAnEvent => event_count += 1,
TryCrankOutcome::ShouldExit(exit_code)
if (exit_code == expected && current_node_id == *node_id) =>
{
debug!(?expected, ?node_id, "node exited with expected code");
break 'outer;
}
TryCrankOutcome::ShouldExit(exit_code) => {
panic!(
"unexpected exit: expected {expected:?} for node {node_id:?}, got {exit_code:?} for node {current_node_id:?}",
)
}
TryCrankOutcome::Exited => (),
}
}

if event_count == 0 {
// No events processed, wait for a bit to avoid 100% cpu usage.
Instant::advance_time(POLL_INTERVAL.as_millis() as u64);
time::sleep(POLL_INTERVAL).await;
}
}
}

/// Returns the internal map of nodes.
pub(crate) fn nodes(&self) -> &HashMap<NodeId, Runner<ConditionCheckReactor<R>>> {
&self.nodes
Expand Down
8 changes: 8 additions & 0 deletions node/src/types/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub enum SyncHandling {
Ttl,
/// Don't attempt to sync historical blocks.
NoSync,
/// Don't attempt to sync historical blocks and shut down node instead of switching to KeepUp
/// after acquiring the first complete block
CompleteBlock,
}

impl SyncHandling {
Expand All @@ -37,6 +40,11 @@ impl SyncHandling {
pub fn is_no_sync(&self) -> bool {
matches!(self, SyncHandling::NoSync)
}

/// Don't Sync and shut down?
pub fn is_complete_block(&self) -> bool {
matches!(self, SyncHandling::CompleteBlock)
}
}

/// Node fast-sync configuration.
Expand Down
9 changes: 8 additions & 1 deletion resources/test/rest_schema_status.json
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@
"enum": [
"ShutdownForUpgrade"
]
},
{
"description": "Node should shut down after catching up.",
"type": "string",
"enum": [
"ShutdownAfterCatchingUp"
]
}
]
},
Expand Down Expand Up @@ -423,4 +430,4 @@
"additionalProperties": false
}
}
}
}

0 comments on commit 1305f65

Please sign in to comment.