Skip to content

Commit

Permalink
feat: read_logs (hybrid) (#417)
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Mar 20, 2024
1 parent 48498ff commit a723fd2
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions src/eth/storage/hybrid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ use async_trait::async_trait;
use metrics::atomics::AtomicU64;
use num_traits::cast::ToPrimitive;
use sqlx::postgres::PgPoolOptions;
use sqlx::types::Json;
use sqlx::Pool;
use sqlx::Postgres;
use sqlx::QueryBuilder;
use sqlx::Row;
use tokio::sync::mpsc;
use tokio::sync::mpsc::channel;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -57,7 +60,7 @@ pub struct HybridPermanentStorage {
pool: Arc<Pool<Postgres>>,
block_number: AtomicU64,
task_sender: mpsc::Sender<BlockTask>,
tasks_pending: Arc<Semaphore>,
tasks_pending: Arc<Semaphore>, // TODO change to Mutex<()>
}

#[derive(Debug)]
Expand Down Expand Up @@ -134,8 +137,7 @@ impl HybridPermanentStorage {
permit = Some(tasks_pending.acquire().await.expect("semaphore has closed"));
}
} else if let Some(_res) = futures.join_next().await {
// res.expect("future failed") XXX
continue;
futures.spawn(query_executor::commit_eventually(pool_clone, block_task));
}
}
}
Expand Down Expand Up @@ -316,20 +318,38 @@ impl PermanentStorage for HybridPermanentStorage {

async fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>> {
tracing::debug!(?filter, "reading logs");
let state_lock = self.lock_read().await;

let logs = state_lock
.logs
.iter()
.skip_while(|log| log.block_number < filter.from_block)
.take_while(|log| match filter.to_block {
Some(to_block) => log.block_number <= to_block,
None => true,
let log_query_builder = &mut QueryBuilder::new(
r#"
SELECT log_data
FROM neo_logs
"#,
);
log_query_builder.push(" WHERE block_number >= ").push_bind(filter.from_block);

// verifies if to_block exists
if let Some(block_number) = filter.to_block {
log_query_builder.push(" AND block_number <= ").push_bind(block_number);
}

for address in filter.addresses.iter() {
log_query_builder.push(" AND address = ").push_bind(address);
}

let log_query = log_query_builder.build();

let query_result = log_query.fetch_all(&*self.pool).await?;

let pg_logs = query_result
.into_iter()
.map(|row| {
let json: Json<LogMined> = row.get("log_data");
json.0
})
.filter(|log| filter.matches(log))
.cloned()
.collect();
Ok(logs)

Ok(pg_logs)
}

async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError> {
Expand Down

0 comments on commit a723fd2

Please sign in to comment.