diff --git a/core/src/rayon_async.rs b/core/src/rayon_async.rs index fd109e2..5807552 100644 --- a/core/src/rayon_async.rs +++ b/core/src/rayon_async.rs @@ -9,7 +9,7 @@ where rayon::spawn(move || { let res = func(); - tx.send(res).ok().unwrap(); + tx.send(res).ok(); }); rx.await.unwrap() diff --git a/worker/src/parquet_query/block.rs b/worker/src/parquet_query/block.rs index bacad7e..ec2a86f 100644 --- a/worker/src/parquet_query/block.rs +++ b/worker/src/parquet_query/block.rs @@ -68,22 +68,14 @@ pub async fn query_blocks( .read() .await?; - tokio::task::spawn_blocking(move || { - let mut blocks = BTreeMap::new(); - dbg!(); - while let Some(res) = chunk_rx.blocking_recv() { - dbg!(); - let (i, columns) = res?; - let block_nums = &pruned_blocks_per_rg[i]; - process_cols(&query.mini_query, block_nums, columns, &mut blocks); - dbg!(); - } - dbg!(); + let mut blocks = BTreeMap::new(); + while let Some(res) = chunk_rx.recv().await { + let (i, columns) = res?; + let block_nums = &pruned_blocks_per_rg[i]; + process_cols(&query.mini_query, block_nums, columns, &mut blocks); + } - Ok(blocks) - }) - .await - .unwrap() + Ok(blocks) } fn process_cols( diff --git a/worker/src/parquet_query/log.rs b/worker/src/parquet_query/log.rs index c3c7ebd..dad6a90 100644 --- a/worker/src/parquet_query/log.rs +++ b/worker/src/parquet_query/log.rs @@ -89,26 +89,20 @@ pub async fn query_logs( .read() .await?; - tokio::task::spawn_blocking(move || { - let mut query_result = LogQueryResult { - logs: BTreeMap::new(), - transactions: BTreeSet::new(), - blocks: BTreeSet::new(), - }; + let mut query_result = LogQueryResult { + logs: BTreeMap::new(), + transactions: BTreeSet::new(), + blocks: BTreeSet::new(), + }; + while let Some(res) = chunk_rx.recv().await { dbg!(); - while let Some(res) = chunk_rx.blocking_recv() { - dbg!(); - let (i, columns) = res?; - let log_queries = &pruned_queries_per_rg[i]; - process_cols(&query.mini_query, log_queries, columns, &mut query_result); - dbg!(); - } + let (i, columns) = res?; + let log_queries = &pruned_queries_per_rg[i]; + process_cols(&query.mini_query, log_queries, columns, &mut query_result); dbg!(); + } - Ok(query_result) - }) - .await - .unwrap() + Ok(query_result) } fn process_cols( diff --git a/worker/src/parquet_query/read.rs b/worker/src/parquet_query/read.rs index 8eb1867..d2cab43 100644 --- a/worker/src/parquet_query/read.rs +++ b/worker/src/parquet_query/read.rs @@ -51,8 +51,8 @@ impl bool> ReadParquet { for (i, rg_meta) in metadata.row_groups.into_iter().enumerate() { if (self.rg_filter)(i) { let fields = self.fields.clone(); - let tx = tx.clone(); let path = self.path.clone(); + let tx = tx.clone(); tokio::task::spawn(async move { let open_reader = move || { let path = path.clone(); @@ -81,7 +81,8 @@ impl bool> ReadParquet { }; let mut num_rows = rg_meta.num_rows(); - rayon_async::spawn(move || { + let chunks = rayon_async::spawn(move || { + let mut chunks = Vec::new(); while num_rows > 0 { num_rows = num_rows.saturating_sub(CHUNK_SIZE); let chunk = @@ -98,10 +99,14 @@ impl bool> ReadParquet { (i, c) }); - tx.blocking_send(chunk).ok(); + chunks.push(chunk); } - }) - .await; + + chunks + }).await; + for chunk in chunks { + tx.send(chunk).await.ok(); + } }); } } diff --git a/worker/src/parquet_query/transaction.rs b/worker/src/parquet_query/transaction.rs index 75cd9f0..af9897e 100644 --- a/worker/src/parquet_query/transaction.rs +++ b/worker/src/parquet_query/transaction.rs @@ -104,30 +104,22 @@ pub async fn query_transactions( .read() .await?; - tokio::task::spawn_blocking(move || { - let mut blocks = blocks; - let mut transactions = BTreeMap::new(); - dbg!(); - while let Some(res) = chunk_rx.blocking_recv() { - dbg!(); - let (i, columns) = res?; - let queries = &pruned_queries_per_rg[i]; - process_cols( - &query.mini_query, - &queries.0, - &queries.1, - columns, - &mut blocks, - &mut transactions, - ); - dbg!(); - } - dbg!(); + let mut blocks = blocks; + let mut transactions = BTreeMap::new(); + while let Some(res) = chunk_rx.recv().await { + let (i, columns) = res?; + let queries = &pruned_queries_per_rg[i]; + process_cols( + &query.mini_query, + &queries.0, + &queries.1, + columns, + &mut blocks, + &mut transactions, + ); + } - Ok((transactions, blocks)) - }) - .await - .unwrap() + Ok((transactions, blocks)) } fn process_cols(