From a94b6ac52cbb892c05fa613c075df849b4073c7c Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 14:35:17 -0300 Subject: [PATCH 01/13] add initial append block --- src/eth/consensus/server.rs | 49 +++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 382392dc0..0c1f64866 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -87,8 +87,16 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + if consensus.is_leader() { tracing::error!(sender = request_inner.leader_id, "append_block_commit called on leader node"); return Err(Status::new( @@ -103,7 +111,48 @@ impl AppendEntryService for AppendEntryServiceImpl { tracing::info!(number = block_entry.number, "appending new block"); + // Return error if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm + if let Some(prev_entry) = consensus.log_entries_storage.get_entry(request_inner.prev_log_index).ok().flatten() { + if prev_entry.term != request_inner.prev_log_term { + let error_message = format!( + "prevLogIndex term mismatch: expected {}, found {} at index {}", + request_inner.prev_log_term, prev_entry.term, request_inner.prev_log_index + ); + tracing::error!( + prev_log_index = request_inner.prev_log_index, + expected_term = request_inner.prev_log_term, + actual_term = prev_entry.term, + "{}", + &error_message + ); + return Err(Status::new((StatusCode::LogMismatch as i32).into(), error_message)); + } + } else { + let error_message = format!("No entry found at prevLogIndex {}", request_inner.prev_log_index); + tracing::error!(prev_log_index = request_inner.prev_log_index, "{}", &error_message); + return Err(Status::new((StatusCode::LogMismatch as i32).into(), error_message)); + } + + // TODO: resolve log inconsistency instead? let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); + if request_inner.prev_log_index != last_last_arrived_block_number { + tracing::error!( + "prevLogIndex mismatch: expected {}, got {}", + last_last_arrived_block_number, + request_inner.prev_log_index + ); + return Err(Status::invalid_argument("empty block entry")); + } + + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::BlockEntry(block_entry.clone()); + + #[cfg(feature = "rocks")] + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From 9dd82e1aee7828387c2c821c244a3c7b71c58ac0 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 14:47:58 -0300 Subject: [PATCH 02/13] fmt --- src/eth/consensus/server.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 0c1f64866..174ed7796 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -136,12 +136,12 @@ impl AppendEntryService for AppendEntryServiceImpl { // TODO: resolve log inconsistency instead? let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); if request_inner.prev_log_index != last_last_arrived_block_number { - tracing::error!( + let error_message = format!( "prevLogIndex mismatch: expected {}, got {}", - last_last_arrived_block_number, - request_inner.prev_log_index + last_last_arrived_block_number, request_inner.prev_log_index ); - return Err(Status::invalid_argument("empty block entry")); + tracing::error!("{}", &error_message); + return Err(Status::invalid_argument(error_message)); } let index = request_inner.prev_log_index + 1; From d3fd65d72866fb524f90a5d41607b92396a86df1 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 14:56:04 -0300 Subject: [PATCH 03/13] fix --- src/eth/consensus/server.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 174ed7796..f24661226 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -90,13 +90,6 @@ impl AppendEntryService for AppendEntryServiceImpl { let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); - // Return error if request term < current term - if request_inner.term < current_term { - let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - } - if consensus.is_leader() { tracing::error!(sender = request_inner.leader_id, "append_block_commit called on leader node"); return Err(Status::new( @@ -105,6 +98,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; From 278f4bb4d600cb9cda1386907e2d59d1485454d6 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:04:26 -0300 Subject: [PATCH 04/13] revert --- src/eth/consensus/server.rs | 49 ------------------------------------- 1 file changed, 49 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index f24661226..382392dc0 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -87,7 +87,6 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; - let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -98,61 +97,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } - // Return error if request term < current term - if request_inner.term < current_term { - let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - } - let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; tracing::info!(number = block_entry.number, "appending new block"); - // Return error if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm - if let Some(prev_entry) = consensus.log_entries_storage.get_entry(request_inner.prev_log_index).ok().flatten() { - if prev_entry.term != request_inner.prev_log_term { - let error_message = format!( - "prevLogIndex term mismatch: expected {}, found {} at index {}", - request_inner.prev_log_term, prev_entry.term, request_inner.prev_log_index - ); - tracing::error!( - prev_log_index = request_inner.prev_log_index, - expected_term = request_inner.prev_log_term, - actual_term = prev_entry.term, - "{}", - &error_message - ); - return Err(Status::new((StatusCode::LogMismatch as i32).into(), error_message)); - } - } else { - let error_message = format!("No entry found at prevLogIndex {}", request_inner.prev_log_index); - tracing::error!(prev_log_index = request_inner.prev_log_index, "{}", &error_message); - return Err(Status::new((StatusCode::LogMismatch as i32).into(), error_message)); - } - - // TODO: resolve log inconsistency instead? let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); - if request_inner.prev_log_index != last_last_arrived_block_number { - let error_message = format!( - "prevLogIndex mismatch: expected {}, got {}", - last_last_arrived_block_number, request_inner.prev_log_index - ); - tracing::error!("{}", &error_message); - return Err(Status::invalid_argument(error_message)); - } - - let index = request_inner.prev_log_index + 1; - let term = request_inner.prev_log_term; - let data = LogEntryData::BlockEntry(block_entry.clone()); - - #[cfg(feature = "rocks")] - if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { - tracing::error!("Failed to save log entry: {:?}", e); - return Err(Status::internal("Failed to save log entry")); - } //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From 1787959882f9d90697b4679376ad550b889752b9 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:11:14 -0300 Subject: [PATCH 05/13] test --- src/eth/consensus/server.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 382392dc0..02cbcb0f2 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -87,6 +87,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -97,13 +98,39 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; tracing::info!(number = block_entry.number, "appending new block"); + // TODO: resolve log inconsistency instead? let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); + if request_inner.prev_log_index != last_last_arrived_block_number { + tracing::error!( + "prevLogIndex mismatch: expected {}, got {}", + last_last_arrived_block_number, + request_inner.prev_log_index + ); + return Err(Status::invalid_argument("empty block entry")); + } + + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::BlockEntry(block_entry.clone()); + + #[cfg(feature = "rocks")] + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From e63816cccdd33bba0466404d1af04548367e704b Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:14:17 -0300 Subject: [PATCH 06/13] comment out --- src/eth/consensus/server.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 02cbcb0f2..7ff0eca79 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -112,15 +112,15 @@ impl AppendEntryService for AppendEntryServiceImpl { tracing::info!(number = block_entry.number, "appending new block"); // TODO: resolve log inconsistency instead? - let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); - if request_inner.prev_log_index != last_last_arrived_block_number { - tracing::error!( - "prevLogIndex mismatch: expected {}, got {}", - last_last_arrived_block_number, - request_inner.prev_log_index - ); - return Err(Status::invalid_argument("empty block entry")); - } + //let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); + //if request_inner.prev_log_index != last_last_arrived_block_number { + // tracing::error!( + // "prevLogIndex mismatch: expected {}, got {}", + // last_last_arrived_block_number, + // request_inner.prev_log_index + // ); + // return Err(Status::invalid_argument("empty block entry")); + //} let index = request_inner.prev_log_index + 1; let term = request_inner.prev_log_term; From 2be41fc375cfe366e6225e00d0a2143851620fbd Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:17:25 -0300 Subject: [PATCH 07/13] comment out --- src/eth/consensus/server.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 7ff0eca79..0e3c404ea 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -122,15 +122,15 @@ impl AppendEntryService for AppendEntryServiceImpl { // return Err(Status::invalid_argument("empty block entry")); //} - let index = request_inner.prev_log_index + 1; - let term = request_inner.prev_log_term; - let data = LogEntryData::BlockEntry(block_entry.clone()); - - #[cfg(feature = "rocks")] - if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { - tracing::error!("Failed to save log entry: {:?}", e); - return Err(Status::internal("Failed to save log entry")); - } + //let index = request_inner.prev_log_index + 1; + //let term = request_inner.prev_log_term; + //let data = LogEntryData::BlockEntry(block_entry.clone()); + + //#[cfg(feature = "rocks")] + //if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + // tracing::error!("Failed to save log entry: {:?}", e); + // return Err(Status::internal("Failed to save log entry")); + //} //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From b9141aae35a5ba25c24bd99da560f3d00a673e19 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:21:29 -0300 Subject: [PATCH 08/13] revert --- src/eth/consensus/server.rs | 29 +---------------------------- 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 0e3c404ea..382392dc0 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -87,7 +87,6 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; - let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -98,39 +97,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } - // Return error if request term < current term - if request_inner.term < current_term { - let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - } - let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; tracing::info!(number = block_entry.number, "appending new block"); - // TODO: resolve log inconsistency instead? - //let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); - //if request_inner.prev_log_index != last_last_arrived_block_number { - // tracing::error!( - // "prevLogIndex mismatch: expected {}, got {}", - // last_last_arrived_block_number, - // request_inner.prev_log_index - // ); - // return Err(Status::invalid_argument("empty block entry")); - //} - - //let index = request_inner.prev_log_index + 1; - //let term = request_inner.prev_log_term; - //let data = LogEntryData::BlockEntry(block_entry.clone()); - - //#[cfg(feature = "rocks")] - //if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { - // tracing::error!("Failed to save log entry: {:?}", e); - // return Err(Status::internal("Failed to save log entry")); - //} + let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From f7de3eb7829a356a75558bcf5bc065be7d181b67 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:36:57 -0300 Subject: [PATCH 09/13] test --- src/eth/consensus/server.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 382392dc0..5869428f3 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -103,6 +103,16 @@ impl AppendEntryService for AppendEntryServiceImpl { tracing::info!(number = block_entry.number, "appending new block"); + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::BlockEntry(block_entry.clone()); + + #[cfg(feature = "rocks")] + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } + let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { From 53a3458a3a1bc658f0776e7a66c58907f8fa2d3d Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:56:10 -0300 Subject: [PATCH 10/13] test term validation --- src/eth/consensus/server.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 5869428f3..7b7eb3850 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -42,6 +42,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -52,6 +53,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let executions = request_inner.executions; let index = request_inner.prev_log_index + 1; let term = request_inner.prev_log_term; @@ -87,6 +95,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -97,6 +106,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; From f3645767c2e3272d37cca9d8f7ac938db961e8cc Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 16:01:06 -0300 Subject: [PATCH 11/13] update e2e check --- chaos/experiments/leader-election.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index 6fbe8fcfa..c873a0dd7 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -91,7 +91,7 @@ check_leader() { # Check the response for specific strings to determine the node status if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then return 0 # Success exit code for leader - elif [[ "$response" == *"APPEND_SUCCESS"* ]]; then + elif [[ "$response" == *"TERM_MISMATCH"* ]]; then return 1 # Failure exit code for non-leader fi } From b434dbc9b8ded070d692c082cc44a7e4de3166ba Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 16:18:51 -0300 Subject: [PATCH 12/13] test --- chaos/experiments/leader-election.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index c873a0dd7..99208bf91 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -91,7 +91,7 @@ check_leader() { # Check the response for specific strings to determine the node status if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then return 0 # Success exit code for leader - elif [[ "$response" == *"TERM_MISMATCH"* ]]; then + else return 1 # Failure exit code for non-leader fi } From f3e8927dec5d038af0f43914de9b1f9845e88e02 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 16:26:19 -0300 Subject: [PATCH 13/13] revert --- chaos/experiments/leader-election.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index 99208bf91..6fbe8fcfa 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -91,7 +91,7 @@ check_leader() { # Check the response for specific strings to determine the node status if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then return 0 # Success exit code for leader - else + elif [[ "$response" == *"APPEND_SUCCESS"* ]]; then return 1 # Failure exit code for non-leader fi }