getTransactionResults() {
+ return this.transactionResults;
+ }
+
+ public Coin getTotalFees() {
+ return this.totalPaidFees;
+ }
+
+ public long getTotalGas() {
+ return this.totalGas;
+ }
+
+ public void stop() {
+ this.stopped = true;
+ }
+}
diff --git a/rskj-core/src/main/java/co/rsk/core/bc/BlockChainImpl.java b/rskj-core/src/main/java/co/rsk/core/bc/BlockChainImpl.java
index 2a369dd61ce..6bed3f10649 100644
--- a/rskj-core/src/main/java/co/rsk/core/bc/BlockChainImpl.java
+++ b/rskj-core/src/main/java/co/rsk/core/bc/BlockChainImpl.java
@@ -249,7 +249,7 @@ private ImportResult internalTryToConnect(Block block) {
long saveTime = System.nanoTime();
logger.trace("execute start");
- result = blockExecutor.execute(block, parent.getHeader(), false, noValidation, true);
+ result = blockExecutor.execute(null, 0, block, parent.getHeader(), false, noValidation, true);
logger.trace("execute done");
diff --git a/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java b/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
index 7f41ae18bd7..afff40d822b 100644
--- a/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
+++ b/rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
@@ -18,18 +18,22 @@
package co.rsk.core.bc;
+import co.rsk.config.RskSystemProperties;
import co.rsk.core.Coin;
import co.rsk.core.RskAddress;
import co.rsk.core.TransactionExecutorFactory;
+import co.rsk.core.TransactionListExecutor;
import co.rsk.crypto.Keccak256;
import co.rsk.db.RepositoryLocator;
import co.rsk.metrics.profilers.Metric;
import co.rsk.metrics.profilers.Profiler;
import co.rsk.metrics.profilers.ProfilerFactory;
import com.google.common.annotations.VisibleForTesting;
+import org.ethereum.config.Constants;
import org.ethereum.config.blockchain.upgrades.ActivationConfig;
import org.ethereum.config.blockchain.upgrades.ConsensusRule;
import org.ethereum.core.*;
+import org.ethereum.util.ByteUtil;
import org.ethereum.vm.DataWord;
import org.ethereum.vm.PrecompiledContracts;
import org.ethereum.vm.program.ProgramResult;
@@ -37,8 +41,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.ethereum.config.blockchain.upgrades.ConsensusRule.RSKIP126;
import static org.ethereum.config.blockchain.upgrades.ConsensusRule.RSKIP85;
@@ -48,7 +55,7 @@
* There are two main use cases:
* - execute and validate the block final state
* - execute and complete the block final state
- *
+ *
* Note that this class IS NOT guaranteed to be thread safe because its dependencies might hold state.
*/
public class BlockExecutor {
@@ -58,40 +65,96 @@ public class BlockExecutor {
private final RepositoryLocator repositoryLocator;
private final TransactionExecutorFactory transactionExecutorFactory;
private final ActivationConfig activationConfig;
+ private final boolean remascEnabled;
+ private final Set concurrentContractsDisallowed;
- private final Map transactionResults = new HashMap<>();
+ private final Map transactionResults = new ConcurrentHashMap<>();
private boolean registerProgramResults;
+ private long minSequentialSetGasLimit;
+
+ /**
+ * An array of ExecutorService's of size `Constants.getTransactionExecutionThreads()`. Each parallel list uses an executor
+ * at specific index of this array, so that threads chosen by thread pools cannot be "reused" for executing parallel
+ * lists from same block. Otherwise, that could lead to non-determinism, when trie keys collision may not be detected
+ * on some circumstances.
+ */
+ private final ExecutorService[] execServices;
public BlockExecutor(
- ActivationConfig activationConfig,
RepositoryLocator repositoryLocator,
- TransactionExecutorFactory transactionExecutorFactory) {
+ TransactionExecutorFactory transactionExecutorFactory,
+ RskSystemProperties systemProperties) {
this.repositoryLocator = repositoryLocator;
this.transactionExecutorFactory = transactionExecutorFactory;
- this.activationConfig = activationConfig;
+ this.activationConfig = systemProperties.getActivationConfig();
+ this.remascEnabled = systemProperties.isRemascEnabled();
+ this.concurrentContractsDisallowed = Collections.unmodifiableSet(new HashSet<>(systemProperties.concurrentContractsDisallowed()));
+ this.minSequentialSetGasLimit = systemProperties.getNetworkConstants().getMinSequentialSetGasLimit();
+
+ int numOfParallelList = Constants.getTransactionExecutionThreads();
+ this.execServices = new ExecutorService[numOfParallelList];
+ for (int i = 0; i < numOfParallelList; i++) {
+ execServices[i] = new ThreadPoolExecutorImpl(i);
+ }
+ }
+
+ /**
+ * Precompiled contracts storage is setup like any other contract for consistency. Here, we apply this logic on the
+ * exact activation block.
+ * This method is called automatically for every block except for the Genesis (which makes an explicit call).
+ */
+ public static void maintainPrecompiledContractStorageRoots(Repository track, ActivationConfig.ForBlock activations) {
+ if (activations.isActivating(RSKIP126)) {
+ for (RskAddress addr : PrecompiledContracts.GENESIS_ADDRESSES) {
+ if (!track.isExist(addr)) {
+ track.createAccount(addr);
+ }
+ track.setupContract(addr);
+ }
+ }
+
+ for (Map.Entry e : PrecompiledContracts.CONSENSUS_ENABLED_ADDRESSES.entrySet()) {
+ ConsensusRule contractActivationRule = e.getValue();
+ if (activations.isActivating(contractActivationRule)) {
+ RskAddress addr = e.getKey();
+ track.createAccount(addr);
+ track.setupContract(addr);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static byte[] calculateLogsBloom(List receipts) {
+ Bloom logBloom = new Bloom();
+
+ for (TransactionReceipt receipt : receipts) {
+ logBloom.or(receipt.getBloomFilter());
+ }
+
+ return logBloom.getData();
}
/**
* Execute and complete a block.
*
- * @param block A block to execute and complete
- * @param parent The parent of the block.
+ * @param block A block to execute and complete
+ * @param parent The parent of the block.
*/
public BlockResult executeAndFill(Block block, BlockHeader parent) {
- BlockResult result = execute(block, parent, true, false, false);
+ BlockResult result = executeForMining(block, parent, true, false, false);
fill(block, result);
return result;
}
@VisibleForTesting
public void executeAndFillAll(Block block, BlockHeader parent) {
- BlockResult result = execute(block, parent, false, true, false);
+ BlockResult result = executeForMining(block, parent, false, true, false);
fill(block, result);
}
@VisibleForTesting
public void executeAndFillReal(Block block, BlockHeader parent) {
- BlockResult result = execute(block, parent, false, false, false);
+ BlockResult result = executeForMining(block, parent, false, false, false);
if (result != BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT) {
fill(block, result);
}
@@ -108,6 +171,7 @@ private void fill(Block block, BlockResult result) {
header.setGasUsed(result.getGasUsed());
header.setPaidFees(result.getPaidFees());
header.setLogsBloom(calculateLogsBloom(result.getTransactionReceipts()));
+ header.setTxExecutionSublistsEdges(result.getTxEdges());
block.flushRLP();
profiler.stop(metric);
@@ -116,13 +180,13 @@ private void fill(Block block, BlockResult result) {
/**
* Execute and validate the final state of a block.
*
- * @param block A block to execute and complete
- * @param parent The parent of the block.
+ * @param block A block to execute and complete
+ * @param parent The parent of the block.
* @return true if the block final state is equalBytes to the calculated final state.
*/
@VisibleForTesting
public boolean executeAndValidate(Block block, BlockHeader parent) {
- BlockResult result = execute(block, parent, false, false, false);
+ BlockResult result = execute(null, 0, block, parent, false, false, false);
return this.validate(block, result);
}
@@ -130,8 +194,8 @@ public boolean executeAndValidate(Block block, BlockHeader parent) {
/**
* Validate the final state of a block.
*
- * @param block A block to validate
- * @param result A block result (state root, receipts root, etc...)
+ * @param block A block to validate
+ * @param result A block result (state root, receipts root, etc...)
* @return true if the block final state is equalBytes to the calculated final state.
*/
public boolean validate(Block block, BlockResult result) {
@@ -172,7 +236,7 @@ public boolean validate(Block block, BlockResult result) {
Coin paidFees = result.getPaidFees();
Coin feesPaidToMiner = block.getFeesPaidToMiner();
- if (!paidFees.equals(feesPaidToMiner)) {
+ if (!paidFees.equals(feesPaidToMiner)) {
logger.error("Block {} [{}] given paidFees doesn't match: {} != {}", block.getNumber(), block.getPrintableHash(), feesPaidToMiner, paidFees);
profiler.stop(metric);
return false;
@@ -181,7 +245,7 @@ public boolean validate(Block block, BlockResult result) {
List executedTransactions = result.getExecutedTransactions();
List transactionsList = block.getTransactionsList();
- if (!executedTransactions.equals(transactionsList)) {
+ if (!executedTransactions.equals(transactionsList)) {
logger.error("Block {} [{}] given txs doesn't match: {} != {}", block.getNumber(), block.getPrintableHash(), transactionsList, executedTransactions);
profiler.stop(metric);
return false;
@@ -217,30 +281,45 @@ private boolean validateLogsBloom(BlockHeader header, BlockResult result) {
return Arrays.equals(calculateLogsBloom(result.getTransactionReceipts()), header.getLogsBloom());
}
- @VisibleForTesting
- BlockResult execute(Block block, BlockHeader parent, boolean discardInvalidTxs) {
- return execute(block, parent, discardInvalidTxs, false, true);
- }
-
- public BlockResult execute(Block block, BlockHeader parent, boolean discardInvalidTxs, boolean ignoreReadyToExecute, boolean saveState) {
- return executeInternal(null, 0, block, parent, discardInvalidTxs, ignoreReadyToExecute, saveState);
+ public BlockResult executeForMining(Block block, BlockHeader parent, boolean discardInvalidTxs, boolean ignoreReadyToExecute, boolean saveState) {
+ if (activationConfig.isActive(ConsensusRule.RSKIP144, block.getHeader().getNumber())) {
+ return executeForMiningAfterRSKIP144(block, parent, discardInvalidTxs, ignoreReadyToExecute, saveState);
+ } else {
+ return executeInternal(null, 0, block, parent, discardInvalidTxs, ignoreReadyToExecute, saveState);
+ }
}
/**
* Execute a block while saving the execution trace in the trace processor
*/
- public void traceBlock(
- ProgramTraceProcessor programTraceProcessor,
- int vmTraceOptions,
- Block block,
- BlockHeader parent,
- boolean discardInvalidTxs,
- boolean ignoreReadyToExecute) {
- executeInternal(
- Objects.requireNonNull(programTraceProcessor), vmTraceOptions, block, parent, discardInvalidTxs, ignoreReadyToExecute, false
- );
+ public void traceBlock(ProgramTraceProcessor programTraceProcessor,
+ int vmTraceOptions,
+ Block block,
+ BlockHeader parent,
+ boolean discardInvalidTxs,
+ boolean ignoreReadyToExecute) {
+ execute(Objects.requireNonNull(programTraceProcessor), vmTraceOptions, block, parent, discardInvalidTxs,
+ ignoreReadyToExecute, false);
}
+ public BlockResult execute(@Nullable ProgramTraceProcessor programTraceProcessor,
+ int vmTraceOptions,
+ Block block,
+ BlockHeader parent,
+ boolean discardInvalidTxs,
+ boolean acceptInvalidTransactions,
+ boolean saveState) {
+ if (activationConfig.isActive(ConsensusRule.RSKIP144, block.getHeader().getNumber())) {
+ return executeParallel(programTraceProcessor, vmTraceOptions, block, parent, discardInvalidTxs, acceptInvalidTransactions, saveState);
+ } else {
+ return executeInternal(programTraceProcessor, vmTraceOptions, block, parent, discardInvalidTxs, acceptInvalidTransactions, saveState);
+ }
+ }
+
+ // executes the block before RSKIP 144
+ // when RSKIP 144 is active the block is in executed parallel
+ // miners use executeForMiningAfterRSKIP144 to create the parallel schedule
+ // new blocks are executed with executeParallel
private BlockResult executeInternal(
@Nullable ProgramTraceProcessor programTraceProcessor,
int vmTraceOptions,
@@ -250,8 +329,8 @@ private BlockResult executeInternal(
boolean acceptInvalidTransactions,
boolean saveState) {
boolean vmTrace = programTraceProcessor != null;
- logger.trace("Start executeInternal.");
- logger.trace("applyBlock: block: [{}] tx.list: [{}]", block.getNumber(), block.getTransactionsList().size());
+ logger.trace("Start execute pre RSKIP144.");
+ loggingApplyBlock(block);
// Forks the repo, does not change "repository". It will have a completely different
// image of the repo, where the middle caches are immediately ignored.
@@ -261,7 +340,7 @@ private BlockResult executeInternal(
// in the next block processed.
// Note that creating a snapshot is important when the block is executed twice
// (e.g. once while building the block in tests/mining, and the other when trying
- // to conect the block). This is because the first execution will change the state
+ // to connect the block). This is because the first execution will change the state
// of the repository to the state post execution, so it's necessary to get it to
// the state prior execution again.
Metric metric = profiler.start(Profiler.PROFILING_TYPE.BLOCK_EXECUTE);
@@ -280,7 +359,7 @@ private BlockResult executeInternal(
int txindex = 0;
for (Transaction tx : block.getTransactionsList()) {
- logger.trace("apply block: [{}] tx: [{}] ", block.getNumber(), i);
+ loggingApplyBlockToTx(block, i);
TransactionExecutor txExecutor = transactionExecutorFactory.newInstance(
tx,
@@ -295,121 +374,465 @@ private BlockResult executeInternal(
boolean transactionExecuted = txExecutor.executeTransaction();
if (!acceptInvalidTransactions && !transactionExecuted) {
- if (discardInvalidTxs) {
- logger.warn("block: [{}] discarded tx: [{}]", block.getNumber(), tx.getHash());
- continue;
- } else {
- logger.warn("block: [{}] execution interrupted because of invalid tx: [{}]",
- block.getNumber(), tx.getHash());
- profiler.stop(metric);
- return BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT;
+ if (!discardInvalidTxs) {
+ return getBlockResultAndLogExecutionInterrupted(block, metric, tx);
}
+ loggingDiscardedBlock(block, tx);
+ continue;
}
- executedTransactions.add(tx);
+ registerExecutedTx(programTraceProcessor, vmTrace, executedTransactions, tx, txExecutor);
+
+ long gasUsed = txExecutor.getGasConsumed();
+ totalGasUsed += gasUsed;
+
+ totalPaidFees = addTotalPaidFees(totalPaidFees, txExecutor);
+
+ deletedAccounts.addAll(txExecutor.getResult().getDeleteAccounts());
+
+ TransactionReceipt receipt = buildTransactionReceipt(tx, txExecutor, gasUsed, totalGasUsed);
+
+ loggingExecuteTxAndReceipt(block, i, tx);
+
+ i++;
+
+ receipts.add(receipt);
+
+ loggingTxDone();
+ }
+
+
+ saveOrCommitTrackState(saveState, track);
+
+ BlockResult result = new BlockResult(
+ block,
+ executedTransactions,
+ receipts,
+ null,
+ totalGasUsed,
+ totalPaidFees,
+ vmTrace ? null : track.getTrie()
+
+ );
+ profiler.stop(metric);
+ logger.trace("End execute pre RSKIP144.");
+ return result;
+ }
+
+ private BlockResult executeParallel(
+ @Nullable ProgramTraceProcessor programTraceProcessor,
+ int vmTraceOptions,
+ Block block,
+ BlockHeader parent,
+ boolean discardInvalidTxs,
+ boolean acceptInvalidTransactions,
+ boolean saveState) {
+ boolean vmTrace = programTraceProcessor != null;
+ logger.trace("Start executeParallel.");
+ loggingApplyBlock(block);
+
+ // Forks the repo, does not change "repository". It will have a completely different
+ // image of the repo, where the middle caches are immediately ignored.
+ // In fact, while cloning everything, it asserts that no cache elements remains.
+ // (see assertNoCache())
+ // Which means that you must commit changes and save them to be able to recover
+ // in the next block processed.
+ // Note that creating a snapshot is important when the block is executed twice
+ // (e.g. once while building the block in tests/mining, and the other when trying
+ // to conect the block). This is because the first execution will change the state
+ // of the repository to the state post execution, so it's necessary to get it to
+ // the state prior execution again.
+ Metric metric = profiler.start(Profiler.PROFILING_TYPE.BLOCK_EXECUTE);
+
+ ReadWrittenKeysTracker readWrittenKeysTracker = new ReadWrittenKeysTracker();
+ Repository track = repositoryLocator.startTrackingAt(parent, readWrittenKeysTracker);
+
+ maintainPrecompiledContractStorageRoots(track, activationConfig.forBlock(block.getNumber()));
+ readWrittenKeysTracker.clear();
+
+ short[] txExecutionEdges = block.getHeader().getTxExecutionSublistsEdges();
+
+ // if the number of parallel lists is less than 2, then there's no need to execute in another thread. The work can
+ // be done in the same thread (in-line) without any threads switching.
+ ExecutorCompletionService completionService = new ExecutorCompletionService<>(txExecutionEdges.length > 1 ? new Executor() {
+ private final AtomicInteger parallelListIndex = new AtomicInteger(0); // 'parallelListIndex' should not exceed Constants.getTransactionExecutionThreads()
- if (this.registerProgramResults) {
- this.transactionResults.put(tx.getHash(), txExecutor.getResult());
+ @Override
+ public void execute(@Nonnull Runnable command) {
+ execServices[parallelListIndex.getAndIncrement()].execute(command);
}
+ } : Runnable::run);
+ List transactionListExecutors = new ArrayList<>();
- if (vmTrace) {
- txExecutor.extractTrace(programTraceProcessor);
+ short start = 0;
+
+ for (short end : txExecutionEdges) {
+ List sublist = block.getTransactionsList().subList(start, end);
+ TransactionListExecutor txListExecutor = new TransactionListExecutor(
+ sublist,
+ block,
+ transactionExecutorFactory,
+ track.startTracking(),
+ vmTrace,
+ vmTraceOptions,
+ new HashSet<>(),
+ discardInvalidTxs,
+ acceptInvalidTransactions,
+ new HashMap<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ registerProgramResults,
+ programTraceProcessor,
+ start,
+ Coin.ZERO,
+ remascEnabled,
+ concurrentContractsDisallowed,
+ BlockUtils.getSublistGasLimit(block, false, minSequentialSetGasLimit)
+ );
+ completionService.submit(txListExecutor);
+ transactionListExecutors.add(txListExecutor);
+ start = end;
+ }
+
+ for (int i = 0; i < transactionListExecutors.size(); i++) {
+ try {
+ Future success = completionService.take();
+ if (!Boolean.TRUE.equals(success.get())) {
+ transactionListExecutors.forEach(TransactionListExecutor::stop);
+ profiler.stop(metric);
+ return BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT;
+ }
+ } catch (InterruptedException e) {
+ logger.warn("block: [{}]/[{}] execution was interrupted", block.getNumber(), block.getHash());
+ logger.trace("", e);
+ Thread.currentThread().interrupt();
+ profiler.stop(metric);
+ return BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT;
+ } catch (ExecutionException e) {
+ logger.warn("block: [{}]/[{}] execution failed", block.getNumber(), block.getHash());
+ logger.trace("", e);
+ profiler.stop(metric);
+ return BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT;
}
+ }
- logger.trace("tx executed");
+ // Review collision
+ if (readWrittenKeysTracker.detectCollision()) {
+ logger.warn("block: [{}]/[{}] execution failed. Block data: [{}]", block.getNumber(), block.getHash(), ByteUtil.toHexString(block.getEncoded()));
+ profiler.stop(metric);
+ return BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT;
+ }
- // No need to commit the changes here. track.commit();
+ // Merge maps.
+ Map executedTransactions = new HashMap<>();
+ Set deletedAccounts = new HashSet<>();
+ Map receipts = new HashMap<>();
+ Map mergedTransactionResults = new HashMap<>();
+ Coin totalPaidFees = Coin.ZERO;
+ long totalGasUsed = 0;
- logger.trace("track commit");
+ for (TransactionListExecutor tle : transactionListExecutors) {
+ tle.getRepository().commit();
+ deletedAccounts.addAll(tle.getDeletedAccounts());
+ executedTransactions.putAll(tle.getExecutedTransactions());
+ receipts.putAll(tle.getReceipts());
+ mergedTransactionResults.putAll(tle.getTransactionResults());
+ totalPaidFees = totalPaidFees.add(tle.getTotalFees());
+ totalGasUsed += tle.getTotalGas();
+ }
- long gasUsed = txExecutor.getGasUsed();
- totalGasUsed += gasUsed;
- Coin paidFees = txExecutor.getPaidFees();
- if (paidFees != null) {
- totalPaidFees = totalPaidFees.add(paidFees);
+ // execute remaining transactions after the parallel subsets
+ List sublist = block.getTransactionsList().subList(start, block.getTransactionsList().size());
+ TransactionListExecutor txListExecutor = new TransactionListExecutor(
+ sublist,
+ block,
+ transactionExecutorFactory,
+ track,
+ vmTrace,
+ vmTraceOptions,
+ deletedAccounts,
+ discardInvalidTxs,
+ acceptInvalidTransactions,
+ receipts,
+ executedTransactions,
+ mergedTransactionResults,
+ registerProgramResults,
+ programTraceProcessor,
+ start,
+ totalPaidFees,
+ remascEnabled,
+ Collections.emptySet(), // precompiled contracts are always allowed in a sequential list, as there's no concurrency in it
+ BlockUtils.getSublistGasLimit(block, true, minSequentialSetGasLimit)
+ );
+ Boolean success = txListExecutor.call();
+ if (!Boolean.TRUE.equals(success)) {
+ return BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT;
+ }
+
+ Coin totalBlockPaidFees = txListExecutor.getTotalFees();
+ totalGasUsed += txListExecutor.getTotalGas();
+
+ saveOrCommitTrackState(saveState, track);
+
+ BlockResult result = new BlockResult(
+ block,
+ new LinkedList<>(executedTransactions.values()),
+ new LinkedList<>(receipts.values()),
+ txExecutionEdges,
+ totalGasUsed,
+ totalBlockPaidFees,
+ vmTrace ? null : track.getTrie()
+ );
+ profiler.stop(metric);
+ logger.trace("End executeParallel.");
+ return result;
+ }
+
+ private BlockResult executeForMiningAfterRSKIP144(
+ Block block,
+ BlockHeader parent,
+ boolean discardInvalidTxs,
+ boolean acceptInvalidTransactions,
+ boolean saveState) {
+ logger.trace("Start executeForMining.");
+ List transactionsList = block.getTransactionsList();
+ loggingApplyBlock(block);
+
+ // Forks the repo, does not change "repository". It will have a completely different
+ // image of the repo, where the middle caches are immediately ignored.
+ // In fact, while cloning everything, it asserts that no cache elements remains.
+ // (see assertNoCache())
+ // Which means that you must commit changes and save them to be able to recover
+ // in the next block processed.
+ // Note that creating a snapshot is important when the block is executed twice
+ // (e.g. once while building the block in tests/mining, and the other when trying
+ // to conect the block). This is because the first execution will change the state
+ // of the repository to the state post execution, so it's necessary to get it to
+ // the state prior execution again.
+ Metric metric = profiler.start(Profiler.PROFILING_TYPE.BLOCK_EXECUTE);
+
+ IReadWrittenKeysTracker readWrittenKeysTracker = new ReadWrittenKeysTracker();
+ Repository track = repositoryLocator.startTrackingAt(parent, readWrittenKeysTracker);
+
+ maintainPrecompiledContractStorageRoots(track, activationConfig.forBlock(block.getNumber()));
+ readWrittenKeysTracker.clear();
+
+ int i = 1;
+ long totalGasUsed = 0;
+ Coin totalPaidFees = Coin.ZERO;
+ Map receiptsByTx = new HashMap<>();
+ Set deletedAccounts = new HashSet<>();
+
+ int txindex = 0;
+
+ int transactionExecutionThreads = Constants.getTransactionExecutionThreads();
+ ParallelizeTransactionHandler parallelizeTransactionHandler = new ParallelizeTransactionHandler((short) transactionExecutionThreads, block, minSequentialSetGasLimit);
+
+ for (Transaction tx : transactionsList) {
+ loggingApplyBlockToTx(block, i);
+
+ TransactionExecutor txExecutor = transactionExecutorFactory.newInstance(
+ tx,
+ txindex,
+ block.getCoinbase(),
+ track,
+ block,
+ parallelizeTransactionHandler.getGasUsedInSequential(),
+ false,
+ 0,
+ deletedAccounts,
+ true,
+ Math.max(BlockUtils.getSublistGasLimit(block, true, minSequentialSetGasLimit), BlockUtils.getSublistGasLimit(block, false, minSequentialSetGasLimit))
+ );
+ boolean transactionExecuted = txExecutor.executeTransaction();
+
+ if (!acceptInvalidTransactions && !transactionExecuted) {
+ if (!discardInvalidTxs) {
+ return getBlockResultAndLogExecutionInterrupted(block, metric, tx);
+ }
+
+ loggingDiscardedBlock(block, tx);
+ txindex++;
+ continue;
}
- deletedAccounts.addAll(txExecutor.getResult().getDeleteAccounts());
+ Optional sublistGasAccumulated = addTxToSublistAndGetAccumulatedGas(
+ readWrittenKeysTracker,
+ parallelizeTransactionHandler,
+ tx,
+ tx.isRemascTransaction(txindex, transactionsList.size()),
+ txExecutor.getGasConsumed(),
+ txExecutor.precompiledContractsCalled().stream().anyMatch(this.concurrentContractsDisallowed::contains));
+
+ if (!acceptInvalidTransactions && !sublistGasAccumulated.isPresent()) {
+ if (!discardInvalidTxs) {
+ return getBlockResultAndLogExecutionInterrupted(block, metric, tx);
+ }
- TransactionReceipt receipt = new TransactionReceipt();
- receipt.setGasUsed(gasUsed);
- receipt.setCumulativeGas(totalGasUsed);
+ loggingDiscardedBlock(block, tx);
+ txindex++;
+ continue;
+ }
+
+ registerTxExecutedForMiningAfterRSKIP144(readWrittenKeysTracker, tx, txExecutor);
- receipt.setTxStatus(txExecutor.getReceipt().isSuccessful());
- receipt.setTransaction(tx);
- receipt.setLogInfoList(txExecutor.getVMLogs());
- receipt.setStatus(txExecutor.getReceipt().getStatus());
+ long gasUsed = txExecutor.getGasConsumed();
+ totalGasUsed += gasUsed;
+ totalPaidFees = addTotalPaidFees(totalPaidFees, txExecutor);
+
+ deletedAccounts.addAll(txExecutor.getResult().getDeleteAccounts());
- logger.trace("block: [{}] executed tx: [{}]", block.getNumber(), tx.getHash());
+ //orElseGet is used for testing only when acceptInvalidTransactions is set.
+ long cumulativeGas = sublistGasAccumulated
+ .orElseGet(() -> parallelizeTransactionHandler.getGasUsedIn((short) Constants.getTransactionExecutionThreads()));
+ TransactionReceipt receipt = buildTransactionReceipt(tx, txExecutor, gasUsed, cumulativeGas);
- logger.trace("tx[{}].receipt", i);
+ loggingExecuteTxAndReceipt(block, i, tx);
i++;
+ txindex++;
- receipts.add(receipt);
+ receiptsByTx.put(tx, receipt);
- logger.trace("tx done");
+ loggingTxDone();
}
- logger.trace("End txs executions.");
- if (saveState) {
- logger.trace("Saving track.");
- track.save();
- logger.trace("End saving track.");
- } else {
- logger.trace("Committing track.");
- track.commit();
- logger.trace("End committing track.");
+ if (totalPaidFees.compareTo(Coin.ZERO) > 0) {
+ if (remascEnabled) {
+ track.addBalance(PrecompiledContracts.REMASC_ADDR, totalPaidFees);
+ } else {
+ track.addBalance(block.getCoinbase(), totalPaidFees);
+ }
}
- logger.trace("Building execution results.");
+ saveOrCommitTrackState(saveState, track);
+
+ List executedTransactions = parallelizeTransactionHandler.getTransactionsInOrder();
+ short[] sublistOrder = parallelizeTransactionHandler.getTransactionsPerSublistInOrder();
+ List receipts = getTransactionReceipts(receiptsByTx, executedTransactions);
+
BlockResult result = new BlockResult(
block,
executedTransactions,
receipts,
+ sublistOrder,
totalGasUsed,
totalPaidFees,
- vmTrace ? null : track.getTrie()
+ track.getTrie()
);
profiler.stop(metric);
- logger.trace("End executeInternal.");
+ logger.trace("End executeForMining.");
return result;
}
- /**
- * Precompiled contracts storage is setup like any other contract for consistency. Here, we apply this logic on the
- * exact activation block.
- * This method is called automatically for every block except for the Genesis (which makes an explicit call).
- */
- public static void maintainPrecompiledContractStorageRoots(Repository track, ActivationConfig.ForBlock activations) {
- if (activations.isActivating(RSKIP126)) {
- for (RskAddress addr : PrecompiledContracts.GENESIS_ADDRESSES) {
- if (!track.isExist(addr)) {
- track.createAccount(addr);
- }
- track.setupContract(addr);
- }
+ private void registerExecutedTx(ProgramTraceProcessor programTraceProcessor, boolean vmTrace, List executedTransactions, Transaction tx, TransactionExecutor txExecutor) {
+ executedTransactions.add(tx);
+
+ if (this.registerProgramResults) {
+ this.transactionResults.put(tx.getHash(), txExecutor.getResult());
}
- for (Map.Entry e : PrecompiledContracts.CONSENSUS_ENABLED_ADDRESSES.entrySet()) {
- ConsensusRule contractActivationRule = e.getValue();
- if (activations.isActivating(contractActivationRule)) {
- RskAddress addr = e.getKey();
- track.createAccount(addr);
- track.setupContract(addr);
- }
+ if (vmTrace) {
+ txExecutor.extractTrace(programTraceProcessor);
}
+
+ loggingTxExecuted();
}
- @VisibleForTesting
- public static byte[] calculateLogsBloom(List receipts) {
- Bloom logBloom = new Bloom();
+ private Coin addTotalPaidFees(Coin totalPaidFees, TransactionExecutor txExecutor) {
+ Coin paidFees = txExecutor.getPaidFees();
+ if (paidFees != null) {
+ totalPaidFees = totalPaidFees.add(paidFees);
+ }
+ return totalPaidFees;
+ }
- for (TransactionReceipt receipt : receipts) {
- logBloom.or(receipt.getBloomFilter());
+ private void registerTxExecutedForMiningAfterRSKIP144(IReadWrittenKeysTracker readWrittenKeysTracker, Transaction tx, TransactionExecutor txExecutor) {
+ readWrittenKeysTracker.clear();
+
+ if (this.registerProgramResults) {
+ this.transactionResults.put(tx.getHash(), txExecutor.getResult());
}
- return logBloom.getData();
+ loggingTxExecuted();
+ }
+
+ private List getTransactionReceipts(Map receiptsByTx, List executedTransactions) {
+ List receipts = new ArrayList<>();
+
+ for (Transaction tx : executedTransactions) {
+ receipts.add(receiptsByTx.get(tx));
+ }
+ return receipts;
+ }
+
+ private Optional addTxToSublistAndGetAccumulatedGas(IReadWrittenKeysTracker readWrittenKeysTracker, ParallelizeTransactionHandler parallelizeTransactionHandler, Transaction tx, boolean isRemascTransaction, long gasUsed, boolean isSequentialSublistRequired) {
+ Optional sublistGasAccumulated;
+
+ if (isRemascTransaction) {
+ sublistGasAccumulated = parallelizeTransactionHandler.addRemascTransaction(tx, gasUsed);
+ } else if (isSequentialSublistRequired) {
+ sublistGasAccumulated = parallelizeTransactionHandler.addTxToSequentialSublist(tx, gasUsed);
+ } else {
+ sublistGasAccumulated = parallelizeTransactionHandler.addTransaction(tx, readWrittenKeysTracker.getThisThreadReadKeys(), readWrittenKeysTracker.getThisThreadWrittenKeys(), gasUsed);
+ }
+ return sublistGasAccumulated;
+ }
+
+ private void saveOrCommitTrackState(boolean saveState, Repository track) {
+ logger.trace("End txs executions.");
+ if (saveState) {
+ logger.trace("Saving track.");
+ track.save();
+ logger.trace("End saving track.");
+ } else {
+ logger.trace("Committing track.");
+ track.commit();
+ logger.trace("End committing track.");
+ }
+ }
+
+ private TransactionReceipt buildTransactionReceipt(Transaction tx, TransactionExecutor txExecutor, long gasUsed, long cumulativeGas) {
+ TransactionReceipt receipt = new TransactionReceipt();
+ receipt.setGasUsed(gasUsed);
+ receipt.setTxStatus(txExecutor.getReceipt().isSuccessful());
+ receipt.setTransaction(tx);
+ receipt.setLogInfoList(txExecutor.getVMLogs());
+ receipt.setStatus(txExecutor.getReceipt().getStatus());
+ receipt.setCumulativeGas(cumulativeGas);
+ return receipt;
+ }
+
+ private BlockResult getBlockResultAndLogExecutionInterrupted(Block block, Metric metric, Transaction tx) {
+ logger.warn("block: [{}]/[{}] execution interrupted because of invalid tx: [{}]",
+ block.getNumber(), block.getHash(), tx.getHash());
+ profiler.stop(metric);
+ return BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT;
+ }
+
+ private void loggingTxExecuted() {
+ logger.trace("tx executed");
+ }
+
+ private void loggingTxDone() {
+ logger.trace("tx done");
+ }
+
+ private void loggingDiscardedBlock(Block block, Transaction tx) {
+ logger.warn("block: [{}] discarded tx: [{}]", block.getNumber(), tx.getHash());
+ }
+
+ private void loggingApplyBlock(Block block) {
+ logger.trace("applyBlock: block: [{}] tx.list: [{}]", block.getNumber(), block.getTransactionsList().size());
+ }
+
+ private void loggingApplyBlockToTx(Block block, int i) {
+ logger.trace("apply block: [{}] tx: [{}] ", block.getNumber(), i);
+ }
+
+ private void loggingExecuteTxAndReceipt(Block block, int i, Transaction tx) {
+ logger.trace("block: [{}] executed tx: [{}]", block.getNumber(), tx.getHash());
+ logger.trace("tx[{}].receipt", i);
}
public ProgramResult getProgramResult(Keccak256 txhash) {
@@ -420,4 +843,53 @@ public void setRegisterProgramResults(boolean value) {
this.registerProgramResults = value;
this.transactionResults.clear();
}
+
+ /**
+ * This implementation mimics a thread pool returned by {@link Executors#newCachedThreadPool()}, meaning that
+ * its core pool size is zero and maximum pool size is unbounded, while each thead created has keep-alive time - 15 mins.
+ */
+ private static final class ThreadPoolExecutorImpl extends ThreadPoolExecutor {
+ private static final long KEEP_ALIVE_TIME_IN_SECS = 15*60L; /* 15 minutes */
+
+ public ThreadPoolExecutorImpl(int parallelListIndex) {
+ super(0, Integer.MAX_VALUE,
+ KEEP_ALIVE_TIME_IN_SECS, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new ThreadFactoryImpl(parallelListIndex));
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ logger.debug("[{}]: before execution", t);
+ super.beforeExecute(t, r);
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ if (t == null) {
+ logger.debug("[{}]: after execution", Thread.currentThread());
+ } else {
+ logger.warn("[{}]: failed execution", Thread.currentThread(), t);
+ }
+ }
+ }
+
+ /**
+ * A utility class that helps to specify a proper thread name in the form of `BlockExecutorWorker--`.
+ */
+ private static final class ThreadFactoryImpl implements ThreadFactory {
+ private final AtomicInteger cnt = new AtomicInteger(0);
+ private final int parallelListIndex;
+
+ ThreadFactoryImpl(int parallelListIndex) {
+ this.parallelListIndex = parallelListIndex;
+ }
+
+ @Override
+ public Thread newThread(@Nonnull Runnable r) {
+ String threadName = "BlockExecutorWorker-" + parallelListIndex + "-" + cnt.getAndIncrement();
+ logger.info("New block execution thread [{}] for parallel list [{}] has been created", threadName, parallelListIndex);
+ return new Thread(r, threadName);
+ }
+ }
}
diff --git a/rskj-core/src/main/java/co/rsk/core/bc/BlockResult.java b/rskj-core/src/main/java/co/rsk/core/bc/BlockResult.java
index bb52ec2e364..b8f3468b9d3 100644
--- a/rskj-core/src/main/java/co/rsk/core/bc/BlockResult.java
+++ b/rskj-core/src/main/java/co/rsk/core/bc/BlockResult.java
@@ -24,6 +24,7 @@
import org.ethereum.core.Transaction;
import org.ethereum.core.TransactionReceipt;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -35,7 +36,7 @@ public class BlockResult {
null,
Collections.emptyList(),
Collections.emptyList(),
- 0,
+ new short[0], 0,
Coin.ZERO,
null
);
@@ -49,11 +50,13 @@ public class BlockResult {
// It is for optimizing switching between states. Instead of using the "stateRoot" field,
// which requires regenerating the trie, using the finalState field does not.
private final Trie finalState;
+ private final short[] txEdges;
public BlockResult(
Block block,
List executedTransactions,
List transactionReceipts,
+ short[] txEdges,
long gasUsed,
Coin paidFees,
Trie finalState) {
@@ -63,12 +66,14 @@ public BlockResult(
this.gasUsed = gasUsed;
this.paidFees = paidFees;
this.finalState = finalState;
+ this.txEdges = txEdges != null? Arrays.copyOf(txEdges, txEdges.length) : null;
}
-
public Block getBlock() {
return block;
}
+ public short[] getTxEdges() { return this.txEdges != null ? Arrays.copyOf(txEdges, txEdges.length) : null; }
+
public List getExecutedTransactions() { return executedTransactions; }
public List getTransactionReceipts() {
diff --git a/rskj-core/src/main/java/co/rsk/core/bc/BlockUtils.java b/rskj-core/src/main/java/co/rsk/core/bc/BlockUtils.java
index afe66fd0a4e..f8a305799f2 100644
--- a/rskj-core/src/main/java/co/rsk/core/bc/BlockUtils.java
+++ b/rskj-core/src/main/java/co/rsk/core/bc/BlockUtils.java
@@ -20,10 +20,12 @@
import co.rsk.crypto.Keccak256;
import co.rsk.net.NetBlockStore;
+import org.ethereum.config.Constants;
import org.ethereum.core.Block;
import org.ethereum.core.BlockHeader;
import org.ethereum.core.Blockchain;
import org.ethereum.db.BlockInformation;
+import org.ethereum.vm.GasCost;
import java.util.*;
import java.util.stream.Collectors;
@@ -116,4 +118,77 @@ public static List sortBlocksByNumber(List blocks) {
.collect(Collectors.toList());
}
+ /**
+ * Calculate the gas limit of a sublist, depending on the sublist type (sequential and parallel), from the block
+ * gas limit. The distribution can be performed one of two ways:
+ *
+ * 1. The block gas limit is divided equally among all sublists. If the division was not even (results in a decimal
+ * number) then the extra gas limit gets added to the sequential sublist.
+ *
+ * 2. The sequential sublist gets assigned a fixed value, determined by minSequentialSetGasLimit and additional
+ * gas limit is calculated by subtracting minSequentialSetGasLimit form block gas limit, the result is then divided
+ * by the amount of transaction execution threads. If the division for the parallel sublists was not even (results
+ * in a decimal number) then the extra gas limit gets added to the sequential sublist.
+ *
+ *
+ * @param block the block to get the gas limit from
+ * @param forSequentialTxSet a boolean the indicates if the gas limit beign calculated is for a sequential
+ * sublist or a paralle one.
+ * @param minSequentialSetGasLimit The minimum gas limit value the sequential sublist can have, configured by
+ * network in {@link Constants}.
+ *
+ * @return set of ancestors block hashes
+ */
+ public static long getSublistGasLimit(Block block, boolean forSequentialTxSet, long minSequentialSetGasLimit) {
+ long blockGasLimit = GasCost.toGas(block.getGasLimit());
+ int transactionExecutionThreadCount = Constants.getTransactionExecutionThreads();
+
+ /*
+ This if determines which distribution approach will be performed. If the result of multiplying the minSequentialSetGasLimit
+ by transactionExecutionThreadCount + 1 (where transactionExecutionThreadCount is the parallel sublist count and
+ the + 1 represents the sequential sublist) is less than the block gas limit then the equal division approach is performed,
+ otherwise the second approach, where the parallel sublists get less gas limit than the sequential sublist, is executed.
+ */
+ if((transactionExecutionThreadCount + 1) * minSequentialSetGasLimit <= blockGasLimit) {
+ long parallelTxSetGasLimit = blockGasLimit / (transactionExecutionThreadCount + 1);
+
+ if (forSequentialTxSet) {
+ /*
+ Subtract the total parallel sublist gas limit (parallelTxSetGasLimit) from the block gas limit to get
+ the sequential sublist gas limit + the possible extra gas limit and return it.
+ */
+ return blockGasLimit - (transactionExecutionThreadCount * parallelTxSetGasLimit);
+ }
+
+ return parallelTxSetGasLimit;
+ } else {
+ // Check if the block gas limit is less than the sequential gas limit.
+ if (blockGasLimit <= minSequentialSetGasLimit) {
+ /*
+ If this method execution is for a sequential sublist then return the total block gas limit. This will
+ skip the parallel sublist gas limit calculation since there will not be any gas limit left.
+ */
+ if (forSequentialTxSet) {
+ return blockGasLimit;
+ }
+
+ // If this method execution is NOT for a sequential sublist then return 0.
+ return 0;
+ }
+
+ long additionalGasLimit = (blockGasLimit - minSequentialSetGasLimit);
+ long parallelTxSetGasLimit = additionalGasLimit / (transactionExecutionThreadCount);
+
+ /*
+ If this method execution is for a sequential sublist then calculate the possible extra gas limit by subtracting
+ the total parallel sublist gas limit (parallelTxSetGasLimit) from additionalGasLimit and add the result to
+ minSequentialSetGasLimit
+ */
+ if (forSequentialTxSet) {
+ long extraGasLimit = additionalGasLimit - (parallelTxSetGasLimit * transactionExecutionThreadCount);
+ return minSequentialSetGasLimit + extraGasLimit;
+ }
+ return parallelTxSetGasLimit;
+ }
+ }
}
diff --git a/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java b/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java
new file mode 100644
index 00000000000..fdc29fef320
--- /dev/null
+++ b/rskj-core/src/main/java/co/rsk/core/bc/IReadWrittenKeysTracker.java
@@ -0,0 +1,42 @@
+/*
+ * This file is part of RskJ
+ * Copyright (C) 2019 RSK Labs Ltd.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program. If not, see .
+ */
+
+package co.rsk.core.bc;
+
+import org.ethereum.db.ByteArrayWrapper;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface IReadWrittenKeysTracker {
+ Set getThisThreadReadKeys();
+
+ Set getThisThreadWrittenKeys();
+
+ Map> getReadKeysByThread();
+
+ Map> getWrittenKeysByThread();
+
+ void addNewReadKey(ByteArrayWrapper key);
+
+ void addNewWrittenKey(ByteArrayWrapper key);
+
+ boolean detectCollision();
+
+ void clear();
+}
diff --git a/rskj-core/src/main/java/co/rsk/core/bc/ParallelizeTransactionHandler.java b/rskj-core/src/main/java/co/rsk/core/bc/ParallelizeTransactionHandler.java
new file mode 100644
index 00000000000..814554f3e33
--- /dev/null
+++ b/rskj-core/src/main/java/co/rsk/core/bc/ParallelizeTransactionHandler.java
@@ -0,0 +1,290 @@
+/*
+ * This file is part of RskJ
+ * Copyright (C) 2017 RSK Labs Ltd.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program. If not, see .
+ */
+
+package co.rsk.core.bc;
+
+import co.rsk.core.RskAddress;
+import org.ethereum.core.Block;
+import org.ethereum.core.Transaction;
+import org.ethereum.db.ByteArrayWrapper;
+import org.ethereum.vm.GasCost;
+
+import java.util.*;
+
+public class ParallelizeTransactionHandler {
+ private final HashMap sublistsHavingWrittenToKey;
+ private final HashMap> sublistsHavingReadFromKey;
+ private final Map sublistOfSender;
+ private final ArrayList sublists;
+
+ public ParallelizeTransactionHandler(short numberOfSublists, Block block, long minSequentialSetGasLimit) {
+ this.sublistOfSender = new HashMap<>();
+ this.sublistsHavingWrittenToKey = new HashMap<>();
+ this.sublistsHavingReadFromKey = new HashMap<>();
+ this.sublists = new ArrayList<>();
+ for (short i = 0; i < numberOfSublists; i++){
+ this.sublists.add(new TransactionSublist(BlockUtils.getSublistGasLimit(block, false, minSequentialSetGasLimit), false));
+ }
+ this.sublists.add(new TransactionSublist(BlockUtils.getSublistGasLimit(block, true, minSequentialSetGasLimit), true));
+ }
+
+ public Optional addTransaction(Transaction tx, Set newReadKeys, Set newWrittenKeys, long gasUsedByTx) {
+ TransactionSublist sublistCandidate = getSublistCandidates(tx, newReadKeys, newWrittenKeys);
+
+ if (sublistDoesNotHaveEnoughGas(tx, sublistCandidate)) {
+ if (sublistCandidate.isSequential()) {
+ return Optional.empty();
+ }
+ sublistCandidate = getSequentialSublist();
+
+ if (sublistDoesNotHaveEnoughGas(tx, sublistCandidate)) {
+ return Optional.empty();
+ }
+ }
+
+ sublistCandidate.addTransaction(tx, gasUsedByTx);
+ addNewKeysToMaps(tx.getSender(), sublistCandidate, newReadKeys, newWrittenKeys);
+ return Optional.of(sublistCandidate.getGasUsed());
+ }
+
+ private boolean sublistDoesNotHaveEnoughGas(Transaction tx, TransactionSublist sublistCandidate) {
+ return !sublistCandidate.hasGasAvailable(GasCost.toGas(tx.getGasLimit()));
+ }
+
+ public Optional addRemascTransaction(Transaction tx, long gasUsedByTx) {
+ TransactionSublist sequentialSublist = getSequentialSublist();
+ sequentialSublist.addTransaction(tx, gasUsedByTx);
+ return Optional.of(sequentialSublist.getGasUsed());
+ }
+
+ // Unlike the smart contracts execution, Precompiled contracts persist the changes in the repository when they
+ // complete the execution. So, there is no way for the ParallelTransactionHandler to track the keys if the execution
+ // fails since the Precompiled contract rolls back the repository.
+ // Therefore, the tx is added to the sequential sublist to avoid possible race conditions.
+ public Optional addTxToSequentialSublist(Transaction tx, long gasUsedByTx) {
+ TransactionSublist sequentialSublist = getSequentialSublist();
+
+ if (sublistDoesNotHaveEnoughGas(tx, sequentialSublist)) {
+ return Optional.empty();
+ }
+
+ sequentialSublist.addTransaction(tx, gasUsedByTx);
+ addNewKeysToMaps(tx.getSender(), sequentialSublist, new HashSet<>(), new HashSet<>());
+
+ return Optional.of(sequentialSublist.getGasUsed());
+ }
+
+ public long getGasUsedIn(Short sublistId) {
+
+ if (sublistId < 0 || sublistId >= sublists.size()) {
+ throw new NoSuchElementException();
+ }
+
+ return this.sublists.get(sublistId).getGasUsed();
+ }
+
+ public List getTransactionsInOrder() {
+ List txs = new ArrayList<>();
+ for (TransactionSublist sublist: this.sublists) {
+ txs.addAll(sublist.getTransactions());
+ }
+ return txs;
+ }
+
+ public short[] getTransactionsPerSublistInOrder() {
+ List sublistSizes = new ArrayList<>();
+ short sublistEdges = 0;
+
+ for (TransactionSublist sublist: this.sublists) {
+ if (sublist.getTransactions().isEmpty() || sublist.isSequential()) {
+ continue;
+ }
+ sublistEdges += (short) sublist.getTransactions().size();
+ sublistSizes.add(sublistEdges);
+ }
+
+ short[] sublistOrder = new short[sublistSizes.size()];
+ int i = 0;
+ for (Short size: sublistSizes) {
+ sublistOrder[i] = size;
+ i++;
+ }
+
+ return sublistOrder;
+ }
+
+ public long getGasUsedInSequential() {
+ return getSequentialSublist().getGasUsed();
+ }
+
+ private void addNewKeysToMaps(RskAddress sender, TransactionSublist sublist, Set newReadKeys, Set newWrittenKeys) {
+ for (ByteArrayWrapper key : newReadKeys) {
+ Set sublistsAlreadyRead = sublistsHavingReadFromKey.getOrDefault(key, new HashSet<>());
+ sublistsAlreadyRead.add(sublist);
+ sublistsHavingReadFromKey.put(key, sublistsAlreadyRead);
+ }
+
+ if (sublist.isSequential()) {
+ sublistOfSender.put(sender, sublist);
+ } else {
+ sublistOfSender.putIfAbsent(sender, sublist);
+ }
+
+ for (ByteArrayWrapper key: newWrittenKeys) {
+ sublistsHavingWrittenToKey.put(key, sublist);
+ }
+ }
+
+ private Optional getSublistBySender(Transaction tx) {
+ return Optional.ofNullable(sublistOfSender.get(tx.getSender()));
+ }
+
+ private Optional getAvailableSublistWithLessUsedGas(long txGasLimit) {
+ long gasUsed = Long.MAX_VALUE;
+ Optional sublistCandidate = Optional.empty();
+
+ for (TransactionSublist sublist : sublists) {
+ if (!sublist.isSequential() && sublist.hasGasAvailable(txGasLimit) && sublist.getGasUsed() < gasUsed) {
+ sublistCandidate = Optional.of(sublist);
+ gasUsed = sublist.getGasUsed();
+ }
+ }
+
+ return sublistCandidate;
+ }
+
+ private TransactionSublist getSublistCandidates(Transaction tx, Set newReadKeys, Set newWrittenKeys) {
+ Optional sublistCandidate = getSublistBySender(tx);
+
+ if (sublistCandidate.isPresent() && sublistCandidate.get().isSequential()) {
+ // there is a tx with the same sender in the sequential sublist
+ return sublistCandidate.get();
+ }
+
+ // analyze reads
+ for (ByteArrayWrapper newReadKey : newReadKeys) {
+ // read - written
+ if (sublistsHavingWrittenToKey.containsKey(newReadKey)) {
+ TransactionSublist sublist = sublistsHavingWrittenToKey.get(newReadKey);
+
+ if (sublist.isSequential()) {
+ // there is a tx with read-written collision in sequential sublist
+ return sublist;
+ }
+ if (!sublistCandidate.isPresent()) {
+ // this is the new candidate
+ sublistCandidate = Optional.of(sublist);
+ } else if (!sublistCandidate.get().equals(sublist)) {
+ // use the sequential sublist (greedy decision)
+ return getSequentialSublist();
+ }
+ }
+ }
+
+ // analyze writes
+ for (ByteArrayWrapper newWrittenKey : newWrittenKeys) {
+ // write - written
+ if (sublistsHavingWrittenToKey.containsKey(newWrittenKey)) {
+ TransactionSublist sublist = sublistsHavingWrittenToKey.get(newWrittenKey);
+
+ if (sublist.isSequential()) {
+ // there is a tx with write-written collision in sequential sublist
+ return sublist;
+ }
+ if (!sublistCandidate.isPresent()) {
+ // this is the new candidate
+ sublistCandidate = Optional.of(sublist);
+ } else if (!sublistCandidate.get().equals(sublist)) {
+ // use the sequential sublist (greedy decision)
+ return getSequentialSublist();
+ }
+ }
+
+ // write - read
+ if (sublistsHavingReadFromKey.containsKey(newWrittenKey)) {
+ Set setOfsublists = sublistsHavingReadFromKey.get(newWrittenKey);
+ if (setOfsublists.size() > 1) {
+ // there is a write-read collision with multiple sublists
+ return getSequentialSublist();
+ }
+
+ // there is only one colluded sublist
+ TransactionSublist sublist = getNextSublist(setOfsublists);
+ if (!sublistCandidate.isPresent()) {
+ // if there is no candidate, take the colluded sublist
+ sublistCandidate = Optional.of(sublist);
+ } else if (!sublistCandidate.get().equals(sublist)) {
+ // otherwise, check if the sublist is different from the candidate and return the sequential
+ return getSequentialSublist();
+ }
+ }
+ }
+
+ // if there is no candidate use the sublist with more gas available
+ // if the is no more gas available in any parallel sublist use the sequential
+ return sublistCandidate
+ .orElseGet(() -> getAvailableSublistWithLessUsedGas(GasCost.toGas(tx.getGasLimit()))
+ .orElseGet(this::getSequentialSublist));
+ }
+
+ private TransactionSublist getNextSublist(Set sublist) {
+ return sublist.iterator().next();
+ }
+
+ private TransactionSublist getSequentialSublist() {
+ return this.sublists.get(this.sublists.size()-1);
+ }
+
+ private static class TransactionSublist {
+
+ private final long gasLimit;
+ private final boolean isSequential;
+ private final List transactions;
+ private long gasUsedInSublist;
+
+ public TransactionSublist(long sublistGasLimit, boolean isSequential) {
+ this.gasLimit = sublistGasLimit;
+ this.isSequential = isSequential;
+ this.transactions = new ArrayList<>();
+ this.gasUsedInSublist = 0;
+ }
+
+ private void addTransaction(Transaction tx, long gasUsedByTx) {
+ transactions.add(tx);
+ gasUsedInSublist = gasUsedInSublist + gasUsedByTx;
+ }
+
+ private boolean hasGasAvailable(long txGasLimit) {
+ //TODO(JULI): Re-check a thousand of times this line.
+ long cumulativeGas = GasCost.add(gasUsedInSublist, txGasLimit);
+ return cumulativeGas <= gasLimit;
+ }
+
+ public long getGasUsed() {
+ return gasUsedInSublist;
+ }
+
+ public List getTransactions() {
+ return new ArrayList<>(this.transactions);
+ }
+
+ public boolean isSequential() {
+ return isSequential;
+ }
+ }
+}
diff --git a/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java b/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java
new file mode 100644
index 00000000000..968465ba20d
--- /dev/null
+++ b/rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java
@@ -0,0 +1,135 @@
+/*
+ * This file is part of RskJ
+ * Copyright (C) 2017 RSK Labs Ltd.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program. If not, see .
+ */
+
+package co.rsk.core.bc;
+
+import org.ethereum.db.ByteArrayWrapper;
+
+import java.util.*;
+
+public class ReadWrittenKeysTracker implements IReadWrittenKeysTracker {
+
+ private Map> readKeysByThread;
+
+ private Map> writtenKeysByThread;
+
+
+ public ReadWrittenKeysTracker() {
+ this.readKeysByThread = new HashMap<>();
+ this.writtenKeysByThread = new HashMap<>();
+ }
+
+ @Override
+ public Set getThisThreadReadKeys(){
+ long threadId = Thread.currentThread().getId();
+ if (this.readKeysByThread.containsKey(threadId)) {
+ return new HashSet<>(this.readKeysByThread.get(threadId));
+ } else {
+ return new HashSet<>();
+ }
+ }
+
+ @Override
+ public Set getThisThreadWrittenKeys(){
+ long threadId = Thread.currentThread().getId();
+ if (this.writtenKeysByThread.containsKey(threadId)) {
+ return new HashSet<>(this.writtenKeysByThread.get(threadId));
+ } else {
+ return new HashSet<>();
+ }
+ }
+
+ @Override
+ public Map> getReadKeysByThread() {
+ return new HashMap<>(this.readKeysByThread);
+ }
+
+ @Override
+ public Map> getWrittenKeysByThread() {
+ return new HashMap<>(this.writtenKeysByThread);
+ }
+
+ @Override
+ public synchronized void addNewReadKey(ByteArrayWrapper key) {
+ long threadId = Thread.currentThread().getId();
+ addNewReadKeyToThread(threadId,key);
+ }
+
+ public synchronized void addNewReadKeyToThread(long threadId,ByteArrayWrapper key) {
+ Set readKeys = readKeysByThread.containsKey(threadId)? readKeysByThread.get(threadId) : new HashSet<>();
+ readKeys.add(key);
+ readKeysByThread.put(threadId, readKeys);
+ }
+ public synchronized void removeReadKeyToThread(long threadId,ByteArrayWrapper key) {
+ Set readKeys = readKeysByThread.containsKey(threadId)? readKeysByThread.get(threadId) : new HashSet<>();
+ readKeys.remove(key);
+ readKeysByThread.put(threadId, readKeys);
+ }
+
+ @Override
+ public synchronized void addNewWrittenKey(ByteArrayWrapper key) {
+ long threadId = Thread.currentThread().getId();
+ addNewWrittenKeyToThread(threadId,key);
+
+ }
+ public synchronized void removeWrittenKeyToThread(long threadId,ByteArrayWrapper key) {
+ Set writtenKeys = writtenKeysByThread.containsKey(threadId)? writtenKeysByThread.get(threadId) : new HashSet<>();
+ writtenKeys.remove(key);
+ writtenKeysByThread.put(threadId, writtenKeys);
+ }
+
+ public synchronized void addNewWrittenKeyToThread(long threadId,ByteArrayWrapper key) {
+ Set writtenKeys = writtenKeysByThread.containsKey(threadId)? writtenKeysByThread.get(threadId) : new HashSet<>();
+ writtenKeys.add(key);
+ writtenKeysByThread.put(threadId, writtenKeys);
+ }
+
+ @Override
+ public synchronized void clear() {
+ this.readKeysByThread = new HashMap<>();
+ this.writtenKeysByThread = new HashMap<>();
+ }
+
+ public boolean detectCollision() {
+ Set threads = new HashSet<>();
+ threads.addAll(readKeysByThread.keySet());
+ threads.addAll(writtenKeysByThread.keySet());
+
+ for (Long threadId : threads) {
+ Set baseReadKeys = readKeysByThread.getOrDefault(threadId, new HashSet<>());
+ Set baseWrittenKeys = writtenKeysByThread.getOrDefault(threadId, new HashSet<>());
+
+ for (Long threadId2 : threads) {
+ if (threadId >= threadId2) {
+ continue;
+ }
+
+ Set temporalReadKeys = readKeysByThread.getOrDefault(threadId2, new HashSet<>());
+ Set temporalWrittenKeys = writtenKeysByThread.getOrDefault(threadId2, new HashSet<>());
+
+ boolean isDisjoint = Collections.disjoint(baseWrittenKeys, temporalWrittenKeys) && Collections.disjoint(baseWrittenKeys, temporalReadKeys)
+ && Collections.disjoint(baseReadKeys, temporalWrittenKeys);
+
+ if (!isDisjoint) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git a/rskj-core/src/main/java/co/rsk/core/types/bytes/Bytes.java b/rskj-core/src/main/java/co/rsk/core/types/bytes/Bytes.java
index ce988888068..cb4c16d17d3 100644
--- a/rskj-core/src/main/java/co/rsk/core/types/bytes/Bytes.java
+++ b/rskj-core/src/main/java/co/rsk/core/types/bytes/Bytes.java
@@ -23,7 +23,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.Arrays;
import java.util.Objects;
/**
@@ -140,8 +139,8 @@ public byte byteAt(int index) {
}
@Override
- public byte[] copyArrayOfRange(int from, int to) {
- return Arrays.copyOfRange(byteArray, from, to);
+ public void arraycopy(int srcPos, byte[] dest, int destPos, int length) {
+ System.arraycopy(byteArray, srcPos, dest, destPos, length);
}
@Override
diff --git a/rskj-core/src/main/java/co/rsk/core/types/bytes/BytesSlice.java b/rskj-core/src/main/java/co/rsk/core/types/bytes/BytesSlice.java
index 5210ff8eea0..ac3616fd8f0 100644
--- a/rskj-core/src/main/java/co/rsk/core/types/bytes/BytesSlice.java
+++ b/rskj-core/src/main/java/co/rsk/core/types/bytes/BytesSlice.java
@@ -18,11 +18,62 @@
package co.rsk.core.types.bytes;
+import java.util.Arrays;
+
/**
* A {@link BytesSlice} is a subsequence of bytes backed by another broader byte sequence.
*/
public interface BytesSlice extends HexPrintableBytes {
+ /**
+ * Copies an array from the {@link BytesSlice} source, beginning at the
+ * specified position, to the specified position of the destination array.
+ * A subsequence of array components are copied from this instance to the
+ * destination array referenced by {@code dest}. The number of components
+ * copied is equal to the {@code length} argument. The components at
+ * positions {@code srcPos} through {@code srcPos+length-1} in the source
+ * array are copied into positions {@code destPos} through
+ * {@code destPos+length-1}, respectively, of the destination
+ * array.
+ *
+ * If the underlying byte array and {@code dest} argument refer to the
+ * same array object, then the copying is performed as if the
+ * components at positions {@code srcPos} through
+ * {@code srcPos+length-1} were first copied to a temporary
+ * array with {@code length} components and then the contents of
+ * the temporary array were copied into positions
+ * {@code destPos} through {@code destPos+length-1} of the
+ * destination array.
+ *
+ * If {@code dest} is {@code null}, then a
+ * {@code NullPointerException} is thrown.
+ *
+ * Otherwise, if any of the following is true, an
+ * {@code IndexOutOfBoundsException} is
+ * thrown and the destination is not modified:
+ *
+ * - The {@code srcPos} argument is negative.
+ *
- The {@code destPos} argument is negative.
+ *
- The {@code length} argument is negative.
+ *
- {@code srcPos+length} is greater than
+ * {@code src.length}, the length of the source array.
+ *
- {@code destPos+length} is greater than
+ * {@code dest.length}, the length of the destination array.
+ *
+ *
+ *
+ * Note: this method mimics behaviour of {@link System#arraycopy(Object, int, Object, int, int)}
+ *
+ * @param srcPos starting position in the source array.
+ * @param dest the destination array.
+ * @param destPos starting position in the destination data.
+ * @param length the number of array elements to be copied.
+ * @throws IndexOutOfBoundsException if copying would cause
+ * access of data outside array bounds.
+ * @throws NullPointerException if {@code dest} is {@code null}.
+ */
+ void arraycopy(int srcPos, byte[] dest, int destPos, int length);
+
/**
* Copies the specified range of the specified array into a new array.
* The initial index of the range (from) must lie between zero
@@ -37,17 +88,30 @@ public interface BytesSlice extends HexPrintableBytes {
* greater than or equal to original.length - from. The length
* of the returned array will be to - from.
*
+ *
+ * Note: this method mimics behaviour of {@link Arrays#copyOfRange(Object[], int, int)}
+ *
* @param from the initial index of the range to be copied, inclusive
* @param to the final index of the range to be copied, exclusive.
* (This index may lie outside the array.)
* @return a new array containing the specified range from the original array,
* truncated or padded with zeros to obtain the required length
- * @throws ArrayIndexOutOfBoundsException if {@code from < 0}
+ * @throws IndexOutOfBoundsException if {@code from < 0}
* or {@code from > original.length}
* @throws IllegalArgumentException if from > to
- * @throws NullPointerException if original is null
*/
- byte[] copyArrayOfRange(int from, int to);
+ default byte[] copyArrayOfRange(int from, int to) {
+ if (from < 0 || from > length()) {
+ throw new IndexOutOfBoundsException("invalid 'from': " + from);
+ }
+ int newLength = to - from;
+ if (newLength < 0) {
+ throw new IllegalArgumentException(from + " > " + to);
+ }
+ byte[] copy = new byte[newLength];
+ arraycopy(from, copy, 0, Math.min(length() - from, newLength));
+ return copy;
+ }
default byte[] copyArray() {
return copyArrayOfRange(0, length());
@@ -104,11 +168,17 @@ public byte byteAt(int index) {
}
@Override
- public byte[] copyArrayOfRange(int from, int to) {
- if (from < 0 || from > to || to > length()) {
- throw new IndexOutOfBoundsException("invalid 'from' and/or 'to': [" + from + ";" + to + ")");
+ public void arraycopy(int srcPos, byte[] dest, int destPos, int length) {
+ if (length < 0) {
+ throw new IndexOutOfBoundsException("invalid 'length': " + length);
+ }
+ if (srcPos < 0 || srcPos + length > length()) {
+ throw new IndexOutOfBoundsException("invalid 'srcPos' and/or 'length': [" + srcPos + ";" + length + ")");
+ }
+ if (destPos < 0 || destPos + length > dest.length) {
+ throw new IndexOutOfBoundsException("invalid 'destPos' and/or 'length': [" + destPos + ";" + length + ")");
}
- return originBytes.copyArrayOfRange(this.from + from, this.from + to);
+ originBytes.arraycopy(this.from + srcPos, dest, destPos, length);
}
@Override
diff --git a/rskj-core/src/main/java/co/rsk/core/types/bytes/HexPrintableBytes.java b/rskj-core/src/main/java/co/rsk/core/types/bytes/HexPrintableBytes.java
index d700c0cc6a6..3f91f918eda 100644
--- a/rskj-core/src/main/java/co/rsk/core/types/bytes/HexPrintableBytes.java
+++ b/rskj-core/src/main/java/co/rsk/core/types/bytes/HexPrintableBytes.java
@@ -77,7 +77,7 @@ public String toFormattedString(@Nonnull HexPrintableBytes printableBytes, int o
}
if (length > 32) {
- return printableBytes.toHexString(off, 15) + ".." + printableBytes.toHexString(off + length - 15, 15);
+ return printableBytes.toHexString(off, 16) + ".." + printableBytes.toHexString(off + length - 15, 15);
}
return printableBytes.toHexString(off, length);
}
diff --git a/rskj-core/src/main/java/co/rsk/db/MutableTrieCache.java b/rskj-core/src/main/java/co/rsk/db/MutableTrieCache.java
index 997b07defd2..64f8bc729e9 100644
--- a/rskj-core/src/main/java/co/rsk/db/MutableTrieCache.java
+++ b/rskj-core/src/main/java/co/rsk/db/MutableTrieCache.java
@@ -31,6 +31,7 @@
import java.nio.charset.StandardCharsets;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
public class MutableTrieCache implements MutableTrie {
@@ -40,14 +41,14 @@ public class MutableTrieCache implements MutableTrie {
private MutableTrie trie;
// We use a single cache to mark both changed elements and removed elements.
// null value means the element has been removed.
- private final Map> cache;
+ private final Map>> cache;
// this logs recursive delete operations to be performed at commit time
private final Set deleteRecursiveLog;
public MutableTrieCache(MutableTrie parentTrie) {
trie = parentTrie;
- cache = new HashMap<>();
+ cache = new ConcurrentHashMap<>();
deleteRecursiveLog = new HashSet<>();
}
@@ -75,7 +76,7 @@ private Optional internalGet(
ByteArrayWrapper wrapper = new ByteArrayWrapper(key);
ByteArrayWrapper accountWrapper = getAccountWrapper(wrapper);
- Map accountItems = cache.get(accountWrapper);
+ Map> accountItems = cache.get(accountWrapper);
boolean isDeletedAccount = deleteRecursiveLog.contains(accountWrapper);
if (accountItems == null || !accountItems.containsKey(wrapper)) {
if (isDeletedAccount) {
@@ -85,14 +86,14 @@ private Optional internalGet(
return Optional.ofNullable(trieRetriever.apply(key));
}
- byte[] cacheItem = accountItems.get(wrapper);
- if (cacheItem == null) {
+ Optional cacheItem = accountItems.get(wrapper);
+ if (!cacheItem.isPresent()) {
// deleted account key
return Optional.empty();
}
// cached account key
- return Optional.ofNullable(cacheTransformer.apply(cacheItem));
+ return Optional.ofNullable(cacheTransformer.apply(cacheItem.get()));
}
public Iterator getStorageKeys(RskAddress addr) {
@@ -100,7 +101,7 @@ public Iterator getStorageKeys(RskAddress addr) {
ByteArrayWrapper accountWrapper = getAccountWrapper(new ByteArrayWrapper(accountStoragePrefixKey));
boolean isDeletedAccount = deleteRecursiveLog.contains(accountWrapper);
- Map accountItems = cache.get(accountWrapper);
+ Map> accountItems = cache.get(accountWrapper);
if (accountItems == null && isDeletedAccount) {
return Collections.emptyIterator();
}
@@ -139,8 +140,8 @@ public void put(ByteArrayWrapper wrapper, byte[] value) {
// in cache with null or in deleteCache. Here we have the choice to
// to add it to cache with null value or to deleteCache.
ByteArrayWrapper accountWrapper = getAccountWrapper(wrapper);
- Map accountMap = cache.computeIfAbsent(accountWrapper, k -> new HashMap<>());
- accountMap.put(wrapper, value);
+ Map> accountMap = cache.computeIfAbsent(accountWrapper, k -> new ConcurrentHashMap<>());
+ accountMap.put(wrapper, Optional.ofNullable(value));
}
@Override
@@ -179,7 +180,7 @@ public void commit() {
cache.forEach((accountKey, accountData) -> {
if (accountData != null) {
// cached account
- accountData.forEach((realKey, value) -> this.trie.put(realKey, value));
+ accountData.forEach((realKey, value) -> this.trie.put(realKey, value.orElse(null)));
}
});
@@ -242,7 +243,7 @@ public Optional getValueHash(byte[] key) {
private static class StorageKeysIterator implements Iterator {
private final Iterator keysIterator;
- private final Map accountItems;
+ private final Map> accountItems;
private final RskAddress address;
private final int storageKeyOffset = (
TrieKeyMapper.domainPrefix().length +
@@ -252,11 +253,11 @@ private static class StorageKeysIterator implements Iterator {
* Byte.SIZE;
private final TrieKeyMapper trieKeyMapper;
private DataWord currentStorageKey;
- private Iterator> accountIterator;
+ private Iterator>> accountIterator;
StorageKeysIterator(
Iterator keysIterator,
- Map accountItems,
+ Map> accountItems,
RskAddress addr,
TrieKeyMapper trieKeyMapper) {
this.keysIterator = keysIterator;
@@ -275,8 +276,8 @@ public boolean hasNext() {
DataWord item = keysIterator.next();
ByteArrayWrapper fullKey = getCompleteKey(item);
if (accountItems.containsKey(fullKey)) {
- byte[] value = accountItems.remove(fullKey);
- if (value == null){
+ Optional value = accountItems.remove(fullKey);
+ if (!value.isPresent()){
continue;
}
}
@@ -289,9 +290,9 @@ public boolean hasNext() {
}
while (accountIterator.hasNext()) {
- Map.Entry entry = accountIterator.next();
+ Map.Entry> entry = accountIterator.next();
byte[] key = entry.getKey().getData();
- if (entry.getValue() != null && key.length * Byte.SIZE > storageKeyOffset) {
+ if (entry.getValue().isPresent() && key.length * Byte.SIZE > storageKeyOffset) {
// cached account key
currentStorageKey = getPartialKey(key);
return true;
diff --git a/rskj-core/src/main/java/co/rsk/db/RepositoryLocator.java b/rskj-core/src/main/java/co/rsk/db/RepositoryLocator.java
index 3a3eef66fec..7d328039bc5 100644
--- a/rskj-core/src/main/java/co/rsk/db/RepositoryLocator.java
+++ b/rskj-core/src/main/java/co/rsk/db/RepositoryLocator.java
@@ -18,6 +18,7 @@
package co.rsk.db;
+import co.rsk.core.bc.IReadWrittenKeysTracker;
import co.rsk.crypto.Keccak256;
import co.rsk.trie.MutableTrie;
import co.rsk.trie.Trie;
@@ -77,6 +78,13 @@ public Repository startTrackingAt(BlockHeader header) {
.orElseThrow(() -> trieNotFoundException(header));
}
+ public Repository startTrackingAt(BlockHeader header, IReadWrittenKeysTracker tracker) {
+ return mutableTrieSnapshotAt(header)
+ .map(MutableTrieCache::new)
+ .map(mutableTrieCache -> new MutableRepository(mutableTrieCache, tracker))
+ .orElseThrow(() -> trieNotFoundException(header));
+ }
+
private IllegalArgumentException trieNotFoundException(BlockHeader header) {
return new IllegalArgumentException(String.format(
"The trie with root %s is missing in this store", header.getHash()
diff --git a/rskj-core/src/main/java/co/rsk/mine/BlockToMineBuilder.java b/rskj-core/src/main/java/co/rsk/mine/BlockToMineBuilder.java
index 42b895dd20c..dfa07fed3a0 100644
--- a/rskj-core/src/main/java/co/rsk/mine/BlockToMineBuilder.java
+++ b/rskj-core/src/main/java/co/rsk/mine/BlockToMineBuilder.java
@@ -249,6 +249,7 @@ private BlockHeader createHeader(
.setMinimumGasPrice(minimumGasPrice)
.setUncleCount(uncles.size())
.setUmmRoot(ummRoot)
+ .setCreateParallelCompliantHeader(activationConfig.isActive(ConsensusRule.RSKIP144, blockNumber))
.build();
newHeader.setDifficulty(difficultyCalculator.calcDifficulty(newHeader, newBlockParentHeader));
diff --git a/rskj-core/src/main/java/co/rsk/net/NodeBlockProcessor.java b/rskj-core/src/main/java/co/rsk/net/NodeBlockProcessor.java
index 2e2a303dc6c..1a3a7b52afc 100644
--- a/rskj-core/src/main/java/co/rsk/net/NodeBlockProcessor.java
+++ b/rskj-core/src/main/java/co/rsk/net/NodeBlockProcessor.java
@@ -227,7 +227,7 @@ public void processBodyRequest(@Nonnull final Peer sender, long requestId, @Nonn
return;
}
- Message responseMessage = new BodyResponseMessage(requestId, block.getTransactionsList(), block.getUncleList());
+ Message responseMessage = new BodyResponseMessage(requestId, block.getTransactionsList(), block.getUncleList(), block.getHeader().getExtension());
sender.sendMessage(responseMessage);
}
diff --git a/rskj-core/src/main/java/co/rsk/net/handler/TxPendingValidator.java b/rskj-core/src/main/java/co/rsk/net/handler/TxPendingValidator.java
index 24416ac005f..54b81c17b5a 100644
--- a/rskj-core/src/main/java/co/rsk/net/handler/TxPendingValidator.java
+++ b/rskj-core/src/main/java/co/rsk/net/handler/TxPendingValidator.java
@@ -18,11 +18,13 @@
package co.rsk.net.handler;
import co.rsk.core.Coin;
+import co.rsk.core.bc.BlockUtils;
import co.rsk.net.TransactionValidationResult;
import co.rsk.net.handler.txvalidator.*;
import org.bouncycastle.util.BigIntegers;
import org.ethereum.config.Constants;
import org.ethereum.config.blockchain.upgrades.ActivationConfig;
+import org.ethereum.config.blockchain.upgrades.ConsensusRule;
import org.ethereum.core.AccountState;
import org.ethereum.core.Block;
import org.ethereum.core.SignatureCache;
@@ -69,10 +71,14 @@ public TxPendingValidator(Constants constants, ActivationConfig activationConfig
}
public TransactionValidationResult isValid(Transaction tx, Block executionBlock, @Nullable AccountState state) {
- BigInteger blockGasLimit = BigIntegers.fromUnsignedByteArray(executionBlock.getGasLimit());
+ long executionBlockNumber = executionBlock.getNumber();
+ ActivationConfig.ForBlock activations = activationConfig.forBlock(executionBlockNumber);
+ BigInteger gasLimit = activations.isActive(ConsensusRule.RSKIP144)
+ ? BigInteger.valueOf(Math.max(BlockUtils.getSublistGasLimit(executionBlock, true, constants.getMinSequentialSetGasLimit()), BlockUtils.getSublistGasLimit(executionBlock, false, constants.getMinSequentialSetGasLimit())))
+ : BigIntegers.fromUnsignedByteArray(executionBlock.getGasLimit());
Coin minimumGasPrice = executionBlock.getMinimumGasPrice();
long bestBlockNumber = executionBlock.getNumber();
- long basicTxCost = tx.transactionCost(constants, activationConfig.forBlock(bestBlockNumber), signatureCache);
+ long basicTxCost = tx.transactionCost(constants, activations, signatureCache);
if (state == null && basicTxCost != 0) {
if (logger.isTraceEnabled()) {
@@ -81,7 +87,7 @@ public TransactionValidationResult isValid(Transaction tx, Block executionBlock,
return TransactionValidationResult.withError("the sender account doesn't exist");
}
- if(tx.isInitCodeSizeInvalidForTx(activationConfig.forBlock(bestBlockNumber))) {
+ if (tx.isInitCodeSizeInvalidForTx(activationConfig.forBlock(bestBlockNumber))) {
return TransactionValidationResult.withError("transaction's init code size is invalid");
}
@@ -90,7 +96,7 @@ public TransactionValidationResult isValid(Transaction tx, Block executionBlock,
}
for (TxValidatorStep step : validatorSteps) {
- TransactionValidationResult validationResult = step.validate(tx, state, blockGasLimit, minimumGasPrice, bestBlockNumber, basicTxCost == 0);
+ TransactionValidationResult validationResult = step.validate(tx, state, gasLimit, minimumGasPrice, executionBlockNumber, basicTxCost == 0);
if (!validationResult.transactionIsValid()) {
logger.info("[tx={}] validation failed with error: {}", tx.getHash(), validationResult.getErrorMessage());
return validationResult;
diff --git a/rskj-core/src/main/java/co/rsk/net/messages/BlockHeadersResponseMessage.java b/rskj-core/src/main/java/co/rsk/net/messages/BlockHeadersResponseMessage.java
index ada7749f442..d38324cb303 100644
--- a/rskj-core/src/main/java/co/rsk/net/messages/BlockHeadersResponseMessage.java
+++ b/rskj-core/src/main/java/co/rsk/net/messages/BlockHeadersResponseMessage.java
@@ -32,7 +32,7 @@ public BlockHeadersResponseMessage(long id, List headers) {
@Override
protected byte[] getEncodedMessageWithoutId() {
byte[][] rlpHeaders = this.blockHeaders.stream()
- .map(BlockHeader::getFullEncoded)
+ .map(BlockHeader::getEncodedCompressed)
.toArray(byte[][]::new);
return RLP.encodeList(RLP.encodeList(rlpHeaders));
diff --git a/rskj-core/src/main/java/co/rsk/net/messages/BodyResponseMessage.java b/rskj-core/src/main/java/co/rsk/net/messages/BodyResponseMessage.java
index 41c73c76de7..b857f262fc9 100644
--- a/rskj-core/src/main/java/co/rsk/net/messages/BodyResponseMessage.java
+++ b/rskj-core/src/main/java/co/rsk/net/messages/BodyResponseMessage.java
@@ -1,6 +1,8 @@
package co.rsk.net.messages;
+import com.google.common.collect.Lists;
import org.ethereum.core.BlockHeader;
+import org.ethereum.core.BlockHeaderExtension;
import org.ethereum.core.Transaction;
import org.ethereum.util.RLP;
@@ -10,14 +12,16 @@
* Created by ajlopez on 25/08/2017.
*/
public class BodyResponseMessage extends MessageWithId {
- private long id;
- private List transactions;
- private List uncles;
+ private final long id;
+ private final List transactions;
+ private final List uncles;
+ private final BlockHeaderExtension blockHeaderExtension;
- public BodyResponseMessage(long id, List transactions, List uncles) {
+ public BodyResponseMessage(long id, List transactions, List uncles, BlockHeaderExtension blockHeaderExtension) {
this.id = id;
this.transactions = transactions;
this.uncles = uncles;
+ this.blockHeaderExtension = blockHeaderExtension;
}
@Override
@@ -26,6 +30,7 @@ public BodyResponseMessage(long id, List transactions, List getTransactions() { return this.transactions; }
public List getUncles() { return this.uncles; }
+ public BlockHeaderExtension getBlockHeaderExtension() { return this.blockHeaderExtension; }
@Override
protected byte[] getEncodedMessageWithoutId() {
@@ -40,7 +45,13 @@ protected byte[] getEncodedMessageWithoutId() {
rlpUncles[k] = this.uncles.get(k).getFullEncoded();
}
- return RLP.encodeList(RLP.encodeList(rlpTransactions), RLP.encodeList(rlpUncles));
+ List elements = Lists.newArrayList(RLP.encodeList(rlpTransactions), RLP.encodeList(rlpUncles));
+
+ if (this.blockHeaderExtension != null) {
+ elements.add(BlockHeaderExtension.toEncoded(blockHeaderExtension));
+ }
+
+ return RLP.encodeList(elements.toArray(new byte[][]{}));
}
@Override
diff --git a/rskj-core/src/main/java/co/rsk/net/messages/MessageType.java b/rskj-core/src/main/java/co/rsk/net/messages/MessageType.java
index ddb5ac7534e..2b8da7a78d0 100644
--- a/rskj-core/src/main/java/co/rsk/net/messages/MessageType.java
+++ b/rskj-core/src/main/java/co/rsk/net/messages/MessageType.java
@@ -143,7 +143,7 @@ public Message createMessage(BlockFactory blockFactory, RLPList list) {
for (int k = 0; k < rlpHeaders.size(); k++) {
RLPElement element = rlpHeaders.get(k);
- BlockHeader header = blockFactory.decodeHeader(element.getRLPData());
+ BlockHeader header = blockFactory.decodeHeader(element.getRLPData(), true);
headers.add(header);
}
@@ -229,10 +229,14 @@ public Message createMessage(BlockFactory blockFactory, RLPList list) {
for (int j = 0; j < rlpUncles.size(); j++) {
RLPElement element = rlpUncles.get(j);
- uncles.add(blockFactory.decodeHeader(element.getRLPData()));
+ uncles.add(blockFactory.decodeHeader(element.getRLPData(), false));
}
- return new BodyResponseMessage(id, transactions, uncles);
+ BlockHeaderExtension blockHeaderExtension = message.size() == 3
+ ? BlockHeaderExtension.fromEncoded(message.get(2).getRLPData())
+ : null;
+
+ return new BodyResponseMessage(id, transactions, uncles, blockHeaderExtension);
}
},
SKELETON_REQUEST_MESSAGE(16) {
diff --git a/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBackwardsBodiesSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBackwardsBodiesSyncState.java
index e3a663c4939..e59da6fcaac 100644
--- a/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBackwardsBodiesSyncState.java
+++ b/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBackwardsBodiesSyncState.java
@@ -94,6 +94,7 @@ public void newBody(BodyResponseMessage body, Peer peer) {
return;
}
+ requestedHeader.setExtension(body.getBlockHeaderExtension());
Block block = blockFactory.newBlock(requestedHeader, body.getTransactions(), body.getUncles());
block.seal();
diff --git a/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBodiesSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBodiesSyncState.java
index dc1eedf0dd4..1eb1d3e1255 100644
--- a/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBodiesSyncState.java
+++ b/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBodiesSyncState.java
@@ -116,6 +116,7 @@ public void newBody(BodyResponseMessage message, Peer peer) {
// we already checked that this message was expected
BlockHeader header = pendingBodyResponses.remove(requestId).header;
+ header.setExtension(message.getBlockHeaderExtension());
Block block;
try {
block = blockFactory.newBlock(header, message.getTransactions(), message.getUncles());
diff --git a/rskj-core/src/main/java/co/rsk/peg/BridgeEvents.java b/rskj-core/src/main/java/co/rsk/peg/BridgeEvents.java
index 4683068e18c..9202aebebfe 100644
--- a/rskj-core/src/main/java/co/rsk/peg/BridgeEvents.java
+++ b/rskj-core/src/main/java/co/rsk/peg/BridgeEvents.java
@@ -5,110 +5,80 @@
public enum BridgeEvents {
- LOCK_BTC("lock_btc",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.RECEIVER, SolidityType.getType(SolidityType.ADDRESS)),
- new CallTransaction.Param(false, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, "senderBtcAddress", SolidityType.getType(SolidityType.STRING)),
- new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.INT256))
- }
- ),
- PEGIN_BTC("pegin_btc",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.RECEIVER, SolidityType.getType(SolidityType.ADDRESS)),
- new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.INT256)),
- new CallTransaction.Param(false, "protocolVersion", SolidityType.getType(SolidityType.INT256))
- }
- ),
- REJECTED_PEGIN("rejected_pegin",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, Fields.REASON, SolidityType.getType(SolidityType.INT256))
- }
- ),
- UNREFUNDABLE_PEGIN("unrefundable_pegin",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, Fields.REASON, SolidityType.getType(SolidityType.INT256))
- }
- ),
- UPDATE_COLLECTIONS("update_collections",
- new CallTransaction.Param[]{
- new CallTransaction.Param(false, Fields.SENDER, SolidityType.getType(SolidityType.ADDRESS))
- }
- ),
- ADD_SIGNATURE("add_signature",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.RELEASE_RSK_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(true, "federatorRskAddress", SolidityType.getType(SolidityType.ADDRESS)),
- new CallTransaction.Param(false, "federatorBtcPublicKey", SolidityType.getType(SolidityType.BYTES))
- }
- ),
- RELEASE_BTC("release_btc",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.RELEASE_RSK_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, "btcRawTransaction", SolidityType.getType(SolidityType.BYTES))
- }
- ),
- COMMIT_FEDERATION("commit_federation",
- new CallTransaction.Param[]{
- new CallTransaction.Param(false, "oldFederationBtcPublicKeys", SolidityType.getType(SolidityType.BYTES)),
- new CallTransaction.Param(false, "oldFederationBtcAddress", SolidityType.getType(SolidityType.STRING)),
- new CallTransaction.Param(false, "newFederationBtcPublicKeys", SolidityType.getType(SolidityType.BYTES)),
- new CallTransaction.Param(false, "newFederationBtcAddress", SolidityType.getType(SolidityType.STRING)),
- new CallTransaction.Param(false, "activationHeight", SolidityType.getType(SolidityType.INT256))
- }
- ),
- RELEASE_REQUESTED("release_requested",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, "rskTxHash", SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.UINT256))
- }
- ),
- RELEASE_REQUEST_RECEIVED_LEGACY("release_request_received",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.SENDER, SolidityType.getType(SolidityType.ADDRESS)),
- new CallTransaction.Param(false, "btcDestinationAddress", SolidityType.getType(SolidityType.BYTES)),
- new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.UINT256))
- }
- ),
- RELEASE_REQUEST_REJECTED("release_request_rejected",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.SENDER, SolidityType.getType(SolidityType.ADDRESS)),
- new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.UINT256)),
- new CallTransaction.Param(false, Fields.REASON, SolidityType.getType(SolidityType.INT256))
- }
- ),
- BATCH_PEGOUT_CREATED("batch_pegout_created",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, Fields.RELEASE_RSK_TX_HASHES, SolidityType.getType(SolidityType.BYTES))
- }
- ),
- RELEASE_REQUEST_RECEIVED("release_request_received",
- new CallTransaction.Param[]{
+ LOCK_BTC("lock_btc", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.RECEIVER, SolidityType.getType(SolidityType.ADDRESS)),
+ new CallTransaction.Param(false, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, "senderBtcAddress", SolidityType.getType(SolidityType.STRING)),
+ new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.INT256))
+ }),
+ PEGIN_BTC("pegin_btc", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.RECEIVER, SolidityType.getType(SolidityType.ADDRESS)),
+ new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.INT256)),
+ new CallTransaction.Param(false, "protocolVersion", SolidityType.getType(SolidityType.INT256))
+ }),
+ REJECTED_PEGIN("rejected_pegin", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, Fields.REASON, SolidityType.getType(SolidityType.INT256))
+ }),
+ UNREFUNDABLE_PEGIN("unrefundable_pegin", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, Fields.REASON, SolidityType.getType(SolidityType.INT256))
+ }),
+ UPDATE_COLLECTIONS("update_collections", new CallTransaction.Param[] {
+ new CallTransaction.Param(false, Fields.SENDER, SolidityType.getType(SolidityType.ADDRESS))
+ }),
+ ADD_SIGNATURE("add_signature", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.RELEASE_RSK_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(true, "federatorRskAddress", SolidityType.getType(SolidityType.ADDRESS)),
+ new CallTransaction.Param(false, "federatorBtcPublicKey", SolidityType.getType(SolidityType.BYTES))
+ }),
+ RELEASE_BTC("release_btc", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.RELEASE_RSK_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, "btcRawTransaction", SolidityType.getType(SolidityType.BYTES))
+ }),
+ COMMIT_FEDERATION("commit_federation", new CallTransaction.Param[] {
+ new CallTransaction.Param(false, "oldFederationBtcPublicKeys", SolidityType.getType(SolidityType.BYTES)),
+ new CallTransaction.Param(false, "oldFederationBtcAddress", SolidityType.getType(SolidityType.STRING)),
+ new CallTransaction.Param(false, "newFederationBtcPublicKeys", SolidityType.getType(SolidityType.BYTES)),
+ new CallTransaction.Param(false, "newFederationBtcAddress", SolidityType.getType(SolidityType.STRING)),
+ new CallTransaction.Param(false, "activationHeight", SolidityType.getType(SolidityType.INT256))
+ }),
+ RELEASE_REQUESTED("release_requested", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, "rskTxHash", SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.UINT256))
+ }),
+ RELEASE_REQUEST_RECEIVED_LEGACY("release_request_received", new CallTransaction.Param[] {
new CallTransaction.Param(true, Fields.SENDER, SolidityType.getType(SolidityType.ADDRESS)),
- new CallTransaction.Param(false, "btcDestinationAddress", SolidityType.getType(SolidityType.STRING)),
- new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.UINT256))
- }
- ),
- PEGOUT_CONFIRMED("pegout_confirmed",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, "pegoutCreationRskBlockNumber", SolidityType.getType(SolidityType.UINT256))
- }
- ),
- PEGOUT_TRANSACTION_CREATED("pegout_transaction_created",
- new CallTransaction.Param[]{
- new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
- new CallTransaction.Param(false, Fields.UTXO_OUTPOINT_VALUES, SolidityType.getType(SolidityType.BYTES))
- }
- );
+ new CallTransaction.Param(false, Fields.BTC_DESTINATION_ADDRESS, SolidityType.getType(SolidityType.BYTES)),
+ new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.UINT256))
+ }),
+ RELEASE_REQUEST_REJECTED("release_request_rejected", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.SENDER, SolidityType.getType(SolidityType.ADDRESS)),
+ new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.UINT256)),
+ new CallTransaction.Param(false, Fields.REASON, SolidityType.getType(SolidityType.INT256))
+ }),
+ BATCH_PEGOUT_CREATED("batch_pegout_created", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, Fields.RELEASE_RSK_TX_HASHES, SolidityType.getType(SolidityType.BYTES))
+ }),
+ RELEASE_REQUEST_RECEIVED("release_request_received", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.SENDER, SolidityType.getType(SolidityType.ADDRESS)),
+ new CallTransaction.Param(false, Fields.BTC_DESTINATION_ADDRESS, SolidityType.getType(SolidityType.STRING)),
+ new CallTransaction.Param(false, Fields.AMOUNT, SolidityType.getType(SolidityType.UINT256))
+ }),
+ PEGOUT_CONFIRMED("pegout_confirmed", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, "pegoutCreationRskBlockNumber", SolidityType.getType(SolidityType.UINT256))
+ }),
+ PEGOUT_TRANSACTION_CREATED("pegout_transaction_created", new CallTransaction.Param[] {
+ new CallTransaction.Param(true, Fields.BTC_TX_HASH, SolidityType.getType(SolidityType.BYTES32)),
+ new CallTransaction.Param(false, Fields.UTXO_OUTPOINT_VALUES, SolidityType.getType(SolidityType.BYTES))
+ });
- private String eventName;
- private CallTransaction.Param[] params;
+ private final String eventName;
+ private final CallTransaction.Param[] params;
BridgeEvents(String eventName, CallTransaction.Param[] params) {
this.eventName = eventName;
@@ -128,5 +98,6 @@ private static class Fields {
private static final String RELEASE_RSK_TX_HASH = "releaseRskTxHash";
private static final String RELEASE_RSK_TX_HASHES = "releaseRskTxHashes";
private static final String UTXO_OUTPOINT_VALUES = "utxoOutpointValues";
+ private static final String BTC_DESTINATION_ADDRESS = "btcDestinationAddress";
}
}
diff --git a/rskj-core/src/main/java/co/rsk/peg/BridgeMethods.java b/rskj-core/src/main/java/co/rsk/peg/BridgeMethods.java
index 2fbdb688a55..80458a14139 100644
--- a/rskj-core/src/main/java/co/rsk/peg/BridgeMethods.java
+++ b/rskj-core/src/main/java/co/rsk/peg/BridgeMethods.java
@@ -19,6 +19,7 @@
import static org.ethereum.config.blockchain.upgrades.ConsensusRule.*;
+import java.math.BigInteger;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@@ -31,7 +32,11 @@
import org.ethereum.vm.MessageCall.MsgType;
/**
- * This enum holds the basic information of the Bridge contract methods: the ABI, the cost and the implementation.
+ * Represents the methods of the Bridge contract, encapsulating details such as
+ * the Application Binary Interface (ABI), execution costs, and method implementations.
+ *
+ * Each enum constant corresponds to a specific method of the Bridge contract,
+ * defining its signature and providing the necessary information for execution.
*/
public enum BridgeMethods {
ADD_FEDERATOR_PUBLIC_KEY(
@@ -41,7 +46,7 @@ public enum BridgeMethods {
new String[]{"int256"}
),
fixedCost(13000L),
- (BridgeMethodExecutorTyped) Bridge::addFederatorPublicKey,
+ (BridgeMethodExecutorTyped) Bridge::addFederatorPublicKey,
activations -> !activations.isActive(RSKIP123),
fixedPermission(false)
),
@@ -52,7 +57,7 @@ public enum BridgeMethods {
new String[]{"int256"}
),
fixedCost(13000L),
- (BridgeMethodExecutorTyped) Bridge::addFederatorPublicKeyMultikey,
+ (BridgeMethodExecutorTyped) Bridge::addFederatorPublicKeyMultikey,
activations -> activations.isActive(RSKIP123),
fixedPermission(false)
),
@@ -63,7 +68,7 @@ public enum BridgeMethods {
new String[]{"int256"}
),
fixedCost(25000L),
- (BridgeMethodExecutorTyped) Bridge::addOneOffLockWhitelistAddress,
+ (BridgeMethodExecutorTyped) Bridge::addOneOffLockWhitelistAddress,
activations -> !activations.isActive(RSKIP87),
fixedPermission(false)
),
@@ -74,7 +79,7 @@ public enum BridgeMethods {
new String[]{"int256"}
),
fixedCost(25000L), // using same gas estimation as ADD_LOCK_WHITELIST_ADDRESS
- (BridgeMethodExecutorTyped) Bridge::addOneOffLockWhitelistAddress,
+ (BridgeMethodExecutorTyped) Bridge::addOneOffLockWhitelistAddress,
activations -> activations.isActive(RSKIP87),
fixedPermission(false)
),
@@ -85,7 +90,7 @@ public enum BridgeMethods {
new String[]{"int256"}
),
fixedCost(25000L), // using same gas estimation as ADD_LOCK_WHITELIST_ADDRESS
- (BridgeMethodExecutorTyped) Bridge::addUnlimitedLockWhitelistAddress,
+ (BridgeMethodExecutorTyped) Bridge::addUnlimitedLockWhitelistAddress,
activations -> activations.isActive(RSKIP87),
fixedPermission(false)
),
@@ -106,7 +111,7 @@ public enum BridgeMethods {
new String[]{"int256"}
),
fixedCost(38000L),
- (BridgeMethodExecutorTyped) Bridge::commitFederation,
+ (BridgeMethodExecutorTyped) Bridge::commitFederation,
fixedPermission(false)
),
CREATE_FEDERATION(
@@ -116,7 +121,7 @@ public enum BridgeMethods {
new String[]{"int256"}
),
fixedCost(11000L),
- (BridgeMethodExecutorTyped) Bridge::createFederation,
+ (BridgeMethodExecutorTyped) Bridge::createFederation,
fixedPermission(false)
),
GET_BTC_BLOCKCHAIN_BEST_CHAIN_HEIGHT(
@@ -126,7 +131,7 @@ public enum BridgeMethods {
new String[]{"int"}
),
fixedCost(19000L),
- (BridgeMethodExecutorTyped) Bridge::getBtcBlockchainBestChainHeight,
+ (BridgeMethodExecutorTyped) Bridge::getBtcBlockchainBestChainHeight,
fromMethod(Bridge::getBtcBlockchainBestChainHeightOnlyAllowsLocalCalls),
CallTypeHelper.ALLOW_STATIC_CALL
),
@@ -137,7 +142,7 @@ public enum BridgeMethods {
new String[]{"int"}
),
fixedCost(20000L),
- (BridgeMethodExecutorTyped) Bridge::getBtcBlockchainInitialBlockHeight,
+ (BridgeMethodExecutorTyped) Bridge::getBtcBlockchainInitialBlockHeight,
activations -> activations.isActive(RSKIP89),
fixedPermission(true),
CallTypeHelper.ALLOW_STATIC_CALL
@@ -149,7 +154,7 @@ public enum BridgeMethods {
new String[]{"string[]"}
),
fixedCost(76000L),
- (BridgeMethodExecutorTyped) Bridge::getBtcBlockchainBlockLocator,
+ (BridgeMethodExecutorTyped