diff --git a/src/eth/storage/hybrid/mod.rs b/src/eth/storage/hybrid/mod.rs index d687edb35..792f575d2 100644 --- a/src/eth/storage/hybrid/mod.rs +++ b/src/eth/storage/hybrid/mod.rs @@ -11,8 +11,11 @@ use async_trait::async_trait; use metrics::atomics::AtomicU64; use num_traits::cast::ToPrimitive; use sqlx::postgres::PgPoolOptions; +use sqlx::types::Json; use sqlx::Pool; use sqlx::Postgres; +use sqlx::QueryBuilder; +use sqlx::Row; use tokio::sync::mpsc; use tokio::sync::mpsc::channel; use tokio::sync::RwLock; @@ -57,7 +60,7 @@ pub struct HybridPermanentStorage { pool: Arc>, block_number: AtomicU64, task_sender: mpsc::Sender, - tasks_pending: Arc, + tasks_pending: Arc, // TODO change to Mutex<()> } #[derive(Debug)] @@ -134,8 +137,7 @@ impl HybridPermanentStorage { permit = Some(tasks_pending.acquire().await.expect("semaphore has closed")); } } else if let Some(_res) = futures.join_next().await { - // res.expect("future failed") XXX - continue; + futures.spawn(query_executor::commit_eventually(pool_clone, block_task)); } } } @@ -316,20 +318,38 @@ impl PermanentStorage for HybridPermanentStorage { async fn read_logs(&self, filter: &LogFilter) -> anyhow::Result> { tracing::debug!(?filter, "reading logs"); - let state_lock = self.lock_read().await; - let logs = state_lock - .logs - .iter() - .skip_while(|log| log.block_number < filter.from_block) - .take_while(|log| match filter.to_block { - Some(to_block) => log.block_number <= to_block, - None => true, + let log_query_builder = &mut QueryBuilder::new( + r#" + SELECT log_data + FROM neo_logs + "#, + ); + log_query_builder.push(" WHERE block_number >= ").push_bind(filter.from_block); + + // verifies if to_block exists + if let Some(block_number) = filter.to_block { + log_query_builder.push(" AND block_number <= ").push_bind(block_number); + } + + for address in filter.addresses.iter() { + log_query_builder.push(" AND address = ").push_bind(address); + } + + let log_query = log_query_builder.build(); + + let query_result = log_query.fetch_all(&*self.pool).await?; + + let pg_logs = query_result + .into_iter() + .map(|row| { + let json: Json = row.get("log_data"); + json.0 }) .filter(|log| filter.matches(log)) - .cloned() .collect(); - Ok(logs) + + Ok(pg_logs) } async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError> {