Skip to content

Commit

Permalink
feat(endpoint_onchain_ohlc): Removed monster SQL query and used Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed May 30, 2024
1 parent 2323f06 commit 862417a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 100 deletions.
1 change: 0 additions & 1 deletion pragma-node/src/handlers/entries/get_onchain/ohlc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub async fn get_onchain_ohlc(
params.network,
pair_id.clone(),
params.interval,
limit,
)
.await
.map_err(|db_err| db_err.to_entry_error(&pair_id))?;
Expand Down
159 changes: 60 additions & 99 deletions pragma-node/src/infra/repositories/onchain_repository.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use bigdecimal::BigDecimal;
use chrono::{Timelike, Utc};
use chrono::{Duration, NaiveDateTime, Timelike};
use deadpool_diesel::postgres::Pool;
use diesel::sql_types::{BigInt, Integer, Numeric, Text, Timestamp, VarChar};
use diesel::{Queryable, QueryableByName, RunQueryDsl};
Expand All @@ -14,7 +14,7 @@ use crate::handlers::entries::utils::get_decimals_for_pair;
use crate::handlers::entries::{Checkpoint, OnchainEntry, Publisher, PublisherEntry};
use crate::utils::format_bigdecimal_price;

use super::entry_repository::{OHLCEntry, OHLCEntryRaw};
use super::entry_repository::OHLCEntry;

const BACKWARD_TIMESTAMP_INTERVAL: &str = "1 hour";

Expand Down Expand Up @@ -476,115 +476,76 @@ pub async fn get_publishers_with_components(
Ok(publishers_response)
}

#[derive(Debug, Queryable, QueryableByName)]
struct RawEntryPriceWithTimestamp {
#[diesel(sql_type = Numeric)]
price: BigDecimal,
#[diesel(sql_type = Timestamp)]
timestamp: NaiveDateTime,
}

pub async fn get_ohlc(
pool: &Pool,
network: Network,
pair_id: String,
interval: Interval,
limit: u64,
) -> Result<Vec<OHLCEntry>, InfraError> {
let interval_in_minutes = interval.to_minutes();
let now = Utc::now().naive_utc();
let current_time = now - chrono::Duration::minutes((now.minute() % interval_in_minutes) as i64);

let raw_sql = format!(
r#"
WITH raw_data AS (
SELECT
pair_id,
price,
timestamp,
(date_trunc('minute', timestamp) + interval '{interval_in_minutes} min' * floor(date_part('minute', timestamp) / {interval_in_minutes})) as interval_start
FROM
{table_name}
WHERE
pair_id = '{pair_id}'
ORDER BY
timestamp DESC
),
earliest_data AS (
SELECT MIN(timestamp) AS earliest_timestamp FROM raw_data
),
ohlc AS (
SELECT
pair_id,
interval_start,
FIRST_VALUE(price) OVER w as "open",
MAX(price) OVER w as "high",
MIN(price) OVER w as "low",
LAST_VALUE(price) OVER w as "close"
FROM
raw_data
WINDOW w AS (
PARTITION BY pair_id, interval_start
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
)
),
all_intervals AS (
SELECT generate_series(
date_trunc('hour', (SELECT earliest_timestamp FROM earliest_data)) + interval '1 hour',
'{current_time}'::timestamp,
interval '{interval_in_minutes} minutes'
) as interval
),
filled_intervals AS (
SELECT
ai.interval as "time",
COALESCE(
(SELECT o.open FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1),
lag((SELECT o.open FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1)) over (order by ai.interval)
) as "open",
COALESCE(
(SELECT o.high FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1),
lag((SELECT o.high FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1)) over (order by ai.interval)
) as "high",
COALESCE(
(SELECT o.low FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1),
lag((SELECT o.low FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1)) over (order by ai.interval)
) as "low",
COALESCE(
(SELECT o.close FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1),
lag((SELECT o.close FROM ohlc o WHERE o.interval_start <= ai.interval ORDER BY o.interval_start DESC LIMIT 1)) over (order by ai.interval)
) as "close"
FROM
all_intervals ai
LEFT JOIN
ohlc o ON ai.interval = o.interval_start
),
current_interval AS (
SELECT
'{now}'::timestamp as time,
o.open,
o.high,
o.low,
o.close
FROM ohlc o
ORDER BY o.interval_start DESC
LIMIT 1
)
SELECT DISTINCT * FROM filled_intervals
UNION ALL
SELECT * FROM current_interval
ORDER BY time DESC
LIMIT {limit};
"#,
table_name = get_table_name(network, DataType::SpotEntry),
pair_id = pair_id,
interval_in_minutes = interval_in_minutes,
limit = limit,
current_time = current_time,
now = now
);
let interval_in_minutes = interval.to_minutes() as i64;
let table_name = get_table_name(network, DataType::SpotEntry);

// Use prepared statements with parameters
let conn = pool.get().await.map_err(adapt_infra_error)?;
let ohlc_entries = conn
.interact(move |conn| diesel::sql_query(raw_sql).load::<OHLCEntryRaw>(conn))
let raw_entries: Vec<RawEntryPriceWithTimestamp> = conn
.interact(move |conn| {
diesel::sql_query(format!(
"SELECT price, timestamp FROM {} WHERE pair_id = $1 ORDER BY timestamp;",
table_name
))
.bind::<diesel::sql_types::Text, _>(&pair_id)
.load::<RawEntryPriceWithTimestamp>(conn)
})
.await
.map_err(adapt_infra_error)?
.map_err(adapt_infra_error)?;

let ohlc_entries: Vec<OHLCEntry> = ohlc_entries.into_iter().collect();
if raw_entries.is_empty() {
return Err(InfraError::NotFound);
}

let ohlc_entries = compute_ohlc(raw_entries, interval_in_minutes);
Ok(ohlc_entries)
}

fn compute_ohlc(
raw_entries: Vec<RawEntryPriceWithTimestamp>,
interval_in_minutes: i64,
) -> Vec<OHLCEntry> {
let mut ohlc_map: HashMap<NaiveDateTime, OHLCEntry> = HashMap::new();

for entry in raw_entries {
let interval_key = entry
.timestamp
.date()
.and_hms_opt(entry.timestamp.hour(), 0, 0)
.unwrap()
+ Duration::minutes(
(entry.timestamp.minute() as i64 / interval_in_minutes) * interval_in_minutes,
);

let ohlc = ohlc_map.entry(interval_key).or_insert_with(|| OHLCEntry {
time: interval_key,
open: entry.price.clone(),
high: entry.price.clone(),
low: entry.price.clone(),
close: entry.price.clone(),
});

ohlc.high = std::cmp::max(ohlc.high.clone(), entry.price.clone());
ohlc.low = std::cmp::min(ohlc.low.clone(), entry.price.clone());
ohlc.close = entry.price.clone();
}

let mut ohlc_entries: Vec<OHLCEntry> = ohlc_map.into_values().collect();
ohlc_entries.sort_by(|a, b| b.time.cmp(&a.time));
ohlc_entries
}

0 comments on commit 862417a

Please sign in to comment.