From 91348ad5771bbe5470a0ff5df73d9dc5e0d5044d Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Wed, 20 Nov 2024 23:46:43 +0530 Subject: [PATCH] init Signed-off-by: Pushkar Mishra --- .../src/python/statistics/long_ratio.rs | 2 +- .../analysis/src/statistics/long_ratio.rs | 20 +- nautilus_core/common/src/cache/mod.rs | 102 +- nautilus_core/common/src/xrate.rs | 480 +++++++- nautilus_core/model/src/accounts/any.rs | 41 +- nautilus_core/model/src/accounts/base.rs | 14 +- nautilus_core/model/src/accounts/cash.rs | 31 + nautilus_core/model/src/events/order/any.rs | 48 +- .../model/src/events/position/mod.rs | 24 +- nautilus_core/portfolio/src/manager.rs | 656 ++++++++++- nautilus_core/portfolio/src/portfolio.rs | 1027 ++++++++++++++++- nautilus_core/risk/src/engine/mod.rs | 28 +- 12 files changed, 2288 insertions(+), 185 deletions(-) diff --git a/nautilus_core/analysis/src/python/statistics/long_ratio.rs b/nautilus_core/analysis/src/python/statistics/long_ratio.rs index 7053d477690..c76157bdd95 100644 --- a/nautilus_core/analysis/src/python/statistics/long_ratio.rs +++ b/nautilus_core/analysis/src/python/statistics/long_ratio.rs @@ -31,7 +31,7 @@ impl LongRatio { } #[pyo3(name = "calculate_from_positions")] - fn py_calculate_from_positions(&mut self, positions: Vec) -> Option { + fn py_calculate_from_positions(&mut self, positions: Vec) -> Option { self.calculate_from_positions(&positions) } } diff --git a/nautilus_core/analysis/src/statistics/long_ratio.rs b/nautilus_core/analysis/src/statistics/long_ratio.rs index 2543a7fc44b..12b8756b73d 100644 --- a/nautilus_core/analysis/src/statistics/long_ratio.rs +++ b/nautilus_core/analysis/src/statistics/long_ratio.rs @@ -38,7 +38,7 @@ impl LongRatio { } impl PortfolioStatistic for LongRatio { - type Item = String; + type Item = f64; fn name(&self) -> String { stringify!(LongRatio).to_string() @@ -55,7 +55,9 @@ impl PortfolioStatistic for LongRatio { .collect(); let value = longs.len() as f64 / positions.len() as f64; - Some(format!("{:.1$}", value, self.precision)) + + let scale = 10f64.powi(self.precision as i32); + Some((value * scale).round() / scale) } } @@ -131,7 +133,7 @@ mod tests { let result = long_ratio.calculate_from_positions(&positions); assert!(result.is_some()); - assert_eq!(result.unwrap(), "1.00"); + assert_eq!(result.unwrap(), 1.00); } #[test] @@ -145,7 +147,7 @@ mod tests { let result = long_ratio.calculate_from_positions(&positions); assert!(result.is_some()); - assert_eq!(result.unwrap(), "0.00"); + assert_eq!(result.unwrap(), 0.00); } #[test] @@ -160,7 +162,7 @@ mod tests { let result = long_ratio.calculate_from_positions(&positions); assert!(result.is_some()); - assert_eq!(result.unwrap(), "0.50"); + assert_eq!(result.unwrap(), 0.50); } #[test] @@ -174,7 +176,7 @@ mod tests { let result = long_ratio.calculate_from_positions(&positions); assert!(result.is_some()); - assert_eq!(result.unwrap(), "0.667"); + assert_eq!(result.unwrap(), 0.667); } #[test] @@ -184,7 +186,7 @@ mod tests { let result = long_ratio.calculate_from_positions(&positions); assert!(result.is_some()); - assert_eq!(result.unwrap(), "1.00"); + assert_eq!(result.unwrap(), 1.00); } #[test] @@ -194,7 +196,7 @@ mod tests { let result = long_ratio.calculate_from_positions(&positions); assert!(result.is_some()); - assert_eq!(result.unwrap(), "0.00"); + assert_eq!(result.unwrap(), 0.00); } #[test] @@ -208,7 +210,7 @@ mod tests { let result = long_ratio.calculate_from_positions(&positions); assert!(result.is_some()); - assert_eq!(result.unwrap(), "1"); + assert_eq!(result.unwrap(), 1.00); } #[test] diff --git a/nautilus_core/common/src/cache/mod.rs b/nautilus_core/common/src/cache/mod.rs index 5659127e315..756856544a2 100644 --- a/nautilus_core/common/src/cache/mod.rs +++ b/nautilus_core/common/src/cache/mod.rs @@ -44,7 +44,7 @@ use nautilus_model::{ enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType}, identifiers::{ AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId, - OrderListId, PositionId, StrategyId, Venue, VenueOrderId, + OrderListId, PositionId, StrategyId, Symbol, Venue, VenueOrderId, }, instruments::{any::InstrumentAny, synthetic::SyntheticInstrument}, orderbook::book::OrderBook, @@ -52,10 +52,13 @@ use nautilus_model::{ position::Position, types::{currency::Currency, price::Price, quantity::Quantity}, }; +use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use ustr::Ustr; -use crate::{enums::SerializationEncoding, msgbus::database::DatabaseConfig}; +use crate::{ + enums::SerializationEncoding, msgbus::database::DatabaseConfig, xrate::get_exchange_rate, +}; /// Configuration for `Cache` instances. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -1142,6 +1145,11 @@ impl Cache { database.add_instrument(&instrument)?; } + // match instrument { + // InstrumentAny::CurrencyPair(_) | InstrumentAny::CryptoPerpetual(_) => {} + // _ => {} + // } + self.instruments.insert(instrument.id(), instrument); Ok(()) } @@ -2474,6 +2482,81 @@ impl Cache { self.bar_count(bar_type) > 0 } + #[must_use] + pub fn get_xrate( + &self, + venue: Venue, + from_currency: Currency, + to_currency: Currency, + price_type: PriceType, + ) -> Decimal { + if from_currency == to_currency { + return Decimal::ONE; + } + + let (bid_quote, ask_quote) = self.build_quote_table(&venue); + + // TODO: improve error and complete this fn + get_exchange_rate(from_currency, to_currency, price_type, bid_quote, ask_quote) + } + + fn build_quote_table( + &self, + venue: &Venue, + ) -> (HashMap, HashMap) { + let mut bid_quotes = HashMap::new(); + let mut ask_quotes = HashMap::new(); + + for instrument_id in self.instruments.keys() { + if instrument_id.venue != *venue { + continue; + } + + let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) { + // Use quote ticks if available + if let Some(tick) = ticks.front() { + (tick.bid_price, tick.ask_price) + } else { + continue; // Empty ticks vector + } + } else { + // Fall back to bars if no quotes available + let bid_bar = self + .bars + .iter() + .find(|(k, _)| { + k.instrument_id() == *instrument_id + && matches!(k.spec().price_type, PriceType::Bid) + }) + .map(|(_, v)| v); + + let ask_bar = self + .bars + .iter() + .find(|(k, _)| { + k.instrument_id() == *instrument_id + && matches!(k.spec().price_type, PriceType::Ask) + }) + .map(|(_, v)| v); + + match (bid_bar, ask_bar) { + (Some(bid), Some(ask)) => { + let bid_price = bid.front().unwrap().close; + let ask_price = ask.front().unwrap().close; + + (bid_price, ask_price) + } + _ => continue, // Missing either bid or ask bar + } + }; + + bid_quotes.insert(instrument_id.symbol, bid_price.as_decimal()); + ask_quotes.insert(instrument_id.symbol, ask_price.as_decimal()); + } + + (bid_quotes, ask_quotes) + } + // -- INSTRUMENT QUERIES ---------------------------------------------------------------------- /// Returns a reference to the instrument for the given `instrument_id` (if found). @@ -2554,6 +2637,12 @@ impl Cache { self.accounts.get(account_id) } + /// Returns a mutable reference to the account for the given `account_id` (if found). + #[must_use] + pub fn mut_account(&mut self, account_id: &AccountId) -> Option<&mut AccountAny> { + self.accounts.get_mut(account_id) + } + /// Returns a reference to the account for the given `venue` (if found). #[must_use] pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> { @@ -2563,6 +2652,15 @@ impl Cache { .and_then(|account_id| self.accounts.get(account_id)) } + /// Returns a reference to the account for the given `venue` (if found). + #[must_use] + pub fn mut_account_for_venue(&mut self, venue: &Venue) -> Option<&mut AccountAny> { + self.index + .venue_account + .get(venue) + .and_then(|account_id| self.accounts.get_mut(account_id)) + } + /// Returns a reference to the account ID for the given `venue` (if found). #[must_use] pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> { diff --git a/nautilus_core/common/src/xrate.rs b/nautilus_core/common/src/xrate.rs index 89b5ac99221..14273c76646 100644 --- a/nautilus_core/common/src/xrate.rs +++ b/nautilus_core/common/src/xrate.rs @@ -27,21 +27,19 @@ use itertools::Itertools; use nautilus_core::correctness::{check_equal_usize, check_map_not_empty, FAILED}; use nautilus_model::{enums::PriceType, identifiers::Symbol, types::currency::Currency}; use rust_decimal::Decimal; -use rust_decimal_macros::dec; use ustr::Ustr; -const DECIMAL_ONE: Decimal = dec!(1.0); -const DECIMAL_TWO: Decimal = dec!(2.0); - +// TODO: Improve efficiency: Check Top Comment /// Returns the calculated exchange rate for the given price type using the /// given dictionary of bid and ask quotes. +#[must_use] pub fn get_exchange_rate( from_currency: Currency, to_currency: Currency, price_type: PriceType, quotes_bid: HashMap, quotes_ask: HashMap, -) -> anyhow::Result { +) -> Decimal { check_map_not_empty("es_bid, stringify!(quotes_bid)).expect(FAILED); check_map_not_empty("es_ask, stringify!(quotes_ask)).expect(FAILED); check_equal_usize( @@ -53,91 +51,453 @@ pub fn get_exchange_rate( .expect(FAILED); if from_currency == to_currency { - return Ok(DECIMAL_ONE); // No conversion necessary + return Decimal::ONE; } - let calculation_quotes: HashMap = match price_type { + let calculation_quotes = match price_type { PriceType::Bid => quotes_bid, PriceType::Ask => quotes_ask, - PriceType::Mid => { - let mut calculation_quotes = HashMap::new(); - for (symbol, bid_quote) in "es_bid { - if let Some(ask_quote) = quotes_ask.get(symbol) { - calculation_quotes.insert(*symbol, (bid_quote + ask_quote) / DECIMAL_TWO); - } - } - calculation_quotes + PriceType::Mid => quotes_bid + .iter() + .map(|(k, v)| { + let ask = quotes_ask.get(k).unwrap_or(v); + (*k, (v + ask) / Decimal::TWO) + }) + .collect(), + _ => { + panic!("Cannot calculate exchange rate for PriceType: {price_type:?}"); } - _ => panic!("Cannot calculate exchange rate for PriceType {price_type:?}"), }; + let mut codes = HashSet::new(); let mut exchange_rates: HashMap> = HashMap::new(); // Build quote table for (symbol, quote) in &calculation_quotes { + // Split symbol into currency pairs let pieces: Vec<&str> = symbol.as_str().split('/').collect(); let code_lhs = Ustr::from(pieces[0]); let code_rhs = Ustr::from(pieces[1]); - let lhs_rates = exchange_rates.entry(code_lhs).or_default(); - lhs_rates.insert(code_lhs, Decimal::new(1, 0)); - lhs_rates.insert(code_rhs, *quote); + codes.insert(code_lhs); + codes.insert(code_rhs); - let rhs_rates = exchange_rates.entry(code_rhs).or_default(); - rhs_rates.insert(code_lhs, Decimal::new(1, 0)); - rhs_rates.insert(code_rhs, *quote); - } + // Initialize currency dictionaries if they don't exist + exchange_rates.entry(code_lhs).or_default(); + exchange_rates.entry(code_rhs).or_default(); - // Clone exchange_rates to avoid borrowing conflicts - let exchange_rates_cloned = exchange_rates.clone(); + // Add base rates + if let Some(rates_lhs) = exchange_rates.get_mut(&code_lhs) { + rates_lhs.insert(code_lhs, Decimal::ONE); + rates_lhs.insert(code_rhs, *quote); + } + if let Some(rates_rhs) = exchange_rates.get_mut(&code_rhs) { + rates_rhs.insert(code_rhs, Decimal::ONE); + } + } // Generate possible currency pairs from all symbols - let mut codes: HashSet<&Ustr> = HashSet::new(); - for (code_lhs, code_rhs) in exchange_rates_cloned.keys().flat_map(|k| { - exchange_rates_cloned - .keys() - .map(move |code_rhs| (k, code_rhs)) - }) { - codes.insert(code_lhs); - codes.insert(code_rhs); - } - let _code_perms: Vec<(&Ustr, &Ustr)> = codes + let code_perms: Vec<(Ustr, Ustr)> = codes .iter() .cartesian_product(codes.iter()) .filter(|(a, b)| a != b) .map(|(a, b)| (*a, *b)) .collect(); - // TODO: Unable to solve borrowing issues for now (see top comment) // Calculate currency inverses - // for (perm_0, perm_1) in code_perms.iter() { - // let exchange_rates_perm_0 = exchange_rates.entry(**perm_0).or_insert_with(HashMap::new); - // let exchange_rates_perm_1 = exchange_rates.entry(**perm_1).or_insert_with(HashMap::new); - // if !exchange_rates_perm_0.contains_key(perm_1) { - // if let Some(rate) = exchange_rates_perm_0.get(perm_1) { - // exchange_rates_perm_1 - // .entry(**perm_0) - // .or_insert_with(|| Decimal::new(1, 0) / rate); - // } - // } - // if !exchange_rates_perm_1.contains_key(perm_0) { - // if let Some(rate) = exchange_rates_perm_1.get(perm_0) { - // exchange_rates_perm_0 - // .entry(**perm_1) - // .or_insert_with(|| Decimal::new(1, 0) / rate); - // } - // } - // } + for (perm0, perm1) in &code_perms { + // First direction: perm0 -> perm1 + let rate_0_to_1 = exchange_rates + .get(perm0) + .and_then(|rates| rates.get(perm1)) + .copied(); + if let Some(rate) = rate_0_to_1 { + if let Some(xrate_perm1) = exchange_rates.get_mut(perm1) { + if !xrate_perm1.contains_key(perm0) { + xrate_perm1.insert(*perm0, Decimal::ONE / rate); + } + } + } + + // Second direction: perm1 -> perm0 + let rate_1_to_0 = exchange_rates + .get(perm1) + .and_then(|rates| rates.get(perm0)) + .copied(); + + if let Some(rate) = rate_1_to_0 { + if let Some(xrate_perm0) = exchange_rates.get_mut(perm0) { + if !xrate_perm0.contains_key(perm1) { + xrate_perm0.insert(*perm1, Decimal::ONE / rate); + } + } + } + } + + // Check if we already have the rate if let Some(quotes) = exchange_rates.get(&from_currency.code) { - if let Some(xrate) = quotes.get(&to_currency.code) { - return Ok(*xrate); + if let Some(&rate) = quotes.get(&to_currency.code) { + return rate; + } + } + + // Calculate remaining exchange rates through common currencies + for (perm0, perm1) in &code_perms { + // Skip if rate already exists + if exchange_rates + .get(perm1) + .map_or(false, |rates| rates.contains_key(perm0)) + { + continue; + } + + // Search for common currency + for code in &codes { + // First check: rates through common currency + let rates_through_common = { + let rates_perm0 = exchange_rates.get(perm0); + let rates_perm1 = exchange_rates.get(perm1); + + match (rates_perm0, rates_perm1) { + (Some(rates0), Some(rates1)) => { + if let (Some(&rate1), Some(&rate2)) = (rates0.get(code), rates1.get(code)) { + Some((rate1, rate2)) + } else { + None + } + } + _ => None, + } + }; + + // Second check: rates from code's perspective + let rates_from_code = if rates_through_common.is_none() { + if let Some(rates_code) = exchange_rates.get(code) { + if let (Some(&rate1), Some(&rate2)) = + (rates_code.get(perm0), rates_code.get(perm1)) + { + Some((rate1, rate2)) + } else { + None + } + } else { + None + } + } else { + None + }; + + // Apply the found rates if any + if let Some((common_rate1, common_rate2)) = rates_through_common.or(rates_from_code) { + // Insert forward rate + if let Some(rates_perm1) = exchange_rates.get_mut(perm1) { + rates_perm1.insert(*perm0, common_rate2 / common_rate1); + } + + // Insert inverse rate + if let Some(rates_perm0) = exchange_rates.get_mut(perm0) { + if !rates_perm0.contains_key(perm1) { + rates_perm0.insert(*perm1, common_rate1 / common_rate2); + } + } + } } } - // TODO: Improve efficiency - let empty: HashMap = HashMap::new(); - let quotes = exchange_rates.get(&from_currency.code).unwrap_or(&empty); + let xrate = exchange_rates + .get(&from_currency.code) + .and_then(|quotes| quotes.get(&to_currency.code)) + .copied() + .unwrap_or(Decimal::ZERO); + + xrate +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use rust_decimal::prelude::FromPrimitive; + use rust_decimal_macros::dec; + + use super::*; + + // Helper function to create test quotes + fn setup_test_quotes() -> (HashMap, HashMap) { + let mut quotes_bid = HashMap::new(); + let mut quotes_ask = HashMap::new(); + + // Direct pairs + quotes_bid.insert(Symbol::from_str_unchecked("EUR/USD"), dec!(1.1000)); + quotes_ask.insert(Symbol::from_str_unchecked("EUR/USD"), dec!(1.1002)); + + quotes_bid.insert(Symbol::from_str_unchecked("GBP/USD"), dec!(1.3000)); + quotes_ask.insert(Symbol::from_str_unchecked("GBP/USD"), dec!(1.3002)); + + quotes_bid.insert(Symbol::from_str_unchecked("USD/JPY"), dec!(110.00)); + quotes_ask.insert(Symbol::from_str_unchecked("USD/JPY"), dec!(110.02)); + + quotes_bid.insert(Symbol::from_str_unchecked("AUD/USD"), dec!(0.7500)); + quotes_ask.insert(Symbol::from_str_unchecked("AUD/USD"), dec!(0.7502)); + + (quotes_bid, quotes_ask) + } - Ok(quotes.get(&to_currency.code).copied().unwrap_or(dec!(0.0))) + #[test] + /// Test same currency conversion + fn test_same_currency() { + let (quotes_bid, quotes_ask) = setup_test_quotes(); + let rate = get_exchange_rate( + Currency::from_str("USD").unwrap(), + Currency::from_str("USD").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + assert_eq!(rate, Decimal::ONE); + } + + #[test] + /// Test direct pair conversion + fn test_direct_pair() { + let (quotes_bid, quotes_ask) = setup_test_quotes(); + + // Test bid price + let rate_bid = get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("USD").unwrap(), + PriceType::Bid, + quotes_bid.clone(), + quotes_ask.clone(), + ); + assert_eq!(rate_bid, dec!(1.1000)); + + // Test ask price + let rate_ask = get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("USD").unwrap(), + PriceType::Ask, + quotes_bid.clone(), + quotes_ask.clone(), + ); + assert_eq!(rate_ask, dec!(1.1002)); + + // Test mid price + let rate_mid = get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("USD").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + assert_eq!(rate_mid, dec!(1.1001)); + } + + #[test] + /// Test inverse pair calculation + fn test_inverse_pair() { + let (quotes_bid, quotes_ask) = setup_test_quotes(); + + let rate = get_exchange_rate( + Currency::from_str("USD").unwrap(), + Currency::from_str("EUR").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + + // USD/EUR should be approximately 1/1.1001 + let expected = Decimal::ONE / dec!(1.1001); + assert!((rate - expected).abs() < dec!(0.0001)); + } + + #[test] + /// Test cross pair calculation through USD + fn test_cross_pair_through_usd() { + let (quotes_bid, quotes_ask) = setup_test_quotes(); + + let rate = get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("JPY").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + + // EUR/JPY should be approximately EUR/USD * USD/JPY + let expected = dec!(1.1001) * dec!(110.01); + assert!((rate - expected).abs() < dec!(0.01)); + } + + #[test] + /// Test cross pair calculation through multiple paths + fn test_multiple_path_cross_pair() { + let (quotes_bid, quotes_ask) = setup_test_quotes(); + + let rate = get_exchange_rate( + Currency::from_str("GBP").unwrap(), + Currency::from_str("AUD").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + + // GBP/AUD should be calculated through USD + // GBP/USD * (1/AUD/USD) + let expected = dec!(1.3001) / dec!(0.7501); + assert!((rate - expected).abs() < dec!(0.01)); + } + + #[test] + /// Test handling of missing pairs + fn test_missing_pairs() { + let mut quotes_bid = HashMap::new(); + let mut quotes_ask = HashMap::new(); + + // Only adding one pair + quotes_bid.insert(Symbol::from_str_unchecked("EUR/USD"), dec!(1.1000)); + quotes_ask.insert(Symbol::from_str_unchecked("EUR/USD"), dec!(1.1002)); + + let rate = get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("JPY").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + + assert_eq!(rate, Decimal::ZERO); // Should return 0 for impossible conversions + } + + #[test] + #[should_panic] + /// Test empty quotes handling + fn test_empty_quotes() { + let quotes_bid = HashMap::new(); + let quotes_ask = HashMap::new(); + + get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("USD").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + } + + #[test] + #[should_panic] + /// Test unequal quotes length handling + fn test_unequal_quotes_length() { + let mut quotes_bid = HashMap::new(); + let mut quotes_ask = HashMap::new(); + + quotes_bid.insert(Symbol::from_str_unchecked("EUR/USD"), dec!(1.1000)); + quotes_bid.insert(Symbol::from_str_unchecked("GBP/USD"), dec!(1.3000)); + quotes_ask.insert(Symbol::from_str_unchecked("EUR/USD"), dec!(1.1002)); + + get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("USD").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + } + + #[test] + #[should_panic] + /// Test invalid price type handling + fn test_invalid_price_type() { + let (quotes_bid, quotes_ask) = setup_test_quotes(); + + get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("USD").unwrap(), + PriceType::Last, // Invalid price type + quotes_bid, + quotes_ask, + ); + } + + #[test] + /// Test extensive cross pairs + fn test_extensive_cross_pairs() { + let mut quotes_bid = HashMap::new(); + let mut quotes_ask = HashMap::new(); + + // Create a complex network of currency pairs + let pairs = vec![ + ("EUR/USD", (1.1000, 1.1002)), + ("GBP/USD", (1.3000, 1.3002)), + ("USD/JPY", (110.00, 110.02)), + ("EUR/GBP", (0.8461, 0.8463)), + ("AUD/USD", (0.7500, 0.7502)), + ("NZD/USD", (0.7000, 0.7002)), + ("USD/CAD", (1.2500, 1.2502)), + ]; + + for (pair, (bid, ask)) in pairs { + quotes_bid.insert( + Symbol::from_str_unchecked(pair), + Decimal::from_f64(bid).unwrap(), + ); + quotes_ask.insert( + Symbol::from_str_unchecked(pair), + Decimal::from_f64(ask).unwrap(), + ); + } + + // Test various cross pairs + let test_pairs = vec![ + ("EUR", "JPY", 121.022), // EUR/USD * USD/JPY + ("GBP", "JPY", 143.024), // GBP/USD * USD/JPY + ("AUD", "JPY", 82.51), // AUD/USD * USD/JPY + ("EUR", "CAD", 1.375), // EUR/USD * USD/CAD + ("NZD", "CAD", 0.875), // NZD/USD * USD/CAD + ("AUD", "NZD", 1.071), // AUD/USD / NZD/USD + ]; + + for (from, to, expected) in test_pairs { + let rate = get_exchange_rate( + Currency::from_str(from).unwrap(), + Currency::from_str(to).unwrap(), + PriceType::Mid, + quotes_bid.clone(), + quotes_ask.clone(), + ); + + let expected_dec = Decimal::from_f64(expected).unwrap(); + assert!( + (rate - expected_dec).abs() < dec!(0.01), + "Failed for pair {from}/{to}: got {rate}, expected {expected_dec}" + ); + } + } + + #[test] + /// Test rate consistency + fn test_rate_consistency() { + let (quotes_bid, quotes_ask) = setup_test_quotes(); + + let rate_eur_usd = get_exchange_rate( + Currency::from_str("EUR").unwrap(), + Currency::from_str("USD").unwrap(), + PriceType::Mid, + quotes_bid.clone(), + quotes_ask.clone(), + ); + + let rate_usd_eur = get_exchange_rate( + Currency::from_str("USD").unwrap(), + Currency::from_str("EUR").unwrap(), + PriceType::Mid, + quotes_bid, + quotes_ask, + ); + + // Check if one rate is the inverse of the other + assert!((rate_eur_usd * rate_usd_eur - Decimal::ONE).abs() < dec!(0.0001)); + } } diff --git a/nautilus_core/model/src/accounts/any.rs b/nautilus_core/model/src/accounts/any.rs index e839d897717..54c9166ebd0 100644 --- a/nautilus_core/model/src/accounts/any.rs +++ b/nautilus_core/model/src/accounts/any.rs @@ -13,15 +13,19 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- +use std::collections::HashMap; + use serde::{Deserialize, Serialize}; use crate::{ accounts::{base::Account, cash::CashAccount, margin::MarginAccount}, enums::AccountType, - events::account::state::AccountState, + events::{account::state::AccountState, order::OrderFilled}, identifiers::AccountId, + instruments::any::InstrumentAny, + position::Position, + types::{balance::AccountBalance, currency::Currency, money::Money}, }; - #[derive(Debug, Clone, Serialize, Deserialize)] pub enum AccountAny { Margin(MarginAccount), @@ -58,6 +62,27 @@ impl AccountAny { } } + pub fn balances(&self) -> HashMap { + match self { + AccountAny::Margin(margin) => margin.balances(), + AccountAny::Cash(cash) => cash.balances(), + } + } + + pub fn balances_locked(&self) -> HashMap { + match self { + AccountAny::Margin(margin) => margin.balances_locked(), + AccountAny::Cash(cash) => cash.balances_locked(), + } + } + + pub fn base_currency(&self) -> Option { + match self { + AccountAny::Margin(margin) => margin.base_currency(), + AccountAny::Cash(cash) => cash.base_currency(), + } + } + pub fn from_events(events: Vec) -> anyhow::Result { if events.is_empty() { anyhow::bail!("No order events provided to create `AccountAny`"); @@ -70,6 +95,18 @@ impl AccountAny { } Ok(account) } + + pub fn calculate_pnls( + &self, + instrument: InstrumentAny, // TODO: Make this a reference + fill: OrderFilled, // TODO: Make this a reference + position: Option, + ) -> anyhow::Result> { + match self { + AccountAny::Margin(margin) => margin.calculate_pnls(instrument, fill, position), + AccountAny::Cash(cash) => cash.calculate_pnls(instrument, fill, position), + } + } } impl From for AccountAny { diff --git a/nautilus_core/model/src/accounts/base.rs b/nautilus_core/model/src/accounts/base.rs index fbfe3bcdbbb..bb827921210 100644 --- a/nautilus_core/model/src/accounts/base.rs +++ b/nautilus_core/model/src/accounts/base.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; -use rust_decimal::prelude::ToPrimitive; +use rust_decimal::{prelude::ToPrimitive, Decimal}; use serde::{Deserialize, Serialize}; use crate::{ @@ -135,6 +135,18 @@ impl BaseAccount { } } + pub fn update_commissions(&mut self, commission: Money) { + if commission.as_decimal() == Decimal::ZERO { + return; + } + + let currency = commission.currency; + let total_commissions = self.commissions.get(¤cy).unwrap_or(&0.0); + + self.commissions + .insert(currency, total_commissions + commission.as_f64()); + } + pub fn base_apply(&mut self, event: AccountState) { self.update_balances(event.balances.clone()); self.events.push(event); diff --git a/nautilus_core/model/src/accounts/cash.rs b/nautilus_core/model/src/accounts/cash.rs index adee23b08b6..0b656d476ea 100644 --- a/nautilus_core/model/src/accounts/cash.rs +++ b/nautilus_core/model/src/accounts/cash.rs @@ -19,6 +19,7 @@ use std::{ ops::{Deref, DerefMut}, }; +use rust_decimal::{prelude::ToPrimitive, Decimal}; use serde::{Deserialize, Serialize}; use crate::{ @@ -66,6 +67,36 @@ impl CashAccount { pub const fn is_unleveraged(&self) -> bool { false } + + pub fn recalculate_balance(&mut self, currency: Currency) { + let current_balance = match self.balances.get(¤cy) { + Some(balance) => *balance, + None => { + return; + } + }; + + let total_locked = self + .balances + .values() + .filter(|balance| balance.currency == currency) + .fold(Decimal::ZERO, |acc, balance| { + acc + balance.locked.as_decimal() + }); + + let new_balance = AccountBalance::new( + current_balance.total, + Money::new(total_locked.to_f64().unwrap(), currency), + Money::new( + (current_balance.total.as_decimal() - total_locked) + .to_f64() + .unwrap(), + currency, + ), + ); + + self.balances.insert(currency, new_balance); + } } impl Account for CashAccount { diff --git a/nautilus_core/model/src/events/order/any.rs b/nautilus_core/model/src/events/order/any.rs index 59e02cd0785..e763578d02c 100644 --- a/nautilus_core/model/src/events/order/any.rs +++ b/nautilus_core/model/src/events/order/any.rs @@ -26,7 +26,7 @@ use crate::{ OrderPendingUpdate, OrderRejected, OrderReleased, OrderSubmitted, OrderTriggered, OrderUpdated, }, - identifiers::{ClientOrderId, StrategyId, TraderId}, + identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId}, }; /// Wraps an `OrderEvent` allowing polymorphism. @@ -144,6 +144,52 @@ impl OrderEventAny { } } + #[must_use] + pub fn account_id(&self) -> Option { + match self { + Self::Initialized(event) => event.account_id(), + Self::Denied(event) => event.account_id(), + Self::Emulated(event) => event.account_id(), + Self::Released(event) => event.account_id(), + Self::Submitted(event) => event.account_id(), + Self::Accepted(event) => event.account_id(), + Self::Rejected(event) => event.account_id(), + Self::Canceled(event) => event.account_id(), + Self::Expired(event) => event.account_id(), + Self::Triggered(event) => event.account_id(), + Self::PendingUpdate(event) => event.account_id(), + Self::PendingCancel(event) => event.account_id(), + Self::ModifyRejected(event) => event.account_id(), + Self::CancelRejected(event) => event.account_id(), + Self::Updated(event) => event.account_id(), + Self::PartiallyFilled(event) => event.account_id(), + Self::Filled(event) => event.account_id(), + } + } + + #[must_use] + pub fn instrument_id(&self) -> InstrumentId { + match self { + Self::Initialized(event) => event.instrument_id(), + Self::Denied(event) => event.instrument_id(), + Self::Emulated(event) => event.instrument_id(), + Self::Released(event) => event.instrument_id(), + Self::Submitted(event) => event.instrument_id(), + Self::Accepted(event) => event.instrument_id(), + Self::Rejected(event) => event.instrument_id(), + Self::Canceled(event) => event.instrument_id(), + Self::Expired(event) => event.instrument_id(), + Self::Triggered(event) => event.instrument_id(), + Self::PendingUpdate(event) => event.instrument_id(), + Self::PendingCancel(event) => event.instrument_id(), + Self::ModifyRejected(event) => event.instrument_id(), + Self::CancelRejected(event) => event.instrument_id(), + Self::Updated(event) => event.instrument_id(), + Self::PartiallyFilled(event) => event.instrument_id(), + Self::Filled(event) => event.instrument_id(), + } + } + #[must_use] pub fn strategy_id(&self) -> StrategyId { match self { diff --git a/nautilus_core/model/src/events/position/mod.rs b/nautilus_core/model/src/events/position/mod.rs index 5338a1d1629..a81dd4e0b46 100644 --- a/nautilus_core/model/src/events/position/mod.rs +++ b/nautilus_core/model/src/events/position/mod.rs @@ -13,10 +13,10 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use crate::events::position::{ - changed::PositionChanged, closed::PositionClosed, opened::PositionOpened, +use crate::{ + events::position::{changed::PositionChanged, closed::PositionClosed, opened::PositionOpened}, + identifiers::{AccountId, InstrumentId}, }; - pub mod changed; pub mod closed; pub mod opened; @@ -27,3 +27,21 @@ pub enum PositionEvent { PositionChanged(PositionChanged), PositionClosed(PositionClosed), } + +impl PositionEvent { + pub fn instrument_id(&self) -> InstrumentId { + match self { + PositionEvent::PositionOpened(position) => position.instrument_id, + PositionEvent::PositionChanged(position) => position.instrument_id, + PositionEvent::PositionClosed(position) => position.instrument_id, + } + } + + pub fn account_id(&self) -> AccountId { + match self { + PositionEvent::PositionOpened(position) => position.account_id, + PositionEvent::PositionChanged(position) => position.account_id, + PositionEvent::PositionClosed(position) => position.account_id, + } + } +} diff --git a/nautilus_core/portfolio/src/manager.rs b/nautilus_core/portfolio/src/manager.rs index c1edae0f75b..4f43db88a7c 100644 --- a/nautilus_core/portfolio/src/manager.rs +++ b/nautilus_core/portfolio/src/manager.rs @@ -15,31 +15,30 @@ //! Provides account management functionality. -// Under development -#![allow(dead_code)] -#![allow(unused_variables)] - use std::{cell::RefCell, rc::Rc}; use nautilus_common::{cache::Cache, clock::Clock}; -use nautilus_core::nanos::UnixNanos; +use nautilus_core::{ffi::uuid::uuid4_new, nanos::UnixNanos}; use nautilus_model::{ - accounts::{any::AccountAny, cash::CashAccount, margin::MarginAccount}, - enums::OrderSideSpecified, + accounts::{any::AccountAny, base::Account, cash::CashAccount, margin::MarginAccount}, + enums::{AccountType, OrderSide, OrderSideSpecified, PriceType}, events::{account::state::AccountState, order::OrderFilled}, instruments::any::InstrumentAny, orders::any::OrderAny, position::Position, - types::money::Money, + types::{balance::AccountBalance, money::Money}, }; -use rust_decimal::Decimal; - +use rust_decimal::{prelude::ToPrimitive, Decimal}; pub struct AccountsManager { clock: Rc>, cache: Rc>, } impl AccountsManager { + pub fn new(clock: Rc>, cache: Rc>) -> Self { + Self { clock, cache } + } + #[must_use] pub fn update_balances( &self, @@ -47,65 +46,645 @@ impl AccountsManager { instrument: InstrumentAny, fill: OrderFilled, ) -> AccountState { - todo!() + let cache = self.cache.borrow(); + let position_id = if let Some(position_id) = fill.position_id { + position_id + } else { + let positions_open = cache.positions_open(None, Some(&fill.instrument_id), None, None); + positions_open + .first() + .unwrap_or_else(|| panic!("List of Positions is empty")) + .id + }; + + let position = cache.position(&position_id); + + let pnls = account.calculate_pnls(instrument, fill, position.cloned()); + + // Calculate final PnL including commissions + match account.base_currency() { + Some(base_currency) => { + let pnl = pnls.map_or_else( + |_| Money::new(0.0, base_currency), + |pnl_list| { + pnl_list + .first() + .copied() + .unwrap_or_else(|| Money::new(0.0, base_currency)) + }, + ); + + self.update_balance_single_currency(account.clone(), &fill, pnl); + } + None => { + if let Ok(mut pnl_list) = pnls { + self.update_balance_multi_currency(account.clone(), fill, &mut pnl_list); + } + } + } + + // Generate and return account state + self.generate_account_state(account, fill.ts_event) } #[must_use] pub fn update_orders( &self, - account: AccountAny, + account: &mut AccountAny, instrument: InstrumentAny, - orders_open: &[OrderAny], + orders_open: Vec<&OrderAny>, ts_event: UnixNanos, - ) -> AccountState { - todo!() + ) -> Option { + match account { + AccountAny::Cash(cash_account) => { + self.update_balance_locked(cash_account, instrument, orders_open, ts_event) + } + AccountAny::Margin(margin_account) => { + self.update_margin_init(margin_account, instrument, orders_open, ts_event) + } + } } + // TODO: too many clones inside this #[must_use] pub fn update_positions( &self, - account: MarginAccount, + account: &mut MarginAccount, instrument: InstrumentAny, - positions: &[Position], + positions: Vec<&Position>, ts_event: UnixNanos, - ) -> AccountState { - todo!() + ) -> Option { + // Initialize variables + let mut total_margin_maint = Decimal::ZERO; + let mut base_xrate = Decimal::ZERO; + let mut currency = instrument.settlement_currency(); + + // Process each position + for position in positions { + // Verify position is for correct instrument + assert_eq!( + position.instrument_id, + instrument.id(), + "Position not for instrument {}", + instrument.id() + ); + + // Skip closed positions + if !position.is_open() { + continue; + } + + let margin_maint = match instrument { + InstrumentAny::Betting(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::BinaryOption(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::CryptoFuture(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::CryptoPerpetual(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::CurrencyPair(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::Equity(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::FuturesContract(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::FuturesSpread(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::OptionsContract(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + InstrumentAny::OptionsSpread(i) => account.calculate_maintenance_margin( + i, + position.quantity, + instrument.make_price(position.avg_px_open), + None, + ), + }; + + let mut margin_maint = margin_maint.as_decimal(); + + // Handle base currency conversion if needed + if let Some(base_currency) = account.base_currency { + if base_xrate.is_zero() { + // Cache base currency and calculate exchange rate + currency = base_currency; + base_xrate = self.calculate_xrate_to_base( + AccountAny::Margin(account.clone()), + instrument.clone(), + position.entry.as_specified(), + ); + + if base_xrate == Decimal::ZERO { + log::debug!("Cannot calculate maintenance (position) margin: insufficient data for {}/{}", instrument.settlement_currency(), base_currency); + return None; + } + } + + // Apply base exchange rate + margin_maint = (margin_maint * base_xrate).round_dp(currency.precision.into()); + } + + // Increment total maintenance margin + total_margin_maint += margin_maint; + } + + // Create Money object for margin maintenance + let margin_maint_money = Money::new(total_margin_maint.to_f64()?, currency); + + // Update account margin maintenance + account.update_maintenance_margin(instrument.id(), margin_maint_money); + + // Log the update + log::info!( + "{} margin_maint={}", + instrument.id(), + margin_maint_money.to_string() + ); + + // Generate and return account state + Some(self.generate_account_state(AccountAny::Margin(account.clone()), ts_event)) } fn update_balance_locked( &self, - account: CashAccount, + account: &mut CashAccount, instrument: InstrumentAny, - fill: OrderFilled, - ) -> AccountState { - todo!() + orders_open: Vec<&OrderAny>, + ts_event: UnixNanos, + ) -> Option { + if orders_open.is_empty() { + let balance = account.balances.remove(&instrument.quote_currency()); + if let Some(balance) = balance { + account.recalculate_balance(balance.currency); + } + return Some(self.generate_account_state(AccountAny::Cash(account.clone()), ts_event)); + } + + // Initialize variables + let mut total_locked = Decimal::ZERO; + let mut base_xrate = Decimal::ZERO; + + let mut currency = instrument.settlement_currency(); + + // Process each open order + for order in orders_open { + // Verify order is for correct instrument + assert_eq!( + order.instrument_id(), + instrument.id(), + "Order not for instrument {}", + instrument.id() + ); + assert!(order.is_open(), "Order is not open"); + + // Skip orders without price or trigger price + if order.price().is_none() && order.trigger_price().is_none() { + continue; + } + + // Calculate locked balance for this order + let price = if order.price().is_some() { + order.price() + } else { + order.trigger_price() + }; + + let mut locked = account + .calculate_balance_locked( + instrument.clone(), + order.order_side(), + order.quantity(), + price?, + None, + ) + .unwrap() + .as_decimal(); + + // Handle base currency conversion if needed + if let Some(base_curr) = account.base_currency() { + if base_xrate.is_zero() { + // Cache base currency and calculate exchange rate + currency = base_curr; + base_xrate = self.calculate_xrate_to_base( + AccountAny::Cash(account.clone()), + instrument.clone(), + order.order_side_specified(), + ); + } + + // Apply base exchange rate and round to currency precision + locked = (locked * base_xrate).round_dp(u32::from(currency.precision)); + } + + // Add to total locked amount + total_locked += locked; + } + + // Create Money object for locked balance + let locked_money = Money::new(total_locked.to_f64()?, currency); + + // Update account locked balance + if let Some(balance) = account.balances.get_mut(&instrument.quote_currency()) { + balance.locked = locked_money; + let currency = balance.currency; + account.recalculate_balance(currency); + } + + log::info!( + "{} balance_locked={}", + instrument.id(), + locked_money.to_string() + ); + + // Generate and return account state + Some(self.generate_account_state(AccountAny::Cash(account.clone()), ts_event)) } fn update_margin_init( &self, - account: MarginAccount, + account: &mut MarginAccount, instrument: InstrumentAny, - orders_open: &[OrderAny], + orders_open: Vec<&OrderAny>, ts_event: UnixNanos, - ) -> AccountState { - todo!() + ) -> Option { + let mut total_margin_init = Decimal::ZERO; + let mut base_xrate = Decimal::ZERO; + let mut currency = instrument.settlement_currency(); + + for order in orders_open { + assert_eq!( + order.instrument_id(), + instrument.id(), + "Order not for instrument {}", + instrument.id() + ); + + if !order.is_open() || (order.price().is_none() && order.trigger_price().is_none()) { + continue; + } + + let price = if order.price().is_some() { + order.price() + } else { + order.trigger_price() + }; + + let margin_init = match instrument { + InstrumentAny::Betting(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::BinaryOption(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::CryptoFuture(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::CryptoPerpetual(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::CurrencyPair(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::Equity(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::FuturesContract(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::FuturesSpread(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::OptionsContract(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + InstrumentAny::OptionsSpread(i) => { + account.calculate_initial_margin(i, order.quantity(), price?, None) + } + }; + + let mut margin_init = margin_init.as_decimal(); + + if let Some(base_currency) = account.base_currency { + if base_xrate.is_zero() { + currency = base_currency; + base_xrate = self.calculate_xrate_to_base( + AccountAny::Margin(account.clone()), + instrument.clone(), + order.order_side_specified(), + ); + + if base_xrate == Decimal::ZERO { + log::debug!( + "Cannot calculate initial margin: insufficient data for {}/{}", + instrument.settlement_currency(), + base_currency + ); + continue; + } + } + + margin_init = (margin_init * base_xrate).round_dp(currency.precision.into()); + } + + total_margin_init += margin_init; + } + + let money = Money::new(total_margin_init.to_f64().unwrap_or(0.0), currency); + let margin_init_money = { + account.update_initial_margin(instrument.id(), money); + money + }; + + log::info!( + "{} margin_init={}", + instrument.id(), + margin_init_money.to_string() + ); + + Some(self.generate_account_state(AccountAny::Margin(account.clone()), ts_event)) } - fn update_balance_single_currency(&self, account: AccountAny, fill: OrderFilled, pnl: Money) { - todo!() + fn update_balance_single_currency( + &self, + account: AccountAny, + fill: &OrderFilled, + mut pnl: Money, + ) { + let base_currency = if let Some(currency) = account.base_currency() { + currency + } else { + log::error!("Account has no base currency set"); + return; + }; + + let mut balances = Vec::new(); + let mut commission = fill.commission; + + if let Some(ref mut comm) = commission { + if comm.currency != base_currency { + let xrate = self.cache.borrow().get_xrate( + fill.instrument_id.venue, + comm.currency, + base_currency, + if fill.order_side == OrderSide::Sell { + PriceType::Bid + } else { + PriceType::Ask + }, + ); + + if xrate.is_zero() { + log::error!( + "Cannot calculate account state: insufficient data for {}/{}", + comm.currency, + base_currency + ); + return; + } + + *comm = Money::new((comm.as_decimal() * xrate).to_f64().unwrap(), base_currency); + } + } + + if pnl.currency != base_currency { + let xrate = self.cache.borrow().get_xrate( + fill.instrument_id.venue, + pnl.currency, + base_currency, + if fill.order_side == OrderSide::Sell { + PriceType::Bid + } else { + PriceType::Ask + }, + ); + + if xrate.is_zero() { + log::error!( + "Cannot calculate account state: insufficient data for {}/{}", + pnl.currency, + base_currency + ); + return; + } + + pnl = Money::new((pnl.as_decimal() * xrate).to_f64().unwrap(), base_currency); + } + + if let Some(comm) = commission { + pnl -= comm; + } + + if pnl.is_zero() { + return; + } + + let existing_balances = account.balances(); + let balance = if let Some(b) = existing_balances.get(&pnl.currency) { + b + } else { + log::error!( + "Cannot complete transaction: no balance for {}", + pnl.currency + ); + return; + }; + + let new_balance = + AccountBalance::new(balance.total + pnl, balance.locked, balance.free + pnl); + balances.push(new_balance); + + match account { + AccountAny::Cash(mut cash) => { + cash.update_balances(balances); + if let Some(comm) = commission { + cash.update_commissions(comm); + } + } + AccountAny::Margin(mut margin) => { + margin.update_balances(balances); + if let Some(comm) = commission { + margin.update_commissions(comm); + } + } + } } fn update_balance_multi_currency( &self, account: AccountAny, fill: OrderFilled, - pnls: &[Money], + pnls: &mut [Money], ) { - todo!() + let mut new_balances = Vec::new(); + let commission = fill.commission; + let mut apply_commission = commission.map_or(false, |c| !c.is_zero()); + + for pnl in pnls.iter_mut() { + if apply_commission && pnl.currency == commission.unwrap().currency { + *pnl -= commission.unwrap(); + apply_commission = false; + } + + if pnl.is_zero() { + continue; // No Adjustment + } + + let currency = pnl.currency; + let balances = account.balances(); + + let new_balance = if let Some(balance) = balances.get(¤cy) { + let new_total = balance.total.as_f64() + pnl.as_f64(); + let new_free = balance.free.as_f64() + pnl.as_f64(); + let total = Money::new(new_total, currency); + let free = Money::new(new_free, currency); + + if new_total < 0.0 { + log::error!( + "AccountBalanceNegative: balance = {}, currency = {}", + total.as_decimal(), + currency + ); + return; + } + if new_free < 0.0 { + log::error!( + "AccountMarginExceeded: balance = {}, margin = {}, currency = {}", + total.as_decimal(), + balance.locked.as_decimal(), + currency + ); + return; + } + + AccountBalance::new(total, balance.locked, free) + } else { + if pnl.as_decimal() < Decimal::ZERO { + log::error!( + "Cannot complete transaction: no {} to deduct a {} realized PnL from", + currency, + pnl + ); + return; + } + AccountBalance::new(*pnl, Money::new(0.0, currency), *pnl) + }; + + new_balances.push(new_balance); + } + + if apply_commission { + // if apply_commission is true, then commission is not none. + let commission = commission.unwrap(); + let currency = commission.currency; + let balances = account.balances(); + + let commission_balance = if let Some(balance) = balances.get(¤cy) { + let new_total = balance.total.as_decimal() - commission.as_decimal(); + let new_free = balance.free.as_decimal() - commission.as_decimal(); + AccountBalance::new( + Money::new(new_total.to_f64().unwrap(), currency), + balance.locked, + Money::new(new_free.to_f64().unwrap(), currency), + ) + } else { + if commission.as_decimal() > Decimal::ZERO { + log::error!( + "Cannot complete transaction: no {} balance to deduct a {} commission from", + currency, + commission + ); + return; + } + AccountBalance::new( + Money::new(0.0, currency), + Money::new(0.0, currency), + Money::new(0.0, currency), + ) + }; + new_balances.push(commission_balance); + } + + if new_balances.is_empty() { + return; + } + + match account { + AccountAny::Cash(mut cash) => { + cash.update_balances(new_balances); + if let Some(commission) = commission { + cash.update_commissions(commission); + } + } + AccountAny::Margin(mut margin) => { + margin.update_balances(new_balances); + if let Some(commission) = commission { + margin.update_commissions(commission); + } + } + } } fn generate_account_state(&self, account: AccountAny, ts_event: UnixNanos) -> AccountState { - todo!() + match account { + AccountAny::Cash(cash_account) => AccountState::new( + cash_account.id, + AccountType::Cash, + cash_account.balances.clone().into_values().collect(), + vec![], + false, + uuid4_new(), + ts_event, + self.clock.borrow().timestamp_ns(), + cash_account.base_currency(), + ), + AccountAny::Margin(margin_account) => AccountState::new( + margin_account.id, + AccountType::Cash, + vec![], + margin_account.margins.clone().into_values().collect(), + false, + uuid4_new(), + ts_event, + self.clock.borrow().timestamp_ns(), + margin_account.base_currency(), + ), + } } fn calculate_xrate_to_base( @@ -114,6 +693,17 @@ impl AccountsManager { instrument: InstrumentAny, side: OrderSideSpecified, ) -> Decimal { - todo!() + match account.base_currency() { + None => Decimal::ONE, + Some(base_curr) => self.cache.borrow().get_xrate( + instrument.id().venue, + instrument.settlement_currency(), + base_curr, + match side { + OrderSideSpecified::Sell => PriceType::Bid, + OrderSideSpecified::Buy => PriceType::Ask, + }, + ), + } } } diff --git a/nautilus_core/portfolio/src/portfolio.rs b/nautilus_core/portfolio/src/portfolio.rs index 835e37e7959..4f9e5f111dd 100644 --- a/nautilus_core/portfolio/src/portfolio.rs +++ b/nautilus_core/portfolio/src/portfolio.rs @@ -18,34 +18,52 @@ // Under development #![allow(dead_code)] #![allow(unused_variables)] +// improve error handling: TODO use std::{ cell::RefCell, collections::{HashMap, HashSet}, rc::Rc, + sync::Arc, }; -use nautilus_analysis::analyzer::PortfolioAnalyzer; +use nautilus_analysis::{ + analyzer::PortfolioAnalyzer, + statistics::{ + expectancy::Expectancy, long_ratio::LongRatio, loser_max::MaxLoser, loser_min::MinLoser, + profit_factor::ProfitFactor, returns_avg::ReturnsAverage, + returns_avg_loss::ReturnsAverageLoss, returns_avg_win::ReturnsAverageWin, + returns_volatility::ReturnsVolatility, risk_return_ratio::RiskReturnRatio, + sharpe_ratio::SharpeRatio, sortino_ratio::SortinoRatio, win_rate::WinRate, + winner_avg::AvgWinner, winner_max::MaxWinner, winner_min::MinWinner, + }, +}; use nautilus_common::{cache::Cache, clock::Clock, msgbus::MessageBus}; use nautilus_model::{ accounts::any::AccountAny, data::quote::QuoteTick, - enums::OrderSide, + enums::{OrderSide, OrderType, PositionSide, PriceType}, events::{account::state::AccountState, order::OrderEventAny, position::PositionEvent}, identifiers::{InstrumentId, Venue}, instruments::any::InstrumentAny, + orders::any::OrderAny, position::Position, - types::money::Money, + types::{currency::Currency, money::Money, price::Price}, +}; +use rust_decimal::{ + prelude::{FromPrimitive, ToPrimitive}, + Decimal, }; -use rust_decimal::Decimal; +use ustr::Ustr; + +use crate::manager::AccountsManager; pub struct Portfolio { clock: Rc>, cache: Rc>, msgbus: Rc>, - accounts: HashMap, + accounts: AccountsManager, analyzer: PortfolioAnalyzer, - // venue: Option, // Added for completeness but was meant to be a "temporary hack" unrealized_pnls: HashMap, realized_pnls: HashMap, net_positions: HashMap, @@ -54,9 +72,72 @@ pub struct Portfolio { } impl Portfolio { - // pub fn set_specific_venue(&mut self, venue: Venue) { // Lets try not to use this? - // todo!() - // } + pub fn new( + msgbus: Rc>, + cache: Rc>, + clock: Rc>, + ) -> Self { + let mut analyzer = PortfolioAnalyzer::new(); + + // Register default statistics + analyzer.register_statistic(Arc::new(MaxWinner {})); + analyzer.register_statistic(Arc::new(AvgWinner {})); + analyzer.register_statistic(Arc::new(MinWinner {})); + analyzer.register_statistic(Arc::new(MinLoser {})); + analyzer.register_statistic(Arc::new(MaxLoser {})); + analyzer.register_statistic(Arc::new(Expectancy {})); + analyzer.register_statistic(Arc::new(WinRate {})); + analyzer.register_statistic(Arc::new(ReturnsVolatility::new(None))); + analyzer.register_statistic(Arc::new(ReturnsAverage {})); + analyzer.register_statistic(Arc::new(ReturnsAverageLoss {})); + analyzer.register_statistic(Arc::new(ReturnsAverageWin {})); + analyzer.register_statistic(Arc::new(SharpeRatio::new(None))); + analyzer.register_statistic(Arc::new(SortinoRatio::new(None))); + analyzer.register_statistic(Arc::new(ProfitFactor {})); + analyzer.register_statistic(Arc::new(RiskReturnRatio {})); + analyzer.register_statistic(Arc::new(LongRatio::new(None))); + + { + let burrowed_msgbus = msgbus.borrow(); + + // todo + // Register endpoints + // burrowed_msgbus.register("Portfolio.update_account", update_account); + // msgbus + // .borrow() + // .register("Portfolio.update_account", Self::update_account); + + // Require subscriptions + // burrowed_msgbus.subscribe("data.quotes.*", handler, Some(10)); + // burrowed_msgbus.subscribe("events.order.*", handler, Some(10)); + // burrowed_msgbus.subscribe("events.position.*", handler, Some(10)); + // burrowed_msgbus.subscribe("events.account.*", handler, Some(10)); + } + Self { + clock: clock.clone(), + cache: cache.clone(), + msgbus, + accounts: AccountsManager::new(clock, cache), + analyzer, + unrealized_pnls: HashMap::new(), + realized_pnls: HashMap::new(), + net_positions: HashMap::new(), + pending_calcs: HashSet::new(), + initialized: false, + } + } + + pub fn reset(&mut self) { + log::debug!("RESETTING"); + + self.net_positions.clear(); + self.unrealized_pnls.clear(); + self.realized_pnls.clear(); + self.pending_calcs.clear(); + self.analyzer.reset(); + + log::debug!("READY"); + } // -- QUERIES --------------------------------------------------------------------------------- @@ -71,138 +152,966 @@ impl Portfolio { } #[must_use] - pub fn account(&self, venue: &Venue) -> Option<&AccountAny> { - self.accounts.get(venue) + pub fn balances_locked(&self, venue: &Venue) -> HashMap { + self.cache.borrow().account_for_venue(venue).map_or_else( + || { + log::error!( + "Cannot get balances locked: no account generated for {}", + venue + ); + HashMap::new() + }, + nautilus_model::accounts::any::AccountAny::balances_locked, + ) } #[must_use] - pub fn balances_locked(&self, venue: &Venue) -> HashMap { - todo!() + pub fn margins_init(&self, venue: &Venue) -> HashMap { + self.cache.borrow().account_for_venue(venue).map_or_else( + || { + log::error!( + "Cannot get initial (order) margins: no account registered for {}", + venue + ); + HashMap::new() + }, + |account| match account { + AccountAny::Margin(margin_account) => margin_account.initial_margins(), + AccountAny::Cash(_) => { + log::warn!("Initial margins not applicable for cash account"); + HashMap::new() + } + }, + ) } #[must_use] - pub fn margins_init(&self, venue: &Venue) -> HashMap { - todo!() + pub fn margins_maint(&self, venue: &Venue) -> HashMap { + self.cache.borrow().account_for_venue(venue).map_or_else( + || { + log::error!( + "Cannot get maintenance (position) margins: no account registered for {}", + venue + ); + HashMap::new() + }, + |account| match account { + AccountAny::Margin(margin_account) => margin_account.maintenance_margins(), + AccountAny::Cash(_) => { + log::warn!("Maintenance margins not applicable for cash account"); + HashMap::new() + } + }, + ) } #[must_use] - pub fn margins_maint(&self, venue: &Venue) -> HashMap { - todo!() - } + pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap { + let instrument_ids = { + let borrowed_cache = self.cache.borrow(); + let positions = borrowed_cache.positions(Some(venue), None, None, None); - #[must_use] - pub fn unrealized_pnls(&self, venue: &Venue) -> HashMap { - todo!() + if positions.is_empty() { + return HashMap::new(); // Nothing to calculate + } + + let instrument_ids: HashSet = + positions.iter().map(|p| p.instrument_id).collect(); + + instrument_ids + }; + + let mut unrealized_pnls: HashMap = HashMap::new(); + + for instrument_id in instrument_ids { + if let Some(&pnl) = self.unrealized_pnls.get(&instrument_id) { + // PnL already calculated + *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(); + continue; + } + + // Calculate PnL + match self.calculate_unrealized_pnl(&instrument_id) { + Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(), + None => continue, + } + } + + unrealized_pnls + .into_iter() + .map(|(currency, amount)| (currency, Money::new(amount, currency))) + .collect() } #[must_use] - pub fn realized_pnls(&self, venue: &Venue) -> HashMap { - todo!() + pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap { + let instrument_ids = { + let borrowed_cache = self.cache.borrow(); + let positions = borrowed_cache.positions(Some(venue), None, None, None); + + if positions.is_empty() { + return HashMap::new(); // Nothing to calculate + } + + let instrument_ids: HashSet = + positions.iter().map(|p| p.instrument_id).collect(); + + instrument_ids + }; + + let mut realized_pnls: HashMap = HashMap::new(); + + for instrument_id in instrument_ids { + if let Some(&pnl) = self.realized_pnls.get(&instrument_id) { + // PnL already calculated + *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(); + continue; + } + + // Calculate PnL + match self.calculate_realized_pnl(&instrument_id) { + Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(), + None => continue, + } + } + + realized_pnls + .into_iter() + .map(|(currency, amount)| (currency, Money::new(amount, currency))) + .collect() } #[must_use] - pub fn net_exposures(&self, venue: &Venue) -> HashMap { - todo!() + pub fn net_exposures(&self, venue: &Venue) -> Option> { + let borrowed_cache = self.cache.borrow(); + let account = if let Some(account) = borrowed_cache.account_for_venue(venue) { + account + } else { + log::error!( + "Cannot calculate net exposures: no account registered for {}", + venue + ); + return None; // Cannot calculate + }; + + let positions_open = borrowed_cache.positions_open(Some(venue), None, None, None); + if positions_open.is_empty() { + return Some(HashMap::new()); // Nothing to calculate + } + + let mut net_exposures: HashMap = HashMap::new(); + + for position in positions_open { + let instrument = + if let Some(instrument) = borrowed_cache.instrument(&position.instrument_id) { + instrument + } else { + log::error!( + "Cannot calculate net exposures: no instrument for {}", + position.instrument_id + ); + return None; // Cannot calculate + }; + + if position.side == PositionSide::Flat { + log::error!( + "Cannot calculate net exposures: position is flat for {}", + position.instrument_id + ); + continue; // Nothing to calculate + } + + let last = self.get_last_price(position)?; + let xrate = self.calculate_xrate_to_base(instrument, account, position.entry); + if xrate == 0.0 { + log::error!( + "Cannot calculate net exposures: insufficient data for {}/{:?}", + instrument.settlement_currency(), + account.base_currency() + ); + return None; // Cannot calculate + } + + let settlement_currency = account + .base_currency() + .unwrap_or_else(|| instrument.settlement_currency()); + + let net_exposure = instrument + .calculate_notional_value(position.quantity, last, None) + .as_f64() + * xrate; + + let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into())) + .round() + / 10f64.powi(settlement_currency.precision.into()); + + *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure; + } + + Some( + net_exposures + .into_iter() + .map(|(currency, amount)| (currency, Money::new(amount, currency))) + .collect(), + ) } #[must_use] - pub fn unrealized_pnl(&self, instrument_id: &InstrumentId) -> Option { - todo!() + pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option { + self.unrealized_pnls + .get(instrument_id) + .copied() + .or_else(|| { + let pnl = self.calculate_unrealized_pnl(instrument_id)?; + self.unrealized_pnls.insert(*instrument_id, pnl); + Some(pnl) + }) } #[must_use] - pub fn realized_pnl(&self, instrument_id: &InstrumentId) -> Option { - todo!() + pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option { + self.realized_pnls.get(instrument_id).copied().or_else(|| { + let pnl = self.calculate_realized_pnl(instrument_id)?; + self.realized_pnls.insert(*instrument_id, pnl); + Some(pnl) + }) } #[must_use] pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option { - todo!() + let borrowed_cache = self.cache.borrow(); + let account = if let Some(account) = borrowed_cache.account_for_venue(&instrument_id.venue) + { + account + } else { + log::error!( + "Cannot calculate net exposure: no account registered for {}", + instrument_id.venue + ); + return None; + }; + + let instrument = if let Some(instrument) = borrowed_cache.instrument(instrument_id) { + instrument + } else { + log::error!( + "Cannot calculate net exposure: no instrument for {}", + instrument_id + ); + return None; + }; + + let positions_open = borrowed_cache.positions_open( + None, // Faster query filtering + Some(instrument_id), + None, + None, + ); + + if positions_open.is_empty() { + return Some(Money::new(0.0, instrument.settlement_currency())); + } + + let mut net_exposure = 0.0; + + for position in positions_open { + let last = self.get_last_price(position)?; + let xrate = self.calculate_xrate_to_base(instrument, account, position.entry); + if xrate == 0.0 { + log::error!( + "Cannot calculate net exposure: insufficient data for {}/{:?}", + instrument.settlement_currency(), + account.base_currency() + ); + return None; + } + + let notional_value = instrument + .calculate_notional_value(position.quantity, last, None) + .as_f64(); + + net_exposure += notional_value * xrate; + } + + let settlement_currency = account + .base_currency() + .unwrap_or_else(|| instrument.settlement_currency()); + + Some(Money::new(net_exposure, settlement_currency)) } #[must_use] - pub fn net_position(&self, instrument_id: &InstrumentId) -> Option { - todo!() + pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal { + self.net_positions + .get(instrument_id) + .copied() + .unwrap_or(Decimal::ZERO) } #[must_use] pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool { - todo!() + self.net_positions + .get(instrument_id) + .copied() + .map_or_else(|| false, |net_position| net_position > Decimal::ZERO) } #[must_use] pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool { - todo!() + self.net_positions + .get(instrument_id) + .copied() + .map_or_else(|| false, |net_position| net_position < Decimal::ZERO) } #[must_use] pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool { - todo!() + self.net_positions + .get(instrument_id) + .copied() + .map_or_else(|| true, |net_position| net_position == Decimal::ZERO) } #[must_use] pub fn is_completely_flat(&self) -> bool { - todo!() + for net_position in self.net_positions.values() { + if *net_position != Decimal::ZERO { + return false; + } + } + + true } // -- COMMANDS -------------------------------------------------------------------------------- pub fn initialize_orders(&mut self) { - todo!() + let borrowed_cache = self.cache.borrow(); + let all_orders_open = borrowed_cache.orders_open(None, None, None, None); + let mut instruments = HashSet::new(); + + for order in all_orders_open.clone() { + instruments.insert(order.instrument_id()); + } + + let mut initialized = true; + + for instrument_id in instruments { + let instrument = if let Some(instrument) = borrowed_cache.instrument(&instrument_id) { + instrument + } else { + log::error!( + "Cannot update initial (order) margin: no instrument found for {}", + instrument_id + ); + initialized = false; + break; + }; + + let orders_open = borrowed_cache.orders_open(None, Some(&instrument_id), None, None); + + let mut borrowed_cache = self.cache.borrow_mut(); + let account = + if let Some(account) = borrowed_cache.mut_account_for_venue(&instrument_id.venue) { + account + } else { + log::error!( + "Cannot update initial (order) margin: no account registered for {}", + instrument_id.venue + ); + initialized = false; + break; + }; + + let result = self.accounts.update_orders( + account, + instrument.clone(), + orders_open, + self.clock.borrow().timestamp_ns(), + ); + + if result.is_none() { + initialized = false; + } + } + + let open_count = all_orders_open.len(); + log::info!( + "Initialized {} open order{}", + open_count, + if open_count == 1 { "" } else { "s" } + ); + self.initialized = initialized; } pub fn initialize_positions(&mut self) { - todo!() + self.unrealized_pnls.clear(); + self.realized_pnls.clear(); + let all_positions_open: Vec; + let mut instruments = HashSet::new(); + { + let borrowed_cache = self.cache.borrow(); + all_positions_open = borrowed_cache + .positions_open(None, None, None, None) + .into_iter() + .cloned() + .collect(); + for position in &all_positions_open { + instruments.insert(position.instrument_id); + } + } + + let mut initialized = true; + + for instrument_id in instruments { + let positions_open: Vec = { + let borrowed_cache = self.cache.borrow(); + borrowed_cache + .positions_open(None, Some(&instrument_id), None, None) + .into_iter() + .cloned() + .collect() + }; + + self.update_net_position(&instrument_id, positions_open); + + let calculated_unrealized_pnl = self.calculate_unrealized_pnl(&instrument_id).unwrap(); + let calculated_realized_pnl = self.calculate_realized_pnl(&instrument_id).unwrap(); + + self.unrealized_pnls + .insert(instrument_id, calculated_unrealized_pnl); + self.realized_pnls + .insert(instrument_id, calculated_realized_pnl); + + let mut borrowed_cache = self.cache.borrow_mut(); + let account = + if let Some(account) = borrowed_cache.mut_account_for_venue(&instrument_id.venue) { + account + } else { + log::error!( + "Cannot update maintenance (position) margin: no account registered for {}", + instrument_id.venue + ); + initialized = false; + break; + }; + + let account = match account { + AccountAny::Cash(_) => continue, + AccountAny::Margin(margin_account) => margin_account, + }; + + let borrowed_cache = self.cache.borrow_mut(); + let instrument = if let Some(instrument) = borrowed_cache.instrument(&instrument_id) { + instrument + } else { + log::error!( + "Cannot update maintenance (position) margin: no instrument found for {}", + instrument_id + ); + initialized = false; + break; + }; + + let result = self.accounts.update_positions( + account, + instrument.clone(), + self.cache + .borrow() + .positions_open(None, Some(&instrument_id), None, None), + self.clock.borrow().timestamp_ns(), + ); + + if result.is_none() { + initialized = false; + } + } + + let open_count = all_positions_open.len(); + self.initialized = initialized; + log::info!( + "Initialized {} open position{}", + open_count, + if open_count == 1 { "" } else { "s" } + ); } pub fn update_quote_tick(&mut self, quote: &QuoteTick) { - todo!() + self.unrealized_pnls.remove("e.instrument_id); + + if self.initialized || !self.pending_calcs.contains("e.instrument_id) { + return; + } + + let result_init: Option; + let mut result_maint = None; + + let account = { + let mut borrowed_cache = self.cache.borrow_mut(); + let account = if let Some(account) = + borrowed_cache.mut_account_for_venue("e.instrument_id.venue) + { + account + } else { + log::error!( + "Cannot update tick: no account registered for {}", + quote.instrument_id.venue + ); + return; + }; + + let borrowed_cache = self.cache.borrow(); + let instrument = + if let Some(instrument) = borrowed_cache.instrument("e.instrument_id) { + instrument.clone() + } else { + log::error!( + "Cannot update tick: no instrument found for {}", + quote.instrument_id + ); + return; + }; + + // Clone the orders and positions to own the data + let orders_open: Vec = borrowed_cache + .orders_open(None, Some("e.instrument_id), None, None) + .iter() + .map(|o| (*o).clone()) + .collect(); + + let positions_open: Vec = borrowed_cache + .positions_open(None, Some("e.instrument_id), None, None) + .iter() + .map(|p| (*p).clone()) + .collect(); + + result_init = self.accounts.update_orders( + account, + instrument.clone(), + orders_open.iter().collect(), + self.clock.borrow().timestamp_ns(), + ); + + if let AccountAny::Margin(margin_account) = account { + result_maint = self.accounts.update_positions( + margin_account, + instrument, + positions_open.iter().collect(), + self.clock.borrow().timestamp_ns(), + ); + } + + account.clone() + }; // All borrows are dropped here + + let result_unrealized_pnl: Option = + self.calculate_unrealized_pnl("e.instrument_id); + + if result_init.is_some() + && (matches!(account, AccountAny::Cash(_)) + || (result_maint.is_some() && result_unrealized_pnl.is_some())) + { + self.pending_calcs.remove("e.instrument_id); + if self.pending_calcs.is_empty() { + self.initialized = true; + } + } } pub fn update_account(&mut self, event: &AccountState) { - todo!() + let mut borrowed_cache = self.cache.borrow_mut(); + + let account = if let Some(existing) = borrowed_cache.account(&event.account_id) { + let mut account = existing.clone(); + account.apply(event.clone()); + borrowed_cache.update_account(account.clone()).unwrap(); + account + } else { + let account = AccountAny::from_events(vec![event.clone()]).unwrap(); + borrowed_cache.add_account(account.clone()).unwrap(); + account + }; + + log::info!("Updated {}", event); } pub fn update_order(&mut self, event: &OrderEventAny) { - todo!() + let mut borrowed_cache = self.cache.borrow_mut(); + let account_id = match event.account_id() { + Some(account_id) => account_id, + None => { + return; // No Account Assigned + } + }; + + let account = if let Some(account) = borrowed_cache.mut_account(&account_id) { + account + } else { + log::error!( + "Cannot update order: no account registered for {}", + account_id + ); + return; + }; + + match account { + AccountAny::Cash(cash_account) => { + if !cash_account.base.calculate_account_state { + return; + } + } + AccountAny::Margin(margin_account) => { + if !margin_account.base.calculate_account_state { + return; + } + } + } + + match event { + OrderEventAny::Accepted(_) + | OrderEventAny::Canceled(_) + | OrderEventAny::Rejected(_) + | OrderEventAny::Updated(_) + | OrderEventAny::Filled(_) => {} + _ => { + return; + } + } + + let borrowed_cache = self.cache.borrow(); + let order = if let Some(order) = borrowed_cache.order(&event.client_order_id()) { + order + } else { + log::error!( + "Cannot update order: {} not found in the cache", + event.client_order_id() + ); + return; // No Order Found + }; + + if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit + { + return; // No change to account state + } + + let instrument = + if let Some(instrument_id) = borrowed_cache.instrument(&event.instrument_id()) { + instrument_id + } else { + log::error!( + "Cannot update order: no instrument found for {}", + event.instrument_id() + ); + return; + }; + + if let OrderEventAny::Filled(order_filled) = event { + let _ = + self.accounts + .update_balances(account.clone(), instrument.clone(), *order_filled); + + // let unrealized_pnl = self.calculate_unrealized_pnl(&order_filled.instrument_id); : TODO + // self.unrealized_pnls + // .insert(event.instrument_id(), unrealized_pnl.unwrap()); + } + + let orders_open = + borrowed_cache.orders_open(None, Some(&event.instrument_id()), None, None); + + let account_state = self.accounts.update_orders( + account, + instrument.clone(), + orders_open, + self.clock.borrow().timestamp_ns(), + ); + + if let Some(account_state) = account_state { + self.msgbus.borrow().publish( + &Ustr::from(&format!("events.account.{}", account.id())), + &account_state, + ); + } else { + log::debug!("Added pending calculation for {}", instrument.id()); + self.pending_calcs.insert(instrument.id()); + } + + log::debug!("Updated {}", event); } pub fn update_position(&mut self, event: &PositionEvent) { - todo!() + let instrument_id = event.instrument_id(); + + let positions_open: Vec = { + let borrowed_cache = self.cache.borrow(); + + borrowed_cache + .positions_open(None, Some(&instrument_id), None, None) + .iter() + .map(|o| (*o).clone()) + .collect() + }; + + self.update_net_position(&instrument_id, positions_open.clone()); + + let calculated_unrealized_pnl = self.calculate_unrealized_pnl(&instrument_id).unwrap(); + let calculated_realized_pnl = self.calculate_realized_pnl(&instrument_id).unwrap(); + + self.unrealized_pnls + .insert(event.instrument_id(), calculated_unrealized_pnl); + self.realized_pnls + .insert(event.instrument_id(), calculated_realized_pnl); + + let mut borrowed_cache = self.cache.borrow_mut(); + let account = borrowed_cache.mut_account(&event.account_id()); + + if let Some(AccountAny::Margin(margin_account)) = account { + if !margin_account.calculate_account_state { + return; // Nothing to calculate + }; + + let borrowed_cache = self.cache.borrow(); + let instrument = if let Some(instrument) = borrowed_cache.instrument(&instrument_id) { + instrument + } else { + log::error!( + "Cannot update position: no instrument found for {}", + instrument_id + ); + return; + }; + + let _ = self.accounts.update_positions( + margin_account, + instrument.clone(), + positions_open.iter().collect(), + self.clock.borrow().timestamp_ns(), + ); + } else if account.is_none() { + log::error!( + "Cannot update position: no account registered for {}", + event.account_id() + ); + } } // -- INTERNAL -------------------------------------------------------------------------------- - // fn net_position(&self, instrument_id: &InstrumentId) -> Decimal { // Same as above? - // todo!() - // } + fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec) { + let mut net_position = Decimal::ZERO; - fn update_net_position( - &self, - instrument_id: &InstrumentId, - positions_open: Vec<&Position>, - ) -> Decimal { - todo!() + for open_position in positions_open { + net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO); + } + + let existing_position = self.net_position(instrument_id); + if existing_position != net_position { + self.net_positions.insert(*instrument_id, net_position); + log::info!("{} net_position={}", instrument_id, net_position); + } } - fn calculate_unrealized_pnl(&self, instrument_id: &InstrumentId) -> Money { - todo!() + fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option { + let borrowed_cache = self.cache.borrow(); + + let account = if let Some(account) = borrowed_cache.account_for_venue(&instrument_id.venue) + { + account + } else { + log::error!( + "Cannot calculate unrealized PnL: no account registered for {}", + instrument_id.venue + ); + return None; + }; + + let instrument = if let Some(instrument) = borrowed_cache.instrument(instrument_id) { + instrument + } else { + log::error!( + "Cannot calculate unrealized PnL: no instrument for {}", + instrument_id + ); + return None; + }; + + let currency = account + .base_currency() + .unwrap_or_else(|| instrument.settlement_currency()); + + let positions_open = borrowed_cache.positions_open( + None, // Faster query filtering + Some(instrument_id), + None, + None, + ); + + if positions_open.is_empty() { + return Some(Money::new(0.0, currency)); + } + + let mut total_pnl = 0.0; + + for position in positions_open { + if position.instrument_id != *instrument_id { + continue; // Nothing to calculate + } + + if position.side == PositionSide::Flat { + continue; // Nothing to calculate + } + + let last = if let Some(price) = self.get_last_price(position) { + price + } else { + log::debug!( + "Cannot calculate unrealized PnL: no prices for {}", + instrument_id + ); + self.pending_calcs.insert(*instrument_id); + return None; // Cannot calculate + }; + + let mut pnl = position.unrealized_pnl(last).as_f64(); + + if let Some(base_currency) = account.base_currency() { + let xrate = self.calculate_xrate_to_base(instrument, account, position.entry); + + if xrate == 0.0 { + log::debug!( + "Cannot calculate unrealized PnL: insufficient data for {}/{}", + instrument.settlement_currency(), + base_currency + ); + self.pending_calcs.insert(*instrument_id); + return None; + } + + let scale = 10f64.powi(currency.precision.into()); + pnl = ((pnl * xrate) * scale).round() / scale; + } + + total_pnl += pnl; + } + + Some(Money::new(total_pnl, currency)) } - fn calculate_realized_pnl(&self, instrument_id: &InstrumentId) -> Money { - todo!() + fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option { + let borrowed_cache = self.cache.borrow(); + + let account = if let Some(account) = borrowed_cache.account_for_venue(&instrument_id.venue) + { + account + } else { + log::error!( + "Cannot calculate realized PnL: no account registered for {}", + instrument_id.venue + ); + return None; + }; + + let instrument = if let Some(instrument) = borrowed_cache.instrument(instrument_id) { + instrument + } else { + log::error!( + "Cannot calculate realized PnL: no instrument for {}", + instrument_id + ); + return None; + }; + + let currency = account + .base_currency() + .unwrap_or_else(|| instrument.settlement_currency()); + + let positions = borrowed_cache.positions( + None, // Faster query filtering + Some(instrument_id), + None, + None, + ); + + if positions.is_empty() { + return Some(Money::new(0.0, currency)); + } + + let mut total_pnl = 0.0; + + for position in positions { + if position.instrument_id != *instrument_id { + continue; // Nothing to calculate + } + + if position.side == PositionSide::Flat { + continue; // Nothing to calculate + } + + let mut pnl = position.realized_pnl?.as_f64(); + + if let Some(base_currency) = account.base_currency() { + let xrate = self.calculate_xrate_to_base(instrument, account, position.entry); + + if xrate == 0.0 { + log::debug!( + "Cannot calculate realized PnL: insufficient data for {}/{}", + instrument.settlement_currency(), + base_currency + ); + self.pending_calcs.insert(*instrument_id); + return None; // Cannot calculate + } + + let scale = 10f64.powi(currency.precision.into()); + pnl = ((pnl * xrate) * scale).round() / scale; + } + + total_pnl += pnl; + } + + Some(Money::new(total_pnl, currency)) } - fn get_last_price(&self, position: &Position) -> Money { - todo!() + fn get_last_price(&self, position: &Position) -> Option { + let price_type = match position.side { + PositionSide::Long => PriceType::Bid, + PositionSide::Short => PriceType::Ask, + _ => panic!("invalid `PositionSide`, was {}", position.side), + }; + + let borrowed_cache = self.cache.borrow(); + + borrowed_cache + .price(&position.instrument_id, price_type) + .or_else(|| borrowed_cache.price(&position.instrument_id, PriceType::Last)) } fn calculate_xrate_to_base( &self, - account: &AccountAny, instrument: &InstrumentAny, + account: &AccountAny, side: OrderSide, ) -> f64 { - todo!() + match account.base_currency() { + Some(base_currency) => { + let price_type = if side == OrderSide::Buy { + PriceType::Bid + } else { + PriceType::Ask + }; + + self.cache + .borrow() + .get_xrate( + instrument.id().venue, + instrument.settlement_currency(), + base_currency, + price_type, + ) + .to_f64() + // todo: improve error + .expect("Fails to convert Decimal to f64") + } + None => 1.0, // No conversion needed + } } } diff --git a/nautilus_core/risk/src/engine/mod.rs b/nautilus_core/risk/src/engine/mod.rs index 5f009fc2787..e3ade98e741 100644 --- a/nautilus_core/risk/src/engine/mod.rs +++ b/nautilus_core/risk/src/engine/mod.rs @@ -193,9 +193,9 @@ impl RiskEngine { } fn handle_submit_order_cache(cache: &Rc>, submit_order: &SubmitOrder) { - let mut burrowed_cache = cache.borrow_mut(); - if !burrowed_cache.order_exists(&submit_order.client_order_id) { - burrowed_cache + let mut borrowed_cache = cache.borrow_mut(); + if !borrowed_cache.order_exists(&submit_order.client_order_id) { + borrowed_cache .add_order(submit_order.order.clone(), None, None, false) .map_err(|e| { log::error!("Cannot add order to cache: {e}"); @@ -205,8 +205,8 @@ impl RiskEngine { } fn get_existing_order(cache: &Rc>, order: &ModifyOrder) -> Option { - let burrowed_cache = cache.borrow(); - if let Some(order) = burrowed_cache.order(&order.client_order_id) { + let borrowed_cache = cache.borrow(); + if let Some(order) = borrowed_cache.order(&order.client_order_id) { Some(order.clone()) } else { log::error!( @@ -424,8 +424,8 @@ impl RiskEngine { // VALIDATE COMMAND //////////////////////////////////////////////////////////////////////////////// let order_exists = { - let burrowed_cache = self.cache.borrow(); - burrowed_cache.order(&command.client_order_id).cloned() + let borrowed_cache = self.cache.borrow(); + borrowed_cache.order(&command.client_order_id).cloned() }; let order = if let Some(order) = order_exists { @@ -633,16 +633,16 @@ impl RiskEngine { last_px = match order { OrderAny::Market(_) | OrderAny::MarketToLimit(_) => { if last_px.is_none() { - let burrowed_cache = self.cache.borrow(); - if let Some(last_quote) = burrowed_cache.quote(&instrument.id()) { + let borrowed_cache = self.cache.borrow(); + if let Some(last_quote) = borrowed_cache.quote(&instrument.id()) { match order.order_side() { OrderSide::Buy => Some(last_quote.ask_price), OrderSide::Sell => Some(last_quote.bid_price), _ => panic!("Invalid order side"), } } else { - let burrowed_cache = self.cache.borrow(); - let last_trade = burrowed_cache.trade(&instrument.id()); + let borrowed_cache = self.cache.borrow(); + let last_trade = borrowed_cache.trade(&instrument.id()); if let Some(last_trade) = last_trade { Some(last_trade.price) @@ -937,9 +937,9 @@ impl RiskEngine { return; } - let mut burrowed_cache = self.cache.borrow_mut(); - if !burrowed_cache.order_exists(&order.client_order_id()) { - burrowed_cache + let mut borrowed_cache = self.cache.borrow_mut(); + if !borrowed_cache.order_exists(&order.client_order_id()) { + borrowed_cache .add_order(order.clone(), None, None, false) .map_err(|e| { log::error!("Cannot add order to cache: {e}");