From 0acf722a53315ba60dc69e10ef9101e4a57a99bd Mon Sep 17 00:00:00 2001 From: elsirion Date: Fri, 22 Nov 2024 23:54:31 +0100 Subject: [PATCH 1/5] fix: use linear pagination algorithm for operation log --- fedimint-client/src/db.rs | 2 +- fedimint-client/src/oplog.rs | 245 ++++++++++++++++++++++++++++++----- 2 files changed, 213 insertions(+), 34 deletions(-) diff --git a/fedimint-client/src/db.rs b/fedimint-client/src/db.rs index 11bfcb9ef22..98dcdb26e60 100644 --- a/fedimint-client/src/db.rs +++ b/fedimint-client/src/db.rs @@ -117,7 +117,7 @@ impl_db_record!( ); /// Key used to lookup operation log entries in chronological order -#[derive(Debug, Clone, Copy, Encodable, Decodable, Serialize)] +#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize)] pub struct ChronologicalOperationLogKey { pub creation_time: std::time::SystemTime, pub operation_id: OperationId, diff --git a/fedimint-client/src/oplog.rs b/fedimint-client/src/oplog.rs index be41c610e73..a28fb8c2fa9 100644 --- a/fedimint-client/src/oplog.rs +++ b/fedimint-client/src/oplog.rs @@ -1,6 +1,9 @@ +use std::collections::HashSet; use std::fmt::Debug; use std::future; use std::io::{Read, Write}; +use std::ops::Range; +use std::time::Duration; use async_stream::stream; use fedimint_core::core::OperationId; @@ -62,47 +65,109 @@ impl OperationLog { pub async fn list_operations( &self, limit: usize, - start_after: Option, + last_seen: Option, ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> { + const EPOCH_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7); + let mut dbtx = self.db.begin_transaction_nc().await; - let operations: Vec = dbtx - .find_by_prefix_sorted_descending(&ChronologicalOperationLogKeyPrefix) - .await - .map(|(key, ())| key) - // FIXME: this is a schlemil-the-painter algorithm that will take longer the further - // back in history one goes. To avoid that I see two options: - // 1. Add a reference to the previous operation to each operation log entry, - // essentially creating a linked list, which seem a little bit inelegant. - // 2. Add an option to prefix queries that allows to specify a start key - // - // The current implementation may also skip operations due to `SystemTime` not being - // guaranteed to be monotonous. The linked list approach would also fix that. - .skip_while(|key| { - let skip = if let Some(start_after) = start_after { - key.creation_time >= start_after.creation_time - } else { - false - }; - - std::future::ready(skip) - }) - .take(limit) - .collect::>() - .await; + let operation_log_keys = if let Some(start_after) = last_seen { + let Some(last_key) = dbtx + .find_by_prefix(&ChronologicalOperationLogKeyPrefix) + .await + .map(|(key, ())| key) + .next() + .await + else { + return vec![]; + }; + + // We want to fetch all operations that were created before `start_after`, going + // backwards in time. This means "start" generally means a later time than + // "end". Only when creating a rust Range we have to swap the terminology (see + // comment there). + let epochs_rev_ranges = (0..) + .map(|epoch| start_after.creation_time - epoch * EPOCH_DURATION) + // We want to get all operation log keys in the range [last_key, start_after). Sp as + // long as the start time is greater than the last key's creation time, we have to + // keep going. + .take_while(|&start_time| start_time >= last_key.creation_time) + .map(|start_time| { + let end_time = start_time - EPOCH_DURATION; + + // In the edge case that there were two events logged at exactly the same time + // we need to specify the correct operation_id for the first key. Otherwise, we + // could miss entries. + let start_key = if start_time == start_after.creation_time { + start_after + } else { + ChronologicalOperationLogKey { + creation_time: start_time, + operation_id: OperationId([0; 32]), + } + }; + + // We could also special-case the last key here, but it's not necessary, making + // it last_key if end_time < last_key.creation_time. We know there are no + // entries beyond last_key though, so the range query will be equivalent either + // way. + let end_key = ChronologicalOperationLogKey { + creation_time: end_time, + operation_id: OperationId([0; 32]), + }; + + // We want to go backwards using a forward range query. This means we have to + // swap the start and end keys and then reverse the vector returned by the + // query. + Range { + start: end_key, + end: start_key, + } + }); + + let mut operation_log_keys = Vec::with_capacity(limit); + for key_range_rev in epochs_rev_ranges { + let epoch_operation_log_keys_rev = dbtx + .find_by_range(key_range_rev) + .await + .map(|(key, ())| key) + .collect::>() + .await; + + for operation_log_key in epoch_operation_log_keys_rev.into_iter().rev() { + operation_log_keys.push(operation_log_key); + if operation_log_keys.len() >= limit { + break; + } + } + } + + debug_assert!( + operation_log_keys.iter().collect::>().len() == operation_log_keys.len(), + "Operation log keys returned are not unique" + ); - let mut operation_entries = Vec::with_capacity(operations.len()); + operation_log_keys + } else { + dbtx.find_by_prefix_sorted_descending(&ChronologicalOperationLogKeyPrefix) + .await + .map(|(key, ())| key) + .take(limit) + .collect::>() + .await + }; - for operation in operations { - let entry = dbtx + let mut operation_log_entries = Vec::with_capacity(operation_log_keys.len()); + for operation_log_key in operation_log_keys { + let operation_log_entry = dbtx .get_value(&OperationLogKey { - operation_id: operation.operation_id, + operation_id: operation_log_key.operation_id, }) .await .expect("Inconsistent DB"); - operation_entries.push((operation, entry)); + operation_log_entries.push((operation_log_key, operation_log_entry)); } - operation_entries + operation_log_entries } pub async fn get_operation(&self, operation_id: OperationId) -> Option { @@ -348,15 +413,19 @@ where #[cfg(test)] mod tests { + use std::time::{Duration, SystemTime}; + use fedimint_core::core::OperationId; use fedimint_core::db::mem_impl::MemDatabase; - use fedimint_core::db::{Database, IRawDatabaseExt}; + use fedimint_core::db::{ + Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, IRawDatabaseExt, + }; use fedimint_core::module::registry::ModuleRegistry; use futures::stream::StreamExt; use serde::{Deserialize, Serialize}; use super::UpdateStreamOrOutcome; - use crate::db::ChronologicalOperationLogKey; + use crate::db::{ChronologicalOperationLogKey, OperationLogKey}; use crate::oplog::{OperationLog, OperationLogEntry}; #[test] @@ -498,4 +567,114 @@ mod tests { assert_eq!(page.len(), 8); assert_page_entries(page, 9); } + + #[tokio::test] + async fn test_pagination_empty() { + let db = Database::new(MemDatabase::new(), ModuleRegistry::default()); + let op_log = OperationLog::new(db.clone()); + + let page = op_log.list_operations(10, None).await; + assert!(page.is_empty()); + } + + #[tokio::test] + async fn test_pagination_multiple_operations_same_time() { + async fn insert_oplog(dbtx: &mut DatabaseTransaction<'_>, idx: u8, time: u64) { + let operation_id = OperationId([idx; 32]); + // Some time in the 2010s + let creation_time = SystemTime::UNIX_EPOCH + + Duration::from_secs(60 * 60 * 24 * 365 * 40) + + Duration::from_secs(time * 60 * 60 * 24); + + dbtx.insert_new_entry( + &OperationLogKey { operation_id }, + &OperationLogEntry { + operation_module_kind: "operation_type".to_string(), + meta: serde_json::Value::Null, + outcome: None, + }, + ) + .await; + dbtx.insert_new_entry( + &ChronologicalOperationLogKey { + creation_time, + operation_id, + }, + &(), + ) + .await; + } + + async fn assert_pages(operation_log: &OperationLog, pages: Vec>) { + let mut previous_last_element: Option = None; + for reference_page in pages { + let page = operation_log + .list_operations(10, previous_last_element) + .await; + assert_eq!(page.len(), reference_page.len()); + assert_eq!( + page.iter() + .map(|(operation_log_key, _)| operation_log_key.operation_id) + .collect::>(), + reference_page + .iter() + .map(|&x| OperationId([x; 32])) + .collect::>() + ); + previous_last_element = page.last().map(|(key, _)| key).copied(); + } + } + + let db = Database::new(MemDatabase::new(), ModuleRegistry::default()); + let op_log = OperationLog::new(db.clone()); + + let mut dbtx = db.begin_transaction().await; + for operation_idx in 0u8..10 { + insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 1).await; + } + dbtx.commit_tx().await; + assert_pages(&op_log, vec![vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0], vec![]]).await; + + let mut dbtx = db.begin_transaction().await; + for operation_idx in 10u8..16 { + insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 2).await; + } + for operation_idx in 16u8..22 { + insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 3).await; + } + dbtx.commit_tx().await; + assert_pages( + &op_log, + vec![ + vec![21, 20, 19, 18, 17, 16, 15, 14, 13, 12], + vec![11, 10, 9, 8, 7, 6, 5, 4, 3, 2], + vec![1, 0], + vec![], + ], + ) + .await; + + let mut dbtx = db.begin_transaction().await; + for operation_idx in 22u8..31 { + // 9 times one operation every 10 days + insert_oplog( + &mut dbtx.to_ref_nc(), + operation_idx, + 10 * u64::from(operation_idx), + ) + .await; + } + dbtx.commit_tx().await; + assert_pages( + &op_log, + vec![ + vec![30, 29, 28, 27, 26, 25, 24, 23, 22, 21], + vec![20, 19, 18, 17, 16, 15, 14, 13, 12, 11], + vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1], + vec![0], + vec![], + ], + ) + .await; + } } From 08f9ee5c6f8b27727685bc1ca60aa296e737f80b Mon Sep 17 00:00:00 2001 From: elsirion Date: Tue, 26 Nov 2024 14:59:16 +0100 Subject: [PATCH 2/5] chore: cache the oldest operation log entry --- fedimint-client/src/oplog.rs | 54 ++++++++++++++++++++++++++++++------ flake.nix | 42 ++++++++++++++++------------ 2 files changed, 70 insertions(+), 26 deletions(-) diff --git a/fedimint-client/src/oplog.rs b/fedimint-client/src/oplog.rs index a28fb8c2fa9..302540501ba 100644 --- a/fedimint-client/src/oplog.rs +++ b/fedimint-client/src/oplog.rs @@ -17,6 +17,7 @@ use fedimint_logging::LOG_CLIENT; use futures::{stream, Stream, StreamExt}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use tokio::sync::OnceCell; use tracing::{error, instrument, warn}; use crate::db::{ @@ -26,11 +27,34 @@ use crate::db::{ #[derive(Debug, Clone)] pub struct OperationLog { db: Database, + oldest_entry: OnceCell, } impl OperationLog { pub fn new(db: Database) -> Self { - Self { db } + Self { + db, + oldest_entry: OnceCell::new(), + } + } + + /// Will return the oldest operation log key in the database and cache the + /// result. If no entry exists yet the DB will be queried on each call till + /// an entry is present. + async fn get_oldest_operation_log_key(&self) -> Option { + let mut dbtx = self.db.begin_transaction_nc().await; + self.oldest_entry + .get_or_try_init(move || async move { + dbtx.find_by_prefix(&ChronologicalOperationLogKeyPrefix) + .await + .map(|(key, ())| key) + .next() + .await + .ok_or(()) + }) + .await + .ok() + .copied() } pub async fn add_operation_log_entry( @@ -71,13 +95,7 @@ impl OperationLog { let mut dbtx = self.db.begin_transaction_nc().await; let operation_log_keys = if let Some(start_after) = last_seen { - let Some(last_key) = dbtx - .find_by_prefix(&ChronologicalOperationLogKeyPrefix) - .await - .map(|(key, ())| key) - .next() - .await - else { + let Some(oldest_entry_key) = self.get_oldest_operation_log_key().await else { return vec![]; }; @@ -90,7 +108,7 @@ impl OperationLog { // We want to get all operation log keys in the range [last_key, start_after). Sp as // long as the start time is greater than the last key's creation time, we have to // keep going. - .take_while(|&start_time| start_time >= last_key.creation_time) + .take_while(|&start_time| start_time >= oldest_entry_key.creation_time) .map(|start_time| { let end_time = start_time - EPOCH_DURATION; @@ -677,4 +695,22 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_pagination_empty_then_not() { + let db = Database::new(MemDatabase::new(), ModuleRegistry::default()); + let op_log = OperationLog::new(db.clone()); + + let page = op_log.list_operations(10, None).await; + assert!(page.is_empty()); + + let mut dbtx = db.begin_transaction().await; + op_log + .add_operation_log_entry(&mut dbtx.to_ref_nc(), OperationId([0; 32]), "foo", "bar") + .await; + dbtx.commit_tx().await; + + let page = op_log.list_operations(10, None).await; + assert_eq!(page.len(), 1); + } } diff --git a/flake.nix b/flake.nix index 83722351c7e..b570f5279a7 100644 --- a/flake.nix +++ b/flake.nix @@ -413,23 +413,31 @@ }; bootstrap = pkgs.mkShell { nativeBuildInputs = with pkgs; [ cachix ]; }; - } // lib.attrsets.optionalAttrs (lib.lists.elem system ["x86_64-linux" "x86_64-darwin" "aarch64-darwin"]) { - # Shell with extra stuff to support cross-compilation with `cargo build --target ` - # - # This will pull extra stuff so to save time and download time to most common developers, - # was moved into another shell. - cross = flakeboxLib.mkDevShell ( - commonShellArgs - // craneMultiBuild.commonEnvsCrossShell - // { - toolchain = toolchainAll; - shellHook = '' - export REPO_ROOT="$(git rev-parse --show-toplevel)" - export PATH="$REPO_ROOT/bin:$PATH" - ''; - } - ); - }; + } + // + lib.attrsets.optionalAttrs + (lib.lists.elem system [ + "x86_64-linux" + "x86_64-darwin" + "aarch64-darwin" + ]) + { + # Shell with extra stuff to support cross-compilation with `cargo build --target ` + # + # This will pull extra stuff so to save time and download time to most common developers, + # was moved into another shell. + cross = flakeboxLib.mkDevShell ( + commonShellArgs + // craneMultiBuild.commonEnvsCrossShell + // { + toolchain = toolchainAll; + shellHook = '' + export REPO_ROOT="$(git rev-parse --show-toplevel)" + export PATH="$REPO_ROOT/bin:$PATH" + ''; + } + ); + }; in { inherit devShells; From c8af9b17253e11817ef83a41db27b2e473017505 Mon Sep 17 00:00:00 2001 From: elsirion Date: Tue, 26 Nov 2024 23:28:52 +0100 Subject: [PATCH 3/5] chore: split `list_operations` for better readability --- fedimint-client/src/oplog.rs | 164 +++++++++++++++++++---------------- 1 file changed, 89 insertions(+), 75 deletions(-) diff --git a/fedimint-client/src/oplog.rs b/fedimint-client/src/oplog.rs index 302540501ba..99a7f60ab80 100644 --- a/fedimint-client/src/oplog.rs +++ b/fedimint-client/src/oplog.rs @@ -93,86 +93,42 @@ impl OperationLog { ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> { const EPOCH_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7); - let mut dbtx = self.db.begin_transaction_nc().await; - let operation_log_keys = if let Some(start_after) = last_seen { - let Some(oldest_entry_key) = self.get_oldest_operation_log_key().await else { - return vec![]; - }; - - // We want to fetch all operations that were created before `start_after`, going - // backwards in time. This means "start" generally means a later time than - // "end". Only when creating a rust Range we have to swap the terminology (see - // comment there). - let epochs_rev_ranges = (0..) - .map(|epoch| start_after.creation_time - epoch * EPOCH_DURATION) - // We want to get all operation log keys in the range [last_key, start_after). Sp as - // long as the start time is greater than the last key's creation time, we have to - // keep going. - .take_while(|&start_time| start_time >= oldest_entry_key.creation_time) - .map(|start_time| { - let end_time = start_time - EPOCH_DURATION; - - // In the edge case that there were two events logged at exactly the same time - // we need to specify the correct operation_id for the first key. Otherwise, we - // could miss entries. - let start_key = if start_time == start_after.creation_time { - start_after - } else { - ChronologicalOperationLogKey { - creation_time: start_time, - operation_id: OperationId([0; 32]), - } - }; - - // We could also special-case the last key here, but it's not necessary, making - // it last_key if end_time < last_key.creation_time. We know there are no - // entries beyond last_key though, so the range query will be equivalent either - // way. - let end_key = ChronologicalOperationLogKey { - creation_time: end_time, - operation_id: OperationId([0; 32]), - }; - - // We want to go backwards using a forward range query. This means we have to - // swap the start and end keys and then reverse the vector returned by the - // query. - Range { - start: end_key, - end: start_key, - } - }); - - let mut operation_log_keys = Vec::with_capacity(limit); - for key_range_rev in epochs_rev_ranges { - let epoch_operation_log_keys_rev = dbtx - .find_by_range(key_range_rev) - .await - .map(|(key, ())| key) - .collect::>() - .await; + let start_after_key = last_seen.unwrap_or_else(|| ChronologicalOperationLogKey { + creation_time: now(), + operation_id: OperationId([0; 32]), + }); - for operation_log_key in epoch_operation_log_keys_rev.into_iter().rev() { - operation_log_keys.push(operation_log_key); - if operation_log_keys.len() >= limit { - break; - } - } - } - - debug_assert!( - operation_log_keys.iter().collect::>().len() == operation_log_keys.len(), - "Operation log keys returned are not unique" - ); + let Some(oldest_entry_key) = self.get_oldest_operation_log_key().await else { + return vec![]; + }; - operation_log_keys - } else { - dbtx.find_by_prefix_sorted_descending(&ChronologicalOperationLogKeyPrefix) + let mut dbtx = self.db.begin_transaction_nc().await; + let mut operation_log_keys = Vec::with_capacity(limit); + + // Find all the operation log keys in the requested window. Since we decided to + // not introduce a find_by_range_rev function we have to jump through some + // hoops, see also the comments in rev_epoch_ranges. + // TODO: Implement using find_by_range_rev if ever introduced + for key_range_rev in rev_epoch_ranges(start_after_key, oldest_entry_key, EPOCH_DURATION) { + let epoch_operation_log_keys_rev = dbtx + .find_by_range(key_range_rev) .await .map(|(key, ())| key) - .take(limit) .collect::>() - .await - }; + .await; + + for operation_log_key in epoch_operation_log_keys_rev.into_iter().rev() { + operation_log_keys.push(operation_log_key); + if operation_log_keys.len() >= limit { + break; + } + } + } + + debug_assert!( + operation_log_keys.iter().collect::>().len() == operation_log_keys.len(), + "Operation log keys returned are not unique" + ); let mut operation_log_entries = Vec::with_capacity(operation_log_keys.len()); for operation_log_key in operation_log_keys { @@ -242,6 +198,64 @@ impl OperationLog { } } +/// Returns an iterator over the ranges of operation log keys, starting from the +/// most recent range and going backwards in time till slightly later than +/// `last_entry`. +/// +/// Simplifying keys to integers and assuming a `start_after` of 100, a +/// `last_entry` of 55 and an `epoch_duration` of 10 the ranges would be: +/// ```text +/// [90..100, 80..90, 70..80, 60..70, 50..60] +/// ``` +fn rev_epoch_ranges( + start_after: ChronologicalOperationLogKey, + last_entry: ChronologicalOperationLogKey, + epoch_duration: Duration, +) -> impl Iterator> { + // We want to fetch all operations that were created before `start_after`, going + // backwards in time. This means "start" generally means a later time than + // "end". Only when creating a rust Range we have to swap the terminology (see + // comment there). + (0..) + .map(move |epoch| start_after.creation_time - epoch * epoch_duration) + // We want to get all operation log keys in the range [last_key, start_after). So as + // long as the start time is greater than the last key's creation time, we have to + // keep going. + .take_while(move |&start_time| start_time >= last_entry.creation_time) + .map(move |start_time| { + let end_time = start_time - epoch_duration; + + // In the edge case that there were two events logged at exactly the same time + // we need to specify the correct operation_id for the first key. Otherwise, we + // could miss entries. + let start_key = if start_time == start_after.creation_time { + start_after + } else { + ChronologicalOperationLogKey { + creation_time: start_time, + operation_id: OperationId([0; 32]), + } + }; + + // We could also special-case the last key here, but it's not necessary, making + // it last_key if end_time < last_key.creation_time. We know there are no + // entries beyond last_key though, so the range query will be equivalent either + // way. + let end_key = ChronologicalOperationLogKey { + creation_time: end_time, + operation_id: OperationId([0; 32]), + }; + + // We want to go backwards using a forward range query. This means we have to + // swap the start and end keys and then reverse the vector returned by the + // query. + Range { + start: end_key, + end: start_key, + } + }) +} + /// Represents an operation triggered by a user, typically related to sending or /// receiving money. /// From c4602e337f53be85f298161803be07e02442f238 Mon Sep 17 00:00:00 2001 From: elsirion Date: Tue, 26 Nov 2024 23:32:23 +0100 Subject: [PATCH 4/5] chore: rename `list_operations` to `paginate_operations_rev` --- fedimint-cli/src/client.rs | 2 +- fedimint-client/src/lib.rs | 2 +- fedimint-client/src/oplog.rs | 27 +++++++++++++++----- modules/fedimint-wallet-tests/tests/tests.rs | 5 +++- 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/fedimint-cli/src/client.rs b/fedimint-cli/src/client.rs index fd832fd2784..bb9091322f5 100644 --- a/fedimint-cli/src/client.rs +++ b/fedimint-cli/src/client.rs @@ -532,7 +532,7 @@ pub async fn handle_command( let operations = client .operation_log() - .list_operations(limit, None) + .paginate_operations_rev(limit, None) .await .into_iter() .map(|(k, v)| { diff --git a/fedimint-client/src/lib.rs b/fedimint-client/src/lib.rs index 663dcd1f355..de7d69ed9b9 100644 --- a/fedimint-client/src/lib.rs +++ b/fedimint-client/src/lib.rs @@ -2128,7 +2128,7 @@ impl Client { } "list_operations" => { // TODO: support pagination - let operations = self.operation_log().list_operations(usize::MAX, None).await; + let operations = self.operation_log().paginate_operations_rev(usize::MAX, None).await; yield serde_json::to_value(operations)?; } "has_pending_recoveries" => { diff --git a/fedimint-client/src/oplog.rs b/fedimint-client/src/oplog.rs index 99a7f60ab80..7b26c1629d3 100644 --- a/fedimint-client/src/oplog.rs +++ b/fedimint-client/src/oplog.rs @@ -84,9 +84,18 @@ impl OperationLog { .await; } + #[deprecated(since = "0.6.0", note = "Use `paginate_operations_rev` instead")] + pub async fn list_operations( + &self, + limit: usize, + last_seen: Option, + ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> { + self.paginate_operations_rev(limit, last_seen).await + } + /// Returns the last `limit` operations. To fetch the next page, pass the /// last operation's [`ChronologicalOperationLogKey`] as `start_after`. - pub async fn list_operations( + pub async fn paginate_operations_rev( &self, limit: usize, last_seen: Option, @@ -589,13 +598,17 @@ mod tests { let mut previous_last_element = None; for page_idx in 0u8..9 { - let page = op_log.list_operations(10, previous_last_element).await; + let page = op_log + .paginate_operations_rev(10, previous_last_element) + .await; assert_eq!(page.len(), 10); previous_last_element = Some(page[9].0); assert_page_entries(page, page_idx); } - let page = op_log.list_operations(10, previous_last_element).await; + let page = op_log + .paginate_operations_rev(10, previous_last_element) + .await; assert_eq!(page.len(), 8); assert_page_entries(page, 9); } @@ -605,7 +618,7 @@ mod tests { let db = Database::new(MemDatabase::new(), ModuleRegistry::default()); let op_log = OperationLog::new(db.clone()); - let page = op_log.list_operations(10, None).await; + let page = op_log.paginate_operations_rev(10, None).await; assert!(page.is_empty()); } @@ -641,7 +654,7 @@ mod tests { let mut previous_last_element: Option = None; for reference_page in pages { let page = operation_log - .list_operations(10, previous_last_element) + .paginate_operations_rev(10, previous_last_element) .await; assert_eq!(page.len(), reference_page.len()); assert_eq!( @@ -715,7 +728,7 @@ mod tests { let db = Database::new(MemDatabase::new(), ModuleRegistry::default()); let op_log = OperationLog::new(db.clone()); - let page = op_log.list_operations(10, None).await; + let page = op_log.paginate_operations_rev(10, None).await; assert!(page.is_empty()); let mut dbtx = db.begin_transaction().await; @@ -724,7 +737,7 @@ mod tests { .await; dbtx.commit_tx().await; - let page = op_log.list_operations(10, None).await; + let page = op_log.paginate_operations_rev(10, None).await; assert_eq!(page.len(), 1); } } diff --git a/modules/fedimint-wallet-tests/tests/tests.rs b/modules/fedimint-wallet-tests/tests/tests.rs index 3f050791d9b..816817eb752 100644 --- a/modules/fedimint-wallet-tests/tests/tests.rs +++ b/modules/fedimint-wallet-tests/tests/tests.rs @@ -187,7 +187,10 @@ async fn on_chain_peg_in_and_peg_out_happy_case() -> anyhow::Result<()> { .await?; // Test operation is created - let operations = client.operation_log().list_operations(10, None).await; + let operations = client + .operation_log() + .paginate_operations_rev(10, None) + .await; assert_eq!(operations.len(), 1, "Expecting only the peg-in operation"); let deposit_operation_id = operations[0].0.operation_id; From 07a30920e78c3631c3eeea232d5079626dc03eff Mon Sep 17 00:00:00 2001 From: elsirion Date: Fri, 29 Nov 2024 11:07:51 +0100 Subject: [PATCH 5/5] fix: add buffer against potential flakiness --- fedimint-client/src/oplog.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fedimint-client/src/oplog.rs b/fedimint-client/src/oplog.rs index 7b26c1629d3..3fa9d0482c5 100644 --- a/fedimint-client/src/oplog.rs +++ b/fedimint-client/src/oplog.rs @@ -103,7 +103,10 @@ impl OperationLog { const EPOCH_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7); let start_after_key = last_seen.unwrap_or_else(|| ChronologicalOperationLogKey { - creation_time: now(), + // We don't expect any operations from the future to exist, since SystemTime isn't + // monotone and CI can be overloaded at times we add a small buffer to avoid flakiness + // in tests. + creation_time: now() + Duration::from_secs(30), operation_id: OperationId([0; 32]), });