diff --git a/Cargo.lock b/Cargo.lock index 7df23877..88dd1fe0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1874,7 +1874,7 @@ dependencies = [ [[package]] name = "pragma-monitoring" version = "0.1.0" -source = "git+https://github.com/astraly-labs/pragma-monitoring?rev=5536301#5536301ccac1f5aca23ed86471cf3518388a72c1" +source = "git+https://github.com/akhercha/pragma-monitoring?branch=dev/naive_datetime_to_utc#b6fee94f191fd8e44daec6160b6852d716c4a9fd" dependencies = [ "arc-swap", "axum", diff --git a/infra/pragma-node/postgres_migrations/01-init.sql b/infra/pragma-node/postgres_migrations/01-init.sql index 5e51d70d..b2b9810f 100644 --- a/infra/pragma-node/postgres_migrations/01-init.sql +++ b/infra/pragma-node/postgres_migrations/01-init.sql @@ -4,10 +4,10 @@ CREATE TABLE mainnet_spot_entry ( data_id character varying(255) NOT NULL, block_hash character varying(255), block_number bigint, - block_timestamp timestamp without time zone, + block_timestamp TIMESTAMPTZ, transaction_hash character varying(255), price numeric, - timestamp timestamp without time zone, + timestamp TIMESTAMPTZ, publisher character varying(255), source character varying(255), volume numeric, @@ -20,10 +20,10 @@ CREATE TABLE spot_entry ( data_id character varying(255) NOT NULL, block_hash character varying(255), block_number bigint, - block_timestamp timestamp without time zone, + block_timestamp TIMESTAMPTZ, transaction_hash character varying(255), price numeric, - timestamp timestamp without time zone, + timestamp TIMESTAMPTZ, publisher character varying(255), source character varying(255), volume numeric, @@ -37,15 +37,15 @@ CREATE TABLE mainnet_future_entry ( data_id character varying(255), block_hash character varying(255), block_number bigint, - block_timestamp timestamp without time zone, + block_timestamp TIMESTAMPTZ, transaction_hash character varying(255), price numeric, - timestamp timestamp without time zone, + timestamp TIMESTAMPTZ, publisher character varying(255), source character varying(255), volume numeric, _cursor bigint, - expiration_timestamp timestamp without time zone + expiration_timestamp TIMESTAMPTZ ); CREATE TABLE future_entry ( @@ -54,15 +54,15 @@ CREATE TABLE future_entry ( data_id character varying(255), block_hash character varying(255), block_number bigint, - block_timestamp timestamp without time zone, + block_timestamp TIMESTAMPTZ, transaction_hash character varying(255), price numeric, - timestamp timestamp without time zone, + timestamp TIMESTAMPTZ, publisher character varying(255), source character varying(255), volume numeric, _cursor bigint, - expiration_timestamp timestamp without time zone + expiration_timestamp TIMESTAMPTZ ); CREATE TABLE mainnet_spot_checkpoints ( @@ -71,13 +71,13 @@ CREATE TABLE mainnet_spot_checkpoints ( data_id character varying(255) NOT NULL, block_hash character varying(255), block_number bigint, - block_timestamp timestamp without time zone, + block_timestamp TIMESTAMPTZ, transaction_hash character varying(255), price numeric, sender_address character varying(255), aggregation_mode numeric, _cursor bigint, - timestamp timestamp without time zone, + timestamp TIMESTAMPTZ, nb_sources_aggregated numeric ); @@ -87,13 +87,13 @@ CREATE TABLE spot_checkpoints ( data_id character varying(255) NOT NULL, block_hash character varying(255), block_number bigint, - block_timestamp timestamp without time zone, + block_timestamp TIMESTAMPTZ, transaction_hash character varying(255), price numeric, sender_address character varying(255), aggregation_mode numeric, _cursor bigint, - timestamp timestamp without time zone, + timestamp TIMESTAMPTZ, nb_sources_aggregated numeric ); @@ -101,13 +101,13 @@ CREATE TABLE vrf_requests ( network character varying(255), request_id numeric, seed numeric, - created_at timestamp without time zone, + created_at TIMESTAMPTZ, created_at_tx character varying(255), callback_address character varying(255), callback_fee_limit numeric, num_words numeric, requestor_address character varying(255), - updated_at timestamp without time zone, + updated_at TIMESTAMPTZ, updated_at_tx character varying(255), status numeric, minimum_block_number numeric, diff --git a/pragma-entities/src/dto/entry.rs b/pragma-entities/src/dto/entry.rs index 1acec901..8aef16f6 100644 --- a/pragma-entities/src/dto/entry.rs +++ b/pragma-entities/src/dto/entry.rs @@ -27,7 +27,7 @@ impl From for Entry { pair_id: entry.pair_id, publisher: entry.publisher, source: entry.source, - timestamp: entry.timestamp.and_utc().timestamp_millis() as u64, + timestamp: entry.timestamp.timestamp_millis() as u64, price: entry.price.to_u128().unwrap_or(0), // change default value ? } } diff --git a/pragma-entities/src/models/entry.rs b/pragma-entities/src/models/entry.rs index f919bf96..512a08f7 100644 --- a/pragma-entities/src/models/entry.rs +++ b/pragma-entities/src/models/entry.rs @@ -2,7 +2,7 @@ use crate::dto::entry as dto; use crate::models::DieselResult; use crate::schema::entries; use bigdecimal::BigDecimal; -use diesel::internal::derives::multiconnection::chrono::NaiveDateTime; +use chrono::{DateTime, Utc}; use diesel::upsert::excluded; use diesel::{ AsChangeset, ExpressionMethods, Insertable, PgConnection, PgTextExpressionMethods, QueryDsl, @@ -19,7 +19,7 @@ pub struct Entry { pub pair_id: String, pub publisher: String, pub source: String, - pub timestamp: NaiveDateTime, + pub timestamp: DateTime, pub price: BigDecimal, } @@ -29,7 +29,7 @@ pub struct NewEntry { pub pair_id: String, pub publisher: String, pub source: String, - pub timestamp: NaiveDateTime, + pub timestamp: DateTime, pub price: BigDecimal, } diff --git a/pragma-node/Cargo.toml b/pragma-node/Cargo.toml index 765ef40e..8fa3db01 100644 --- a/pragma-node/Cargo.toml +++ b/pragma-node/Cargo.toml @@ -21,7 +21,7 @@ envy = "0.4.2" lazy_static = "1.4.0" pragma-common = { path = "../pragma-common", version = "1.0.0" } pragma-entities = { path = "../pragma-entities", version = "1.0.0" } -pragma-monitoring = { git = "https://github.com/astraly-labs/pragma-monitoring", rev = "5536301" } +pragma-monitoring = { git = "https://github.com/akhercha/pragma-monitoring", branch = "dev/naive_datetime_to_utc" } rdkafka = "0.36.0" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["arbitrary_precision"] } diff --git a/pragma-node/src/handlers/entries/create_entry.rs b/pragma-node/src/handlers/entries/create_entry.rs index ea95969a..58195e82 100644 --- a/pragma-node/src/handlers/entries/create_entry.rs +++ b/pragma-node/src/handlers/entries/create_entry.rs @@ -145,7 +145,7 @@ pub async fn create_entries( .iter() .map(|entry| { let dt = match DateTime::::from_timestamp(entry.base.timestamp as i64, 0) { - Some(dt) => dt.naive_utc(), + Some(dt) => dt, None => return Err(EntryError::InvalidTimestamp), }; diff --git a/pragma-node/src/handlers/entries/get_entry.rs b/pragma-node/src/handlers/entries/get_entry.rs index 6bea68c8..1100901d 100644 --- a/pragma-node/src/handlers/entries/get_entry.rs +++ b/pragma-node/src/handlers/entries/get_entry.rs @@ -84,7 +84,7 @@ fn adapt_entry_to_entry_response( ) -> GetEntryResponse { GetEntryResponse { pair_id, - timestamp: entry.time.and_utc().timestamp_millis() as u64, + timestamp: entry.time.timestamp_millis() as u64, num_sources_aggregated: entry.num_sources as usize, price: format!( "0x{}", diff --git a/pragma-node/src/handlers/entries/utils.rs b/pragma-node/src/handlers/entries/utils.rs index b1a1b6a9..6b5ef0f5 100644 --- a/pragma-node/src/handlers/entries/utils.rs +++ b/pragma-node/src/handlers/entries/utils.rs @@ -1,5 +1,5 @@ use bigdecimal::{BigDecimal, ToPrimitive}; -use chrono::NaiveDateTime; +use chrono::{DateTime, Utc}; use std::collections::HashMap; use crate::infra::repositories::entry_repository::MedianEntry; @@ -49,7 +49,7 @@ pub(crate) fn get_decimals_for_pair( #[allow(dead_code)] pub(crate) fn compute_median_price_and_time( entries: &mut Vec, -) -> Option<(BigDecimal, NaiveDateTime)> { +) -> Option<(BigDecimal, DateTime)> { if entries.is_empty() { return None; } @@ -108,7 +108,7 @@ mod tests { fn new_entry(median_price: u32, timestamp: i64) -> MedianEntry { MedianEntry { - time: DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc(), + time: DateTime::from_timestamp(timestamp, 0).unwrap(), median_price: median_price.into(), num_sources: 5, } diff --git a/pragma-node/src/infra/repositories/entry_repository.rs b/pragma-node/src/infra/repositories/entry_repository.rs index 4a02b546..7c30ad92 100644 --- a/pragma-node/src/infra/repositories/entry_repository.rs +++ b/pragma-node/src/infra/repositories/entry_repository.rs @@ -1,5 +1,5 @@ use bigdecimal::{BigDecimal, ToPrimitive}; -use chrono::{DateTime, NaiveDateTime}; +use chrono::{DateTime, Utc}; use diesel::prelude::QueryableByName; use diesel::{ExpressionMethods, QueryDsl, Queryable, RunQueryDsl}; use serde::{Deserialize, Serialize}; @@ -68,7 +68,7 @@ pub async fn _get_all( #[derive(Debug, Serialize, Queryable)] pub struct MedianEntry { - pub time: NaiveDateTime, + pub time: DateTime, pub median_price: BigDecimal, pub num_sources: i64, } @@ -76,7 +76,7 @@ pub struct MedianEntry { #[derive(Serialize, QueryableByName, Clone, Debug)] pub struct MedianEntryRaw { #[diesel(sql_type = diesel::sql_types::Timestamptz)] - pub time: NaiveDateTime, + pub time: DateTime, #[diesel(sql_type = diesel::sql_types::Numeric)] pub median_price: BigDecimal, #[diesel(sql_type = diesel::sql_types::BigInt)] @@ -141,14 +141,10 @@ fn calculate_rebased_price( base_decimals, ) }; - let min_timestamp = std::cmp::max( - base_entry.time.and_utc().timestamp(), - quote_entry.time.and_utc().timestamp(), - ); + let min_timestamp = std::cmp::max(base_entry.time.timestamp(), quote_entry.time.timestamp()); let num_sources = std::cmp::max(base_entry.num_sources, quote_entry.num_sources); - let new_timestamp = DateTime::from_timestamp(min_timestamp, 0) - .ok_or(InfraError::InvalidTimeStamp)? - .naive_utc(); + let new_timestamp = + DateTime::from_timestamp(min_timestamp, 0).ok_or(InfraError::InvalidTimeStamp)?; let median_entry = MedianEntry { time: new_timestamp, @@ -554,7 +550,7 @@ pub async fn get_decimals( #[derive(Debug, Clone, Serialize, Deserialize, Queryable)] pub struct OHLCEntry { - pub time: NaiveDateTime, + pub time: DateTime, pub open: BigDecimal, pub low: BigDecimal, pub high: BigDecimal, @@ -564,7 +560,7 @@ pub struct OHLCEntry { #[derive(Serialize, QueryableByName, Clone, Debug)] pub struct OHLCEntryRaw { #[diesel(sql_type = diesel::sql_types::Timestamptz)] - pub time: NaiveDateTime, + pub time: DateTime, #[diesel(sql_type = diesel::sql_types::Numeric)] pub open: BigDecimal, #[diesel(sql_type = diesel::sql_types::Numeric)] diff --git a/pragma-node/src/infra/repositories/onchain_repository.rs b/pragma-node/src/infra/repositories/onchain_repository.rs index c6604e67..aeb5b9a1 100644 --- a/pragma-node/src/infra/repositories/onchain_repository.rs +++ b/pragma-node/src/infra/repositories/onchain_repository.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; use bigdecimal::BigDecimal; +use chrono::{DateTime, Utc}; use deadpool_diesel::postgres::Pool; -use diesel::sql_types::{BigInt, Integer, Numeric, Text, Timestamp, VarChar}; +use diesel::sql_types::{BigInt, Integer, Numeric, Text, Timestamptz, VarChar}; use diesel::{Queryable, QueryableByName, RunQueryDsl}; use pragma_common::types::{AggregationMode, DataType, Network}; @@ -42,7 +43,7 @@ impl From for OnchainEntry { source: entry.spot_entry.source, price: entry.spot_entry.price.to_string(), tx_hash: entry.spot_entry.transaction_hash, - timestamp: entry.spot_entry.timestamp.and_utc().timestamp() as u64, + timestamp: entry.spot_entry.timestamp.timestamp() as u64, } } } @@ -142,8 +143,8 @@ pub async fn get_sources_and_aggregate( #[derive(Queryable, QueryableByName)] struct EntryTimestamp { - #[diesel(sql_type = Timestamp)] - pub timestamp: chrono::NaiveDateTime, + #[diesel(sql_type = Timestamptz)] + pub timestamp: DateTime, } // TODO(akhercha): Only works for Spot entries @@ -179,7 +180,7 @@ pub async fn get_last_updated_timestamp( .map_err(adapt_infra_error)?; let most_recent_entry = raw_entry.first().ok_or(InfraError::NotFound)?; - Ok(most_recent_entry.timestamp.and_utc().timestamp() as u64) + Ok(most_recent_entry.timestamp.timestamp() as u64) } #[derive(Queryable, QueryableByName)] @@ -188,8 +189,8 @@ struct RawCheckpoint { pub transaction_hash: String, #[diesel(sql_type = Numeric)] pub price: BigDecimal, - #[diesel(sql_type = Timestamp)] - pub timestamp: chrono::NaiveDateTime, + #[diesel(sql_type = Timestamptz)] + pub timestamp: DateTime, #[diesel(sql_type = VarChar)] pub sender_address: String, } @@ -199,7 +200,7 @@ impl RawCheckpoint { Checkpoint { tx_hash: self.transaction_hash.clone(), price: format_bigdecimal_price(self.price.clone(), decimals), - timestamp: self.timestamp.and_utc().timestamp() as u64, + timestamp: self.timestamp.timestamp() as u64, sender_address: self.sender_address.clone(), } } @@ -304,8 +305,8 @@ pub struct RawLastPublisherEntryForPair { pub price: BigDecimal, #[diesel(sql_type = VarChar)] pub source: String, - #[diesel(sql_type = Timestamp)] - pub last_updated_timestamp: chrono::NaiveDateTime, + #[diesel(sql_type = Timestamptz)] + pub last_updated_timestamp: DateTime, } impl RawLastPublisherEntryForPair { @@ -313,7 +314,7 @@ impl RawLastPublisherEntryForPair { let decimals = get_decimals_for_pair(currencies, &self.pair_id); PublisherEntry { pair_id: self.pair_id.clone(), - last_updated_timestamp: self.last_updated_timestamp.and_utc().timestamp() as u64, + last_updated_timestamp: self.last_updated_timestamp.timestamp() as u64, price: format_bigdecimal_price(self.price.clone(), decimals), source: self.source.clone(), decimals,