Skip to content

Commit

Permalink
Improve the start vm_thread
Browse files Browse the repository at this point in the history
Move the thread_vm start in the new()
Remove the condvar workaround from thread::spawn
  • Loading branch information
adrien-zinger committed Dec 13, 2021
1 parent 3276f1b commit 53d26b7
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 97 deletions.
23 changes: 10 additions & 13 deletions massa-execution/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use massa_models::{Block, BlockHashMap};
use std::collections::VecDeque;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{error, info};

/// A sender of execution commands.
#[derive(Clone)]
Expand Down Expand Up @@ -35,16 +36,15 @@ pub struct ExecutionManager {
}

impl ExecutionManager {
pub async fn stop(
self,
execution_event_receiver: ExecutionEventReceiver,
) -> Result<(), ExecutionError> {
pub async fn stop(self) -> Result<(), ExecutionError> {
drop(self.manager_tx);
execution_event_receiver.drain().await;
match self.join_handle.await {
Err(_) => Err(ExecutionError::JoinError),
_ => Ok(()),
}
if let Err(err) = self.join_handle.await {
error!("execution worker crashed: {}", err);
return Err(ExecutionError::JoinError);
};

info!("execution worker finished cleanly");
Ok(())
}
}

Expand Down Expand Up @@ -72,16 +72,13 @@ pub async fn start_controller(

// Unbounded, as execution is limited per metering already.
let (event_tx, event_rx) = mpsc::unbounded_channel::<ExecutionEvent>();

let mut worker = ExecutionWorker::new(cfg, thread_count, event_tx, command_rx, manager_rx)?;

let worker = ExecutionWorker::new(cfg, thread_count, event_tx, command_rx, manager_rx)?;
let join_handle = tokio::spawn(async move {
match worker.run_loop().await {
Err(err) => Err(err),
Ok(v) => Ok(v),
}
});

Ok((
ExecutionCommandSender(command_tx),
ExecutionEventReceiver(event_rx),
Expand Down
14 changes: 4 additions & 10 deletions massa-execution/src/tests/scenarios_mandatories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,21 @@ async fn test_execution_basic() {
#[tokio::test]
#[serial]
async fn test_execution_shutdown() {
let (_command_sender, event_receiver, manager) = start_controller(ExecutionConfig {}, 2)
let (_command_sender, _event_receiver, manager) = start_controller(ExecutionConfig {}, 2)
.await
.expect("Failed to start execution.");
manager
.stop(event_receiver)
.await
.expect("Failed to stop execution.");
manager.stop().await.expect("Failed to stop execution.");
}

#[tokio::test]
#[serial]
async fn test_sending_command() {
let (mut command_sender, event_receiver, manager) = start_controller(ExecutionConfig {}, 2)
let (mut command_sender, _event_receiver, manager) = start_controller(ExecutionConfig {}, 2)
.await
.expect("Failed to start execution.");
command_sender
.update_blockclique(Default::default(), Default::default())
.await
.expect("Failed to send command");
manager
.stop(event_receiver)
.await
.expect("Failed to stop execution.");
manager.stop().await.expect("Failed to stop execution.");
}
2 changes: 2 additions & 0 deletions massa-execution/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ pub(crate) enum ExecutionRequest {
ResetToFinalState,
/// Shutdown state, set by the worker to signal shutdown to the VM thread.
Shutdown,
/// Starting state
Starting,
}

pub(crate) type ExecutionQueue = Arc<(Mutex<VecDeque<ExecutionRequest>>, Condvar)>;
122 changes: 50 additions & 72 deletions massa-execution/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::vm::VM;
use crate::{config::ExecutionConfig, types::ExecutionStep};
use massa_models::{Block, BlockHashMap, BlockId, Slot};
use tokio::sync::mpsc;
use tracing::debug;

/// Commands sent to the `execution` component.
#[derive(Debug)]
Expand Down Expand Up @@ -48,22 +49,44 @@ pub struct ExecutionWorker {
/// pending CSS final blocks
ordered_pending_css_final_blocks: Vec<(BlockId, Block)>,
/// VM
vm_thread: Option<JoinHandle<()>>,
vm_thread: JoinHandle<()>,
/// VM execution requests queue
request_queue: ExecutionQueue,
execution_queue: ExecutionQueue,
}

impl ExecutionWorker {
pub fn new(
_cfg: ExecutionConfig,
cfg: ExecutionConfig,
thread_count: u8,
event_sender: mpsc::UnboundedSender<ExecutionEvent>,
controller_command_rx: mpsc::Receiver<ExecutionCommand>,
controller_manager_rx: mpsc::Receiver<ExecutionManagementCommand>,
) -> Result<ExecutionWorker, ExecutionError> {
let execution_queue = ExecutionQueue::default();
let execution_queue_clone = execution_queue.clone();
let cfg_clone = cfg.clone();
// Start vm thread
let vm_thread = thread::spawn(move || {
let mut vm = VM::new(cfg);
let (lock, condvar) = &*execution_queue_clone;
let mut requests = lock.lock().unwrap();
requests.push_back(ExecutionRequest::Starting);
// Run until shutdown.
loop {
match &requests.pop_front() {
Some(ExecutionRequest::RunFinalStep(step)) => vm.run_final_step(step),
Some(ExecutionRequest::RunActiveStep(step)) => vm.run_active_step(step),
Some(ExecutionRequest::ResetToFinalState) => vm.reset_to_final(),
Some(ExecutionRequest::Shutdown) => return,
Some(ExecutionRequest::Starting) => {}
None => panic!("Unexpected request None"),
};
requests = condvar.wait(requests).unwrap();
}
});
// return execution worker
Ok(ExecutionWorker {
_cfg,
_cfg: cfg_clone,
thread_count,
controller_command_rx,
controller_manager_rx,
Expand All @@ -73,121 +96,76 @@ impl ExecutionWorker {
last_active_slot: Slot::new(0, 0),
ordered_active_blocks: Default::default(),
ordered_pending_css_final_blocks: Default::default(),
vm_thread: None,
request_queue: ExecutionQueue::default(),
vm_thread,
execution_queue,
})
}

fn start_thread(&mut self) {
let reqs_clone = self.request_queue.clone();
let cfg = self._cfg.clone();
// Start vm thread
self.vm_thread = Some(thread::spawn(move || {
let mut vm = VM::new(cfg);
// Run until shutdown.
let condvar = &reqs_clone.1;
condvar.notify_one();
loop {
let (lock, condvar) = &*reqs_clone;
let lock = lock.lock().unwrap();
let mut requests = condvar.wait(lock).unwrap();
if let Some(request) = &requests.pop_front() {
match request {
ExecutionRequest::RunFinalStep(step) => vm.run_final_step(step),
ExecutionRequest::RunActiveStep(step) => vm.run_active_step(step),
ExecutionRequest::ResetToFinalState => vm.reset_to_final(),
ExecutionRequest::Shutdown => return,
};
} else {
panic!("Unexpected execution queue state.")
}
}
}));
// Wait for the VM thread to have started
let _started_flag = self
.request_queue
.1
.wait(self.request_queue.0.lock().unwrap())
.unwrap();
}

// asks the VM to reset to its final
pub fn reset_to_final(&mut self) {
let (queue_lock, condvar) = &*self.request_queue;
let (queue_lock, condvar) = &*self.execution_queue;
let queue_guard = &mut queue_lock.lock().unwrap();
// cancel all non-final requests
// Final execution requests are left to maintain final state consistency
queue_guard.retain(|req| match req {
ExecutionRequest::RunFinalStep(..) => true,
ExecutionRequest::RunActiveStep(..) => false,
ExecutionRequest::ResetToFinalState => false,
ExecutionRequest::Shutdown => true,
queue_guard.retain(|req| {
matches!(
req,
ExecutionRequest::RunFinalStep(..) | ExecutionRequest::Shutdown
)
});
// request reset to final state
queue_guard.push_back(ExecutionRequest::ResetToFinalState);
// notify
condvar.notify_one();
}

fn stop_thread(&mut self) -> anyhow::Result<()> {
self.push_request(ExecutionRequest::Shutdown);
if let Some(th) = self.vm_thread.take() {
match th.join() {
Err(_) => anyhow::bail!("Failed joining vm thread"),
_ => return Ok(()),
}
}
anyhow::bail!("Failed joining vm thread")
}

/// runs an SCE-active step (slot)
///
/// # Arguments
/// * slot: target slot
/// * block: None if miss, Some(block_id, block) otherwise
fn push_request(&mut self, request: ExecutionRequest) {
let (queue_lock, condvar) = &*self.request_queue;
let (queue_lock, condvar) = &*self.execution_queue;
let queue_guard = &mut queue_lock.lock().unwrap();
queue_guard.push_back(request);
condvar.notify_one();
}

pub async fn run_loop(&mut self) -> Result<(), ExecutionError> {
self.start_thread();
pub async fn run_loop(mut self) -> Result<(), ExecutionError> {
loop {
tokio::select! {
// Process management commands
cmd = self.controller_manager_rx.recv() => {
self.stop_thread().unwrap();
match cmd {
None => break,
Some(_) => {}
}
},
_ = self.controller_manager_rx.recv() => break,

// Process commands
Some(cmd) = self.controller_command_rx.recv() => self.process_command(cmd).await?,
Some(cmd) = self.controller_command_rx.recv() => self.process_command(cmd)?,
}
}
// Shutdown VM, cancel all pending execution requests
self.push_request(ExecutionRequest::Shutdown);
if self.vm_thread.join().is_err() {
debug!("Failed joining vm thread")
}
Ok(())
}

/// Process a given command.
/// Proces a given command.
///
/// # Argument
/// * cmd: command to process
async fn process_command(&mut self, cmd: ExecutionCommand) -> Result<(), ExecutionError> {
fn process_command(&mut self, cmd: ExecutionCommand) -> Result<(), ExecutionError> {
match cmd {
ExecutionCommand::BlockCliqueChanged {
blockclique,
finalized_blocks,
} => {
self.blockclique_changed(blockclique, finalized_blocks)
.await?;
self.blockclique_changed(blockclique, finalized_blocks)?;
}
}
Ok(())
}

async fn blockclique_changed(
fn blockclique_changed(
&mut self,
blockclique: BlockHashMap<Block>,
finalized_blocks: BlockHashMap<Block>,
Expand Down
4 changes: 2 additions & 2 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ async fn stop(
api_private_handle.stop();

// stop consensus controller
let (protocol_event_receiver, execution_event_receiver) = consensus_manager
let (protocol_event_receiver, _execution_event_receiver) = consensus_manager
.stop(consensus_event_receiver)
.await
.expect("consensus shutdown failed");

// Stop execution controller.
execution_manager
.stop(execution_event_receiver)
.stop()
.await
.expect("Failed to shutdown execution.");

Expand Down

0 comments on commit 53d26b7

Please sign in to comment.