Skip to content

Commit

Permalink
dev(naive_datetime_to_utc): Converted NaiveDateTimes to DateTime<Utc> (
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha authored Jun 3, 2024
1 parent b4c6cd0 commit 5a8c03a
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions infra/pragma-node/postgres_migrations/01-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ CREATE TABLE mainnet_spot_entry (
data_id character varying(255) NOT NULL,
block_hash character varying(255),
block_number bigint,
block_timestamp timestamp without time zone,
block_timestamp TIMESTAMPTZ,
transaction_hash character varying(255),
price numeric,
timestamp timestamp without time zone,
timestamp TIMESTAMPTZ,
publisher character varying(255),
source character varying(255),
volume numeric,
Expand All @@ -20,10 +20,10 @@ CREATE TABLE spot_entry (
data_id character varying(255) NOT NULL,
block_hash character varying(255),
block_number bigint,
block_timestamp timestamp without time zone,
block_timestamp TIMESTAMPTZ,
transaction_hash character varying(255),
price numeric,
timestamp timestamp without time zone,
timestamp TIMESTAMPTZ,
publisher character varying(255),
source character varying(255),
volume numeric,
Expand All @@ -37,15 +37,15 @@ CREATE TABLE mainnet_future_entry (
data_id character varying(255),
block_hash character varying(255),
block_number bigint,
block_timestamp timestamp without time zone,
block_timestamp TIMESTAMPTZ,
transaction_hash character varying(255),
price numeric,
timestamp timestamp without time zone,
timestamp TIMESTAMPTZ,
publisher character varying(255),
source character varying(255),
volume numeric,
_cursor bigint,
expiration_timestamp timestamp without time zone
expiration_timestamp TIMESTAMPTZ
);

CREATE TABLE future_entry (
Expand All @@ -54,15 +54,15 @@ CREATE TABLE future_entry (
data_id character varying(255),
block_hash character varying(255),
block_number bigint,
block_timestamp timestamp without time zone,
block_timestamp TIMESTAMPTZ,
transaction_hash character varying(255),
price numeric,
timestamp timestamp without time zone,
timestamp TIMESTAMPTZ,
publisher character varying(255),
source character varying(255),
volume numeric,
_cursor bigint,
expiration_timestamp timestamp without time zone
expiration_timestamp TIMESTAMPTZ
);

CREATE TABLE mainnet_spot_checkpoints (
Expand All @@ -71,13 +71,13 @@ CREATE TABLE mainnet_spot_checkpoints (
data_id character varying(255) NOT NULL,
block_hash character varying(255),
block_number bigint,
block_timestamp timestamp without time zone,
block_timestamp TIMESTAMPTZ,
transaction_hash character varying(255),
price numeric,
sender_address character varying(255),
aggregation_mode numeric,
_cursor bigint,
timestamp timestamp without time zone,
timestamp TIMESTAMPTZ,
nb_sources_aggregated numeric
);

Expand All @@ -87,27 +87,27 @@ CREATE TABLE spot_checkpoints (
data_id character varying(255) NOT NULL,
block_hash character varying(255),
block_number bigint,
block_timestamp timestamp without time zone,
block_timestamp TIMESTAMPTZ,
transaction_hash character varying(255),
price numeric,
sender_address character varying(255),
aggregation_mode numeric,
_cursor bigint,
timestamp timestamp without time zone,
timestamp TIMESTAMPTZ,
nb_sources_aggregated numeric
);

CREATE TABLE vrf_requests (
network character varying(255),
request_id numeric,
seed numeric,
created_at timestamp without time zone,
created_at TIMESTAMPTZ,
created_at_tx character varying(255),
callback_address character varying(255),
callback_fee_limit numeric,
num_words numeric,
requestor_address character varying(255),
updated_at timestamp without time zone,
updated_at TIMESTAMPTZ,
updated_at_tx character varying(255),
status numeric,
minimum_block_number numeric,
Expand Down
2 changes: 1 addition & 1 deletion pragma-entities/src/dto/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl From<crate::Entry> for Entry {
pair_id: entry.pair_id,
publisher: entry.publisher,
source: entry.source,
timestamp: entry.timestamp.and_utc().timestamp_millis() as u64,
timestamp: entry.timestamp.timestamp_millis() as u64,
price: entry.price.to_u128().unwrap_or(0), // change default value ?
}
}
Expand Down
6 changes: 3 additions & 3 deletions pragma-entities/src/models/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::dto::entry as dto;
use crate::models::DieselResult;
use crate::schema::entries;
use bigdecimal::BigDecimal;
use diesel::internal::derives::multiconnection::chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use diesel::upsert::excluded;
use diesel::{
AsChangeset, ExpressionMethods, Insertable, PgConnection, PgTextExpressionMethods, QueryDsl,
Expand All @@ -19,7 +19,7 @@ pub struct Entry {
pub pair_id: String,
pub publisher: String,
pub source: String,
pub timestamp: NaiveDateTime,
pub timestamp: DateTime<Utc>,
pub price: BigDecimal,
}

Expand All @@ -29,7 +29,7 @@ pub struct NewEntry {
pub pair_id: String,
pub publisher: String,
pub source: String,
pub timestamp: NaiveDateTime,
pub timestamp: DateTime<Utc>,
pub price: BigDecimal,
}

Expand Down
2 changes: 1 addition & 1 deletion pragma-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ envy = "0.4.2"
lazy_static = "1.4.0"
pragma-common = { path = "../pragma-common", version = "1.0.0" }
pragma-entities = { path = "../pragma-entities", version = "1.0.0" }
pragma-monitoring = { git = "https://github.com/astraly-labs/pragma-monitoring", rev = "5536301" }
pragma-monitoring = { git = "https://github.com/akhercha/pragma-monitoring", branch = "dev/naive_datetime_to_utc" }
rdkafka = "0.36.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
Expand Down
2 changes: 1 addition & 1 deletion pragma-node/src/handlers/entries/create_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub async fn create_entries(
.iter()
.map(|entry| {
let dt = match DateTime::<Utc>::from_timestamp(entry.base.timestamp as i64, 0) {
Some(dt) => dt.naive_utc(),
Some(dt) => dt,
None => return Err(EntryError::InvalidTimestamp),
};

Expand Down
2 changes: 1 addition & 1 deletion pragma-node/src/handlers/entries/get_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn adapt_entry_to_entry_response(
) -> GetEntryResponse {
GetEntryResponse {
pair_id,
timestamp: entry.time.and_utc().timestamp_millis() as u64,
timestamp: entry.time.timestamp_millis() as u64,
num_sources_aggregated: entry.num_sources as usize,
price: format!(
"0x{}",
Expand Down
6 changes: 3 additions & 3 deletions pragma-node/src/handlers/entries/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bigdecimal::{BigDecimal, ToPrimitive};
use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use std::collections::HashMap;

use crate::infra::repositories::entry_repository::MedianEntry;
Expand Down Expand Up @@ -49,7 +49,7 @@ pub(crate) fn get_decimals_for_pair(
#[allow(dead_code)]
pub(crate) fn compute_median_price_and_time(
entries: &mut Vec<MedianEntry>,
) -> Option<(BigDecimal, NaiveDateTime)> {
) -> Option<(BigDecimal, DateTime<Utc>)> {
if entries.is_empty() {
return None;
}
Expand Down Expand Up @@ -108,7 +108,7 @@ mod tests {

fn new_entry(median_price: u32, timestamp: i64) -> MedianEntry {
MedianEntry {
time: DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc(),
time: DateTime::from_timestamp(timestamp, 0).unwrap(),
median_price: median_price.into(),
num_sources: 5,
}
Expand Down
20 changes: 8 additions & 12 deletions pragma-node/src/infra/repositories/entry_repository.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bigdecimal::{BigDecimal, ToPrimitive};
use chrono::{DateTime, NaiveDateTime};
use chrono::{DateTime, Utc};
use diesel::prelude::QueryableByName;
use diesel::{ExpressionMethods, QueryDsl, Queryable, RunQueryDsl};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -68,15 +68,15 @@ pub async fn _get_all(

#[derive(Debug, Serialize, Queryable)]
pub struct MedianEntry {
pub time: NaiveDateTime,
pub time: DateTime<Utc>,
pub median_price: BigDecimal,
pub num_sources: i64,
}

#[derive(Serialize, QueryableByName, Clone, Debug)]
pub struct MedianEntryRaw {
#[diesel(sql_type = diesel::sql_types::Timestamptz)]
pub time: NaiveDateTime,
pub time: DateTime<Utc>,
#[diesel(sql_type = diesel::sql_types::Numeric)]
pub median_price: BigDecimal,
#[diesel(sql_type = diesel::sql_types::BigInt)]
Expand Down Expand Up @@ -141,14 +141,10 @@ fn calculate_rebased_price(
base_decimals,
)
};
let min_timestamp = std::cmp::max(
base_entry.time.and_utc().timestamp(),
quote_entry.time.and_utc().timestamp(),
);
let min_timestamp = std::cmp::max(base_entry.time.timestamp(), quote_entry.time.timestamp());
let num_sources = std::cmp::max(base_entry.num_sources, quote_entry.num_sources);
let new_timestamp = DateTime::from_timestamp(min_timestamp, 0)
.ok_or(InfraError::InvalidTimeStamp)?
.naive_utc();
let new_timestamp =
DateTime::from_timestamp(min_timestamp, 0).ok_or(InfraError::InvalidTimeStamp)?;

let median_entry = MedianEntry {
time: new_timestamp,
Expand Down Expand Up @@ -554,7 +550,7 @@ pub async fn get_decimals(

#[derive(Debug, Clone, Serialize, Deserialize, Queryable)]
pub struct OHLCEntry {
pub time: NaiveDateTime,
pub time: DateTime<Utc>,
pub open: BigDecimal,
pub low: BigDecimal,
pub high: BigDecimal,
Expand All @@ -564,7 +560,7 @@ pub struct OHLCEntry {
#[derive(Serialize, QueryableByName, Clone, Debug)]
pub struct OHLCEntryRaw {
#[diesel(sql_type = diesel::sql_types::Timestamptz)]
pub time: NaiveDateTime,
pub time: DateTime<Utc>,
#[diesel(sql_type = diesel::sql_types::Numeric)]
pub open: BigDecimal,
#[diesel(sql_type = diesel::sql_types::Numeric)]
Expand Down
23 changes: 12 additions & 11 deletions pragma-node/src/infra/repositories/onchain_repository.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashMap;

use bigdecimal::BigDecimal;
use chrono::{DateTime, Utc};
use deadpool_diesel::postgres::Pool;
use diesel::sql_types::{BigInt, Integer, Numeric, Text, Timestamp, VarChar};
use diesel::sql_types::{BigInt, Integer, Numeric, Text, Timestamptz, VarChar};
use diesel::{Queryable, QueryableByName, RunQueryDsl};

use pragma_common::types::{AggregationMode, DataType, Network};
Expand Down Expand Up @@ -42,7 +43,7 @@ impl From<SpotEntryWithAggregatedPrice> for OnchainEntry {
source: entry.spot_entry.source,
price: entry.spot_entry.price.to_string(),
tx_hash: entry.spot_entry.transaction_hash,
timestamp: entry.spot_entry.timestamp.and_utc().timestamp() as u64,
timestamp: entry.spot_entry.timestamp.timestamp() as u64,
}
}
}
Expand Down Expand Up @@ -142,8 +143,8 @@ pub async fn get_sources_and_aggregate(

#[derive(Queryable, QueryableByName)]
struct EntryTimestamp {
#[diesel(sql_type = Timestamp)]
pub timestamp: chrono::NaiveDateTime,
#[diesel(sql_type = Timestamptz)]
pub timestamp: DateTime<Utc>,
}

// TODO(akhercha): Only works for Spot entries
Expand Down Expand Up @@ -179,7 +180,7 @@ pub async fn get_last_updated_timestamp(
.map_err(adapt_infra_error)?;

let most_recent_entry = raw_entry.first().ok_or(InfraError::NotFound)?;
Ok(most_recent_entry.timestamp.and_utc().timestamp() as u64)
Ok(most_recent_entry.timestamp.timestamp() as u64)
}

#[derive(Queryable, QueryableByName)]
Expand All @@ -188,8 +189,8 @@ struct RawCheckpoint {
pub transaction_hash: String,
#[diesel(sql_type = Numeric)]
pub price: BigDecimal,
#[diesel(sql_type = Timestamp)]
pub timestamp: chrono::NaiveDateTime,
#[diesel(sql_type = Timestamptz)]
pub timestamp: DateTime<Utc>,
#[diesel(sql_type = VarChar)]
pub sender_address: String,
}
Expand All @@ -199,7 +200,7 @@ impl RawCheckpoint {
Checkpoint {
tx_hash: self.transaction_hash.clone(),
price: format_bigdecimal_price(self.price.clone(), decimals),
timestamp: self.timestamp.and_utc().timestamp() as u64,
timestamp: self.timestamp.timestamp() as u64,
sender_address: self.sender_address.clone(),
}
}
Expand Down Expand Up @@ -304,16 +305,16 @@ pub struct RawLastPublisherEntryForPair {
pub price: BigDecimal,
#[diesel(sql_type = VarChar)]
pub source: String,
#[diesel(sql_type = Timestamp)]
pub last_updated_timestamp: chrono::NaiveDateTime,
#[diesel(sql_type = Timestamptz)]
pub last_updated_timestamp: DateTime<Utc>,
}

impl RawLastPublisherEntryForPair {
pub fn to_publisher_entry(&self, currencies: &HashMap<String, BigDecimal>) -> PublisherEntry {
let decimals = get_decimals_for_pair(currencies, &self.pair_id);
PublisherEntry {
pair_id: self.pair_id.clone(),
last_updated_timestamp: self.last_updated_timestamp.and_utc().timestamp() as u64,
last_updated_timestamp: self.last_updated_timestamp.timestamp() as u64,
price: format_bigdecimal_price(self.price.clone(), decimals),
source: self.source.clone(),
decimals,
Expand Down

0 comments on commit 5a8c03a

Please sign in to comment.