Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Reset syncing on unlinkable blocks #2220

Merged
merged 4 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2124,8 +2124,8 @@ namespace eosio {
void sync_manager::request_next_chunk( const connection_ptr& conn ) REQUIRES(sync_mtx) {
auto chain_info = my_impl->get_chain_info();

fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, head: ${h}",
("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span)("h", chain_info.head_num) );
fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, head: ${h}, lib: ${lib}",
("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span)("h", chain_info.head_num)("lib", chain_info.lib_num) );

if( chain_info.head_num + sync_req_span < sync_last_requested_num && sync_source && sync_source->current() ) {
fc_dlog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}",
Expand Down Expand Up @@ -2169,8 +2169,8 @@ namespace eosio {
sync_last_requested_num = end;
sync_source = new_sync_source;
request_sent = true;
new_sync_source->strand.post( [new_sync_source, start, end, head_num=chain_info.head_num]() {
peer_ilog( new_sync_source, "requesting range ${s} to ${e}, head ${h}", ("s", start)("e", end)("h", head_num) );
new_sync_source->strand.post( [new_sync_source, start, end, head_num=chain_info.head_num, lib=chain_info.lib_num]() {
peer_ilog( new_sync_source, "requesting range ${s} to ${e}, head ${h}, lib ${lib}", ("s", start)("e", end)("h", head_num)("lib", lib) );
new_sync_source->request_sync_blocks( start, end );
} );
}
Expand Down Expand Up @@ -2216,8 +2216,10 @@ namespace eosio {

if( sync_state != lib_catchup ) {
set_state( lib_catchup );
sync_next_expected_num = chain_info.lib_num + 1;
} else {
sync_next_expected_num = std::max( chain_info.lib_num + 1, sync_next_expected_num );
}
sync_next_expected_num = std::max( chain_info.lib_num + 1, sync_next_expected_num );

request_next_chunk( c );
}
Expand Down Expand Up @@ -2425,11 +2427,10 @@ namespace eosio {
// called from connection strand
void sync_manager::rejected_block( const connection_ptr& c, uint32_t blk_num ) {
c->block_status_monitor_.rejected();
// reset sync on rejected block
fc::unique_lock g( sync_mtx );
sync_last_requested_num = 0;
if (blk_num < sync_next_expected_num) {
sync_next_expected_num = my_impl->get_chain_lib_num();
}
sync_next_expected_num = my_impl->get_chain_lib_num() + 1;
if( c->block_status_monitor_.max_events_violated()) {
peer_wlog( c, "block ${bn} not accepted, closing connection", ("bn", blk_num) );
sync_source.reset();
Expand Down Expand Up @@ -2504,7 +2505,11 @@ namespace eosio {
c->sync_wait();
}

sync_next_expected_num = blk_num + 1;
if (sync_last_requested_num == 0) { // block was rejected
sync_next_expected_num = my_impl->get_chain_lib_num() + 1;
} else {
sync_next_expected_num = blk_num + 1;
}
}

uint32_t head = my_impl->get_chain_head_num();
Expand Down Expand Up @@ -3756,8 +3761,9 @@ namespace eosio {
// use c in this method instead of this to highlight that all methods called on c-> must be thread safe
connection_ptr c = shared_from_this();

uint32_t lib = cc.last_irreversible_block_num();
try {
if( blk_num <= cc.last_irreversible_block_num() || cc.fetch_block_by_id(blk_id) ) {
if( blk_num <= lib || cc.fetch_block_by_id(blk_id) ) {
c->strand.post( [sync_master = my_impl->sync_master.get(),
dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
dispatcher->add_peer_block( blk_id, c->connection_id );
Expand All @@ -3774,7 +3780,7 @@ namespace eosio {

fc::microseconds age( fc::time_point::now() - block->timestamp);
fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection ${cid}, ${v}",
("n", blk_num)("age", age.to_seconds())("cid", c->connection_id)("v", bsp ? "pre-validated" : "validation pending") );
("n", blk_num)("age", age.to_seconds())("cid", c->connection_id)("v", bsp ? "pre-validated" : "validation pending")("lib", lib) );

go_away_reason reason = no_reason;
bool accepted = false;
Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ target_include_directories( plugin_test PUBLIC
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_tests/dawn_515/test.sh ${CMAKE_CURRENT_BINARY_DIR}/p2p_tests/dawn_515/test.sh COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/block_log_util_test.py ${CMAKE_CURRENT_BINARY_DIR}/block_log_util_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/block_log_retain_blocks_test.py ${CMAKE_CURRENT_BINARY_DIR}/block_log_retain_blocks_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/bridge_for_fork_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/bridge_for_fork_test_shape.json COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cluster_launcher.py ${CMAKE_CURRENT_BINARY_DIR}/cluster_launcher.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/distributed-transactions-test.py ${CMAKE_CURRENT_BINARY_DIR}/distributed-transactions-test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/sample-cluster-map.json ${CMAKE_CURRENT_BINARY_DIR}/sample-cluster-map.json COPYONLY)
Expand Down Expand Up @@ -47,6 +48,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DI
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/large-lib-test.py ${CMAKE_CURRENT_BINARY_DIR}/large-lib-test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/lib_advance_test.py ${CMAKE_CURRENT_BINARY_DIR}/lib_advance_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_high_latency_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_high_latency_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_multiple_listen_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_multiple_listen_test.py COPYONLY)
Expand Down Expand Up @@ -244,6 +246,9 @@ set_property(TEST cli_test PROPERTY LABELS nonparallelizable_tests)
add_test(NAME larger_lib_test COMMAND tests/large-lib-test.py ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST larger_lib_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME lib_advance_test COMMAND tests/lib_advance_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST lib_advance_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME http_plugin_test COMMAND tests/http_plugin_test.py ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_tests_properties(http_plugin_test PROPERTIES TIMEOUT 100)
set_property(TEST http_plugin_test PROPERTY LABELS nonparallelizable_tests)
Expand Down
114 changes: 114 additions & 0 deletions tests/bridge_for_fork_test_shape.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
{
"name": "testnet_",
"ssh_helper": {
"ssh_cmd": "/usr/bin/ssh",
"scp_cmd": "/usr/bin/scp",
"ssh_identity": "",
"ssh_args": ""
},
"nodes": {
"bios":{
"name": "bios",
"keys": [
{
"privkey":"5KQwrPbwdL6PhXujxW37FSSQZ1JiwsST4cqQzDeyXtP79zkvFD3",
"pubkey":"EOS6MRyAjQq8ud7hVNYcfnVPJqcVpscN5So8BhtHuGYqET5GDW5CV"
}
],
"peers": [],
"producers": [
"eosio"
],
"dont_start": false
},
"testnet_00":{
"name": "testnet_00",
"keys": [
{
"privkey":"5Jf4sTk7vwX1MYpLJ2eQFanVvKYXFqGBrCyANPukuP2BJ5WAAKZ",
"pubkey":"EOS58B33q9S7oNkgeFfcoW3VJYu4obfDiqn5RHGE2ige6jVjUhymR"
}
],
"peers": [
"bios",
"testnet_01",
"testnet_02",
"testnet_04"
],
"producers": [
"defproducera"
],
"dont_start": false
},
"testnet_01":{
"name": "testnet_01",
"keys": [
{
"privkey":"5HviUPkTEtvF2B1nm8aZUnjma2TzgpKRjuXjwHyy3FME4xDbkZF",
"pubkey":"EOS5CbcTDgbks2ptTxvyCbT9HFbzX7PDHUY2wN4DDnVBhhQr2ZNDE"
}
],
"peers": [
"bios",
"testnet_00",
"testnet_02",
"testnet_04"
],
"producers": [
"defproducerb"
],
"dont_start": false
},
"testnet_02":{
"name": "testnet_02",
"keys": [
{
"privkey":"5KkQbdxFHr8Pg1N3DEMDdU7emFgUTwQvh99FDJrodFhUbbsAtQT",
"pubkey":"EOS6Tkpf8kcDfa32WA9B4nTcEJ64ZdDMSNioDcaL6rzdMwnpzaWJB"
}
],
"peers": [
"bios",
"testnet_01",
"testnet_00",
"testnet_04"
],
"producers": [
"defproducerc"
],
"dont_start": false
},
"testnet_03":{
"name": "testnet_03",
"keys": [
{
"privkey":"5JxTJJegQBpEL1p77TzkN1ompMB9gDwAfjM9chPzFCB4chxmwrE",
"pubkey":"EOS52ntDHqA2qj4xVo7KmxdezMRhvvBqpZBuKYJCsgihisxmywpAx"
}
],
"peers": [
"bios",
"testnet_04"
],
"producers": [
"defproducerd"
],
"dont_start": false
},
"testnet_04":{
"name": "testnet_04",
"keys": [
],
"peers": [
"bios",
"testnet_00",
"testnet_01",
"testnet_02",
"testnet_03"
],
"producers": [
],
"dont_start": false
}
}
}
151 changes: 151 additions & 0 deletions tests/lib_advance_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python3

import time
import decimal
import json
import math
import re
import signal

from TestHarness import Account, Cluster, Node, TestHelper, Utils, WalletMgr, CORE_SYMBOL
from TestHarness.Node import BlockType

###############################################################
# lib_advance_test
#
# Setup 4 producers separated by a bridge node.
# Kill bridge node, allow both sides to produce and
# verify they can sync back together after they are
# reconnected.
#
###############################################################
Print=Utils.Print
errorExit=Utils.errorExit


args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running",
"--wallet-port","--unshared"})
Utils.Debug=args.v
totalProducerNodes=4
totalNonProducerNodes=1
totalNodes=totalProducerNodes+totalNonProducerNodes
maxActiveProducers=3
totalProducers=maxActiveProducers
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
dumpErrorDetails=args.dump_error_details
walletPort=args.wallet_port

walletMgr=WalletMgr(True, port=walletPort)
testSuccessful=False

try:
TestHelper.printSystemInfo("BEGIN")

cluster.setWalletMgr(walletMgr)
Print("Stand up cluster")
specificExtraNodeosArgs={}
# producer nodes will be mapped to 0 through totalProducerNodes-1, so the number totalProducerNodes will be the non-producing node
specificExtraNodeosArgs[totalProducerNodes]="--plugin eosio::test_control_api_plugin"

# *** setup topogrophy ***

# "bridge" shape connects defproducera (node0) defproducerb (node1) defproducerc (node2) to each other and defproducerd (node3)
# and the only connection between those 2 groups is through the bridge (node4)
if cluster.launch(topo="./tests/bridge_for_fork_test_shape.json", pnodes=totalProducerNodes,
totalNodes=totalNodes, totalProducers=totalProducerNodes, loadSystemContract=False,
specificExtraNodeosArgs=specificExtraNodeosArgs) is False:
Utils.cmdError("launcher")
Utils.errorExit("Failed to stand up eos cluster.")
Print("Validating system accounts after bootstrap")
cluster.validateAccounts(None)

# *** identify each node (producers and non-producing node) ***

prodNode0 = cluster.getNode(0)
prodNode1 = cluster.getNode(1)
prodNode2 = cluster.getNode(2)
prodNode3 = cluster.getNode(3) # other side of bridge
nonProdNode = cluster.getNode(4)

prodNodes=[ prodNode0, prodNode1, prodNode2, prodNode3 ]

prodA=prodNode0 # defproducera
prodD=prodNode3 # defproducerc

# *** Identify a block where production is stable ***

#verify nodes are in sync and advancing
cluster.biosNode.kill(signal.SIGTERM)
cluster.waitOnClusterSync(blockAdvancing=5)

libProdABeforeKill = prodA.getIrreversibleBlockNum()
libProdDBeforeKill = prodD.getIrreversibleBlockNum()

# *** Killing the "bridge" node ***
Print('Sending command to kill "bridge" node to separate the 2 producer groups.')
# kill at the beginning of the production window for defproducera, so there is time for the fork for
# defproducerd to grow before it would overtake the fork for defproducera and defproducerb and defproducerc
killAtProducer="defproducera"
nonProdNode.killNodeOnProducer(producer=killAtProducer, whereInSequence=1)

#verify that the non producing node is not alive (and populate the producer nodes with current getInfo data to report if
#an error occurs)
numPasses = 2
blocksPerProducer = 12
blocksPerRound = totalProducers * blocksPerProducer
count = blocksPerRound * numPasses
while nonProdNode.verifyAlive() and count > 0:
# wait on prodNode 0 since it will continue to advance, since defproducera and defproducerb are its producers
Print("Wait for next block")
assert prodA.waitForNextBlock(timeout=6), "Production node A should continue to advance, even after bridge node is killed"
count -= 1

assert not nonProdNode.verifyAlive(), "Bridge node should have been killed if test was functioning correctly."

transferAmount = 10
# Does not use transaction retry (not needed)
transfer = prodD.transferFunds(cluster.eosioAccount, cluster.defproduceraAccount, f"{transferAmount}.0000 {CORE_SYMBOL}", "fund account")
transBlockNum = transfer['processed']['block_num']
transId = prodD.getLastTrackedTransactionId()

beforeBlockNum = prodA.getBlockNum()
prodA.waitForProducer("defproducera")
prodA.waitForProducer("defproducerb")
prodA.waitForProducer("defproducera")
prodA.waitForProducer("defproducerc") # produce enough that sync will have over 30 blocks to sync
assert prodA.waitForLibToAdvance(), "Production node A should advance lib without D"
assert prodD.waitForNextBlock(), "Production node D should continue to advance, even after bridge node is killed"
afterBlockNum = prodA.getBlockNum()

Print("Relaunching the non-producing bridge node to connect the nodes")
if not nonProdNode.relaunch():
errorExit(f"Failure - (non-production) node {nonProdNode.nodeNum} should have restarted")

while prodD.getInfo()['last_irreversible_block_num'] < transBlockNum:
Print("Wait for LIB to move, which indicates prodD may have forked out the branch")
assert prodD.waitForLibToAdvance(60), \
"ERROR: Network did not reach consensus after bridge node was restarted."

assert prodD.waitForLibToAdvance()
assert prodD.waitForProducer("defproducera")
assert prodA.waitForProducer("defproducerd")

for prodNode in prodNodes:
info=prodNode.getInfo()
Print(f"node info: {json.dumps(info, indent=1)}")

assert prodA.getIrreversibleBlockNum() > max(libProdABeforeKill, libProdDBeforeKill)
assert prodD.getIrreversibleBlockNum() > max(libProdABeforeKill, libProdDBeforeKill)

logFile = Utils.getNodeDataDir(prodNode3.nodeId) + "/stderr.txt"
f = open(logFile)
contents = f.read()
if contents.count("3030001 unlinkable_block_exception: Unlinkable block") > (afterBlockNum-beforeBlockNum): # a few are fine
errorExit(f"Node{prodNode3.nodeId} has more than {afterBlockNum-beforeBlockNum} unlinkable blocks: {logFile}.")

testSuccessful=True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)

errorCode = 0 if testSuccessful else 1
exit(errorCode)
Loading