Skip to content

Commit

Permalink
Merge branch 'main' into sload-replication-trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
renancloudwalk authored Mar 8, 2024
2 parents 7f5c17b + 99c5ab3 commit 1a8bf27
Show file tree
Hide file tree
Showing 21 changed files with 144 additions and 53 deletions.
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,20 @@ phf_codegen = "0.11.2"
# ------------------------------------------------------------------------------

[[bin]]
name = "importer-download"
path = "src/bin/importer/importer-download.rs"
name = "rpc-downloader"
path = "src/bin/rpc_downloader.rs"

[[bin]]
name = "importer-import"
path = "src/bin/importer/importer-import.rs"
name = "importer-offline"
path = "src/bin/importer_offline.rs"

[[bin]]
name = "rpc-server-poller"
path = "src/bin/rpc_server_poller.rs"
name = "importer-online"
path = "src/bin/importer_online.rs"

[[bin]]
name = "state-validator"
path = "src/bin/state-validator.rs"
path = "src/bin/state_validator.rs"

# ------------------------------------------------------------------------------
# Lints
Expand Down
31 changes: 27 additions & 4 deletions e2e/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion e2e/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"ethers": "^6.9.0",
"hardhat": "^2.19.2",
"prettier": "^3.1.1",
"web3-types": "^1.3.1"
"web3-types": "^1.3.1",
"ws": "^8.16.0"
}
}
19 changes: 18 additions & 1 deletion e2e/test/e2e-json-rpc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Block } from "web3-types";

import { ALICE, BOB } from "./helpers/account";
import { isStratus } from "./helpers/network";
import { CHAIN_ID, CHAIN_ID_DEC, TEST_BALANCE, ZERO, send, sendExpect, sendRawTransaction } from "./helpers/rpc";
import { CHAIN_ID, CHAIN_ID_DEC, TEST_BALANCE, ZERO, send, sendAndGetError, sendExpect, sendRawTransaction, subscribeAndGetId } from "./helpers/rpc";

describe("JSON-RPC", () => {
describe("State", () => {
Expand Down Expand Up @@ -123,6 +123,23 @@ describe("JSON-RPC", () => {
await send("evm_setNextBlockTimestamp", [0]);
})
});
});

describe("Subscription", () => {
describe("HTTP", () => {
it("eth_subscribe fails with code 32603", async () => {
const error = await sendAndGetError("eth_subscribe", ["newHeads"]);
expect(error).to.not.be.null;
expect(error.code).eq(-32603); // Internal error
});
});

describe("WebSocket", () => {
it("Subscribe to newHeads receives subscription id", async () => {
const waitTimeInMilliseconds = 40;
const id = await subscribeAndGetId("newHeads", waitTimeInMilliseconds);
expect(id).to.not.be.undefined;
});
});
});
});
44 changes: 42 additions & 2 deletions e2e/test/helpers/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { JsonRpcProvider, keccak256 } from "ethers";
import { config, ethers } from "hardhat";
import { HttpNetworkConfig } from "hardhat/types";
import { Numbers } from "web3-types";
import { WebSocket, WebSocketServer } from "ws";

