Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Append Entries to Log #1199

Merged
merged 83 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
ba60eaa
initial commit
gabriel-aranha-cw Jun 21, 2024
d31316f
Merge branch 'main' into implement-append-entries-to-log-storage
gabriel-aranha-cw Jun 21, 2024
07178d0
merge
gabriel-aranha-cw Jun 21, 2024
7d66d37
Merge branch 'implement-append-entries-to-log-storage' of https://git…
gabriel-aranha-cw Jun 21, 2024
94c24dc
implement custom storage prefix
gabriel-aranha-cw Jun 21, 2024
20320ca
fix var name
gabriel-aranha-cw Jun 21, 2024
c824dc9
fix e2e clean up
gabriel-aranha-cw Jun 21, 2024
d4bd2ef
change clone
gabriel-aranha-cw Jun 21, 2024
92d0ee3
debug logs
gabriel-aranha-cw Jun 21, 2024
731efa3
refactor
gabriel-aranha-cw Jun 21, 2024
67ad762
Merge branch 'main' into implement-append-entries-to-log-storage
gabriel-aranha-cw Jun 21, 2024
69599d8
lint
gabriel-aranha-cw Jun 21, 2024
0f81353
test fix
gabriel-aranha-cw Jun 21, 2024
8bd833d
temp fix
gabriel-aranha-cw Jun 21, 2024
14f65ad
revert
gabriel-aranha-cw Jun 21, 2024
928453e
add todo
gabriel-aranha-cw Jun 21, 2024
4851e16
implement tx execution save log
gabriel-aranha-cw Jun 21, 2024
1a41013
fmt
gabriel-aranha-cw Jun 21, 2024
0ca93ec
lint
gabriel-aranha-cw Jun 21, 2024
8c713e4
add block back
gabriel-aranha-cw Jun 21, 2024
d5ddc45
test
gabriel-aranha-cw Jun 21, 2024
d057de0
fix
gabriel-aranha-cw Jun 21, 2024
f4a4097
fix rocks flag
gabriel-aranha-cw Jun 21, 2024
16128ad
fix
gabriel-aranha-cw Jun 21, 2024
f11340a
refactor
gabriel-aranha-cw Jun 21, 2024
15d7182
lint
gabriel-aranha-cw Jun 21, 2024
9a3fef0
Merge branch 'main' into implement-append-entries-to-log-storage
gabriel-aranha-cw Jun 21, 2024
5e34376
Merge branch 'main' into implement-append-entries-to-log-storage
renancloudwalk Jun 23, 2024
775dfe2
chore: return error if we have a conflict appending entries (#1218)
renancloudwalk Jun 24, 2024
470c722
chore: save log entries at executions
renancloudwalk Jun 24, 2024
b566763
lint
renancloudwalk Jun 24, 2024
b72bed1
Merge branch 'main' into append-entries-at-raft-server
renancloudwalk Jun 24, 2024
e308fe7
chore: test server
renancloudwalk Jun 24, 2024
0a7cec4
Merge branch 'main' into implement-append-entries-to-log-storage
renancloudwalk Jun 24, 2024
1008ee2
Merge branch 'append-entries-at-raft-server' of github.com:cloudwalk/…
renancloudwalk Jun 24, 2024
1a91f27
Merge branch 'main' of github.com:cloudwalk/stratus into append-entri…
renancloudwalk Jun 24, 2024
5d6c628
fix: append log entries test
renancloudwalk Jun 24, 2024
d5c49d9
lint
renancloudwalk Jun 24, 2024
79ba3b5
Merge branch 'append-entries-at-raft-server' of github.com:cloudwalk/…
renancloudwalk Jun 24, 2024
b6bd324
add conditional rocks save entry
gabriel-aranha-cw Jun 24, 2024
328eaf2
improving resiliency
gabriel-aranha-cw Jun 24, 2024
39547e9
remove debug logs
gabriel-aranha-cw Jun 24, 2024
5134ce6
improve
gabriel-aranha-cw Jun 24, 2024
27027b0
improve
gabriel-aranha-cw Jun 24, 2024
f2412fe
fmt
gabriel-aranha-cw Jun 24, 2024
b980f0b
fmt
gabriel-aranha-cw Jun 24, 2024
5a31ffc
fix broadcast
gabriel-aranha-cw Jun 24, 2024
8fce3a3
remove log
gabriel-aranha-cw Jun 24, 2024
03079a2
lint
gabriel-aranha-cw Jun 24, 2024
56eb883
add save_log_entry tests
gabriel-aranha-cw Jun 24, 2024
0907d71
comment
gabriel-aranha-cw Jun 24, 2024
4d957b1
Merge branch 'main' into implement-append-entries-to-log-storage
gabriel-aranha-cw Jun 24, 2024
85ab899
Merge branch 'main' into implement-append-entries-to-log-storage
gabriel-aranha-cw Jun 25, 2024
50cf104
fix lint
gabriel-aranha-cw Jun 25, 2024
8ecc967
Merge branch 'main' into implement-append-entries-to-log-storage
gabriel-aranha-cw Jun 25, 2024
62e5fe2
fix conflicts
gabriel-aranha-cw Jun 25, 2024
adbf89a
Merge branch 'main' of github.com:cloudwalk/stratus into implement-ap…
renancloudwalk Jun 26, 2024
19ff157
Merge branch 'main' into implement-append-entries-to-log-storage
gabriel-aranha-cw Jun 26, 2024
6c1d8e1
Merge branch 'main' into implement-append-entries-to-log-storage
renancloudwalk Jun 26, 2024
a94b6ac
add initial append block
gabriel-aranha-cw Jun 26, 2024
9dd82e1
fmt
gabriel-aranha-cw Jun 26, 2024
d3fd65d
fix
gabriel-aranha-cw Jun 26, 2024
278f4bb
revert
gabriel-aranha-cw Jun 26, 2024
1787959
test
gabriel-aranha-cw Jun 26, 2024
e63816c
comment out
gabriel-aranha-cw Jun 26, 2024
2be41fc
comment out
gabriel-aranha-cw Jun 26, 2024
b9141aa
revert
gabriel-aranha-cw Jun 26, 2024
f7de3eb
test
gabriel-aranha-cw Jun 26, 2024
53a3458
test term validation
gabriel-aranha-cw Jun 26, 2024
f364576
update e2e check
gabriel-aranha-cw Jun 26, 2024
b434dbc
test
gabriel-aranha-cw Jun 26, 2024
f3e8927
revert
gabriel-aranha-cw Jun 26, 2024
1c40ef5
Merge branch 'main' into implement-append-entries-to-log-storage
gabriel-aranha-cw Jun 27, 2024
286d719
revert
gabriel-aranha-cw Jun 27, 2024
5e78af1
fix main merge
gabriel-aranha-cw Jun 27, 2024
a811fac
type
gabriel-aranha-cw Jun 27, 2024
1298d07
fix
gabriel-aranha-cw Jun 27, 2024
158b95a
fix
gabriel-aranha-cw Jun 27, 2024
174d5f7
add term check
gabriel-aranha-cw Jun 27, 2024
525a95a
test unit
gabriel-aranha-cw Jun 27, 2024
ca394a8
fix
gabriel-aranha-cw Jun 27, 2024
054e736
doc
gabriel-aranha-cw Jun 27, 2024
a2d7a2d
Merge branch 'main' into implement-append-entries-to-log-storage
renancloudwalk Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions chaos/experiments/leader-election.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ check_leader() {
local grpc_address=$1

# Send the gRPC request using grpcurl and capture both stdout and stderr
response=$(grpcurl -import-path static/proto -proto append_entry.proto -plaintext -d '{"term": 5, "prevLogIndex": 0, "prevLogTerm": 4, "leader_id": "leader_id_value", "block_entry": {"number": 1, "hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transactions_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "gas_used": 999, "gas_limit": 999, "bloom": "ZXh0cmFfZGF0YV92YWx1ZQ==", "timestamp": 123456789, "parent_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "author": "ZXh0cmFfZGF0YV92YWx1ZQ==", "extra_data": "ZXh0cmFfZGF0YV92YWx1ZQ==", "miner": "ZXh0cmFfZGF0YV92YWx1ZQ==", "receipts_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "uncle_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "size": 12345, "state_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transaction_hashes": []}}' "$grpc_address" append_entry.AppendEntryService/AppendBlockCommit 2>&1)
response=$(grpcurl -import-path static/proto -proto append_entry.proto -plaintext -d '{"term": 999999999, "prevLogIndex": 0, "prevLogTerm": 0, "leader_id": "leader_id_value", "block_entry": {"number": 1, "hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transactions_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "gas_used": 999, "gas_limit": 999, "bloom": "ZXh0cmFfZGF0YV92YWx1ZQ==", "timestamp": 123456789, "parent_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "author": "ZXh0cmFfZGF0YV92YWx1ZQ==", "extra_data": "ZXh0cmFfZGF0YV92YWx1ZQ==", "miner": "ZXh0cmFfZGF0YV92YWx1ZQ==", "receipts_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "uncle_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "size": 12345, "state_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transaction_hashes": []}}' "$grpc_address" append_entry.AppendEntryService/AppendBlockCommit 2>&1)

# Check the response for specific strings to determine the node status
if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then
Expand All @@ -113,7 +113,7 @@ find_leader() {

# Function to remove rocks-path directory
remove_rocks_path() {
rm -rf data/
find . -type d -name "tmp_rocks_*" -print0 | xargs -0 rm -rf
}

# Function to run the election test
Expand Down
1 change: 1 addition & 0 deletions src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
let miner = config.miner.init_external_mode(Arc::clone(&storage), None)?;
let consensus = Consensus::new(
Arc::clone(&storage),
config.storage.perm_storage.rocks_path_prefix.clone(),
config.clone().candidate_peers.clone(),
Some(config.clone()),
config.address,
Expand Down
1 change: 0 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,6 @@ pub struct PermanentStorageConfig {
#[arg(long = "perm-storage", env = "PERM_STORAGE")]
pub perm_storage_kind: PermanentStorageKind,

#[cfg(feature = "rocks")]
/// RocksDB storage path prefix to execute multiple local Stratus instances.
#[arg(long = "rocks-path-prefix", env = "ROCKS_PATH_PREFIX")]
pub rocks_path_prefix: Option<String>,
Expand Down
140 changes: 136 additions & 4 deletions src/eth/consensus/append_log_entries_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,32 @@ use rocksdb::Options;
use rocksdb::DB;

use super::log_entry::LogEntry;
use super::log_entry::LogEntryData;

pub struct AppendLogEntriesStorage {
db: DB,
}

impl AppendLogEntriesStorage {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
pub fn new(path: Option<String>) -> Result<Self> {
let mut opts = Options::default();
opts.create_if_missing(true);

let path = if let Some(prefix) = path {
// run some checks on the given prefix
assert!(!prefix.is_empty(), "given prefix for RocksDB is empty, try not providing the flag");
if Path::new(&prefix).is_dir() || Path::new(&prefix).iter().count() > 1 {
tracing::warn!(?prefix, "given prefix for RocksDB might put it in another folder");
}

let path = format!("{prefix}-log-entries-rocksdb");
tracing::info!("starting rocksdb log entries storage - at custom path: '{:?}'", path);
path
} else {
tracing::info!("starting rocksdb log entries storage - at default path: 'data/log-entries-rocksdb'"); // TODO: keep inside data?
"data/log-entries-rocksdb".to_string()
};

let db = DB::open(&opts, path).context("Failed to open RocksDB")?;
Ok(Self { db })
}
Expand Down Expand Up @@ -81,6 +98,35 @@ impl AppendLogEntriesStorage {
None => Ok(0), // Default to 0 if not set
}
}

pub fn save_log_entry(&self, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<()> {
tracing::debug!(index, term, "Creating {} log entry", entry_type);
let log_entry = LogEntry { term, index, data };
tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type);

tracing::debug!("Checking for existing {} entry at new index", entry_type);
match self.get_entry(log_entry.index) {
Ok(Some(existing_entry)) =>
if existing_entry.term != log_entry.term {
tracing::warn!(
index = log_entry.index,
"Conflicting entry found, deleting existing entry and all that follow it"
);
self.delete_entries_from(log_entry.index)?;
},
Ok(None) => {
// No existing entry at this index, proceed to save the new entry
}
Err(e) => {
tracing::error!(index = log_entry.index, "Error retrieving entry: {}", e);
return Err(anyhow::anyhow!("Error retrieving entry: {}", e));
}
}

tracing::debug!("Appending new {} log entry", entry_type);
self.save_entry(&log_entry)
.map_err(|e| anyhow::anyhow!("Failed to append {} log entry: {}", entry_type, e))
}
}

#[cfg(test)]
Expand All @@ -89,12 +135,11 @@ mod tests {

use super::*;
use crate::eth::consensus::tests::factories::*;
use crate::eth::consensus::LogEntryData;

fn setup_storage() -> AppendLogEntriesStorage {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
AppendLogEntriesStorage::new(temp_path).unwrap()
let temp_path = temp_dir.path().to_str().expect("Failed to get temp path").to_string();
AppendLogEntriesStorage::new(Some(temp_path)).unwrap()
}

#[test]
Expand Down Expand Up @@ -192,4 +237,91 @@ mod tests {
panic!("Expected TransactionExecutionEntries");
}
}

#[test]
fn test_save_log_entry_no_conflict() {
let storage = setup_storage();
let index = 1;
let term = 1;
let log_entry_data = create_mock_log_entry_data_block();

storage.save_log_entry(index, term, log_entry_data.clone(), "block").unwrap();
let retrieved_entry = storage.get_entry(index).unwrap().unwrap();

assert_eq!(retrieved_entry.index, index);
assert_eq!(retrieved_entry.term, term);

if let LogEntryData::BlockEntry(ref block) = retrieved_entry.data {
if let LogEntryData::BlockEntry(ref expected_block) = log_entry_data {
assert_eq!(block.hash, expected_block.hash);
assert_eq!(block.number, expected_block.number);
assert_eq!(block.parent_hash, expected_block.parent_hash);
assert_eq!(block.uncle_hash, expected_block.uncle_hash);
assert_eq!(block.transactions_root, expected_block.transactions_root);
assert_eq!(block.state_root, expected_block.state_root);
assert_eq!(block.receipts_root, expected_block.receipts_root);
assert_eq!(block.miner, expected_block.miner);
assert_eq!(block.extra_data, expected_block.extra_data);
assert_eq!(block.size, expected_block.size);
assert_eq!(block.gas_limit, expected_block.gas_limit);
assert_eq!(block.gas_used, expected_block.gas_used);
assert_eq!(block.timestamp, expected_block.timestamp);
assert_eq!(block.bloom, expected_block.bloom);
assert_eq!(block.author, expected_block.author);
assert_eq!(block.transaction_hashes, expected_block.transaction_hashes);
} else {
panic!("Expected BlockEntry");
}
} else {
panic!("Expected BlockEntry");
}
}

#[test]
fn test_save_log_entry_with_conflict() {
let storage = setup_storage();
let index = 1;
let term = 1;
let conflicting_term = 2;
let log_entry_data = create_mock_log_entry_data_block();

// Save initial log entry
storage.save_log_entry(index, term, log_entry_data.clone(), "block").unwrap();

// Save conflicting log entry at the same index but with a different term
storage.save_log_entry(index, conflicting_term, log_entry_data.clone(), "block").unwrap();

// Assert no entries exist after the conflicting entry's index, confirming that the conflicting entry and all that follow it were deleted
assert!(storage.get_entry(index + 1).unwrap().is_none());

// Retrieve the entry at the index and assert it matches the conflicting term entry
let retrieved_entry = storage.get_entry(index).unwrap().unwrap();
assert_eq!(retrieved_entry.index, index);
assert_eq!(retrieved_entry.term, conflicting_term);

if let LogEntryData::BlockEntry(ref block) = retrieved_entry.data {
if let LogEntryData::BlockEntry(ref expected_block) = log_entry_data {
assert_eq!(block.hash, expected_block.hash);
assert_eq!(block.number, expected_block.number);
assert_eq!(block.parent_hash, expected_block.parent_hash);
assert_eq!(block.uncle_hash, expected_block.uncle_hash);
assert_eq!(block.transactions_root, expected_block.transactions_root);
assert_eq!(block.state_root, expected_block.state_root);
assert_eq!(block.receipts_root, expected_block.receipts_root);
assert_eq!(block.miner, expected_block.miner);
assert_eq!(block.extra_data, expected_block.extra_data);
assert_eq!(block.size, expected_block.size);
assert_eq!(block.gas_limit, expected_block.gas_limit);
assert_eq!(block.gas_used, expected_block.gas_used);
assert_eq!(block.timestamp, expected_block.timestamp);
assert_eq!(block.bloom, expected_block.bloom);
assert_eq!(block.author, expected_block.author);
assert_eq!(block.transaction_hashes, expected_block.transaction_hashes);
} else {
panic!("Expected BlockEntry");
}
} else {
panic!("Expected BlockEntry");
}
}
}
92 changes: 80 additions & 12 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ use append_entry::RequestVoteRequest;
use append_entry::StatusCode;
use append_entry::TransactionExecutionEntry;

#[cfg(feature = "rocks")]
use self::append_log_entries_storage::AppendLogEntriesStorage;
use self::log_entry::LogEntryData;
use super::primitives::TransactionExecution;
use super::primitives::TransactionInput;
Expand Down Expand Up @@ -164,6 +166,8 @@ pub struct Consensus {
broadcast_sender: broadcast::Sender<LogEntryData>, //propagates the blocks
importer_config: Option<RunWithImporterConfig>, //HACK this is used with sync online only
storage: Arc<StratusStorage>,
#[cfg(feature = "rocks")]
log_entries_storage: Arc<AppendLogEntriesStorage>,
peers: Arc<RwLock<HashMap<PeerAddress, PeerTuple>>>,
#[allow(dead_code)]
direct_peers: Vec<String>,
Expand All @@ -180,8 +184,10 @@ pub struct Consensus {
}

impl Consensus {
#[allow(clippy::too_many_arguments)] //TODO: refactor into consensus config
pub async fn new(
storage: Arc<StratusStorage>,
log_storage_path: Option<String>,
direct_peers: Vec<String>,
importer_config: Option<RunWithImporterConfig>,
jsonrpc_address: SocketAddr,
Expand All @@ -197,6 +203,8 @@ impl Consensus {
let consensus = Self {
broadcast_sender,
storage,
#[cfg(feature = "rocks")]
log_entries_storage: Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()),
peers,
direct_peers,
current_term: AtomicU64::new(0),
Expand Down Expand Up @@ -385,19 +393,48 @@ impl Consensus {
}

fn initialize_transaction_execution_queue(consensus: Arc<Consensus>) {
//TODO add data to consensus-log-transactions
//TODO rediscover followers on comunication error
//XXX FIXME deal with the scenario where a transactionHash arrives after the block, in this case before saving the block LogEntry, it should ALWAYS wait for all transaction hashes
//TODO maybe check if I'm currently the leader?
// TODO: add data to consensus-log-transactions
// TODO: rediscover followers on communication error
// XXX FIXME: deal with the scenario where a transactionHash arrives after the block;
// in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes
// TODO: maybe check if I'm currently the leader?

spawn_named("consensus::transaction_execution_queue", async move {
let interval = Duration::from_millis(40);
loop {
tokio::time::sleep(interval).await;

if consensus.is_leader() {
let mut queue = consensus.transaction_execution_queue.lock().await;
let executions = queue.drain(..).collect::<Vec<_>>();
drop(queue);

#[cfg(feature = "rocks")]
{
tracing::debug!(executions_len = executions.len(), "Processing transaction executions");
if !executions.is_empty() {
let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0);
tracing::debug!(last_index, "Last index fetched");

let current_term = consensus.current_term.load(Ordering::SeqCst);
tracing::debug!(current_term, "Current term loaded");

match consensus.log_entries_storage.save_log_entry(
last_index + 1,
current_term,
LogEntryData::TransactionExecutionEntries(executions.clone()),
"transaction",
) {
Ok(_) => {
tracing::debug!("Transaction execution entry saved successfully");
}
Err(e) => {
tracing::error!("Failed to save transaction execution entry: {:?}", e);
}
}
}
}

let peers = consensus.peers.read().await;
for (_, (peer, _)) in peers.iter() {
let mut peer_clone = peer.clone();
Expand All @@ -420,14 +457,14 @@ impl Consensus {
loop {
tokio::select! {
Ok(tx) = rx_pending_txs.recv() => {
tracing::debug!("Attempting to receive transaction execution");
if consensus.is_leader() {
tracing::info!(tx_hash = %tx.hash(), "received transaction execution to send to followers");
if tx.is_local() {
tracing::debug!(tx_hash = %tx.hash(), "skipping local transaction because only external transactions are supported for now");
continue;
}

//TODO XXX save transaction to appendEntries log
let transaction = vec![tx.to_append_entry_transaction()];
let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction);
if consensus.broadcast_sender.send(transaction_entry).is_err() {
Expand All @@ -437,13 +474,44 @@ impl Consensus {
}
Ok(block) = rx_blocks.recv() => {
if consensus.is_leader() {
tracing::info!(number = block.header.number.as_u64(), "received block to send to followers");

//TODO save block to appendEntries log
//TODO before saving check if all transaction_hashes are already in the log
let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new()));
if consensus.broadcast_sender.send(block_entry).is_err() {
tracing::error!("failed to broadcast block");
tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers");

#[cfg(feature = "rocks")]
{
//TODO: before saving check if all transaction_hashes are already in the log
let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0);
tracing::debug!(last_index, "Last index fetched");

let current_term = consensus.current_term.load(Ordering::SeqCst);
tracing::debug!(current_term, "Current term loaded");

let transaction_hashes: Vec<Vec<u8>> = block.transactions.iter().map(|tx| tx.input.hash.to_string().into_bytes()).collect();

match consensus.log_entries_storage.save_log_entry(
last_index + 1,
current_term,
LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes.clone())),
"block",
) {
Ok(_) => {
tracing::debug!("Block entry saved successfully");
let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes));
if consensus.broadcast_sender.send(block_entry).is_err() {
tracing::error!("Failed to broadcast block");
}
}
Err(e) => {
tracing::error!("Failed to save block entry: {:?}", e);
}
}
}
#[cfg(not(feature = "rocks"))]
{
let transaction_hashes: Vec<Vec<u8>> = block.transactions.iter().map(|tx| tx.input.hash.to_string().into_bytes()).collect();
let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes));
if consensus.broadcast_sender.send(block_entry).is_err() {
tracing::error!("Failed to broadcast block");
}
}
}
}
Expand Down
Loading