Skip to content

Commit

Permalink
chore: add horizontal block sync (#942)
Browse files Browse the repository at this point in the history
* chore: add horizontal block sync

* lint
  • Loading branch information
renancloudwalk authored May 28, 2024
1 parent 846283c commit 5efea25
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 90 deletions.
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use nom::sequence::separated_pair;
use nom::IResult;

fn main() {
tonic_build::compile_protos("proto/raft.proto").unwrap();
tonic_build::compile_protos("proto/append_entry.proto").unwrap();
// any code change
println!("cargo:rerun-if-changed=src/");
// used in signatures codegen
Expand Down
151 changes: 151 additions & 0 deletions proto/append_entry.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
syntax = "proto3";

package append_entry;

// Enum to represent status codes
enum StatusCode {
OK = 0; // Request was processed successfully. No further action needed.
ERROR = 1; // General error. Investigate the cause.
NOT_LEADER = 2; // Node is not the current leader. Retry with the correct leader.
LEADER_CHANGED = 3; // Leadership has changed. Re-evaluate the leader and retry.
LOG_MISMATCH = 4; // Log mismatch. Adjust prevLogIndex and prevLogTerm, then resend entries.
TERM_MISMATCH = 5; // Term mismatch. Leader’s term outdated, step down and initiate a new election.
RETRY = 6; // Temporary issue. Wait and retry the request.
APPEND_SUCCESS = 7; // Entry appended successfully. Update commit index.
APPEND_FAILURE_GENERAL = 8; // General failure. Retry or investigate.
TERM_OUTDATED = 9; // Leader’s term outdated. Leader should step down.
STORAGE_ERROR = 10; // Persistent storage error. Handle the error and retry.
LEADER_COMMIT_INDEX_ADVANCED = 11; // Leader’s commit index advanced. Follower needs to catch up.
FOLLOWER_IN_CANDIDATE_STATE = 12; // Follower is in candidate state. Leader may need to step down.
FOLLOWER_IN_LEADER_STATE = 13; // Follower believes it is the leader. Resolve split-brain scenario.
ENTRY_ALREADY_EXISTS = 14; // Entry already exists. Verify log consistency.
RETRY_LATER = 15; // Follower requests retry later. Wait and retry the request.
}

// Transaction Execution message
message Transaction {
string hash = 1;
string nonce = 2;
string blockHash = 3;
string blockNumber = 4;
string transactionIndex = 5;
string from = 6;
string to = 7;
string value = 8;
string gasPrice = 9;
string gas = 10;
string input = 11;
string v = 12;
string r = 13;
string s = 14;
string type = 15;
string chainId = 16;
string publicKey = 17;
string raw = 18;
string standardV = 19;
}

// Transaction Receipt message
message Receipt {
string transactionHash = 1;
string transactionIndex = 2;
string blockHash = 3;
string blockNumber = 4;
string from = 5;
string to = 6;
string cumulativeGasUsed = 7;
string gasUsed = 8;
string contractAddress = 9;
repeated Log logs = 10;
string status = 11;
string logsBloom = 12;
string effectiveGasPrice = 13;
}

// Log message
message Log {
string address = 1;
repeated string topics = 2;
string data = 3;
string blockHash = 4;
string blockNumber = 5;
string transactionHash = 6;
string transactionIndex = 7;
string logIndex = 8;
string transactionLogIndex = 9;
bool removed = 10;
}

// Execution Result message
message ExecutionResult {
int64 block_timestamp = 1;
bool receipt_applied = 2;
string result = 3;
string output = 4;
repeated Log logs = 5;
string gas = 6;
}

// Transaction Execution message
message TransactionExecution {
Transaction tx = 1;
Receipt receipt = 2;
ExecutionResult result = 3;
}

// Append Transaction Execution message
message AppendTransactionExecutionsRequest {
uint64 term = 1;
uint64 prevLogIndex = 2;
uint64 prevLogTerm = 3;
repeated TransactionExecution executions = 4;
}

message AppendTransactionExecutionsResponse {
StatusCode status = 1;
string message = 2;
uint64 last_committed_block_number = 3;
}

// Block Header message
message BlockHeader {
uint64 number = 1;
string hash = 2;
string transactions_root = 3;
string gas_used = 4;
string gas_limit = 5;
string bloom = 6;
uint64 timestamp = 7;
string parent_hash = 8;
string author = 9;
bytes extra_data = 10;
string miner = 11;
string difficulty = 12;
string receipts_root = 13;
string uncle_hash = 14;
uint64 size = 15;
string state_root = 16;
string total_difficulty = 17;
string nonce = 18;
}

// Append Block Commit message
message AppendBlockCommitRequest {
uint64 term = 1;
uint64 prevLogIndex = 2;
uint64 prevLogTerm = 3;
BlockHeader header = 4;
repeated string transactionHashes = 5;
}

message AppendBlockCommitResponse {
StatusCode status = 1;
string message = 2;
uint64 last_committed_block_number = 3;
}

// Service definition
service AppendEntryService {
rpc AppendTransactionExecutions (AppendTransactionExecutionsRequest) returns (AppendTransactionExecutionsResponse);
rpc AppendBlockCommit (AppendBlockCommitRequest) returns (AppendBlockCommitResponse);
}
20 changes: 0 additions & 20 deletions proto/raft.proto

This file was deleted.

1 change: 0 additions & 1 deletion src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
let storage = config.storage.init().await?;
let consensus = Arc::new(Consensus::new(config.clone().leader_node)); // in development, with no leader configured, the current node ends up being the leader
let (http_url, ws_url) = consensus.get_chain_url(config.clone());
consensus.sender.send("Consensus initialized.".to_string()).await.unwrap();
let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref(), config.online.external_rpc_timeout).await?);

let relayer = config.relayer.init(Arc::clone(&storage)).await?;
Expand Down
12 changes: 8 additions & 4 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ impl BlockMiner {
let tx_hash = tx_execution.hash();
self.storage.save_execution(tx_execution.clone()).await?;

if let Some(consensus) = &self.consensus {
let execution = format!("{:?}", tx_execution.clone());
consensus.sender.send(execution).await.unwrap();
}
//TODO implement full gRPC for tx execution: if let Some(consensus) = &self.consensus {
//TODO implement full gRPC for tx execution: let execution = format!("{:?}", tx_execution.clone());
//TODO implement full gRPC for tx execution: consensus.sender.send(execution).await.unwrap();
//TODO implement full gRPC for tx execution: }

// decide what to do based on mining mode
match self.mode {
Expand Down Expand Up @@ -214,6 +214,10 @@ impl BlockMiner {
let block_header = block.header.clone();
let block_logs: Vec<LogMined> = block.transactions.iter().flat_map(|tx| &tx.logs).cloned().collect();

if let Some(consensus) = &self.consensus {
consensus.sender.send(block.clone()).await?;
}

// persist block
self.storage.save_block(block).await?;
self.storage.set_mined_block_number(block_number).await?;
Expand Down
Loading

0 comments on commit 5efea25

Please sign in to comment.