import { TestContractBalances, TestContractCounter } from "../../typechain-types";
import { Account, CHARLIE } from "./account";
Expand Down Expand Up @@ -64,9 +65,9 @@ if (process.env.RPC_LOG) {
// Helper functions
// -----------------------------------------------------------------------------

// Sends a RPC request to the blockchain.
// Sends a RPC request to the blockchain, returning full response.
var requestId = 0;
export async function send(method: string, params: any[] = []): Promise<any> {
export async function sendAndGetFullResponse(method: string, params: any[] = []): Promise<any> {
for (const i in params) {
const param = params[i];
if (param instanceof Account) {
Expand All @@ -91,9 +92,22 @@ export async function send(method: string, params: any[] = []): Promise<any> {
console.log("RESP <-", JSON.stringify(response.data));
}

return response;
}

// Sends a RPC request to the blockchain, returning its result field.
export async function send(method: string, params: any[] = []): Promise<any> {
const response = await sendAndGetFullResponse(method, params);
return response.data.result;
}

// Sends a RPC request to the blockchain, returning its error field.
// Use it when you expect the RPC call to fail.
export async function sendAndGetError(method: string, params: any[] = []): Promise<any> {
const response = await sendAndGetFullResponse(method, params);
return response.data.error;
}

// Sends a RPC request to the blockchain and applies the expect function to the result.
export async function sendExpect(method: string, params: any[] = []): Promise<Chai.Assertion> {
return expect(await send(method, params));
Expand Down Expand Up @@ -189,3 +203,29 @@ export async function sendGetBlockNumber(): Promise<number> {
const result = await send("eth_blockNumber");
return parseInt(result, 16);
}

/// Start a subscription and returns its id
/// Waits at the most for the specified time
/// An error or timeout will result in undefined
export async function subscribeAndGetId(subscription: string, waitTimeInMilliseconds: number): Promise<string | undefined> {
const socket = new WebSocket(providerUrl.replace("http", "ws"));
let subsId = undefined;

socket.addEventListener("open", function () {
socket.send(JSON.stringify({ jsonrpc: "2.0", id: 0, method: "eth_subscribe", params: [subscription] }));
});

socket.addEventListener('message', function (event: { data: string }) {
//console.log('Message from server ', event.data);
if (event.data.includes("id")) {
subsId = JSON.parse(event.data).result;
}
socket.close();
});

// Wait for the specified time, if necessary
if (subsId === undefined && waitTimeInMilliseconds > 0)
await new Promise(resolve => setTimeout(resolve, waitTimeInMilliseconds));

return subsId;
}
26 changes: 17 additions & 9 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import '.justfile_helpers' # _lint, _outdated

# Environment variables (automatically set in all actions).
export RUST_BACKTRACE := "1"
export RUST_LOG := env("RUST_LOG", "stratus=info,importer-download=info,importer-importer=info")
export RUST_LOG := env("RUST_LOG", "stratus=info,rpc-downloader=info,importer-offline=info,importer-online=info,state-validator=info")

# Default URLs that can be passed as argument.
postgres_url := env("POSTGRES_URL", "postgres://postgres:[email protected]:5432/stratus")
Expand Down Expand Up @@ -86,15 +86,23 @@ update:
cargo update stratus

# ------------------------------------------------------------------------------
# Importer tasks
# Jobs
# ------------------------------------------------------------------------------
# Importer: Download external RPC blocks to temporary storage
importer-download *args="":
cargo run --bin importer-download --features dev --release -- --postgres {{postgres_url}} --external-rpc {{testnet_url}} {{args}}
# Job: Download external RPC blocks and receipts to temporary storage
rpc-downloader *args="":
cargo run --bin rpc-downloader --features dev --release -- --postgres {{postgres_url}} --external-rpc {{testnet_url}} {{args}}

# Importer: Import downloaded external RPC blocks to Stratus storage
importer-import *args="":
cargo run --bin importer-import --features dev --release -- --postgres {{postgres_url}} {{args}}
# Job: Import external RPC blocks from temporary storage to Stratus storage
importer-offline *args="":
cargo run --bin importer-offline --features dev --release -- --postgres {{postgres_url}} {{args}}

# Job: Import external RPC blocks from external RPC endpoint to Stratus storage
importer-online *args="":
cargo run --bin importer-online --features dev --release -- --external-rpc {{testnet_url}} {{args}}

# Job: Validate Stratus storage slots matches reference slots
state-validator *args="":
cargo run --bin state-validator --features dev --release -- --method {{testnet_url}} {{args}}

# ------------------------------------------------------------------------------
# Test tasks
Expand Down Expand Up @@ -262,7 +270,7 @@ e2e-flamegraph:

# Run cargo flamegraph with necessary environment variables
echo "Running cargo flamegraph..."
CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --bin rpc-server-poller --deterministic -- --external-rpc=http://localhost:3003/rpc --storage={{postgres_url}}
CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --bin importer-online --deterministic -- --external-rpc=http://localhost:3003/rpc --storage={{postgres_url}}


# ------------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions src/bin/helpers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod postgres;
pub use postgres::*;
14 changes: 7 additions & 7 deletions src/bin/importer/_postgres.rs → src/bin/helpers/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use stratus::log_and_err;
pub async fn pg_retrieve_max_external_block(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Option<BlockNumber>> {
tracing::debug!(%start, %end, "retrieving max external block");

let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_external_block_in_range.sql", start.as_i64(), end.as_i64())
let result = sqlx::query_file_scalar!("src/bin/helpers/sql/select_max_external_block_in_range.sql", start.as_i64(), end.as_i64())
.fetch_one(&pg.connection_pool)
.await;

Expand All @@ -33,7 +33,7 @@ pub async fn pg_retrieve_max_external_block(pg: &Postgres, start: BlockNumber, e

pub async fn pg_retrieve_external_blocks_in_range(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<BlockRow>> {
tracing::debug!(%start, %end, "retrieving external blocks in range");
let result = sqlx::query_file!("src/bin/importer/sql/select_external_blocks_in_range.sql", start.as_i64(), end.as_i64())
let result = sqlx::query_file!("src/bin/helpers/sql/select_external_blocks_in_range.sql", start.as_i64(), end.as_i64())
.fetch_all(&pg.connection_pool)
.await;

Expand All @@ -58,7 +58,7 @@ pub async fn pg_retrieve_external_blocks_in_range(pg: &Postgres, start: BlockNum
pub async fn pg_retrieve_external_receipts_in_range(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<ReceiptRow>> {
tracing::debug!(%start, %end, "retrieving external receipts in range");

let result = sqlx::query_file!("src/bin/importer/sql/select_external_receipts_in_range.sql", start.as_i64(), end.as_i64())
let result = sqlx::query_file!("src/bin/helpers/sql/select_external_receipts_in_range.sql", start.as_i64(), end.as_i64())
.fetch_all(&pg.connection_pool)
.await;

Expand All @@ -83,7 +83,7 @@ pub async fn pg_retrieve_external_receipts_in_range(pg: &Postgres, start: BlockN
pub async fn pg_retrieve_external_balances(pg: &Postgres) -> anyhow::Result<Vec<BalanceRow>> {
tracing::debug!("retrieving external balances");

let result = sqlx::query_file_as!(BalanceRow, "src/bin/importer/sql/select_external_balances.sql")
let result = sqlx::query_file_as!(BalanceRow, "src/bin/helpers/sql/select_external_balances.sql")
.fetch_all(&pg.connection_pool)
.await;

Expand All @@ -100,7 +100,7 @@ pub async fn pg_insert_external_balance(pg: &Postgres, address: Address, balance
tracing::debug!(%address, %balance, "saving external balance");

let result = sqlx::query_file!(
"src/bin/importer/sql/insert_external_balance.sql",
"src/bin/helpers/sql/insert_external_balance.sql",
address.as_ref(),
TryInto::<BigDecimal>::try_into(balance)?
)
Expand All @@ -124,7 +124,7 @@ pub async fn pg_insert_external_block_and_receipts(
let mut tx = pg.start_transaction().await?;

// insert block
let result = sqlx::query_file!("src/bin/importer/sql/insert_external_block.sql", number.as_i64(), block)
let result = sqlx::query_file!("src/bin/helpers/sql/insert_external_block.sql", number.as_i64(), block)
.execute(&mut *tx)
.await;

Expand All @@ -138,7 +138,7 @@ pub async fn pg_insert_external_block_and_receipts(

// insert receipts
for (hash, receipt) in receipts {
let result = sqlx::query_file!("src/bin/importer/sql/insert_external_receipt.sql", hash.as_ref(), number.as_i64(), receipt)
let result = sqlx::query_file!("src/bin/helpers/sql/insert_external_receipt.sql", hash.as_ref(), number.as_i64(), receipt)
.execute(&mut *tx)
.await;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
mod _postgres;
mod helpers;

use std::cmp::min;
use std::collections::HashMap;
use std::sync::Arc;

use _postgres::*;
use anyhow::anyhow;
use futures::try_join;
use futures::StreamExt;
use helpers::*;
use itertools::Itertools;
use stratus::config::ImporterImportConfig;
use stratus::config::ImporterOfflineConfig;
use stratus::eth::primitives::Account;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::BlockSelection;
Expand All @@ -35,7 +35,7 @@ type BacklogTask = (Vec<BlockRow>, Vec<ReceiptRow>);
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// init services
let config: ImporterImportConfig = init_global_services();
let config: ImporterOfflineConfig = init_global_services();
let pg = Arc::new(Postgres::new(&config.postgres_url).await?);
let storage = config.init_storage().await?;
let executor = config.init_executor(Arc::clone(&storage));
Expand Down
4 changes: 2 additions & 2 deletions src/bin/rpc_server_poller.rs → src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use stratus::config::RpcPollerConfig;
use stratus::config::ImporterOnlineConfig;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::Hash;
use stratus::infra::BlockchainClient;
Expand All @@ -13,7 +13,7 @@ const POLL_LATENCY: Duration = Duration::from_secs(1);

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let config: RpcPollerConfig = init_global_services();
let config: ImporterOnlineConfig = init_global_services();

let chain = Arc::new(BlockchainClient::new(&config.external_rpc, Duration::from_secs(1))?);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
mod _postgres;
mod helpers;

use std::cmp::min;
use std::sync::Arc;
use std::time::Duration;

use _postgres::*;
use anyhow::anyhow;
use anyhow::Context;
use futures::StreamExt;
use futures::TryStreamExt;
use helpers::*;
use itertools::Itertools;
use stratus::config::ImporterDownloadConfig;
use stratus::config::RpcDownloaderConfig;
use stratus::eth::primitives::Address;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::Hash;
Expand All @@ -29,7 +29,7 @@ const NETWORK_TIMEOUT: Duration = Duration::from_secs(2);
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// init services
let config: ImporterDownloadConfig = init_global_services();
let config: RpcDownloaderConfig = init_global_services();
let pg = Arc::new(Postgres::new(&config.postgres_url).await?);
let chain = Arc::new(BlockchainClient::new(&config.external_rpc, NETWORK_TIMEOUT)?);

Expand Down
File renamed without changes.
Loading

0 comments on commit 1a8bf27

Please sign in to comment.