Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
Deadlock fix (#181)
Browse files Browse the repository at this point in the history
- Remove usages of channels in blocking threads as much as possible. This is to prevent deadlocks in case of low number of available threads in the threadpools.
  • Loading branch information
ozgrakkurt authored Mar 15, 2023
1 parent d302599 commit a18815d
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[env]
CARGO_MAKE_EXTEND_WORKSPACE_MAKEFILE = true
RUST_LOG = "info,aws_config=off"
RUST_LOG = "debug,aws_config=off,hyper=off,h2=off"

[tasks.ingester]
workspace = false
Expand Down
20 changes: 0 additions & 20 deletions worker/src/data_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ impl DataCtx {
}
.run()
.await;

tx.send((res, block_range)).ok();
});
}
Expand Down Expand Up @@ -311,22 +310,3 @@ impl DataCtx {
Ok(())
}
}

impl FieldSelection {
fn with_join_columns(mut self) -> Self {
self.block.number = true;
self.transaction.hash = true;
self.transaction.block_number = true;
self.transaction.transaction_index = true;
self.transaction.dest = true;
self.transaction.source = true;
self.transaction.status = true;
self.log.block_number = true;
self.log.log_index = true;
self.log.transaction_index = true;
self.log.address = true;
self.log.topics = true;

self
}
}
19 changes: 19 additions & 0 deletions worker/src/field_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ pub struct FieldSelection {
pub log: LogFieldSelection,
}

impl FieldSelection {
pub fn with_join_columns(mut self) -> Self {
self.block.number = true;
self.transaction.hash = true;
self.transaction.block_number = true;
self.transaction.transaction_index = true;
self.transaction.dest = true;
self.transaction.source = true;
self.transaction.status = true;
self.log.block_number = true;
self.log.log_index = true;
self.log.transaction_index = true;
self.log.address = true;
self.log.topics = true;

self
}
}

#[derive(
Serialize,
Deserialize,
Expand Down
20 changes: 8 additions & 12 deletions worker/src/parquet_query/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,22 @@ pub async fn query_blocks(
}
};

let chunk_rx = ReadParquet {
let mut chunk_rx = ReadParquet {
path,
rg_filter,
fields,
}
.read()
.await?;

tokio::task::spawn_blocking(move || {
let mut blocks = BTreeMap::new();
while let Ok(res) = chunk_rx.recv() {
let (i, columns) = res?;
let block_nums = &pruned_blocks_per_rg[i];
process_cols(&query.mini_query, block_nums, columns, &mut blocks);
}
let mut blocks = BTreeMap::new();
while let Some(res) = chunk_rx.recv().await {
let (i, columns) = res?;
let block_nums = &pruned_blocks_per_rg[i];
process_cols(&query.mini_query, block_nums, columns, &mut blocks);
}

Ok(blocks)
})
.await
.unwrap()
Ok(blocks)
}

