From 862417a47856277c6c1d96cf841f18f6c7c98f37 Mon Sep 17 00:00:00 2001 From: akhercha Date: Thu, 30 May 2024 22:58:47 +0200 Subject: [PATCH] feat(endpoint_onchain_ohlc): Removed monster SQL query and used Rust --- .../src/handlers/entries/get_onchain/ohlc.rs | 1 - .../infra/repositories/onchain_repository.rs | 159 +++++++----------- 2 files changed, 60 insertions(+), 100 deletions(-) diff --git a/pragma-node/src/handlers/entries/get_onchain/ohlc.rs b/pragma-node/src/handlers/entries/get_onchain/ohlc.rs index cc71e9f2..9de94c3a 100644 --- a/pragma-node/src/handlers/entries/get_onchain/ohlc.rs +++ b/pragma-node/src/handlers/entries/get_onchain/ohlc.rs @@ -44,7 +44,6 @@ pub async fn get_onchain_ohlc( params.network, pair_id.clone(), params.interval, - limit, ) .await .map_err(|db_err| db_err.to_entry_error(&pair_id))?; diff --git a/pragma-node/src/infra/repositories/onchain_repository.rs b/pragma-node/src/infra/repositories/onchain_repository.rs index ff5fb82f..a056f996 100644 --- a/pragma-node/src/infra/repositories/onchain_repository.rs +++ b/pragma-node/src/infra/repositories/onchain_repository.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use bigdecimal::BigDecimal; -use chrono::{Timelike, Utc}; +use chrono::{Duration, NaiveDateTime, Timelike}; use deadpool_diesel::postgres::Pool; use diesel::sql_types::{BigInt, Integer, Numeric, Text, Timestamp, VarChar}; use diesel::{Queryable, QueryableByName, RunQueryDsl}; @@ -14,7 +14,7 @@ use crate::handlers::entries::utils::get_decimals_for_pair; use crate::handlers::entries::{Checkpoint, OnchainEntry, Publisher, PublisherEntry}; use crate::utils::format_bigdecimal_price; -use super::entry_repository::{OHLCEntry, OHLCEntryRaw}; +use super::entry_repository::OHLCEntry; const BACKWARD_TIMESTAMP_INTERVAL: &str = "1 hour"; @@ -476,115 +476,76 @@ pub async fn get_publishers_with_components( Ok(publishers_response) } +#[derive(Debug, Queryable, QueryableByName)] +struct RawEntryPriceWithTimestamp { + #[diesel(sql_type = Numeric)] + price: BigDecimal, + #[diesel(sql_type = Timestamp)] + timestamp: NaiveDateTime, +} + pub async fn get_ohlc( pool: &Pool, network: Network, pair_id: String, interval: Interval, - limit: u64, ) -> Result, InfraError> { - let interval_in_minutes = interval.to_minutes(); - let now = Utc::now().naive_utc(); - let current_time = now - chrono::Duration::minutes((now.minute() % interval_in_minutes) as i64); - - let raw_sql = format!( - r#" - WITH raw_data AS ( - SELECT - pair_id, - price, - timestamp, - (date_trunc('minute', timestamp) + interval '{interval_in_minutes} min' * floor(date_part('minute', timestamp) / {interval_in_minutes})) as interval_start - FROM - {table_name} - WHERE - pair_id = '{pair_id}' - ORDER BY - timestamp DESC - ), - earliest_data AS ( - SELECT MIN(timestamp) AS earliest_timestamp FROM raw_data - ), - ohlc AS ( - SELECT - pair_id, - interval_start, - FIRST_VALUE(price) OVER w as "open", - MAX(price) OVER w as "high", - MIN(price) OVER w as "low", - LAST_VALUE(price) OVER w as "close" - FROM - raw_data - WINDOW w AS ( - PARTITION BY pair_id, interval_start - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) - ), - all_intervals AS ( - SELECT generate_series( - date_trunc('hour', (SELECT earliest_timestamp FROM earliest_data)) + interval '1 hour', - '{current_time}'::timestamp, - interval '{interval_in_minutes} minutes' - ) as interval - ), - filled_intervals AS ( - SELECT - ai.interval as "time", - COALESCE( - (SELECT o.open FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1), - lag((SELECT o.open FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1)) over (order by ai.interval) - ) as "open", - COALESCE( - (SELECT o.high FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1), - lag((SELECT o.high FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1)) over (order by ai.interval) - ) as "high", - COALESCE( - (SELECT o.low FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1), - lag((SELECT o.low FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1)) over (order by ai.interval) - ) as "low", - COALESCE( - (SELECT o.close FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1), - lag((SELECT o.close FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1)) over (order by ai.interval) - ) as "close" - FROM - all_intervals ai - LEFT JOIN - ohlc o ON ai.interval = o.interval_start - ), - current_interval AS ( - SELECT - '{now}'::timestamp as time, - o.open, - o.high, - o.low, - o.close - FROM ohlc o - ORDER BY o.interval_start DESC - LIMIT 1 - ) - SELECT DISTINCT * FROM filled_intervals - UNION ALL - SELECT * FROM current_interval - ORDER BY time DESC - LIMIT {limit}; - "#, - table_name = get_table_name(network, DataType::SpotEntry), - pair_id = pair_id, - interval_in_minutes = interval_in_minutes, - limit = limit, - current_time = current_time, - now = now - ); + let interval_in_minutes = interval.to_minutes() as i64; + let table_name = get_table_name(network, DataType::SpotEntry); + // Use prepared statements with parameters let conn = pool.get().await.map_err(adapt_infra_error)?; - let ohlc_entries = conn - .interact(move |conn| diesel::sql_query(raw_sql).load::(conn)) + let raw_entries: Vec = conn + .interact(move |conn| { + diesel::sql_query(format!( + "SELECT price, timestamp FROM {} WHERE pair_id = $1 ORDER BY timestamp;", + table_name + )) + .bind::(&pair_id) + .load::(conn) + }) .await .map_err(adapt_infra_error)? .map_err(adapt_infra_error)?; - let ohlc_entries: Vec = ohlc_entries.into_iter().collect(); + if raw_entries.is_empty() { + return Err(InfraError::NotFound); + } + let ohlc_entries = compute_ohlc(raw_entries, interval_in_minutes); Ok(ohlc_entries) } + +fn compute_ohlc( + raw_entries: Vec, + interval_in_minutes: i64, +) -> Vec { + let mut ohlc_map: HashMap = HashMap::new(); + + for entry in raw_entries { + let interval_key = entry + .timestamp + .date() + .and_hms_opt(entry.timestamp.hour(), 0, 0) + .unwrap() + + Duration::minutes( + (entry.timestamp.minute() as i64 / interval_in_minutes) * interval_in_minutes, + ); + + let ohlc = ohlc_map.entry(interval_key).or_insert_with(|| OHLCEntry { + time: interval_key, + open: entry.price.clone(), + high: entry.price.clone(), + low: entry.price.clone(), + close: entry.price.clone(), + }); + + ohlc.high = std::cmp::max(ohlc.high.clone(), entry.price.clone()); + ohlc.low = std::cmp::min(ohlc.low.clone(), entry.price.clone()); + ohlc.close = entry.price.clone(); + } + + let mut ohlc_entries: Vec = ohlc_map.into_values().collect(); + ohlc_entries.sort_by(|a, b| b.time.cmp(&a.time)); + ohlc_entries +}