Skip to content

Commit

Permalink
Merge pull request #3031 from massalabs/add_exec_logs
Browse files Browse the repository at this point in the history
Add exec logs
  • Loading branch information
damip authored Sep 17, 2022
2 parents 1feffc9 + b4fe19c commit f0ae0c3
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 2 deletions.
20 changes: 20 additions & 0 deletions massa-execution-worker/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use massa_models::{block::BlockId, slot::Slot};
use massa_storage::Storage;
use parking_lot::{Condvar, Mutex, RwLock};
use std::collections::{BTreeMap, HashMap};
use std::fmt::Display;
use std::sync::Arc;
use tracing::info;

Expand All @@ -33,6 +34,25 @@ pub(crate) struct ExecutionInputData {
pub readonly_requests: RequestQueue<ReadOnlyExecutionRequest, ExecutionOutput>,
}

impl Display for ExecutionInputData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"stop={:?}, finalized={:?}, blockclique={:?}, readonly={:?}",
self.stop,
self.finalized_blocks
.iter()
.map(|(slot, (id, _))| (*slot, *id))
.collect::<BTreeMap<Slot, BlockId>>(),
self.new_blockclique.as_ref().map(|bq| bq
.iter()
.map(|(slot, (id, _))| (*slot, *id))
.collect::<BTreeMap<Slot, BlockId>>()),
self.readonly_requests
)
}
}

impl ExecutionInputData {
/// Creates a new empty `ExecutionInputData`
pub fn new(config: ExecutionConfig) -> Self {
Expand Down
2 changes: 2 additions & 0 deletions massa-execution-worker/src/request_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::VecDeque;
use std::sync::mpsc::Sender;

/// Represents an execution request T coupled with an MPSC sender for a result of type R
#[derive(Debug)]
pub(crate) struct RequestWithResponseSender<T, R> {
/// The underlying execution request
request: T,
Expand Down Expand Up @@ -47,6 +48,7 @@ impl<T, R> RequestWithResponseSender<T, R> {
/// Structure representing an execution request queue with maximal length.
/// Each request is a `RequestWithResponseSender` that comes with an MPSC sender
/// to return the execution result when the execution is over (or an error).
#[derive(Debug)]
pub(crate) struct RequestQueue<T, R> {
/// Max number of item in the queue.
/// When the queue is full, extra new items are cancelled and dropped.
Expand Down
32 changes: 30 additions & 2 deletions massa-execution-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use massa_storage::Storage;
use massa_time::MassaTime;
use parking_lot::{Condvar, Mutex, RwLock};
use std::{collections::HashMap, sync::Arc};
use tracing::{info, warn};
use tracing::{debug, info, warn};

/// Structure gathering all elements needed by the execution thread
pub(crate) struct ExecutionThread {
Expand Down Expand Up @@ -252,9 +252,13 @@ impl ExecutionThread {
return false;
}

debug!("entered execute_one_final_slot");

// w-lock execution state
let mut exec_state = self.execution_state.write();

debug!("locked execution state in execute_one_final_slot");

// get the slot just after the last executed final slot
let slot = exec_state
.final_cursor
Expand All @@ -269,12 +273,19 @@ impl ExecutionThread {
let target_id = exec_target.as_ref().map(|(b_id, _)| *b_id);

// check if the final slot is cached at the front of the speculative execution history
debug!(
"execute_one_final_slot: checking cache for slot {} target {:?}",
slot,
exec_target.as_ref().map(|(s, _)| *s)
);
if let Some(exec_out) = exec_state.pop_first_execution_result() {
if exec_out.slot == slot && exec_out.block_id == target_id {
// speculative execution front result matches what we want to compute

// apply the cached output and return
exec_state.apply_final_execution_output(exec_out);

debug!("execute_one_final_slot: found in cache, applied cache");
return true;
} else {
// speculative cache mismatch
Expand All @@ -295,10 +306,13 @@ impl ExecutionThread {
exec_state.clear_history();

// execute slot
debug!("execute_one_final_slot: execution started");
let exec_out = exec_state.execute_slot(slot, exec_target, &self.selector);
debug!("execute_one_final_slot: execution finished");

// apply execution output to final state
exec_state.apply_final_execution_output(exec_out);
debug!("execute_one_final_slot: execution result applied");

true
}
Expand All @@ -322,7 +336,9 @@ impl ExecutionThread {
/// returns true if something was executed
fn execute_one_active_slot(&mut self) -> bool {
// write-lock the execution state
debug!("execute_one_active_slot: execution started");
let mut exec_state = self.execution_state.write();
debug!("execute_one_active_slot: execution state locked");

// get the next active slot
let slot = exec_state
Expand All @@ -333,14 +349,24 @@ impl ExecutionThread {
// choose the execution target
let exec_target = match self.active_slots.get(&slot) {
Some(b_store) => b_store.as_ref().map(|(b_id, bs)| (*b_id, bs.clone())),
_ => return false,
_ => {
debug!("execute_one_active_slot: no target for slot {}", slot);
return false;
}
};

// execute the slot
debug!(
"execute_one_active_slot: executing slot {} target = {:?}",
slot,
exec_target.as_ref().map(|(s, _)| *s)
);
let exec_out = exec_state.execute_slot(slot, exec_target, &self.selector);
debug!("execute_one_active_slot: execution sfinished");

// apply execution output to active state
exec_state.apply_active_execution_output(exec_out);
debug!("execute_one_active_slot: execution state applied");

true
}
Expand Down Expand Up @@ -539,6 +565,8 @@ impl ExecutionThread {
// 2 - speculative executions
// 3 - read-only executions
while let Some(input_data) = self.wait_loop_event() {
debug!("Execution loop triggered, input_data = {}", input_data);

// update the sequence of final slots given the newly finalized blocks
self.update_final_slots(input_data.finalized_blocks);

Expand Down

0 comments on commit f0ae0c3

Please sign in to comment.