Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
make data_path optional on worker (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozgrakkurt authored Jan 25, 2023
1 parent a4b1358 commit 1c7deb4
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 64 deletions.
44 changes: 22 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct Config {
pub db_path: PathBuf,
/// Path to read parquet files from
#[clap(long)]
pub data_path: PathBuf,
pub data_path: Option<PathBuf>,
#[command(flatten)]
pub ingest: IngestConfig,
#[command(flatten)]
Expand Down
78 changes: 41 additions & 37 deletions worker/src/data_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,41 +68,43 @@ impl DataCtx {
}
});

// this task checks and registers new parquet files to database
tokio::spawn({
let start = db.parquet_height();
let data_path = config.data_path.clone();
let db_writer = db_writer.clone();
if let Some(data_path) = &config.data_path {
// this task checks and registers new parquet files to database
tokio::spawn({
let start = db.parquet_height();
let data_path = data_path.clone();
let db_writer = db_writer.clone();

async move {
if let Err(e) = tokio::fs::create_dir_all(&data_path).await {
eprintln!(
"failed to create missing data directory:\n{}\nstopping parquet watcher",
e
);
return;
}

async move {
if let Err(e) = tokio::fs::create_dir_all(&data_path).await {
eprintln!(
"failed to create missing data directory:\n{}\nstopping parquet watcher",
e
);
return;
}
let mut next_start = start;
loop {
let dir_names = DirName::find_sorted(&data_path, next_start).await.unwrap();

let mut next_start = start;
loop {
let dir_names = DirName::find_sorted(&data_path, next_start).await.unwrap();
for dir_name in dir_names {
if !Self::parquet_folder_is_valid(&data_path, dir_name)
.await
.unwrap()
{
break;
}

for dir_name in dir_names {
if !Self::parquet_folder_is_valid(&data_path, dir_name)
.await
.unwrap()
{
break;
db_writer.register_parquet_folder(dir_name).await;
next_start = dir_name.range.to;
}

db_writer.register_parquet_folder(dir_name).await;
next_start = dir_name.range.to;
tokio::time::sleep(Duration::from_secs(5)).await;
}

tokio::time::sleep(Duration::from_secs(5)).await;
}
}
});
});
}

// this task runs periodical compactions on the database
tokio::spawn(async move {
Expand All @@ -112,12 +114,14 @@ impl DataCtx {
}
});

if let Some(s3_config) = config.s3.into_parsed() {
s3_sync::start(s3_sync::Direction::Down, &config.data_path, &s3_config)
.await
.map_err(Error::StartS3Sync)?;
} else {
log::info!("no s3 config, disabling s3 sync");
if let Some(data_path) = &config.data_path {
if let Some(s3_config) = config.s3.into_parsed() {
s3_sync::start(s3_sync::Direction::Down, data_path, &s3_config)
.await
.map_err(Error::StartS3Sync)?;
} else {
log::info!("no s3 config, disabling s3 sync");
}
}

Ok(Self { config, db })
Expand Down Expand Up @@ -478,7 +482,7 @@ impl DataCtx {
dir_name: DirName,
field_selection: FieldSelection,
) -> Result<LazyFrame> {
let mut path = self.config.data_path.clone();
let mut path = self.config.data_path.as_ref().unwrap().clone();
path.push(dir_name.to_string());

let blocks = {
Expand Down Expand Up @@ -528,7 +532,7 @@ impl DataCtx {
dir_name: DirName,
field_selection: FieldSelection,
) -> Result<LazyFrame> {
let mut path = self.config.data_path.clone();
let mut path = self.config.data_path.as_ref().unwrap().clone();
path.push(dir_name.to_string());

let blocks = {
Expand Down Expand Up @@ -566,7 +570,7 @@ impl DataCtx {
dir_name: DirName,
query: &MiniQuery,
) -> Result<LazyFrame> {
let mut path = self.config.data_path.clone();
let mut path = self.config.data_path.as_ref().unwrap().clone();
path.push(dir_name.to_string());

let mut blocks = {
Expand Down
12 changes: 8 additions & 4 deletions worker/src/db_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use eth_archive_core::dir_name::DirName;
use eth_archive_core::types::{Block, BlockRange, Log};
use polars::export::arrow::array::BinaryArray;
use polars::prelude::*;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
Expand All @@ -16,18 +16,22 @@ pub struct DbWriter {
}

impl DbWriter {
pub fn new(db: Arc<DbHandle>, data_path: &Path) -> Self {
pub fn new(db: Arc<DbHandle>, data_path: &Option<PathBuf>) -> Self {
let (tx, mut rx) = mpsc::channel::<Job>(4);

let data_path = data_path.to_owned();
let data_path = data_path.as_ref().map(|p| p.to_owned());

std::thread::spawn(move || {
while let Some(job) = rx.blocking_recv() {
loop {
let res = match job.clone() {
Job::WriteBatches(batches) => db.insert_batches(batches),
Job::RegisterParquetFolder(dir_name) => {
Self::handle_register_parquet_folder(&db, &data_path, dir_name)
Self::handle_register_parquet_folder(
&db,
data_path.as_ref().unwrap(),
dir_name,
)
}
Job::RunCompaction => {
db.compact();
Expand Down

0 comments on commit 1c7deb4

Please sign in to comment.