From 1c7deb4cf1acf77d5771eb251da6e4e6a9c8319b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96zg=C3=BCr=20Akkurt?= <91746947+ozgrakkurt@users.noreply.github.com> Date: Thu, 26 Jan 2023 00:38:42 +0300 Subject: [PATCH] make data_path optional on worker (#111) --- Cargo.lock | 44 +++++++++++------------ worker/src/config.rs | 2 +- worker/src/data_ctx.rs | 78 ++++++++++++++++++++++------------------- worker/src/db_writer.rs | 12 ++++--- 4 files changed, 72 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96b50901..e8caaa2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,9 +83,9 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ea16c295198e958ef31930a6ef37d0fb64e9ca3b6116e6b93a8bdae96ee1000" +checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e" dependencies = [ "futures-core", "tokio", @@ -93,9 +93,9 @@ dependencies = [ [[package]] name = "actix-server" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0da34f8e659ea1b077bb4637948b815cd3768ad5a188fdcd74ff4d84240cd824" +checksum = "3e8613a75dd50cc45f473cee3c34d59ed677c0f7b44480ce3b8247d7dc519327" dependencies = [ "actix-rt", "actix-service", @@ -202,9 +202,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf6ccdb167abbf410dcb915cabd428929d7f6a04980b54a11f26a39f1c7f7107" +checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", "const-random", @@ -283,7 +283,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b843531e0a9d8dac44b0aa6adc926b2d970e8a627fe2105cd0498d5f93a6e97f" dependencies = [ - "ahash 0.8.2", + "ahash 0.8.3", "arrow-format", "base64 0.13.1", "bytemuck", @@ -346,9 +346,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.62" +version = "0.1.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "689894c2db1ea643a50834b999abf1c110887402542955ff5451dab8f861f9ed" +checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1" dependencies = [ "proc-macro2", "quote", @@ -928,9 +928,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.1.2" +version = "4.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e638668a62aced2c9fb72b5135a33b4a500485ccf2a0e402e09aa04ab2fc115" +checksum = "f13b9c79b5d1dd500d20ef541215a6423c75829ef43117e1b4d17fd8af0b5d76" dependencies = [ "bitflags", "clap_derive", @@ -1616,7 +1616,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash 0.8.2", + "ahash 0.8.3", "rayon", ] @@ -2437,7 +2437,7 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3ab5c1e10e42a34a44a6e1421869c84ad56fe217c6120cda697c84bc467eb9c" dependencies = [ - "ahash 0.8.2", + "ahash 0.8.3", "anyhow", "arrow2", "bitflags", @@ -2464,7 +2464,7 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "882e392cbc3e645bfa7ad582065764e21dc271cfeefee904f43a95ccd1b52cb7" dependencies = [ - "ahash 0.8.2", + "ahash 0.8.3", "anyhow", "arrow2", "csv-core", @@ -2490,7 +2490,7 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6cb356861b8967e39ca6205b8bded68b4986f0edd49f259cebbd59ad056e67cc" dependencies = [ - "ahash 0.8.2", + "ahash 0.8.3", "bitflags", "glob", "polars-arrow", @@ -2540,7 +2540,7 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f58b4b3daee78b95937930ad3cfc2a454d57802a92a39ee0ce220bb1ae627ef9" dependencies = [ - "ahash 0.8.2", + "ahash 0.8.3", "polars-arrow", "polars-core", "polars-io", @@ -2712,9 +2712,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.10.1" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3" +checksum = "356a0625f1954f730c0201cdab48611198dc6ce21f4acff55089b5a78e6e835b" dependencies = [ "crossbeam-channel", "crossbeam-deque", @@ -2954,9 +2954,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.8.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645926f31b250a2dca3c232496c2d898d91036e45ca0e97e0e2390c54e11be36" +checksum = "7c4437699b6d34972de58652c68b98cb5b53a4199ab126db8e20ec8ded29a721" dependencies = [ "bitflags", "core-foundation", @@ -3465,9 +3465,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "unicode-bidi" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0046be40136ef78dc325e0edefccf84ccddacd0afcc1ca54103fa3c61bbdab1d" +checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" [[package]] name = "unicode-ident" diff --git a/worker/src/config.rs b/worker/src/config.rs index 6ec51b21..c3109ef4 100644 --- a/worker/src/config.rs +++ b/worker/src/config.rs @@ -11,7 +11,7 @@ pub struct Config { pub db_path: PathBuf, /// Path to read parquet files from #[clap(long)] - pub data_path: PathBuf, + pub data_path: Option, #[command(flatten)] pub ingest: IngestConfig, #[command(flatten)] diff --git a/worker/src/data_ctx.rs b/worker/src/data_ctx.rs index 4148650f..f11e1bfe 100644 --- a/worker/src/data_ctx.rs +++ b/worker/src/data_ctx.rs @@ -68,41 +68,43 @@ impl DataCtx { } }); - // this task checks and registers new parquet files to database - tokio::spawn({ - let start = db.parquet_height(); - let data_path = config.data_path.clone(); - let db_writer = db_writer.clone(); + if let Some(data_path) = &config.data_path { + // this task checks and registers new parquet files to database + tokio::spawn({ + let start = db.parquet_height(); + let data_path = data_path.clone(); + let db_writer = db_writer.clone(); + + async move { + if let Err(e) = tokio::fs::create_dir_all(&data_path).await { + eprintln!( + "failed to create missing data directory:\n{}\nstopping parquet watcher", + e + ); + return; + } - async move { - if let Err(e) = tokio::fs::create_dir_all(&data_path).await { - eprintln!( - "failed to create missing data directory:\n{}\nstopping parquet watcher", - e - ); - return; - } + let mut next_start = start; + loop { + let dir_names = DirName::find_sorted(&data_path, next_start).await.unwrap(); - let mut next_start = start; - loop { - let dir_names = DirName::find_sorted(&data_path, next_start).await.unwrap(); + for dir_name in dir_names { + if !Self::parquet_folder_is_valid(&data_path, dir_name) + .await + .unwrap() + { + break; + } - for dir_name in dir_names { - if !Self::parquet_folder_is_valid(&data_path, dir_name) - .await - .unwrap() - { - break; + db_writer.register_parquet_folder(dir_name).await; + next_start = dir_name.range.to; } - db_writer.register_parquet_folder(dir_name).await; - next_start = dir_name.range.to; + tokio::time::sleep(Duration::from_secs(5)).await; } - - tokio::time::sleep(Duration::from_secs(5)).await; } - } - }); + }); + } // this task runs periodical compactions on the database tokio::spawn(async move { @@ -112,12 +114,14 @@ impl DataCtx { } }); - if let Some(s3_config) = config.s3.into_parsed() { - s3_sync::start(s3_sync::Direction::Down, &config.data_path, &s3_config) - .await - .map_err(Error::StartS3Sync)?; - } else { - log::info!("no s3 config, disabling s3 sync"); + if let Some(data_path) = &config.data_path { + if let Some(s3_config) = config.s3.into_parsed() { + s3_sync::start(s3_sync::Direction::Down, data_path, &s3_config) + .await + .map_err(Error::StartS3Sync)?; + } else { + log::info!("no s3 config, disabling s3 sync"); + } } Ok(Self { config, db }) @@ -478,7 +482,7 @@ impl DataCtx { dir_name: DirName, field_selection: FieldSelection, ) -> Result { - let mut path = self.config.data_path.clone(); + let mut path = self.config.data_path.as_ref().unwrap().clone(); path.push(dir_name.to_string()); let blocks = { @@ -528,7 +532,7 @@ impl DataCtx { dir_name: DirName, field_selection: FieldSelection, ) -> Result { - let mut path = self.config.data_path.clone(); + let mut path = self.config.data_path.as_ref().unwrap().clone(); path.push(dir_name.to_string()); let blocks = { @@ -566,7 +570,7 @@ impl DataCtx { dir_name: DirName, query: &MiniQuery, ) -> Result { - let mut path = self.config.data_path.clone(); + let mut path = self.config.data_path.as_ref().unwrap().clone(); path.push(dir_name.to_string()); let mut blocks = { diff --git a/worker/src/db_writer.rs b/worker/src/db_writer.rs index 92442757..c3e9e40e 100644 --- a/worker/src/db_writer.rs +++ b/worker/src/db_writer.rs @@ -6,7 +6,7 @@ use eth_archive_core::dir_name::DirName; use eth_archive_core::types::{Block, BlockRange, Log}; use polars::export::arrow::array::BinaryArray; use polars::prelude::*; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -16,10 +16,10 @@ pub struct DbWriter { } impl DbWriter { - pub fn new(db: Arc, data_path: &Path) -> Self { + pub fn new(db: Arc, data_path: &Option) -> Self { let (tx, mut rx) = mpsc::channel::(4); - let data_path = data_path.to_owned(); + let data_path = data_path.as_ref().map(|p| p.to_owned()); std::thread::spawn(move || { while let Some(job) = rx.blocking_recv() { @@ -27,7 +27,11 @@ impl DbWriter { let res = match job.clone() { Job::WriteBatches(batches) => db.insert_batches(batches), Job::RegisterParquetFolder(dir_name) => { - Self::handle_register_parquet_folder(&db, &data_path, dir_name) + Self::handle_register_parquet_folder( + &db, + data_path.as_ref().unwrap(), + dir_name, + ) } Job::RunCompaction => { db.compact();