Skip to content

Commit

Permalink
feat: endpoint ohlc (#43)
Browse files Browse the repository at this point in the history
* feat(endpoint_onchain_ohlc): Initial work for OHLC endpoint

* feat(endpoint_onchain_ohlc): Fixed compilation error

* feat(endpoint_onchain_ohlc): Added ohlc computation

* feat(endpoint_onchain_ohlc): Added FromIterator trait

* feat(endpoint_onchain_ohlc): Fixed earliest timesamp

* feat(endpoint_onchain_ohlc): Removed monster SQL query and used Rust

* feat(endpoint_onchain_ohlc): Updated OHLC computation with limit parameter

* feat(endpoint_onchain_ohlc): TODO

* feat(endpoint_onchain_ohlc): Repushed query

* feat(endpoint_onchain_ohlc): Added ws endpoint for OHLC

* feat(endpoint_onchain_ohlc): compute OHLC data (ws)

* feat(endpoint_onchain_ohlc): refinements from review

* feat(endpoint_onchain_ohlc): Added docs for context
  • Loading branch information
akhercha authored Jun 5, 2024
1 parent 1f6f75d commit 7db884e
Show file tree
Hide file tree
Showing 17 changed files with 544 additions and 49 deletions.
58 changes: 58 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion infra/pragma-ingestor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.75 as builder
FROM rust:1.78 as builder

WORKDIR /home/pragma-ingestor

Expand Down
2 changes: 1 addition & 1 deletion infra/pragma-node/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.75 as builder
FROM rust:1.78 as builder

WORKDIR /home/pragma-node

Expand Down
1 change: 1 addition & 0 deletions pragma-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chrono = { version = "0.4.26", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
123 changes: 109 additions & 14 deletions pragma-common/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{DateTime, Timelike, Utc};
use serde::Deserialize;
use utoipa::ToSchema;

Expand All @@ -12,20 +13,6 @@ pub enum AggregationMode {
Twap,
}

// Supported Aggregation Intervals
#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)]
pub enum Interval {
#[serde(rename = "1min")]
#[default]
OneMinute,
#[serde(rename = "15min")]
FifteenMinutes,
#[serde(rename = "1h")]
OneHour,
#[serde(rename = "2h")]
TwoHours,
}

#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)]
pub enum Network {
#[serde(rename = "testnet")]
Expand All @@ -43,3 +30,111 @@ pub enum DataType {
#[serde(rename = "future_entry")]
FutureEntry,
}

// Supported Aggregation Intervals
#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)]
pub enum Interval {
#[serde(rename = "1min")]
#[default]
OneMinute,
#[serde(rename = "15min")]
FifteenMinutes,
#[serde(rename = "1h")]
OneHour,
#[serde(rename = "2h")]
TwoHours,
}

impl Interval {
pub fn to_minutes(&self) -> i64 {
match self {
Interval::OneMinute => 1,
Interval::FifteenMinutes => 15,
Interval::OneHour => 60,
Interval::TwoHours => 120,
}
}

pub fn to_seconds(&self) -> i64 {
self.to_minutes() * 60
}

/// Align a datetime to the nearest interval boundary.
///
/// This function ensures that the given datetime is aligned to the self interval.
/// For example, if the interval is 15 minutes, a datetime like 20:17 will be
/// adjusted to 20:15, so that it falls on the boundary of the interval.
pub fn align_timestamp(&self, dt: DateTime<Utc>) -> DateTime<Utc> {
let interval_minutes = self.to_minutes();
let dt_minutes = dt.minute() as i64;
let total_minutes = dt.hour() as i64 * 60 + dt_minutes;

let aligned_total_minutes = (total_minutes / interval_minutes) * interval_minutes;
let aligned_hours = aligned_total_minutes / 60;
let aligned_minutes = aligned_total_minutes % 60;

dt.with_minute(aligned_minutes as u32)
.unwrap()
.with_hour(aligned_hours as u32)
.unwrap()
.with_second(0)
.unwrap()
}
}

#[cfg(test)]
mod tests {
use super::Interval;
use chrono::{DateTime, Utc};

#[test]
fn test_align_timestamp() {
let test_inputs = [
(
Interval::OneMinute,
vec![
("2021-01-01T00:00:00Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T00:00:30Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T00:01:00Z", "2021-01-01 00:01:00 UTC"),
("2021-01-01T00:01:30Z", "2021-01-01 00:01:00 UTC"),
],
),
(
Interval::FifteenMinutes,
vec![
("2021-01-01T00:00:00Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T00:00:30Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T00:01:30Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T00:00:30Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T00:15:00Z", "2021-01-01 00:15:00 UTC"),
("2021-01-01T00:22:30Z", "2021-01-01 00:15:00 UTC"),
],
),
(
Interval::OneHour,
vec![
("2021-01-01T00:00:00Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T00:30:00Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T01:00:00Z", "2021-01-01 01:00:00 UTC"),
("2021-01-01T01:30:00Z", "2021-01-01 01:00:00 UTC"),
],
),
(
Interval::TwoHours,
vec![
("2021-01-01T00:00:00Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T01:30:00Z", "2021-01-01 00:00:00 UTC"),
("2021-01-01T02:00:00Z", "2021-01-01 02:00:00 UTC"),
("2021-01-01T02:30:00Z", "2021-01-01 02:00:00 UTC"),
],
),
];
for (interval, test_case) in test_inputs.iter() {
for (input, expected) in test_case.iter() {
let dt: DateTime<Utc> = DateTime::parse_from_rfc3339(input).unwrap().to_utc();
let aligned_dt = interval.align_timestamp(dt);
assert_eq!(aligned_dt.to_string(), *expected);
}
}
}
}
3 changes: 3 additions & 0 deletions pragma-entities/src/models/entry_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum VolatilityError {
pub enum EntryError {
#[error("internal server error")]
InternalServerError,
#[error("bad request")]
BadRequest,
#[error("entry not found: {0}")]
NotFound(String),
#[error("infra error: {0}")]
Expand Down Expand Up @@ -73,6 +75,7 @@ impl IntoResponse for EntryError {
StatusCode::INTERNAL_SERVER_ERROR,
format!("Publisher error: {}", err),
),
Self::BadRequest => (StatusCode::BAD_REQUEST, "Bad request".to_string()),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
String::from("Internal server error"),
Expand Down
21 changes: 21 additions & 0 deletions pragma-entities/src/models/publisher_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,30 @@ use axum::Json;
use serde_json::json;
use utoipa::ToSchema;

use crate::error::InfraError;

#[derive(Debug, thiserror::Error, ToSchema)]
pub enum PublisherError {
#[error("internal server error")]
InternalServerError,
#[error("invalid key : {0}")]
InvalidKey(String),
#[error("invalid address : {0}")]
InvalidAddress(String),
#[error("inactive publisher : {0}")]
InactivePublisher(String),
#[error("no publishers found")]
NotFound,
}

impl From<InfraError> for PublisherError {
fn from(error: InfraError) -> Self {
match error {
InfraError::InternalServerError => Self::InternalServerError,
InfraError::NotFound => Self::NotFound,
_ => Self::InternalServerError,
}
}
}

impl IntoResponse for PublisherError {
Expand All @@ -29,6 +45,11 @@ impl IntoResponse for PublisherError {
StatusCode::FORBIDDEN,
format!("Inactive Publisher: {}", publisher_name),
),
Self::NotFound => (StatusCode::NOT_FOUND, "No publishers found".to_string()),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
"Internal Server Error".to_string(),
),
};
(
status,
Expand Down
2 changes: 1 addition & 1 deletion pragma-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"


[dependencies]
axum = { version = "0.6", features = ["macros"] }
axum = { version = "0.6", features = ["macros", "ws"] }
axum-macros = "0.3"
bigdecimal = { version = "0.4.1", features = ["serde"] }
chrono = { version = "0.4.26", features = ["serde"] }
Expand Down
2 changes: 1 addition & 1 deletion pragma-node/src/handlers/entries/get_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn get_entry(
// Construct pair id
let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1);

let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64;
let now = chrono::Utc::now().timestamp_millis() as u64;

let timestamp = if let Some(timestamp) = params.timestamp {
timestamp
Expand Down
2 changes: 1 addition & 1 deletion pragma-node/src/handlers/entries/get_ohlc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn get_ohlc(
// Construct pair id
let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1);

let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64;
let now = chrono::Utc::now().timestamp_millis() as u64;

let timestamp = if let Some(timestamp) = params.timestamp {
timestamp
Expand Down
2 changes: 1 addition & 1 deletion pragma-node/src/handlers/entries/get_onchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn get_onchain(
tracing::info!("Received get onchain entry request for pair {:?}", pair);

let pair_id: String = currency_pair_to_pair_id(&pair.0, &pair.1);
let now = chrono::Utc::now().naive_utc().and_utc().timestamp() as u64;
let now = chrono::Utc::now().timestamp() as u64;
let timestamp = if let Some(timestamp) = params.timestamp {
if timestamp > now {
return Err(EntryError::InvalidTimestamp);
Expand Down
Loading

0 comments on commit 7db884e

Please sign in to comment.