Skip to content

Commit

Permalink
Add block sync bytes received metric and use it in sync throttle test.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgiszczak committed Oct 4, 2023
1 parent 7acac0c commit ff7a8a1
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 13 deletions.
1 change: 1 addition & 0 deletions plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ namespace eosio {
std::chrono::nanoseconds last_bytes_received{0};
size_t bytes_sent{0};
std::chrono::nanoseconds last_bytes_sent{0};
size_t block_sync_bytes_received{0};
size_t block_sync_bytes_sent{0};
std::chrono::nanoseconds connection_start_time{0};
std::string log_p2p_address;
Expand Down
12 changes: 8 additions & 4 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ namespace eosio {
std::chrono::nanoseconds get_last_bytes_received() const { return last_bytes_received.load(); }
size_t get_bytes_sent() const { return bytes_sent.load(); }
std::chrono::nanoseconds get_last_bytes_sent() const { return last_bytes_sent.load(); }
size_t get_block_sync_bytes_received() const { return block_sync_bytes_received.load(); }
size_t get_block_sync_bytes_sent() const { return block_sync_bytes_sent.load(); }
boost::asio::ip::port_type get_remote_endpoint_port() const { return remote_endpoint_port.load(); }
void set_heartbeat_timeout(std::chrono::milliseconds msec) {
Expand Down Expand Up @@ -888,6 +889,7 @@ namespace eosio {
std::atomic<size_t> bytes_received{0};
std::atomic<std::chrono::nanoseconds> last_bytes_received{0ns};
std::atomic<size_t> bytes_sent{0};
std::atomic<size_t> block_sync_bytes_received{0};
std::atomic<size_t> block_sync_bytes_sent{0};
std::atomic<std::chrono::nanoseconds> last_bytes_sent{0ns};
std::atomic<boost::asio::ip::port_type> remote_endpoint_port{0};
Expand Down Expand Up @@ -1739,6 +1741,7 @@ namespace eosio {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(get_time() - connection_start_time);
auto current_rate = double(block_sync_bytes_sent) / elapsed.count();
if( current_rate >= block_sync_rate_limit ) {
peer_dlog( this, "throttling block sync to peer ${host}:${port}", ("host", log_remote_endpoint_ip)("port", log_remote_endpoint_port));
return false;
}
}
Expand Down Expand Up @@ -3019,7 +3022,6 @@ namespace eosio {
fc::raw::unpack( peek_ds, which ); // throw away
block_header bh;
fc::raw::unpack( peek_ds, bh );

const block_id_type blk_id = bh.calculate_id();
const uint32_t blk_num = last_received_block_num = block_header::num_from_id(blk_id);
// don't add_peer_block because we have not validated this block header yet
Expand Down Expand Up @@ -3053,6 +3055,7 @@ namespace eosio {
return true;
}
} else {
block_sync_bytes_received += message_length;
my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, false);
}

Expand Down Expand Up @@ -4728,7 +4731,8 @@ namespace eosio {
fc::unique_lock g_conn(c->conn_mtx);
boost::asio::ip::address_v6::bytes_type addr = c->remote_endpoint_ip_array;
g_conn.unlock();
net_plugin::p2p_per_connection_metrics::connection_metric metrics{
per_connection.peers.emplace_back(
net_plugin::p2p_per_connection_metrics::connection_metric{
.connection_id = c->connection_id
, .address = addr
, .port = c->get_remote_endpoint_port()
Expand All @@ -4742,11 +4746,11 @@ namespace eosio {
, .last_bytes_received = c->get_last_bytes_received()
, .bytes_sent = c->get_bytes_sent()
, .last_bytes_sent = c->get_last_bytes_sent()
, .block_sync_bytes_received = c->get_block_sync_bytes_received()
, .block_sync_bytes_sent = c->get_block_sync_bytes_sent()
, .connection_start_time = c->connection_start_time
, .log_p2p_address = c->log_p2p_address
};
per_connection.peers.push_back(metrics);
});
}
g.unlock();
update_p2p_connection_metrics({num_peers+num_bp_peers, num_clients, std::move(per_connection)});
Expand Down
1 change: 1 addition & 0 deletions plugins/prometheus_plugin/metrics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ struct catalog_type {
add_and_set_gauge("last_bytes_received", peer.last_bytes_received.count());
add_and_set_gauge("bytes_sent", peer.bytes_sent);
add_and_set_gauge("last_bytes_sent", peer.last_bytes_sent.count());
add_and_set_gauge("block_sync_bytes_received", peer.block_sync_bytes_received);
add_and_set_gauge("block_sync_bytes_sent", peer.block_sync_bytes_sent);
add_and_set_gauge("connection_start_time", peer.connection_start_time.count());
add_and_set_gauge(peer.log_p2p_address, 0); // Empty gauge; we only want the label
Expand Down
99 changes: 90 additions & 9 deletions tests/p2p_sync_throttle_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env python3

import math
import re
import requests
import signal
import time

Expand All @@ -13,6 +16,7 @@
#
###############################################################

PROMETHEUS_URL = '/v1/prometheus/metrics'

Print=Utils.Print
errorExit=Utils.errorExit
Expand All @@ -38,6 +42,19 @@
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
walletMgr=WalletMgr(True)

def readMetrics(host: str, port: str):
response = requests.get(f'http://{host}:{port}{PROMETHEUS_URL}', timeout=10)
if response.status_code != 200:
errorExit(f'Prometheus metrics URL returned {response.status_code}: {response.url}')
return response

def extractPrometheusMetric(connID: str, metric: str, text: str):
searchStr = f'nodeos_p2p_connections{{connid_{connID}="{metric}"}} '
begin = text.find(searchStr) + len(searchStr)
return int(text[begin:response.text.find('\n', begin)])

prometheusHostPortPattern = re.compile(r'^nodeos_p2p_connections.connid_([0-9])="localhost:([0-9]*)', re.MULTILINE)

try:
TestHelper.printSystemInfo("BEGIN")

Expand All @@ -46,10 +63,7 @@
Print(f'producing nodes: {pnodes}, delay between nodes launch: {delay} second{"s" if delay != 1 else ""}')

Print("Stand up cluster")
if args.plugin:
extraNodeosArgs = ''.join([i+j for i,j in zip([' --plugin '] * len(args.plugin), args.plugin)])
else:
extraNodeosArgs = ''
extraNodeosArgs = '--plugin eosio::prometheus_plugin --connection-cleanup-period 3'
# Custom topology is a line of singlely connected nodes from highest node number in sequence to lowest,
# the reverse of the usual TestHarness line topology.
if cluster.launch(pnodes=pnodes, unstartedNodes=2, totalNodes=total_nodes, prodCount=prod_count,
Expand Down Expand Up @@ -112,19 +126,86 @@
cluster.launchUnstarted(2)

throttledNode = cluster.getNode(3)
time.sleep(15)
while True:
try:
response = readMetrics(throttlingNode.host, throttlingNode.port)
except (requests.ConnectionError, requests.ReadTimeout) as e:
# waiting for node to finish startup and respond
time.sleep(0.5)
else:
connPorts = prometheusHostPortPattern.findall(response.text)
if len(connPorts) < 3:
# wait for node to be connected
time.sleep(0.5)
continue
Print('Throttling Node Start State')
#Print(response.text)
throttlingNodePortMap = {port: id for id, port in connPorts}
startSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodePortMap['9879'],
'block_sync_bytes_sent',
response.text)
Print(f'Start sync throttling bytes sent: {startSyncThrottlingBytesSent}')
break
while True:
try:
response = readMetrics(throttledNode.host, throttledNode.port)
except (requests.ConnectionError, requests.ReadTimeout) as e:
# waiting for node to finish startup and respond
time.sleep(0.5)
else:
if 'nodeos_p2p_connections{connid_2' not in response.text:
# wait for sending node to be connected
continue
Print('Throttled Node Start State')
#Print(response.text)
connPorts = prometheusHostPortPattern.findall(response.text)
throttledNodePortMap = {port: id for id, port in connPorts}
startSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodePortMap['9878'],
'block_sync_bytes_received',
response.text)
Print(f'Start sync throttled bytes received: {startSyncThrottledBytesReceived}')
break

# Throttling node was offline during block generation and once online receives blocks as fast as possible while
# transmitting blocks to the next node in line at the above throttle setting.
assert throttlingNode.waitForBlock(endLargeBlocksHeadBlock), f'wait for block {endLargeBlocksHeadBlock} on throttled node timed out'
endThrottlingSync = time.time()
try:
response = readMetrics(throttlingNode.host, throttlingNode.port)
except (requests.ConnectionError, requests.ReadTimeout) as e:
errorExit(str(e))
else:
Print('Throttling Node End State')
#Print(response.text)
endSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodePortMap['9879'],
'block_sync_bytes_sent',
response.text)
Print(f'End sync throttling bytes sent: {endSyncThrottlingBytesSent}')
# Throttled node is connecting to a listen port with a block sync throttle applied so it will receive
# blocks more slowly during syncing than an unthrottled node.
assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=90), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out'
endThrottledSync = time.time()
Print(f'Unthrottled sync time: {endThrottlingSync - clusterStart} seconds')
Print(f'Throttled sync time: {endThrottledSync - clusterStart} seconds')
# 15 seconds chosen as the minimum reasonable sync time differential given the throttle and the average block size.
assert endThrottledSync - clusterStart > endThrottlingSync - clusterStart + 15, 'Throttled sync time must be at least 15 seconds greater than unthrottled'
try:
response = readMetrics(throttledNode.host, throttledNode.port)
except (requests.ConnectionError, requests.ReadTimeout) as e:
errorExit(str(e))
else:
Print('Throttled Node End State')
#Print(response.text)
endSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodePortMap['9878'],
'block_sync_bytes_received',
response.text)
Print(f'End sync throttled bytes received: {endSyncThrottledBytesReceived}')
throttlingElapsed = endThrottlingSync - clusterStart
throttledElapsed = endThrottledSync - clusterStart
Print(f'Unthrottled sync time: {throttlingElapsed} seconds')
Print(f'Throttled sync time: {throttledElapsed} seconds')
# Sanity check
assert throttledElapsed > throttlingElapsed + 15, 'Throttled sync time must be at least 15 seconds greater than unthrottled'
# Calculate block receive rate
calculatedRate = (endSyncThrottledBytesReceived - startSyncThrottledBytesReceived)/throttledElapsed
#assert math.isclose(calculatedRate, 40000, rel_tol=0.01), f'Throttled bytes receive rate must be near 40,000, was {calculatedRate}'
assert calculatedRate < 40000, f'Throttled bytes receive rate must be less than 40,000, was {calculatedRate}'

testSuccessful=True
finally:
Expand Down

0 comments on commit ff7a8a1

Please sign in to comment.