Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
qwe
Browse files Browse the repository at this point in the history
  • Loading branch information
ozgrakkurt committed Mar 15, 2023
1 parent 90f9823 commit e1fda79
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 61 deletions.
2 changes: 1 addition & 1 deletion core/src/rayon_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ where

rayon::spawn(move || {
let res = func();
tx.send(res).ok().unwrap();
tx.send(res).ok();
});

rx.await.unwrap()
Expand Down
22 changes: 7 additions & 15 deletions worker/src/parquet_query/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,14 @@ pub async fn query_blocks(
.read()
.await?;

tokio::task::spawn_blocking(move || {
let mut blocks = BTreeMap::new();
dbg!();
while let Some(res) = chunk_rx.blocking_recv() {
dbg!();
let (i, columns) = res?;
let block_nums = &pruned_blocks_per_rg[i];
process_cols(&query.mini_query, block_nums, columns, &mut blocks);
dbg!();
}
dbg!();
let mut blocks = BTreeMap::new();
while let Some(res) = chunk_rx.recv().await {
let (i, columns) = res?;
let block_nums = &pruned_blocks_per_rg[i];
process_cols(&query.mini_query, block_nums, columns, &mut blocks);
}

Ok(blocks)
})
.await
.unwrap()
Ok(blocks)
}

fn process_cols(
Expand Down
28 changes: 11 additions & 17 deletions worker/src/parquet_query/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,20 @@ pub async fn query_logs(
.read()
.await?;

tokio::task::spawn_blocking(move || {
let mut query_result = LogQueryResult {
logs: BTreeMap::new(),
transactions: BTreeSet::new(),
blocks: BTreeSet::new(),
};
let mut query_result = LogQueryResult {
logs: BTreeMap::new(),
transactions: BTreeSet::new(),
blocks: BTreeSet::new(),
};
while let Some(res) = chunk_rx.recv().await {
dbg!();
while let Some(res) = chunk_rx.blocking_recv() {
dbg!();
let (i, columns) = res?;
let log_queries = &pruned_queries_per_rg[i];
process_cols(&query.mini_query, log_queries, columns, &mut query_result);
dbg!();
}
let (i, columns) = res?;
let log_queries = &pruned_queries_per_rg[i];
process_cols(&query.mini_query, log_queries, columns, &mut query_result);
dbg!();
}

Ok(query_result)
})
.await
.unwrap()
Ok(query_result)
}

fn process_cols(
Expand Down
15 changes: 10 additions & 5 deletions worker/src/parquet_query/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl<F: Fn(usize) -> bool> ReadParquet<F> {
for (i, rg_meta) in metadata.row_groups.into_iter().enumerate() {
if (self.rg_filter)(i) {
let fields = self.fields.clone();
let tx = tx.clone();
let path = self.path.clone();
let tx = tx.clone();
tokio::task::spawn(async move {
let open_reader = move || {
let path = path.clone();
Expand Down Expand Up @@ -81,7 +81,8 @@ impl<F: Fn(usize) -> bool> ReadParquet<F> {
};

let mut num_rows = rg_meta.num_rows();
rayon_async::spawn(move || {
let chunks = rayon_async::spawn(move || {
let mut chunks = Vec::new();
while num_rows > 0 {
num_rows = num_rows.saturating_sub(CHUNK_SIZE);
let chunk =
Expand All @@ -98,10 +99,14 @@ impl<F: Fn(usize) -> bool> ReadParquet<F> {
(i, c)
});

tx.blocking_send(chunk).ok();
chunks.push(chunk);
}
})
.await;

chunks
}).await;
for chunk in chunks {
tx.send(chunk).await.ok();
}
});
}
}
Expand Down
38 changes: 15 additions & 23 deletions worker/src/parquet_query/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,30 +104,22 @@ pub async fn query_transactions(
.read()
.await?;

tokio::task::spawn_blocking(move || {
let mut blocks = blocks;
let mut transactions = BTreeMap::new();
dbg!();
while let Some(res) = chunk_rx.blocking_recv() {
dbg!();
let (i, columns) = res?;
let queries = &pruned_queries_per_rg[i];
process_cols(
&query.mini_query,
&queries.0,
&queries.1,
columns,
&mut blocks,
&mut transactions,
);
dbg!();
}
dbg!();
let mut blocks = blocks;
let mut transactions = BTreeMap::new();
while let Some(res) = chunk_rx.recv().await {
let (i, columns) = res?;
let queries = &pruned_queries_per_rg[i];
process_cols(
&query.mini_query,
&queries.0,
&queries.1,
columns,
&mut blocks,
&mut transactions,
);
}

Ok((transactions, blocks))
})
.await
.unwrap()
Ok((transactions, blocks))
}

fn process_cols(
Expand Down

0 comments on commit e1fda79

Please sign in to comment.