diff --git a/Cargo.lock b/Cargo.lock index a538093e..6e05564d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -815,7 +821,7 @@ dependencies = [ "sha1", "sync_wrapper 1.0.1", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.24.0", "tower 0.5.1", "tower-layer", "tower-service", @@ -1242,6 +1248,12 @@ dependencies = [ "serde", ] +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + [[package]] name = "castaway" version = "0.1.2" @@ -1491,6 +1503,31 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "crossterm" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" +dependencies = [ + "bitflags 2.5.0", + "crossterm_winapi", + "libc", + "mio", + "parking_lot", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "crunchy" version = "0.2.2" @@ -2082,6 +2119,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -2358,6 +2401,17 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +[[package]] +name = "hashbrown" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "heck" version = "0.4.1" @@ -2858,6 +2912,12 @@ dependencies = [ "serde", ] +[[package]] +name = "indoc" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" + [[package]] name = "init-tracing-opentelemetry" version = "0.22.0" @@ -3193,6 +3253,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -3298,6 +3367,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] @@ -4145,6 +4215,7 @@ dependencies = [ "bigdecimal", "cainome", "chrono", + "crossterm", "deadpool-diesel", "diesel", "dotenvy", @@ -4159,6 +4230,7 @@ dependencies = [ "pragma-common", "pragma-entities", "pragma-monitoring", + "ratatui", "rdkafka", "redis", "rstest", @@ -4169,8 +4241,10 @@ dependencies = [ "strum 0.26.3", "thiserror", "tokio", + "tokio-tungstenite 0.20.1", "tower-http", "tracing", + "url", "utoipa", "utoipa-swagger-ui", "utoipauto", @@ -4435,6 +4509,24 @@ dependencies = [ "getrandom", ] +[[package]] +name = "ratatui" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ebc917cfb527a566c37ecb94c7e3fd098353516fb4eb6bea17015ade0182425" +dependencies = [ + "bitflags 2.5.0", + "cassowary", + "crossterm", + "indoc", + "itertools", + "lru", + "paste", + "strum 0.25.0", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "raw-cpuid" version = "11.0.2" @@ -5268,6 +5360,27 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -6061,6 +6174,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite 0.20.1", +] + [[package]] name = "tokio-tungstenite" version = "0.24.0" @@ -6070,7 +6197,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.24.0", ] [[package]] @@ -6410,6 +6537,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 0.2.12", + "httparse", + "log", + "native-tls", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.24.0" @@ -6488,6 +6635,12 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "unicode-xid" version = "0.2.4" diff --git a/pragma-node/Cargo.toml b/pragma-node/Cargo.toml index d9439b89..ce639546 100644 --- a/pragma-node/Cargo.toml +++ b/pragma-node/Cargo.toml @@ -50,3 +50,7 @@ uuid = { workspace = true, features = ["fast-rng", "v4", "serde"] } [dev-dependencies] rstest = { workspace = true } +tokio-tungstenite = { version = "0.20.1", features = ["connect", "native-tls"] } +url = "2.5.0" +ratatui = "0.24.0" +crossterm = "0.27.0" diff --git a/pragma-node/examples/starkex.rs b/pragma-node/examples/starkex.rs new file mode 100644 index 00000000..5c6bf307 --- /dev/null +++ b/pragma-node/examples/starkex.rs @@ -0,0 +1,390 @@ +use chrono::Utc; +use crossterm::{ + event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, + execute, + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, +}; +use futures_util::{SinkExt as _, StreamExt as _}; +use ratatui::{ + prelude::*, + widgets::{Block, Borders, List, ListItem, Paragraph}, + Terminal, +}; +use serde::{Deserialize, Serialize}; +use starknet::core::utils::parse_cairo_short_string; +use std::env; +use std::{io, sync::mpsc, thread, time::Duration}; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use url::Url; + +const TEST_PAIRS: &[&str] = &[ + "BTC/USD", + // "ETH/USD", + // "SOL/USD", + // "AVAX/USD", + // "MATIC/USD", + // "ARB/USD", +]; + +const TEST_MARK_PAIRS: &[&str] = &["BTC/USD"]; + +#[derive(Serialize, Deserialize, Debug)] +struct SubscribeMessage { + msg_type: String, + pairs: Vec, +} + +#[derive(Debug, Deserialize)] +#[allow(unused)] +struct SignedPublisherPrice { + oracle_asset_id: String, + oracle_price: String, + signing_key: String, + signature: String, + timestamp: String, +} + +#[derive(Debug, Deserialize)] +#[allow(unused)] +struct AssetOraclePrice { + global_asset_id: String, + median_price: String, + signature: String, + signed_prices: Vec, +} + +#[derive(Debug, Deserialize)] +struct SubscribeToEntryResponse { + oracle_prices: Vec, + timestamp: i64, +} + +#[derive(Debug)] +struct Environment { + ws_url: String, +} + +impl Environment { + fn new() -> Self { + // Default to 'dev' if not specified + let env_type = env::var("PRAGMA_ENV").unwrap_or_else(|_| "dev".to_string()); + + let ws_url = match env_type.as_str() { + "prod" => "wss://ws.pragma.build/node/v1/data/subscribe", + "dev" => "wss://ws.dev.pragma.build/node/v1/data/subscribe", + "local" => "ws://0.0.0.0:3000/node/v1/data/subscribe", + _ => panic!( + "Invalid environment: {}. Use 'prod', 'dev', or 'local'", + env_type + ), + } + .to_string(); + + Environment { ws_url } + } +} + +#[derive(Debug, Deserialize)] +#[allow(unused)] +struct SubscriptionAck { + msg_type: String, + pairs: Vec, +} + +struct App { + subscription_pairs: Vec, + latest_update: Option, + should_quit: bool, + current_time: i64, +} + +impl App { + fn new() -> App { + App { + subscription_pairs: Vec::new(), + latest_update: None, + should_quit: false, + current_time: Utc::now().timestamp(), + } + } +} + +fn main() -> Result<(), Box> { + // Terminal initialization + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + + // Create channels for WebSocket messages + let (tx, rx) = mpsc::channel(); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + let mut app = App::new(); + + // Spawn WebSocket thread + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let env = Environment::new(); + + // Convert spot pairs + let index_pairs: Vec = TEST_PAIRS.iter().map(|&p| p.to_string()).collect(); + + // Convert mark pairs and add :MARK suffix + let mark_pairs: Vec = TEST_MARK_PAIRS + .iter() + .map(|&p| format!("{}:MARK", p)) + .collect(); + + // Combine both sets of pairs + let mut all_pairs = index_pairs.clone(); + all_pairs.extend(mark_pairs.clone()); + + let url = Url::parse(&env.ws_url).unwrap(); + let (mut socket, _) = connect_async(url).await.unwrap(); + + let subscribe_msg = SubscribeMessage { + msg_type: "subscribe".to_string(), + pairs: all_pairs.clone(), + }; + + let msg_str = serde_json::to_string(&subscribe_msg).unwrap(); + socket.send(Message::text(msg_str)).await.unwrap(); + + loop { + tokio::select! { + Some(message) = socket.next() => { + match message { + Ok(msg) => { + if let Message::Text(text) = msg { + if tx.send(text).is_err() { + break; + } + } + } + Err(e) => { + eprintln!("WebSocket error: {}", e); + break; + } + } + } + Ok(_) = &mut shutdown_rx => { + // Clean shutdown + let _ = socket.close(None).await; + break; + } + } + } + }); + }); + + // Main loop + loop { + app.current_time = Utc::now().timestamp(); + terminal.draw(|f| ui(f, &app))?; + + // Handle events + if event::poll(Duration::from_millis(100))? { + if let Event::Key(key) = event::read()? { + if key.code == KeyCode::Char('q') { + app.should_quit = true; + } + } + } + + // Check for WebSocket messages + if let Ok(text) = rx.try_recv() { + if let Ok(ack) = serde_json::from_str::(&text) { + app.subscription_pairs = ack.pairs; + } else if let Ok(response) = serde_json::from_str::(&text) { + app.latest_update = Some(response); + } + } + + if app.should_quit { + let _ = shutdown_tx.send(()); // Signal WebSocket thread to shutdown + thread::sleep(Duration::from_millis(100)); // Brief pause to allow cleanup + break; + } + } + + // Restore terminal + disable_raw_mode()?; + execute!( + terminal.backend_mut(), + LeaveAlternateScreen, + DisableMouseCapture + )?; + terminal.show_cursor()?; + + Ok(()) +} + +/// Converts a hexadecimal asset ID to a human-readable string format. +/// If the input starts with "0x", attempts to parse it as a Cairo short string. +/// Otherwise, returns the original string. +/// +/// # Arguments +/// * `hex_id` - The hexadecimal asset ID to parse +/// +/// # Returns +/// A String containing either the parsed asset name or the original hex ID +fn parse_hex_asset_id(hex_id: &str) -> String { + if !hex_id.starts_with("0x") { + return hex_id.to_string(); + } + + let hex_str = &hex_id[2..]; + u128::from_str_radix(hex_str, 16) + .ok() + .and_then(|felt| parse_cairo_short_string(&felt.into()).ok()) + .unwrap_or_else(|| hex_id.to_string()) + .replace("/", "") +} + +/// Extracts and formats all received pairs from oracle prices. +/// +/// # Arguments +/// * `oracle_prices` - Slice of AssetOraclePrice containing the received price updates +/// +/// # Returns +/// A Vec containing all formatted asset pairs (e.g., "ETHUSD") +fn get_received_pairs(oracle_prices: &[AssetOraclePrice]) -> Vec { + oracle_prices + .iter() + .map(|p| parse_hex_asset_id(&p.global_asset_id)) + .collect() +} + +/// Identifies which subscribed pairs are missing from the received pairs. +/// Handles the format difference between subscribed pairs (ETH/USD) and received pairs (ETHUSD). +/// +/// # Arguments +/// * `subscribed` - Slice of subscribed pair strings (format: "ETH/USD") +/// * `received` - Slice of received pair strings (format: "ETHUSD") +/// +/// # Returns +/// A Vec containing all subscribed pairs that weren't received +fn get_missing_pairs(subscribed: &[String], received: &[String]) -> Vec { + subscribed + .iter() + .filter(|p| !received.contains(&p.replace("/", ""))) + .cloned() + .collect() +} + +/// Formats the missing pairs into a user-friendly status message. +/// +/// # Arguments +/// * `missing_pairs` - Slice of strings containing the missing pairs +/// +/// # Returns +/// A formatted string either confirming all pairs were received or listing missing pairs +fn format_missing_pairs_text(missing_pairs: &[String]) -> String { + if missing_pairs.is_empty() { + "✅ All pairs received".to_string() + } else { + format!("⚠️ Missing: {}", missing_pairs.join(", ")) + } +} + +fn ui(f: &mut Frame, app: &App) { + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(3), // Subscription status + Constraint::Length(3), // Missing pairs + Constraint::Min(10), // Price updates + ]) + .split(f.size()); + + // Add latency display with milliseconds + if let Some(update) = &app.latest_update { + let latency_ms = (app.current_time - update.timestamp) * 1000; // Convert to milliseconds + let latency_text = Paragraph::new(format!("⏱ Latency: {}ms", latency_ms)) + .alignment(Alignment::Right) + .block(Block::default().borders(Borders::ALL)); + + let latency_area = Rect { + x: chunks[0].width - 25, + y: 0, + width: 25, + height: 3, + }; + f.render_widget(latency_text, latency_area); + } + + // Subscription header + let subscribed_pairs = Paragraph::new(format!( + "Subscribed Pairs: {}", + app.subscription_pairs.join(", ") + )) + .block( + Block::default() + .borders(Borders::ALL) + .title("Subscription Status"), + ); + f.render_widget(subscribed_pairs, chunks[0]); + + // Missing pairs section + if let Some(update) = &app.latest_update { + let received_pairs = get_received_pairs(&update.oracle_prices); + let missing_pairs = get_missing_pairs(&app.subscription_pairs, &received_pairs); + let missing_text = format_missing_pairs_text(&missing_pairs); + + let missing_widget = Paragraph::new(missing_text).block( + Block::default() + .borders(Borders::ALL) + .title("Missing Pairs"), + ); + f.render_widget(missing_widget, chunks[1]); + + // Price updates list + let mut items = vec![]; + for price in &update.oracle_prices { + let asset_display = if price.global_asset_id.starts_with("0x") { + let hex_str = &price.global_asset_id[2..]; + if let Ok(felt) = u128::from_str_radix(hex_str, 16) { + parse_cairo_short_string(&felt.into()) + .unwrap_or_else(|_| price.global_asset_id.clone()) + } else { + price.global_asset_id.clone() + } + } else { + price.global_asset_id.clone() + }; + + items.push(ListItem::new(vec![ + Line::from(format!( + "🔸 Asset: {} ({})", + asset_display, price.global_asset_id + )), + Line::from(format!(" ├─ Median Price: {}", price.median_price)), + Line::from(format!(" ├─ Publishers: {}", price.signed_prices.len())), + ])); + + for (idx, pub_price) in price.signed_prices.iter().enumerate() { + items.push(ListItem::new(vec![ + Line::from(format!( + " {}. Price: {}", + idx + 1, + pub_price.oracle_price + )), + Line::from(format!( + " Key: {}...{}", + &pub_price.signing_key[..10], + &pub_price.signing_key[pub_price.signing_key.len() - 8..] + )), + ])); + } + } + + let prices_list = List::new(items).block( + Block::default() + .borders(Borders::ALL) + .title(format!("Price Updates (Timestamp: {})", update.timestamp)), + ); + f.render_widget(prices_list, chunks[2]); + } +} diff --git a/pragma-node/src/handlers/subscribe_to_entry.rs b/pragma-node/src/handlers/subscribe_to_entry.rs index a4a21d8b..373d0685 100644 --- a/pragma-node/src/handlers/subscribe_to_entry.rs +++ b/pragma-node/src/handlers/subscribe_to_entry.rs @@ -244,7 +244,15 @@ impl WsEntriesHandler { let (usd_pairs, non_usd_pairs): (Vec, Vec) = subscription .get_subscribed_perp_pairs() .into_iter() - .partition(|pair| pair.ends_with("USD")); + .partition(|pair| { + tracing::debug!("Checking pair for USD: {}", pair); + pair.ends_with("USD") + }); + tracing::debug!( + "USD pairs: {:?}, non-USD pairs: {:?}", + usd_pairs, + non_usd_pairs + ); let mark_pricer_usd = IndexPricer::new(usd_pairs, DataType::PerpEntry); let mark_pricer_non_usd = MarkPricer::new(non_usd_pairs, DataType::PerpEntry); diff --git a/pragma-node/src/types/pricer.rs b/pragma-node/src/types/pricer.rs index d60e4609..c83ab2ee 100644 --- a/pragma-node/src/types/pricer.rs +++ b/pragma-node/src/types/pricer.rs @@ -195,6 +195,7 @@ impl Pricer for MarkPricer { ) )] async fn compute(&self, db_pool: &Pool) -> Result, EntryError> { + tracing::debug!("Computing mark prices for pairs: {:?}", self.pairs); if self.pairs.is_empty() { return Ok(vec![]); }