Skip to content

Commit

Permalink
Merge pull request #17 from weaveVM/dev
Browse files Browse the repository at this point in the history
Double indexing threads: parallel live blockheight & from genesis backfilling
  • Loading branch information
charmful0x authored Sep 26, 2024
2 parents 3c3a463 + 9c78bd1 commit 522e6bd
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 28 deletions.
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
archiver_pk=...
archiver_pk="..."
backfill_pk="..."
network="./networks/your_network.json"

DATABASE_HOST=""
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[package]
name = "wvm-archiver"
version = "0.1.3"
version = "0.2.3"
edition = "2021"
description = "EL data pipeline for WVM testnet v0"
authors = ["charmful0x <[email protected]>"]
license = "MIT"
repository = "https://github.com/weavevm/wvm-archiver"
readme = "README.md"
documentation = "https://docs.wvm.dev"
keywords = ["wvm", "indexer", "arweave"]
keywords = ["wvm", "indexer", "arweave", "etl"]

[dependencies]
anyhow = "1.0.86"
Expand Down
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ While a WeaveVM Archiver node can run without web2 component dependencies, this

```js
archiver_pk="" // WeaveVM archiver PK
backfill_pk="" // WeaveVM backfill PK
network="./networks/your_network.json"

DATABASE_HOST="" // planetscale
Expand All @@ -45,10 +46,11 @@ To start archiving your network block data on WeaveVM:

1. Add your network config file to the [networks](./networks/) directory.
2. Name your config file using snake_case syntax (e.g., `your_network_name.json`).
3. Modify properties that don't have a `wvm_` prefix in the config JSON file.
4. Fund your `archiver_address` with a sufficient amount of tWVM (1 MB costs ~ 5 cents). Check out WVM Faucet to claim $tWVM.
5. Choose a unique `archive_pool_address` that's different from your `archiver_address`.
6. Set up your PlanetScale DB according to `db_schema.sql`.
3. Modify properties that don't have a `wvm_` prefix in the config JSON file. Check [_template.json](./networks/_template.json) guide
4. Fund your `archiver_address` & `backfill_address` with a sufficient amount of tWVM (1 MB costs ~ 5 cents). Check out [WVM Faucet](https://wvm.dev/faucet) to claim $tWVM. Make sure that the two addresses are distinct.
5. Choose a unique `archive_pool_address` that's different from your `archiver_address` & `backfill_address`
6. set `start_block` value to the most recent network's blockheight. That will facilitate the archiver to start in sync with live blockheight while, in parallel, reindexing from genesis using the `backfill_address`.
7. Set up your PlanetScale DB according to `db_schema.sql`.

### RPC Proxy and Caching

Expand All @@ -64,6 +66,7 @@ docker-compose up -d
```

Finally, you can set eRPC's proxy URL in each relative network config.

```optimism.json
{
"name": "Optimism",
Expand All @@ -73,15 +76,14 @@ Finally, you can set eRPC's proxy URL in each relative network config.
}
```


## How it works

The WeaveVM Archiver node operates as follows:

1. It starts downloading the target EVM network block data from the RPC you provide in the network config file.
2. The node begins pulling blocks from the `start_block` defined in the network's config file.
3. The block data is then serialized in [borsh](https://borsh.io) format and compressed using Brotli.
4. The serialized-compressed data is pushed to WeaveVM as calldata transaction from the `archiver_address` to the `archive_pool_address`.
4. The serialized-compressed data is pushed to WeaveVM as calldata transaction from the `archiver_address` & `backfill_address` to the `archive_pool_address`.
5. Simultaneously, the resulting TXID from pushing data to WeaveVM and the archived EVM block ID are indexed in the cloud for faster data retrieval.

## Server Methods
Expand Down
13 changes: 13 additions & 0 deletions networks/_template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name": "$YOUR_NETWORK_NAME",
"network_chain_id": 1, // your network ID
"wvm_chain_id": 9496,
"network_rpc": "https://", // your network public RPC URL
"wvm_rpc": "https://testnet-rpc.wvm.dev",
"block_time": 1, // your network block time in seconds
"start_block": 7777, // at what blockheight your want to start the
// archiving, it's advised to use the most recent blockheight
"archiver_address": "$ARCHIVING_ADDRESS_1", // it archives starting from "start_block"
"backfill_address": "$ARCHIVING_ADDRESS_2", // it archives from genesis until start_block
"archive_pool_address": "$DIFFERENT_ADDRESS" // most common is 0x0 address (null)
}
3 changes: 2 additions & 1 deletion networks/metis.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"network_rpc": "https://andromeda.metis.io/?owner=1088",
"wvm_rpc": "https://testnet-rpc.wvm.dev",
"block_time": 9,
"start_block": 18039936,
"start_block": 18574702,
"archiver_address": "0x197f818c1313DC58b32D88078ecdfB40EA822614",
"backfill_address": "0x123463a4B065722E99115D6c222f267d9cABb524",
"archive_pool_address": "0xa2A0D977847805fE224B789D8C4d3D711ab251e7"
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub mod utils;
pub mod utils;
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::utils::archive_block::sprint_blocks_archiving;
use crate::utils::backfill_genesis::backfill_from_genesis;
use crate::utils::schema::Network;
use crate::utils::server_handlers::{handle_block, handle_block_raw, handle_info, handle_weave_gm};
use axum::{routing::get, Router};
Expand All @@ -21,5 +22,9 @@ async fn main() -> shuttle_axum::ShuttleAxum {
task::spawn(async move {
sprint_blocks_archiving().await;
});
// backfill blocks from genesis till network.start_block
task::spawn(async move {
backfill_from_genesis().await.unwrap();
});
Ok(router.into())
}
21 changes: 15 additions & 6 deletions src/utils/archive_block.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::utils::get_block::{by_number, get_current_block_number};
use crate::utils::planetscale::{ps_archive_block, ps_get_latest_block_id};
use crate::utils::schema::{Block, Network};
use crate::utils::transaction::send_wvm_calldata;
use crate::utils::transaction::{send_wvm_calldata, send_wvm_calldata_backfill};
use anyhow::Error;
use std::{thread, time::Duration};

