From 7db884eb840a38ba136ce4fdd88e3f03a704aeae Mon Sep 17 00:00:00 2001 From: adel Date: Wed, 5 Jun 2024 13:38:24 +0200 Subject: [PATCH] feat: endpoint ohlc (#43) * feat(endpoint_onchain_ohlc): Initial work for OHLC endpoint * feat(endpoint_onchain_ohlc): Fixed compilation error * feat(endpoint_onchain_ohlc): Added ohlc computation * feat(endpoint_onchain_ohlc): Added FromIterator trait * feat(endpoint_onchain_ohlc): Fixed earliest timesamp * feat(endpoint_onchain_ohlc): Removed monster SQL query and used Rust * feat(endpoint_onchain_ohlc): Updated OHLC computation with limit parameter * feat(endpoint_onchain_ohlc): TODO * feat(endpoint_onchain_ohlc): Repushed query * feat(endpoint_onchain_ohlc): Added ws endpoint for OHLC * feat(endpoint_onchain_ohlc): compute OHLC data (ws) * feat(endpoint_onchain_ohlc): refinements from review * feat(endpoint_onchain_ohlc): Added docs for context --- Cargo.lock | 58 +++++ infra/pragma-ingestor/Dockerfile | 2 +- infra/pragma-node/Dockerfile | 2 +- pragma-common/Cargo.toml | 1 + pragma-common/src/types.rs | 123 +++++++++-- pragma-entities/src/models/entry_error.rs | 3 + pragma-entities/src/models/publisher_error.rs | 21 ++ pragma-node/Cargo.toml | 2 +- pragma-node/src/handlers/entries/get_entry.rs | 2 +- pragma-node/src/handlers/entries/get_ohlc.rs | 2 +- .../src/handlers/entries/get_onchain/mod.rs | 2 +- .../src/handlers/entries/get_onchain/ohlc.rs | 101 ++++++++- pragma-node/src/handlers/entries/mod.rs | 35 ++- .../infra/repositories/entry_repository.rs | 32 ++- .../infra/repositories/onchain_repository.rs | 200 +++++++++++++++++- pragma-node/src/main.rs | 3 + pragma-node/src/routes.rs | 4 +- 17 files changed, 544 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92ab4a7d..8d3c5ffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,6 +165,7 @@ dependencies = [ "async-trait", "axum-core", "axum-macros", + "base64 0.21.7", "bitflags 1.3.2", "bytes", "futures-util", @@ -183,8 +184,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -477,6 +480,12 @@ dependencies = [ "syn 2.0.57", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "deadpool" version = "0.9.5" @@ -1827,6 +1836,7 @@ dependencies = [ name = "pragma-common" version = "1.0.0" dependencies = [ + "chrono", "serde", "tracing", "tracing-subscriber", @@ -2610,6 +2620,17 @@ dependencies = [ "syn 2.0.57", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -3165,6 +3186,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -3345,6 +3378,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -3410,6 +3462,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utoipa" version = "4.2.0" diff --git a/infra/pragma-ingestor/Dockerfile b/infra/pragma-ingestor/Dockerfile index cfb15427..9196887b 100644 --- a/infra/pragma-ingestor/Dockerfile +++ b/infra/pragma-ingestor/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.75 as builder +FROM rust:1.78 as builder WORKDIR /home/pragma-ingestor diff --git a/infra/pragma-node/Dockerfile b/infra/pragma-node/Dockerfile index 64056e6e..324c6f8d 100644 --- a/infra/pragma-node/Dockerfile +++ b/infra/pragma-node/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.75 as builder +FROM rust:1.78 as builder WORKDIR /home/pragma-node diff --git a/pragma-common/Cargo.toml b/pragma-common/Cargo.toml index a406080c..c18d9433 100644 --- a/pragma-common/Cargo.toml +++ b/pragma-common/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +chrono = { version = "0.4.26", features = ["serde"] } serde = { version = "1.0", features = ["derive"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/pragma-common/src/types.rs b/pragma-common/src/types.rs index 77a47f09..096236f8 100644 --- a/pragma-common/src/types.rs +++ b/pragma-common/src/types.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Timelike, Utc}; use serde::Deserialize; use utoipa::ToSchema; @@ -12,20 +13,6 @@ pub enum AggregationMode { Twap, } -// Supported Aggregation Intervals -#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)] -pub enum Interval { - #[serde(rename = "1min")] - #[default] - OneMinute, - #[serde(rename = "15min")] - FifteenMinutes, - #[serde(rename = "1h")] - OneHour, - #[serde(rename = "2h")] - TwoHours, -} - #[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)] pub enum Network { #[serde(rename = "testnet")] @@ -43,3 +30,111 @@ pub enum DataType { #[serde(rename = "future_entry")] FutureEntry, } + +// Supported Aggregation Intervals +#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)] +pub enum Interval { + #[serde(rename = "1min")] + #[default] + OneMinute, + #[serde(rename = "15min")] + FifteenMinutes, + #[serde(rename = "1h")] + OneHour, + #[serde(rename = "2h")] + TwoHours, +} + +impl Interval { + pub fn to_minutes(&self) -> i64 { + match self { + Interval::OneMinute => 1, + Interval::FifteenMinutes => 15, + Interval::OneHour => 60, + Interval::TwoHours => 120, + } + } + + pub fn to_seconds(&self) -> i64 { + self.to_minutes() * 60 + } + + /// Align a datetime to the nearest interval boundary. + /// + /// This function ensures that the given datetime is aligned to the self interval. + /// For example, if the interval is 15 minutes, a datetime like 20:17 will be + /// adjusted to 20:15, so that it falls on the boundary of the interval. + pub fn align_timestamp(&self, dt: DateTime) -> DateTime { + let interval_minutes = self.to_minutes(); + let dt_minutes = dt.minute() as i64; + let total_minutes = dt.hour() as i64 * 60 + dt_minutes; + + let aligned_total_minutes = (total_minutes / interval_minutes) * interval_minutes; + let aligned_hours = aligned_total_minutes / 60; + let aligned_minutes = aligned_total_minutes % 60; + + dt.with_minute(aligned_minutes as u32) + .unwrap() + .with_hour(aligned_hours as u32) + .unwrap() + .with_second(0) + .unwrap() + } +} + +#[cfg(test)] +mod tests { + use super::Interval; + use chrono::{DateTime, Utc}; + + #[test] + fn test_align_timestamp() { + let test_inputs = [ + ( + Interval::OneMinute, + vec![ + ("2021-01-01T00:00:00Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T00:00:30Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T00:01:00Z", "2021-01-01 00:01:00 UTC"), + ("2021-01-01T00:01:30Z", "2021-01-01 00:01:00 UTC"), + ], + ), + ( + Interval::FifteenMinutes, + vec![ + ("2021-01-01T00:00:00Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T00:00:30Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T00:01:30Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T00:00:30Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T00:15:00Z", "2021-01-01 00:15:00 UTC"), + ("2021-01-01T00:22:30Z", "2021-01-01 00:15:00 UTC"), + ], + ), + ( + Interval::OneHour, + vec![ + ("2021-01-01T00:00:00Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T00:30:00Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T01:00:00Z", "2021-01-01 01:00:00 UTC"), + ("2021-01-01T01:30:00Z", "2021-01-01 01:00:00 UTC"), + ], + ), + ( + Interval::TwoHours, + vec![ + ("2021-01-01T00:00:00Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T01:30:00Z", "2021-01-01 00:00:00 UTC"), + ("2021-01-01T02:00:00Z", "2021-01-01 02:00:00 UTC"), + ("2021-01-01T02:30:00Z", "2021-01-01 02:00:00 UTC"), + ], + ), + ]; + for (interval, test_case) in test_inputs.iter() { + for (input, expected) in test_case.iter() { + let dt: DateTime = DateTime::parse_from_rfc3339(input).unwrap().to_utc(); + let aligned_dt = interval.align_timestamp(dt); + assert_eq!(aligned_dt.to_string(), *expected); + } + } + } +} diff --git a/pragma-entities/src/models/entry_error.rs b/pragma-entities/src/models/entry_error.rs index 6e717275..ddf85661 100644 --- a/pragma-entities/src/models/entry_error.rs +++ b/pragma-entities/src/models/entry_error.rs @@ -17,6 +17,8 @@ pub enum VolatilityError { pub enum EntryError { #[error("internal server error")] InternalServerError, + #[error("bad request")] + BadRequest, #[error("entry not found: {0}")] NotFound(String), #[error("infra error: {0}")] @@ -73,6 +75,7 @@ impl IntoResponse for EntryError { StatusCode::INTERNAL_SERVER_ERROR, format!("Publisher error: {}", err), ), + Self::BadRequest => (StatusCode::BAD_REQUEST, "Bad request".to_string()), _ => ( StatusCode::INTERNAL_SERVER_ERROR, String::from("Internal server error"), diff --git a/pragma-entities/src/models/publisher_error.rs b/pragma-entities/src/models/publisher_error.rs index efaeab68..67260338 100644 --- a/pragma-entities/src/models/publisher_error.rs +++ b/pragma-entities/src/models/publisher_error.rs @@ -4,14 +4,30 @@ use axum::Json; use serde_json::json; use utoipa::ToSchema; +use crate::error::InfraError; + #[derive(Debug, thiserror::Error, ToSchema)] pub enum PublisherError { + #[error("internal server error")] + InternalServerError, #[error("invalid key : {0}")] InvalidKey(String), #[error("invalid address : {0}")] InvalidAddress(String), #[error("inactive publisher : {0}")] InactivePublisher(String), + #[error("no publishers found")] + NotFound, +} + +impl From for PublisherError { + fn from(error: InfraError) -> Self { + match error { + InfraError::InternalServerError => Self::InternalServerError, + InfraError::NotFound => Self::NotFound, + _ => Self::InternalServerError, + } + } } impl IntoResponse for PublisherError { @@ -29,6 +45,11 @@ impl IntoResponse for PublisherError { StatusCode::FORBIDDEN, format!("Inactive Publisher: {}", publisher_name), ), + Self::NotFound => (StatusCode::NOT_FOUND, "No publishers found".to_string()), + _ => ( + StatusCode::INTERNAL_SERVER_ERROR, + "Internal Server Error".to_string(), + ), }; ( status, diff --git a/pragma-node/Cargo.toml b/pragma-node/Cargo.toml index 04fe8342..0be1c13e 100644 --- a/pragma-node/Cargo.toml +++ b/pragma-node/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] -axum = { version = "0.6", features = ["macros"] } +axum = { version = "0.6", features = ["macros", "ws"] } axum-macros = "0.3" bigdecimal = { version = "0.4.1", features = ["serde"] } chrono = { version = "0.4.26", features = ["serde"] } diff --git a/pragma-node/src/handlers/entries/get_entry.rs b/pragma-node/src/handlers/entries/get_entry.rs index 1100901d..c74f416f 100644 --- a/pragma-node/src/handlers/entries/get_entry.rs +++ b/pragma-node/src/handlers/entries/get_entry.rs @@ -34,7 +34,7 @@ pub async fn get_entry( // Construct pair id let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1); - let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64; + let now = chrono::Utc::now().timestamp_millis() as u64; let timestamp = if let Some(timestamp) = params.timestamp { timestamp diff --git a/pragma-node/src/handlers/entries/get_ohlc.rs b/pragma-node/src/handlers/entries/get_ohlc.rs index f6d64c96..10e83c7a 100644 --- a/pragma-node/src/handlers/entries/get_ohlc.rs +++ b/pragma-node/src/handlers/entries/get_ohlc.rs @@ -31,7 +31,7 @@ pub async fn get_ohlc( // Construct pair id let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1); - let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64; + let now = chrono::Utc::now().timestamp_millis() as u64; let timestamp = if let Some(timestamp) = params.timestamp { timestamp diff --git a/pragma-node/src/handlers/entries/get_onchain/mod.rs b/pragma-node/src/handlers/entries/get_onchain/mod.rs index 622ce8fd..c0960556 100644 --- a/pragma-node/src/handlers/entries/get_onchain/mod.rs +++ b/pragma-node/src/handlers/entries/get_onchain/mod.rs @@ -40,7 +40,7 @@ pub async fn get_onchain( tracing::info!("Received get onchain entry request for pair {:?}", pair); let pair_id: String = currency_pair_to_pair_id(&pair.0, &pair.1); - let now = chrono::Utc::now().naive_utc().and_utc().timestamp() as u64; + let now = chrono::Utc::now().timestamp() as u64; let timestamp = if let Some(timestamp) = params.timestamp { if timestamp > now { return Err(EntryError::InvalidTimestamp); diff --git a/pragma-node/src/handlers/entries/get_onchain/ohlc.rs b/pragma-node/src/handlers/entries/get_onchain/ohlc.rs index c68091b3..babc6b67 100644 --- a/pragma-node/src/handlers/entries/get_onchain/ohlc.rs +++ b/pragma-node/src/handlers/entries/get_onchain/ohlc.rs @@ -1,2 +1,99 @@ -// TODO(akhercha): ohldc endpoint -// https://buildonpragma.notion.site/Pragma-API-fc14ba680030470cab61ee58098b135f +use std::time::Duration; + +use axum::extract::{Query, State}; +use axum::response::IntoResponse; +use serde_json::json; + +use pragma_common::types::{Interval, Network}; + +use crate::handlers::entries::utils::currency_pair_to_pair_id; +use crate::handlers::entries::GetOnchainOHLCParams; +use crate::infra::repositories::entry_repository::OHLCEntry; +use crate::infra::repositories::onchain_repository::get_ohlc; +use crate::utils::PathExtractor; +use crate::AppState; + +use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; + +pub const WS_UPDATING_INTERVAL_IN_SECONDS: u64 = 10; + +#[utoipa::path( + get, + path = "/node/v1/onchain/ws/ohlc/{base}/{quote}", + responses( + ( + status = 200, + description = "Get OHLC data for a pair continuously updated through a ws connection", + body = GetOnchainOHLCResponse + ) + ), + params( + ("base" = String, Path, description = "Base Asset"), + ("quote" = String, Path, description = "Quote Asset"), + ("network" = Network, Query, description = "Network"), + ("interval" = Interval, Query, description = "Interval of the OHLC data"), + ), +)] +pub async fn get_onchain_ohlc_ws( + ws: WebSocketUpgrade, + State(state): State, + PathExtractor(pair): PathExtractor<(String, String)>, + Query(params): Query, +) -> impl IntoResponse { + let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1); + ws.on_upgrade(move |socket| { + handle_ohlc_ws(socket, state, pair_id, params.network, params.interval) + }) +} + +async fn handle_ohlc_ws( + mut socket: WebSocket, + state: AppState, + pair_id: String, + network: Network, + interval: Interval, +) { + // Initial OHLC to compute + let mut ohlc_to_compute = 10; + let mut update_interval = + tokio::time::interval(Duration::from_secs(WS_UPDATING_INTERVAL_IN_SECONDS)); + + let mut ohlc_data: Vec = Vec::new(); + + loop { + update_interval.tick().await; + match get_ohlc( + &mut ohlc_data, + &state.postgres_pool, + network, + pair_id.clone(), + interval, + ohlc_to_compute, + ) + .await + { + Ok(()) => { + if socket + .send(Message::Text(serde_json::to_string(&ohlc_data).unwrap())) + .await + .is_err() + { + break; + } + } + Err(e) => { + if socket + .send(Message::Text(json!({ "error": e.to_string() }).to_string())) + .await + .is_err() + { + break; + } + } + } + // After the first request, we only get the latest interval + if !ohlc_data.is_empty() { + ohlc_to_compute = 1; + } + } +} diff --git a/pragma-node/src/handlers/entries/mod.rs b/pragma-node/src/handlers/entries/mod.rs index ba6c0fe8..31e5426a 100644 --- a/pragma-node/src/handlers/entries/mod.rs +++ b/pragma-node/src/handlers/entries/mod.rs @@ -79,7 +79,7 @@ impl Default for GetOnchainParams { Self { network: Network::default(), aggregation: None, - timestamp: Some(chrono::Utc::now().naive_utc().and_utc().timestamp() as u64), + timestamp: Some(chrono::Utc::now().timestamp() as u64), } } } @@ -140,6 +140,17 @@ pub struct GetEntryParams { pub aggregation: Option, } +impl Default for GetEntryParams { + fn default() -> Self { + Self { + timestamp: Some(chrono::Utc::now().timestamp_millis() as u64), + interval: Some(Interval::default()), + routing: Some(false), + aggregation: Some(AggregationMode::default()), + } + } +} + #[derive(Debug, Default, Deserialize, IntoParams, ToSchema)] pub struct GetOnchainPublishersParams { pub network: Network, @@ -167,16 +178,18 @@ pub struct Publisher { pub components: Vec, } -#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Default, Serialize, Deserialize, ToSchema)] pub struct GetOnchainPublishersResponse(pub Vec); -impl Default for GetEntryParams { - fn default() -> Self { - Self { - timestamp: Some(chrono::Utc::now().timestamp_millis() as u64), - interval: Some(Interval::default()), - routing: Some(false), - aggregation: Some(AggregationMode::default()), - } - } +#[derive(Debug, Default, Deserialize, IntoParams, ToSchema)] +pub struct GetOnchainOHLCParams { + pub network: Network, + pub interval: Interval, + pub limit: Option, +} + +#[derive(Debug, Default, Serialize, Deserialize, ToSchema)] +pub struct GetOnchainOHLCResponse { + pub pair_id: String, + pub data: Vec, } diff --git a/pragma-node/src/infra/repositories/entry_repository.rs b/pragma-node/src/infra/repositories/entry_repository.rs index 7c30ad92..6b23d06e 100644 --- a/pragma-node/src/infra/repositories/entry_repository.rs +++ b/pragma-node/src/infra/repositories/entry_repository.rs @@ -463,12 +463,10 @@ pub async fn get_entries_between( end_timestamp: u64, ) -> Result, InfraError> { let conn = pool.get().await.map_err(adapt_infra_error)?; - let start_datetime = DateTime::from_timestamp(start_timestamp as i64, 0) - .ok_or(InfraError::InvalidTimeStamp)? - .naive_utc(); - let end_datetime = DateTime::from_timestamp(end_timestamp as i64, 0) - .ok_or(InfraError::InvalidTimeStamp)? - .naive_utc(); + let start_datetime = + DateTime::from_timestamp(start_timestamp as i64, 0).ok_or(InfraError::InvalidTimeStamp)?; + let end_datetime = + DateTime::from_timestamp(end_timestamp as i64, 0).ok_or(InfraError::InvalidTimeStamp)?; let raw_sql = r#" SELECT @@ -488,8 +486,8 @@ pub async fn get_entries_between( .interact(move |conn| { diesel::sql_query(raw_sql) .bind::(pair_id) - .bind::(start_datetime) - .bind::(end_datetime) + .bind::(start_datetime) + .bind::(end_datetime) .load::(conn) }) .await @@ -571,6 +569,24 @@ pub struct OHLCEntryRaw { pub close: BigDecimal, } +impl From for OHLCEntry { + fn from(raw: OHLCEntryRaw) -> Self { + OHLCEntry { + time: raw.time, + open: raw.open, + high: raw.high, + low: raw.low, + close: raw.close, + } + } +} + +impl FromIterator for Vec { + fn from_iter>(iter: T) -> Self { + iter.into_iter().map(OHLCEntry::from).collect() + } +} + pub async fn get_ohlc( pool: &deadpool_diesel::postgres::Pool, pair_id: String, diff --git a/pragma-node/src/infra/repositories/onchain_repository.rs b/pragma-node/src/infra/repositories/onchain_repository.rs index aeb5b9a1..0133a9ba 100644 --- a/pragma-node/src/infra/repositories/onchain_repository.rs +++ b/pragma-node/src/infra/repositories/onchain_repository.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; use bigdecimal::BigDecimal; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use deadpool_diesel::postgres::Pool; use diesel::sql_types::{BigInt, Integer, Numeric, Text, Timestamptz, VarChar}; use diesel::{Queryable, QueryableByName, RunQueryDsl}; -use pragma_common::types::{AggregationMode, DataType, Network}; +use pragma_common::types::{AggregationMode, DataType, Interval, Network}; use pragma_entities::error::{adapt_infra_error, InfraError}; use pragma_monitoring::models::SpotEntry; @@ -14,15 +14,14 @@ use crate::handlers::entries::utils::get_decimals_for_pair; use crate::handlers::entries::{Checkpoint, OnchainEntry, Publisher, PublisherEntry}; use crate::utils::format_bigdecimal_price; -const BACKWARD_TIMESTAMP_INTERVAL: &str = "1 hour"; +use super::entry_repository::OHLCEntry; -#[allow(dead_code)] +const BACKWARD_TIMESTAMP_INTERVAL: &str = "1 hour"; fn get_table_name(network: Network, data_type: DataType) -> &'static str { match (network, data_type) { (Network::Testnet, DataType::SpotEntry) => "spot_entry", (Network::Mainnet, DataType::SpotEntry) => "mainnet_spot_entry", - // TODO(akhercha): Future tables not used yet (Network::Testnet, DataType::FutureEntry) => "future_entry", (Network::Mainnet, DataType::FutureEntry) => "mainnet_future_entry", } @@ -148,7 +147,6 @@ struct EntryTimestamp { } // TODO(akhercha): Only works for Spot entries -// TODO(akhercha): Give different result than onchain oracle sometimes pub async fn get_last_updated_timestamp( pool: &Pool, network: Network, @@ -344,7 +342,7 @@ async fn get_all_publishers_updates( r#" SELECT publisher, - COUNT(*) FILTER (WHERE block_timestamp >= NOW() - INTERVAL '1 day') AS daily_updates, + COUNT(*) FILTER (WHERE timestamp >= NOW() - INTERVAL '1 day') AS daily_updates, COUNT(*) AS total_updates, COUNT(DISTINCT pair_id) AS nb_feeds FROM @@ -477,3 +475,191 @@ pub async fn get_publishers_with_components( Ok(publishers_response) } + +// --- onchain OHLC --- + +pub async fn get_ohlc( + ohlc_data: &mut Vec, + pool: &Pool, + network: Network, + pair_id: String, + interval: Interval, + data_to_retrieve: i64, +) -> Result<(), InfraError> { + let now = Utc::now(); + let aligned_current_timestamp = interval.align_timestamp(now); + let start_timestamp = if data_to_retrieve > 1 { + aligned_current_timestamp + - Duration::minutes(interval.to_minutes() * (data_to_retrieve * 10)) + } else { + aligned_current_timestamp + }; + + let entries = get_entries_from_timestamp(pool, network, &pair_id, start_timestamp).await?; + update_ohlc_data( + ohlc_data, + entries, + interval, + now, + start_timestamp, + data_to_retrieve == 1, + ); + + Ok(()) +} + +async fn get_entries_from_timestamp( + pool: &Pool, + network: Network, + pair_id: &str, + start_timestamp: DateTime, +) -> Result, InfraError> { + let raw_sql = format!( + r#" + SELECT + * + FROM + {table_name} + WHERE + pair_id = '{pair_id}' + AND timestamp >= '{start_timestamp}' + ORDER BY + timestamp + ASC + "#, + table_name = get_table_name(network, DataType::SpotEntry), + pair_id = pair_id, + start_timestamp = start_timestamp + ); + + let conn = pool.get().await.map_err(adapt_infra_error)?; + let entries: Vec = conn + .interact(move |conn| diesel::sql_query(raw_sql).load::(conn)) + .await + .map_err(adapt_infra_error)? + .map_err(adapt_infra_error)?; + Ok(entries) +} + +/// Compute the OHLC data from the entries for the given interval. +/// +/// The function updates the `ohlc_data` vector with the computed OHLC entries +/// between the `start_timestamp` and the current timestamp. +/// - for the first call, the olhc_data vector is empty and will be populated +/// by multiple OHLC entries - depending on start_timestamp. +/// - for the next calls, the function will update the last OHLC entry in the vector +/// until it closes the current interval. (for example, current timetamp is +/// 23h17 and interval is 15mn: we will update this last non finished interval +/// between 23h15 & 23h17). +/// - at some point, current timestamp will close the current interval and +/// we will lock this last interval, for example in our last example 23h15 +/// to 23h30, if it's 23h30m03s now, we close the 23h15->23h30 interval +/// and start a new one from 23h30 to 23h30m03s (current time). +fn update_ohlc_data( + ohlc_data: &mut Vec, + entries: Vec, + interval: Interval, + now: DateTime, + mut start_timestamp: DateTime, + only_update_last: bool, +) { + let interval_duration = Duration::minutes(interval.to_minutes()); + + // Remove the last not complete interval to update it. + // This is because the last entry correspond to the interval + // closing with current timestamp (so not complete yet). + if only_update_last { + ohlc_data.pop(); + } + + while start_timestamp < now { + let mut end_current_interval = start_timestamp + interval_duration; + let mut ohlc_end_interval = std::cmp::min(end_current_interval, now); + + let last_ohlc_entry: Option<&OHLCEntry> = ohlc_data.last(); + + // If the current time slipped into a new interval, we move + // start_timestamp to the previous interval - so that we + // don't miss the last complete interval + if let Some(last_ohlc_entry) = last_ohlc_entry { + if only_update_last && (ohlc_end_interval - last_ohlc_entry.time) > interval_duration { + start_timestamp = last_ohlc_entry.time; + ohlc_end_interval = interval.align_timestamp(ohlc_end_interval); + end_current_interval = start_timestamp + interval_duration; + } + } + + // get all price entries for the delimited interval + let entries_for_interval = + get_entries_for_interval(&entries, start_timestamp, ohlc_end_interval); + + // & compute ohlc from either price entries / last OHLC computed if no entries + // are available for the current interval + let maybe_ohlc = + compute_ohlc_from_entries(&entries_for_interval, ohlc_end_interval, last_ohlc_entry); + if let Some(ohlc) = maybe_ohlc { + ohlc_data.push(ohlc); + } + + // & increase the timestamp for the next interval + start_timestamp = end_current_interval; + } +} + +fn compute_ohlc_from_entries( + entries: &[&SpotEntry], + end_interval: DateTime, + last_ohlc_computed: Option<&OHLCEntry>, +) -> Option { + if entries.is_empty() && last_ohlc_computed.is_none() { + return None; + } + + if !entries.is_empty() { + // Safe to unwrap since we checked that entries is not empty + Some(OHLCEntry { + open: entries.first().unwrap().price.clone(), + high: entries + .iter() + .map(|entry| entry.price.clone()) + .max() + .unwrap(), + low: entries + .iter() + .map(|entry| entry.price.clone()) + .min() + .unwrap(), + close: entries.last().unwrap().price.clone(), + time: end_interval, + }) + } else if last_ohlc_computed.is_some() { + // If no data is available for the current interval and we have + // a last OHLC computed, we use the last close price as the + // OHLC values for the current interval. + let last_ohlc_computed = last_ohlc_computed.unwrap(); + Some(OHLCEntry { + open: last_ohlc_computed.close.clone(), + high: last_ohlc_computed.close.clone(), + low: last_ohlc_computed.close.clone(), + close: last_ohlc_computed.close.clone(), + time: end_interval, + }) + } else { + None + } +} + +/// Get all entries for a given interval. +/// The interval is defined by the start_timestamp and the end_current_interval. +fn get_entries_for_interval( + entries: &[SpotEntry], + start_timestamp: DateTime, + end_current_interval: DateTime, +) -> Vec<&SpotEntry> { + entries + .iter() + .filter(|entry| { + (entry.timestamp >= start_timestamp) && (entry.timestamp <= end_current_interval) + }) + .collect::>() +} diff --git a/pragma-node/src/main.rs b/pragma-node/src/main.rs index da2ca82f..03a441e8 100644 --- a/pragma-node/src/main.rs +++ b/pragma-node/src/main.rs @@ -36,6 +36,7 @@ async fn main() { handlers::entries::get_onchain::get_onchain, handlers::entries::get_onchain::checkpoints::get_onchain_checkpoints, handlers::entries::get_onchain::publishers::get_onchain_publishers, + handlers::entries::get_onchain::ohlc::get_onchain_ohlc_ws, ), components( schemas(pragma_entities::dto::Entry, pragma_entities::EntryError), @@ -54,6 +55,8 @@ async fn main() { handlers::entries::GetOnchainCheckpointsResponse, handlers::entries::GetOnchainPublishersParams, handlers::entries::GetOnchainPublishersResponse, + handlers::entries::GetOnchainOHLCParams, + handlers::entries::GetOnchainOHLCResponse, ), schemas( handlers::entries::Entry, diff --git a/pragma-node/src/routes.rs b/pragma-node/src/routes.rs index 3bf354b4..63ae83c8 100644 --- a/pragma-node/src/routes.rs +++ b/pragma-node/src/routes.rs @@ -6,7 +6,8 @@ use utoipa::OpenApi as OpenApiT; use utoipa_swagger_ui::SwaggerUi; use crate::handlers::entries::get_onchain::{ - checkpoints::get_onchain_checkpoints, get_onchain, publishers::get_onchain_publishers, + checkpoints::get_onchain_checkpoints, get_onchain, ohlc::get_onchain_ohlc_ws, + publishers::get_onchain_publishers, }; use crate::handlers::entries::{create_entries, get_entry, get_ohlc, get_volatility}; use crate::AppState; @@ -46,6 +47,7 @@ fn onchain_routes(state: AppState) -> Router { .route("/:base/:quote", get(get_onchain)) .route("/checkpoints/:base/:quote", get(get_onchain_checkpoints)) .route("/publishers", get(get_onchain_publishers)) + .route("/ws/ohlc/:base/:quote", get(get_onchain_ohlc_ws)) .with_state(state) }