diff --git a/Cargo.toml b/Cargo.toml index 6290186..3394196 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["chain", "shared", "orm", "webserver"] +members = ["block-index", "chain", "shared", "orm", "webserver"] [workspace.package] authors = ["Heliax AG "] @@ -15,6 +15,7 @@ anyhow = "1.0.75" axum = { version = "0.6.20", features = [ "tower-log" ] } axum-macros = "0.3.8" axum-trace-id = "0.1.0" +bincode = "1.3.3" clap = { version = "4.4.2", features = [ "derive", "env" ] } clap-verbosity-flag = "2.1.1" deadpool-diesel = { version = "0.5.0", features = ["postgres"] } @@ -38,5 +39,7 @@ tokio-retry = "0.3" tower = { version = "0.4", features = [ "util", "timeout", "load-shed", "limit", "buffer" ] } tower-http = { version = "0.4.4", features = [ "compression-full", "limit", "trace", "cors" ] } tracing = "0.1" +tracing-appender = "0.2.0" tracing-subscriber = { version = "0.3", features = [ "env-filter" ] } validator = { version = "0.16.0", features = ["derive"] } +xorf = { version = "0.11.0", features = ["serde"]} diff --git a/block-index/Cargo.toml b/block-index/Cargo.toml new file mode 100644 index 0000000..bd5c8ed --- /dev/null +++ b/block-index/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "block-index" +description = "Namada masp indexer block index builder." +resolver = "2" +authors.workspace = true +edition.workspace = true +license.workspace = true +readme.workspace = true +version.workspace = true + +[[bin]] +name = "block-index-builder" +path = "src/main.rs" + +[dependencies] +anyhow.workspace = true +bincode.workspace = true +clap-verbosity-flag.workspace = true +clap.workspace = true +deadpool-diesel.workspace = true +diesel.workspace = true +diesel_migrations.workspace = true +orm.workspace = true +shared.workspace = true +tokio.workspace = true +tracing-appender.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true +xorf.workspace = true + +[build-dependencies] +vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/block-index/build.rs b/block-index/build.rs new file mode 100644 index 0000000..1ff10a0 --- /dev/null +++ b/block-index/build.rs @@ -0,0 +1,8 @@ +use std::error::Error; + +use vergen::EmitBuilder; + +fn main() -> Result<(), Box> { + EmitBuilder::builder().all_git().emit()?; + Ok(()) +} diff --git a/block-index/src/appstate.rs b/block-index/src/appstate.rs new file mode 100644 index 0000000..69a8492 --- /dev/null +++ b/block-index/src/appstate.rs @@ -0,0 +1,38 @@ +use std::env; + +use anyhow::Context; +use deadpool_diesel::postgres::{Object, Pool as DbPool}; + +#[derive(Clone)] +pub struct AppState { + db: DbPool, +} + +impl AppState { + pub fn new(db_url: String) -> anyhow::Result { + let max_pool_size = env::var("DATABASE_POOL_SIZE") + .unwrap_or_else(|_| 8.to_string()) + .parse::() + .unwrap_or(8_usize); + let pool_manager = deadpool_diesel::Manager::from_config( + db_url, + deadpool_diesel::Runtime::Tokio1, + deadpool_diesel::ManagerConfig { + recycling_method: deadpool_diesel::RecyclingMethod::Verified, + }, + ); + let pool = DbPool::builder(pool_manager) + .max_size(max_pool_size) + .build() + .context("Failed to build Postgres db pool")?; + + Ok(Self { db: pool }) + } + + pub async fn get_db_connection(&self) -> anyhow::Result { + self.db + .get() + .await + .context("Failed to get db connection handle from deadpool") + } +} diff --git a/block-index/src/config.rs b/block-index/src/config.rs new file mode 100644 index 0000000..4d11f80 --- /dev/null +++ b/block-index/src/config.rs @@ -0,0 +1,41 @@ +use std::num::NonZeroU64; + +use clap_verbosity_flag::{InfoLevel, LevelFilter, Verbosity}; +use tracing::Level; +use tracing_appender::non_blocking::NonBlocking; +use tracing_subscriber::FmtSubscriber; + +#[derive(clap::Parser)] +pub struct AppConfig { + /// Link to the Postgres database + #[clap(long, env)] + pub database_url: String, + + /// How often (in seconds) a new block index is built + #[clap(long, env)] + pub interval: Option, + + #[command(flatten)] + pub verbosity: Verbosity, +} + +pub fn install_tracing_subscriber( + verbosity: Verbosity, + non_blocking_logger: NonBlocking, +) { + let log_level = match verbosity.log_level_filter() { + LevelFilter::Off => None, + LevelFilter::Error => Some(Level::ERROR), + LevelFilter::Warn => Some(Level::WARN), + LevelFilter::Info => Some(Level::INFO), + LevelFilter::Debug => Some(Level::DEBUG), + LevelFilter::Trace => Some(Level::TRACE), + }; + if let Some(log_level) = log_level { + let subscriber = FmtSubscriber::builder() + .with_max_level(log_level) + .with_writer(non_blocking_logger) + .finish(); + tracing::subscriber::set_global_default(subscriber).unwrap(); + } +} diff --git a/block-index/src/main.rs b/block-index/src/main.rs new file mode 100644 index 0000000..4f80910 --- /dev/null +++ b/block-index/src/main.rs @@ -0,0 +1,294 @@ +pub mod appstate; +pub mod config; + +use std::future::{self, Future}; +use std::num::NonZeroU64; +use std::ops::ControlFlow; +use std::sync::atomic::{self, AtomicBool}; +use std::sync::{Arc, Mutex}; +use std::task::{Poll, Waker}; +use std::time::Duration; + +use anyhow::{anyhow, Context}; +use clap::Parser; +use deadpool_diesel::postgres::Object; +use orm::block_index::BlockIndex; +use orm::schema; +use shared::error::{ContextDbInteractError, IntoMainError, MainError}; +use tokio::signal; +use tokio::time::sleep; +use xorf::{BinaryFuse16, Filter}; + +use crate::appstate::AppState; +use crate::config::AppConfig; + +const VERSION_STRING: &str = env!("VERGEN_GIT_SHA"); + +macro_rules! exit { + () => {{ + tracing::info!("Exiting..."); + return Ok(()); + }}; +} + +#[tokio::main(worker_threads = 2)] +async fn main() -> Result<(), MainError> { + let AppConfig { + verbosity, + database_url, + interval, + } = AppConfig::parse(); + + let (non_blocking_logger, _worker) = + tracing_appender::non_blocking(std::io::stdout()); + config::install_tracing_subscriber(verbosity, non_blocking_logger); + + tracing::info!(version = VERSION_STRING, "Started the block index builder"); + let mut exit_handle = must_exit(); + + let app_state = AppState::new(database_url).into_db_error()?; + + if wait_for_migrations(&mut exit_handle, &app_state) + .await + .is_break() + { + exit!(); + } + build_block_indexes(&mut exit_handle, interval, &app_state).await; + + exit!(); +} + +async fn wait_for_migrations( + mut exit_handle: F, + app_state: &AppState, +) -> ControlFlow<()> +where + F: Future + Unpin, +{ + while run_migrations(app_state).await.is_err() { + const SLEEP_AMOUNT: Duration = Duration::from_secs(5); + + tracing::info!(after = ?SLEEP_AMOUNT, "Retrying migrations"); + + tokio::select! { + _ = &mut exit_handle => { + return ControlFlow::Break(()); + } + _ = sleep(SLEEP_AMOUNT) => {} + } + } + + ControlFlow::Continue(()) +} + +async fn build_block_indexes( + mut exit_handle: F, + interval: Option, + app_state: &AppState, +) where + F: Future + Unpin, +{ + const DEFAULT_SLEEP_AMOUNT: Duration = Duration::from_secs(30 * 60); + let sleep_amount = interval + .map(|interval| Duration::from_secs(interval.get())) + .unwrap_or(DEFAULT_SLEEP_AMOUNT); + + loop { + tracing::debug!(after = ?sleep_amount, "Building new block index"); + + tokio::select! { + _ = &mut exit_handle => { + return; + } + _ = sleep(sleep_amount) => { + _ = build_new_block_index(app_state).await; + } + } + } +} + +fn must_exit() -> impl Future { + struct ExitHandle { + flag: AtomicBool, + waker: Mutex>, + } + + let fut_handle = Arc::new(ExitHandle { + flag: AtomicBool::new(false), + waker: Mutex::new(None), + }); + let task_handle = Arc::clone(&fut_handle); + + tokio::spawn(async move { + let mut interrupt = + signal::unix::signal(signal::unix::SignalKind::interrupt()) + .expect("Failed to install INT signal handler"); + let mut term = + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Failed to install TERM signal handler"); + let mut quit = signal::unix::signal(signal::unix::SignalKind::quit()) + .expect("Failed to install QUIT signal handler"); + + let signal_descriptor = tokio::select! { + _ = interrupt.recv() => "INT", + _ = term.recv() => "TERM", + _ = quit.recv() => "QUIT", + }; + tracing::info!(which = signal_descriptor, "Signal received"); + + atomic::fence(atomic::Ordering::Release); + task_handle.flag.store(true, atomic::Ordering::Relaxed); + + let waker = task_handle.waker.lock().unwrap().take(); + if let Some(waker) = waker { + waker.wake(); + } + }); + + future::poll_fn(move |cx| { + if fut_handle.flag.load(atomic::Ordering::Relaxed) { + atomic::fence(atomic::Ordering::Acquire); + Poll::Ready(()) + } else { + *fut_handle.waker.lock().unwrap() = Some(cx.waker().clone()); + Poll::Pending + } + }) +} + +async fn run_migrations(app_state: &AppState) -> Result<(), MainError> { + use diesel_migrations::{ + embed_migrations, EmbeddedMigrations, MigrationHarness, + }; + + const MIGRATIONS: EmbeddedMigrations = + embed_migrations!("../orm/migrations/"); + + async fn run_migrations_inner(conn: Object) -> anyhow::Result<()> { + tracing::debug!("Running db migrations..."); + + conn.interact(|transaction_conn| { + transaction_conn + .run_pending_migrations(MIGRATIONS) + .map_err(|_| anyhow!("Failed to run db migrations"))?; + anyhow::Ok(()) + }) + .await + .context_db_interact_error()??; + + tracing::debug!("Finished running db migrations"); + + anyhow::Ok(()) + } + + run_migrations_inner(app_state.get_db_connection().await.into_db_error()?) + .await + .into_db_error() +} + +async fn build_new_block_index(app_state: &AppState) -> Result<(), MainError> { + use diesel::connection::DefaultLoadingMode as DbDefaultLoadingMode; + use diesel::prelude::*; + + tracing::info!("Starting new masp txs block index"); + + tracing::debug!("Reading all block heights with masp transactions from db"); + + let conn = app_state.get_db_connection().await.into_db_error()?; + + let block_heights = conn + .interact(|conn| { + use schema::tx::dsl::*; + + tx.select(block_height) + .distinct() + .load_iter::<_, DbDefaultLoadingMode>(conn) + .context("Failed to query block heights with masp txs")? + .try_fold(Vec::new(), |mut accum, maybe_block_height| { + tracing::debug!("Reading block height entry from db"); + let height: i32 = maybe_block_height.context( + "Failed to get tx block height row data from db", + )?; + tracing::debug!("Read block height entry from db"); + accum.push(u64::try_from(height).context( + "Failed to convert block height from i32 to u64", + )?); + anyhow::Ok(accum) + }) + }) + .await + .context_db_interact_error() + .into_db_error()? + .into_db_error()?; + + let block_heights_len = block_heights.len(); + tracing::debug!( + num_blocks_with_masp_txs = block_heights_len, + "Read all block heights with masp transactions from db" + ); + + let serialized_filter = tokio::task::block_in_place(|| { + tracing::debug!( + "Building binary fuse xor filter of all heights with masp \ + transactions" + ); + + let filter: BinaryFuse16 = block_heights + .try_into() + .map_err(|err| { + anyhow!( + "Failed to convert queried block heights into binary fuse \ + xor filter: {err}", + ) + }) + .into_conversion_error()?; + + let serialized = bincode::serialize(&filter) + .context( + "Failed to serialze binary fuse xor filter of block heights", + ) + .into_serialization_error()?; + + tracing::debug!( + index_len = filter.len(), + "Binary fuse xor filter built" + ); + + Ok(serialized) + })?; + + tracing::debug!("Storing binary fuse xor filter in db"); + + conn.interact(|conn| { + use schema::block_index::dsl::*; + + let db_filter = BlockIndex { + id: 0, + serialized_data: serialized_filter, + }; + + diesel::insert_into(block_index) + .values(&db_filter) + .on_conflict(id) + .do_update() + .set(serialized_data.eq(&db_filter.serialized_data)) + .execute(conn) + .context("Failed to insert masp txs block index into db")?; + + anyhow::Ok(()) + }) + .await + .context_db_interact_error() + .into_db_error()? + .into_db_error()?; + + tracing::debug!("Stored binary fuse xor filter in db"); + + tracing::info!( + num_blocks_with_masp_txs = block_heights_len, + "Built and stored new masp txs block index" + ); + + Ok(()) +} diff --git a/orm/migrations/2024-07-23-171457_block_index/down.sql b/orm/migrations/2024-07-23-171457_block_index/down.sql new file mode 100644 index 0000000..96d4cdc --- /dev/null +++ b/orm/migrations/2024-07-23-171457_block_index/down.sql @@ -0,0 +1 @@ +DROP TABLE block_index; diff --git a/orm/migrations/2024-07-23-171457_block_index/up.sql b/orm/migrations/2024-07-23-171457_block_index/up.sql new file mode 100644 index 0000000..42d5bc6 --- /dev/null +++ b/orm/migrations/2024-07-23-171457_block_index/up.sql @@ -0,0 +1,5 @@ +CREATE TABLE block_index ( + id SERIAL PRIMARY KEY, + -- NB: serialized with `bincode` + serialized_data bytea NOT NULL +); diff --git a/orm/src/block_index.rs b/orm/src/block_index.rs new file mode 100644 index 0000000..3e662ba --- /dev/null +++ b/orm/src/block_index.rs @@ -0,0 +1,11 @@ +use diesel::{Insertable, Queryable, Selectable}; + +use crate::schema::block_index; + +#[derive(Insertable, Queryable, Selectable, Clone)] +#[diesel(table_name = block_index)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct BlockIndex { + pub id: i32, + pub serialized_data: Vec, +} diff --git a/orm/src/lib.rs b/orm/src/lib.rs index a96a04c..80dcd1f 100644 --- a/orm/src/lib.rs +++ b/orm/src/lib.rs @@ -1,3 +1,4 @@ +pub mod block_index; pub mod chain_state; pub mod notes_map; pub mod schema; diff --git a/orm/src/schema.rs b/orm/src/schema.rs index f595415..06d0ffd 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -1,5 +1,12 @@ // @generated automatically by Diesel CLI. +diesel::table! { + block_index (id) { + id -> Int4, + serialized_data -> Bytea, + } +} + diesel::table! { chain_state (id) { id -> Int4, @@ -45,6 +52,7 @@ diesel::table! { } diesel::allow_tables_to_appear_in_same_query!( + block_index, chain_state, commitment_tree, notes_map, diff --git a/shared/src/error.rs b/shared/src/error.rs index 0e9afa8..1779d16 100644 --- a/shared/src/error.rs +++ b/shared/src/error.rs @@ -25,6 +25,16 @@ pub fn ok(x: T) -> Result { pub trait IntoMainError: Sized { fn into_main_error(self, description: &str) -> Result; + #[inline] + fn into_conversion_error(self) -> Result { + self.into_main_error("Conversion error") + } + + #[inline] + fn into_serialization_error(self) -> Result { + self.into_main_error("Serialization error") + } + #[inline] fn into_rpc_error(self) -> Result { self.into_main_error("RPC error") diff --git a/swagger.yml b/swagger.yml index 638cd38..fb1fa4c 100644 --- a/swagger.yml +++ b/swagger.yml @@ -5,6 +5,15 @@ info: servers: - url: https://localhost:5000/api/v1 paths: + /block-index: + get: + responses: + '200': + description: Compressed (lossy) index of all blocks containing masp txs. + content: + application/json: + schema: + $ref: '#/components/schemas/BlockIndexResponse' /health: get: responses: @@ -187,3 +196,22 @@ components: description: The index of the individual masp transaction in the block. description: The batch of masp transactions in this slot. description: The vector of masp transactions. + BlockIndexResponse: + type: object + properties: + index: + description: Compressed (lossy) index of all blocks containing masp txs. + type: object + properties: + seed: + type: integer + segment_length: + type: integer + segment_length_mask: + type: integer + segment_count_length: + type: integer + fingerprints: + type: array + items: + type: integer diff --git a/webserver/Cargo.toml b/webserver/Cargo.toml index fc75606..9a5910d 100644 --- a/webserver/Cargo.toml +++ b/webserver/Cargo.toml @@ -21,6 +21,7 @@ anyhow.workspace = true axum-macros.workspace = true axum-trace-id.workspace = true axum.workspace = true +bincode.workspace = true clap.workspace = true deadpool-diesel.workspace = true diesel.workspace = true @@ -39,6 +40,7 @@ tower.workspace = true tracing-subscriber.workspace = true tracing.workspace = true validator.workspace = true +xorf.workspace = true [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/webserver/src/app.rs b/webserver/src/app.rs index 5f2aff0..e3fe800 100644 --- a/webserver/src/app.rs +++ b/webserver/src/app.rs @@ -51,6 +51,10 @@ impl ApplicationServer { .route("/notes-map", get(handler::notes_map::get_notes_map)) .route("/tx", get(handler::tx::get_tx)) .route("/height", get(handler::namada_state::get_latest_height)) + .route( + "/block-index", + get(handler::namada_state::get_block_index), + ) .with_state(common_state) }; diff --git a/webserver/src/error/namada_state.rs b/webserver/src/error/namada_state.rs index 218b35c..c8a0c8c 100644 --- a/webserver/src/error/namada_state.rs +++ b/webserver/src/error/namada_state.rs @@ -6,6 +6,8 @@ use crate::response::api::ApiErrorResponse; #[derive(Error, Debug)] pub enum NamadaStateError { + #[error("Block index not found")] + BlockIndexNotFound, #[error("Database error: {0}")] Database(String), } @@ -13,6 +15,7 @@ pub enum NamadaStateError { impl IntoResponse for NamadaStateError { fn into_response(self) -> Response { let status_code = match self { + NamadaStateError::BlockIndexNotFound => StatusCode::NOT_FOUND, NamadaStateError::Database(_) => StatusCode::INTERNAL_SERVER_ERROR, }; diff --git a/webserver/src/handler/namada_state.rs b/webserver/src/handler/namada_state.rs index 059f789..4b7d1a7 100644 --- a/webserver/src/handler/namada_state.rs +++ b/webserver/src/handler/namada_state.rs @@ -5,7 +5,7 @@ use axum_trace_id::TraceId; use shared::error::InspectWrap; use crate::error::namada_state::NamadaStateError; -use crate::response::namada_state::LatestHeightResponse; +use crate::response::namada_state::{BlockIndexResponse, LatestHeightResponse}; use crate::state::common::CommonState; #[debug_handler] @@ -25,3 +25,23 @@ pub async fn get_latest_height( block_height: maybe_height.map(|h| h.0).unwrap_or_default(), })) } + +#[debug_handler] +pub async fn get_block_index( + _trace_id: TraceId, + State(state): State, +) -> Result, NamadaStateError> { + let maybe_block_index = state + .namada_state_service + .get_block_index() + .await + .inspect_wrap("get_block_index", |err| { + NamadaStateError::Database(err.to_string()) + })?; + + if let Some(index) = maybe_block_index { + Ok(Json(BlockIndexResponse { index })) + } else { + Err(NamadaStateError::BlockIndexNotFound) + } +} diff --git a/webserver/src/repository/namada_state.rs b/webserver/src/repository/namada_state.rs index ba6fb84..1f13532 100644 --- a/webserver/src/repository/namada_state.rs +++ b/webserver/src/repository/namada_state.rs @@ -1,9 +1,9 @@ use anyhow::Context; use diesel::dsl::max; -use diesel::{QueryDsl, RunQueryDsl}; -use orm::schema::chain_state; +use diesel::{OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper}; use shared::error::ContextDbInteractError; use shared::height::BlockHeight; +use xorf::BinaryFuse16; use crate::appstate::AppState; @@ -16,6 +16,8 @@ pub trait NamadaStateRepositoryTrait { fn new(app_state: AppState) -> Self; async fn get_latest_height(&self) -> anyhow::Result>; + + async fn get_block_index(&self) -> anyhow::Result>; } impl NamadaStateRepositoryTrait for NamadaStateRepository { @@ -31,6 +33,8 @@ impl NamadaStateRepositoryTrait for NamadaStateRepository { let block_height = conn .interact(move |conn| { + use orm::schema::chain_state; + chain_state::dsl::chain_state .select(max(chain_state::dsl::block_height)) .first::>(conn) @@ -41,4 +45,43 @@ impl NamadaStateRepositoryTrait for NamadaStateRepository { Ok(block_height.map(BlockHeight::from)) } + + async fn get_block_index(&self) -> anyhow::Result> { + let conn = self.app_state.get_db_connection().await.context( + "Failed to retrieve connection from the pool of database \ + connections", + )?; + + let maybe_serialized_data = conn + .interact(move |conn| { + use orm::block_index::BlockIndex; + use orm::schema::block_index::dsl::block_index; + + anyhow::Ok( + block_index + .select(BlockIndex::as_select()) + .first::(conn) + .optional() + .context("Failed to get latest block index from db")? + .map( + |BlockIndex { + serialized_data, .. + }| serialized_data, + ), + ) + }) + .await + .context_db_interact_error()??; + + tokio::task::block_in_place(|| { + maybe_serialized_data + .map(|data| { + bincode::deserialize(&data).context( + "Failed to deserialize block index data returned from \ + db", + ) + }) + .transpose() + }) + } } diff --git a/webserver/src/response/namada_state.rs b/webserver/src/response/namada_state.rs index 1383a3c..907a6d0 100644 --- a/webserver/src/response/namada_state.rs +++ b/webserver/src/response/namada_state.rs @@ -1,6 +1,12 @@ use serde::{Deserialize, Serialize}; +use xorf::BinaryFuse16; #[derive(Clone, Debug, Deserialize, Serialize, Default)] pub struct LatestHeightResponse { pub block_height: u64, } + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct BlockIndexResponse { + pub index: BinaryFuse16, +} diff --git a/webserver/src/service/namada_state.rs b/webserver/src/service/namada_state.rs index dbc519d..129dc52 100644 --- a/webserver/src/service/namada_state.rs +++ b/webserver/src/service/namada_state.rs @@ -22,4 +22,10 @@ impl NamadaStateService { ) -> anyhow::Result> { self.namada_state_repo.get_latest_height().await } + + pub async fn get_block_index( + &self, + ) -> anyhow::Result> { + self.namada_state_repo.get_block_index().await + } }