fn process_cols(
Expand Down
28 changes: 12 additions & 16 deletions worker/src/parquet_query/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,30 +80,26 @@ pub async fn query_logs(
!val.is_empty()
};

let chunk_rx = ReadParquet {
let mut chunk_rx = ReadParquet {
path,
rg_filter,
fields,
}
.read()
.await?;

tokio::task::spawn_blocking(move || {
let mut query_result = LogQueryResult {
logs: BTreeMap::new(),
transactions: BTreeSet::new(),
blocks: BTreeSet::new(),
};
while let Ok(res) = chunk_rx.recv() {
let (i, columns) = res?;
let log_queries = &pruned_queries_per_rg[i];
process_cols(&query.mini_query, log_queries, columns, &mut query_result);
}
let mut query_result = LogQueryResult {
logs: BTreeMap::new(),
transactions: BTreeSet::new(),
blocks: BTreeSet::new(),
};
while let Some(res) = chunk_rx.recv().await {
let (i, columns) = res?;
let log_queries = &pruned_queries_per_rg[i];
process_cols(&query.mini_query, log_queries, columns, &mut query_result);
}

Ok(query_result)
})
.await
.unwrap()
Ok(query_result)
}

fn process_cols(
Expand Down
24 changes: 15 additions & 9 deletions worker/src/parquet_query/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use arrow2::datatypes::Field;
use arrow2::io::parquet;
use arrow2::io::parquet::read::ArrayIter;
use eth_archive_core::hash::HashMap;
use eth_archive_core::rayon_async;
use futures::future::BoxFuture;
use rayon::prelude::*;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::BufReader;
use tokio::sync::mpsc;
use tokio_util::compat::TokioAsyncReadCompatExt;

pub struct ReadParquet<F: Fn(usize) -> bool> {
Expand Down Expand Up @@ -45,13 +47,12 @@ impl<F: Fn(usize) -> bool> ReadParquet<F> {
.map_err(Error::ReadParquet)?
};

let (tx, rx) = crossbeam::channel::unbounded();

let (tx, rx) = mpsc::channel(metadata.row_groups.len());
for (i, rg_meta) in metadata.row_groups.into_iter().enumerate() {
if (self.rg_filter)(i) {
let fields = self.fields.clone();
let tx = tx.clone();
let path = self.path.clone();
let tx = tx.clone();
tokio::task::spawn(async move {
let open_reader = move || {
let path = path.clone();
Expand All @@ -74,13 +75,14 @@ impl<F: Fn(usize) -> bool> ReadParquet<F> {
{
Ok(columns) => columns,
Err(e) => {
tx.send(Err(e)).ok();
tx.send(Err(e)).await.ok();
return;
}
};

let mut num_rows = rg_meta.num_rows();
tokio::task::spawn_blocking(move || {
let chunks = rayon_async::spawn(move || {
let mut chunks = Vec::new();
while num_rows > 0 {
num_rows = num_rows.saturating_sub(CHUNK_SIZE);
let chunk =
Expand All @@ -97,11 +99,15 @@ impl<F: Fn(usize) -> bool> ReadParquet<F> {
(i, c)
});

tx.send(chunk).ok();
chunks.push(chunk);
}

chunks
})
.await
.unwrap();
.await;
for chunk in chunks {
tx.send(chunk).await.ok();
}
});
}
}
Expand All @@ -110,5 +116,5 @@ impl<F: Fn(usize) -> bool> ReadParquet<F> {
}
}

type ChunkReceiver = crossbeam::channel::Receiver<ChunkRes>;
type ChunkReceiver = mpsc::Receiver<ChunkRes>;
type ChunkRes = Result<(usize, HashMap<String, Box<dyn Array>>)>;
36 changes: 16 additions & 20 deletions worker/src/parquet_query/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,34 +95,30 @@ pub async fn query_transactions(
!tx_queries.is_empty() || !tx_ids.is_empty()
};

let chunk_rx = ReadParquet {
let mut chunk_rx = ReadParquet {
path,
rg_filter,
fields,
}
.read()
.await?;

tokio::task::spawn_blocking(move || {
let mut blocks = blocks;
let mut transactions = BTreeMap::new();
while let Ok(res) = chunk_rx.recv() {
let (i, columns) = res?;
let queries = &pruned_queries_per_rg[i];
process_cols(
&query.mini_query,
&queries.0,
&queries.1,
columns,
&mut blocks,
&mut transactions,
);
}
let mut blocks = blocks;
let mut transactions = BTreeMap::new();
while let Some(res) = chunk_rx.recv().await {
let (i, columns) = res?;
let queries = &pruned_queries_per_rg[i];
process_cols(
&query.mini_query,
&queries.0,
&queries.1,
columns,
&mut blocks,
&mut transactions,
);
}

Ok((transactions, blocks))
})
.await
.unwrap()
Ok((transactions, blocks))
}

fn process_cols(
Expand Down
3 changes: 2 additions & 1 deletion worker/src/serialize_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ impl SerializeTask {
pub async fn join(self) -> Result<Vec<u8>> {
mem::drop(self.tx);

self.join_handle.await.map_err(Error::TaskJoinError)
let res = self.join_handle.await.map_err(Error::TaskJoinError);
res
}

pub async fn send(&self, msg: (QueryResult, BlockRange)) -> bool {
Expand Down

0 comments on commit a18815d

Please sign in to comment.