pub async fn archive(block_number: Option<u64>) -> Result<String, Error> {
pub async fn archive(block_number: Option<u64>, is_backfill: bool) -> Result<String, Error> {
let network = Network::config();
let start_block = network.start_block;
let block_to_archive = block_number.unwrap_or(start_block);
Expand All @@ -23,7 +23,12 @@ pub async fn archive(block_number: Option<u64>) -> Result<String, Error> {
// println!("borsh vec length: {:?}", borsh_res.len());
// println!("brotli vec length: {:?}", brotli_res.len());

let txid = send_wvm_calldata(brotli_res).await.unwrap();
let txid = if is_backfill {
send_wvm_calldata_backfill(brotli_res).await.unwrap()
} else {
send_wvm_calldata(brotli_res).await.unwrap()
};

Ok(txid)
}

Expand All @@ -33,16 +38,20 @@ pub async fn sprint_blocks_archiving() {
let mut current_block_number = get_current_block_number().await.as_u64();
let ps_latest_archived_block = ps_get_latest_block_id().await;
// it defaults to network.start_block if planestcale fails
let mut start_block = ps_latest_archived_block;
let mut start_block = if ps_latest_archived_block < network.start_block {
network.start_block
} else {
ps_latest_archived_block
};

loop {
if ps_latest_archived_block < current_block_number - 1 {
if start_block < current_block_number - 1 {
println!("\n{}", "#".repeat(100));
println!(
"\nARCHIVING BLOCK #{} of Network {} -- ChainId: {}\n",
start_block, network.name, network.network_chain_id
);
let archive_txid = archive(Some(start_block)).await.unwrap();
let archive_txid = archive(Some(start_block), false).await.unwrap();
let _ = ps_archive_block(&start_block, &archive_txid).await;
start_block += 1;
println!("\n{}", "#".repeat(100));
Expand Down
27 changes: 27 additions & 0 deletions src/utils/backfill_genesis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::utils::archive_block::archive;
use crate::utils::planetscale::ps_archive_block;
use crate::utils::schema::Network;
use anyhow::{Error, Ok};

pub async fn backfill_from_genesis() -> Result<(), Error> {
let network = Network::config();
let config_start_block = network.start_block;
let backfill_blocks: Vec<u64> = (0..=config_start_block).collect();

if config_start_block == 0 {
return Ok(());
}

for &block_number in backfill_blocks.iter() {
println!("\n{}", "#".repeat(100));
println!(
"\nARCHIVING **BACKFILL** BLOCK #{} of Network {} -- ChainId: {}\n",
&block_number, network.name, network.network_chain_id
);
let archive_txid = archive(Some(block_number), true).await.unwrap();
let _ = ps_archive_block(&block_number, &archive_txid).await;
println!("\n{}", "#".repeat(100));
}

Ok(())
}
1 change: 1 addition & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod archive_block;
pub mod backfill_genesis;
pub mod env_var;
pub mod get_block;
pub mod planetscale;
Expand Down
29 changes: 19 additions & 10 deletions src/utils/planetscale.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::utils::env_var::get_env_var;
use crate::utils::schema::{Network, PsGetBlockTxid, PsGetExtremeBlock};
use crate::utils::schema::{Network, PsGetBlockTxid, PsGetExtremeBlock, PsGetTotalBlocksCount};
use anyhow::Error;
use planetscale_driver::{query, PSConnection};
use serde_json::Value;
Expand All @@ -22,12 +22,13 @@ pub async fn ps_archive_block(
let wvm_calldata_txid = wvm_calldata_txid.trim_matches('"');
let conn = ps_init().await;

let res =
query("INSERT INTO WeaveVMArchiver(NetworkBlockId, WeaveVMArchiveTxid) VALUES($0, \"$1\")")
.bind(network_block_id)
.bind(wvm_calldata_txid)
.execute(&conn)
.await;
let res = query(
"INSERT INTO WeaveVMArchiverMetis(NetworkBlockId, WeaveVMArchiveTxid) VALUES($0, \"$1\")",
)
.bind(network_block_id)
.bind(wvm_calldata_txid)
.execute(&conn)
.await;

match res {
Ok(result) => {
Expand All @@ -46,7 +47,7 @@ pub async fn ps_get_latest_block_id() -> u64 {
let conn = ps_init().await;

let latest_archived: u64 =
query("SELECT MAX(NetworkBlockId) AS LatestNetworkBlockId FROM WeaveVMArchiver;")
query("SELECT MAX(NetworkBlockId) AS LatestNetworkBlockId FROM WeaveVMArchiverMetis;")
.fetch_scalar(&conn)
.await
.unwrap_or(network.start_block);
Expand All @@ -59,7 +60,7 @@ pub async fn ps_get_archived_block_txid(id: u64) -> Value {
let conn = ps_init().await;

let query_formatted = format!(
"SELECT WeaveVMArchiveTxid FROM WeaveVMArchiver WHERE NetworkBlockId = {}",
"SELECT WeaveVMArchiveTxid FROM WeaveVMArchiverMetis WHERE NetworkBlockId = {}",
id
);
let txid: PsGetBlockTxid = query(&query_formatted).fetch_one(&conn).await.unwrap();
Expand All @@ -78,7 +79,7 @@ pub async fn ps_get_blocks_extremes(extreme: &str) -> Value {
};

let query_formatted = format!(
"SELECT NetworkBlockId FROM WeaveVMArchiver ORDER BY NetworkBlockId {} LIMIT 1;",
"SELECT NetworkBlockId FROM WeaveVMArchiverMetis ORDER BY NetworkBlockId {} LIMIT 1;",
query_type
);

Expand All @@ -87,3 +88,11 @@ pub async fn ps_get_blocks_extremes(extreme: &str) -> Value {
let res = serde_json::json!(query);
res
}

pub async fn ps_get_archived_blocks_count() -> PsGetTotalBlocksCount {
let conn = ps_init().await;

let query_formatted = "SELECT MAX(Id) FROM WeaveVMArchiverMetis;";
let count: PsGetTotalBlocksCount = query(&query_formatted).fetch_one(&conn).await.unwrap();
count
}
9 changes: 8 additions & 1 deletion src/utils/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::utils::env_var::get_env_var;
use crate::utils::get_block::get_current_block_number;
use crate::utils::planetscale::ps_get_archived_blocks_count;
use crate::utils::transaction::get_archiver_balance;
use borsh::{from_slice, to_vec};
use borsh_derive::{BorshDeserialize, BorshSerialize};
Expand All @@ -22,6 +23,7 @@ pub struct Network {
pub block_time: u32,
pub start_block: u64, // as per ethers_provider
pub archiver_address: String,
pub backfill_address: String,
pub archive_pool_address: String,
}

Expand Down Expand Up @@ -120,6 +122,11 @@ pub struct PsGetExtremeBlock {
pub block_id: u64,
}

#[derive(Database, Debug, Serialize)]
pub struct PsGetTotalBlocksCount {
pub count: u64,
}

#[derive(Debug, Serialize)]
pub struct InfoServerResponse {
first_block: Option<u64>,
Expand All @@ -136,7 +143,7 @@ pub struct InfoServerResponse {
impl InfoServerResponse {
pub async fn new(first_block: Option<u64>, last_block: Option<u64>) -> InfoServerResponse {
let network = Network::config();
let total_archived_blocks = last_block.unwrap_or(0) - first_block.unwrap_or(0);
let total_archived_blocks = (ps_get_archived_blocks_count().await).count;
let archiver_balance = get_archiver_balance().await;
let archiver_balance = Some(archiver_balance).unwrap();
let current_live_block = get_current_block_number().await.as_u64();
Expand Down
21 changes: 21 additions & 0 deletions src/utils/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@ pub async fn send_wvm_calldata(block_data: Vec<u8>) -> Result<String, Box<dyn st
Ok(txid)
}

pub async fn send_wvm_calldata_backfill(
block_data: Vec<u8>,
) -> Result<String, Box<dyn std::error::Error>> {
let network = Network::config();
let provider = Network::provider(&network, true).await;
let private_key = get_env_var("backfill_pk").unwrap();
let wallet: LocalWallet = private_key
.parse::<LocalWallet>()?
.with_chain_id(network.wvm_chain_id);
let client = SignerMiddleware::new(provider.clone(), wallet.clone());

let address_from = network.backfill_address.parse::<Address>()?;
let address_to = network.archive_pool_address.parse::<Address>()?;
// check archiver tWVM balance (non-zero)
assert_non_zero_balance(&provider, &address_from).await;
// send calldata tx to WeaveVM
let txid = send_transaction(&client, &address_from, &address_to, block_data).await?;

Ok(txid)
}

async fn assert_non_zero_balance(provider: &Provider<Http>, address: &Address) {
let balance = provider.get_balance(address.clone(), None).await.unwrap();
assert!(balance > 0.into());
Expand Down

0 comments on commit 522e6bd

Please sign in to comment.