Skip to content

Commit

Permalink
Merge branch 'implement-append-entries-to-log-storage' of github.com:…
Browse files Browse the repository at this point in the history
…cloudwalk/stratus into follower-mode
  • Loading branch information
renancloudwalk committed Jun 27, 2024
2 parents 052229f + 1c40ef5 commit 3a3de4a
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/eth/consensus/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl AppendEntryService for AppendEntryServiceImpl {
let start = std::time::Instant::now();

let consensus = self.consensus.lock().await;
let current_term = consensus.current_term.load(Ordering::SeqCst);
let request_inner = request.into_inner();

if consensus.is_leader() {
Expand All @@ -53,6 +54,13 @@ impl AppendEntryService for AppendEntryServiceImpl {
));
}

// Return error if request term < current term
if request_inner.term < current_term {
let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term);
tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message);
return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message));
}

let executions = request_inner.executions;
let index = request_inner.prev_log_index + 1;
let term = request_inner.prev_log_term;
Expand Down Expand Up @@ -88,6 +96,7 @@ impl AppendEntryService for AppendEntryServiceImpl {
let start = std::time::Instant::now();

let consensus = self.consensus.lock().await;
let current_term = consensus.current_term.load(Ordering::SeqCst);
let request_inner = request.into_inner();

if consensus.is_leader() {
Expand All @@ -98,12 +107,29 @@ impl AppendEntryService for AppendEntryServiceImpl {
));
}

// Return error if request term < current term
if request_inner.term < current_term {
let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term);
tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message);
return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message));
}

let Some(block_entry) = request_inner.block_entry else {
return Err(Status::invalid_argument("empty block entry"));
};

tracing::info!(number = block_entry.number, "appending new block");

let index = request_inner.prev_log_index + 1;
let term = request_inner.prev_log_term;
let data = LogEntryData::BlockEntry(block_entry.clone());

#[cfg(feature = "rocks")]
if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") {
tracing::error!("Failed to save log entry: {:?}", e);
return Err(Status::internal("Failed to save log entry"));
}

let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst);

//TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else {
Expand Down

0 comments on commit 3a3de4a

Please sign in to comment.