diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index 13638ed6230..5d82572470c 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -15,7 +15,7 @@ use std::{collections::BTreeSet, fmt, sync::Arc}; use as_variant::as_variant; -use eyeball_im::{ObservableVectorEntry, VectorDiff}; +use eyeball_im::VectorDiff; use eyeball_im_util::vector::VectorObserverExt; use futures_core::Stream; use imbl::Vector; @@ -51,9 +51,17 @@ use tracing::{ debug, error, field, field::debug, info, info_span, instrument, trace, warn, Instrument as _, }; -pub(super) use self::state::{ - AllRemoteEvents, FullEventMeta, PendingEdit, PendingEditKind, TimelineMetadata, - TimelineNewItemPosition, TimelineState, TimelineStateTransaction, +#[cfg(test)] +pub(super) use self::observable_items::ObservableItems; +pub(super) use self::{ + observable_items::{ + AllRemoteEvents, ObservableItemsEntry, ObservableItemsTransaction, + ObservableItemsTransactionEntry, + }, + state::{ + FullEventMeta, PendingEdit, PendingEditKind, TimelineMetadata, TimelineNewItemPosition, + TimelineState, TimelineStateTransaction, + }, }; use super::{ event_handler::TimelineEventKind, @@ -77,6 +85,7 @@ use crate::{ unable_to_decrypt_hook::UtdHookManager, }; +mod observable_items; mod state; /// Data associated to the current timeline focus. @@ -458,7 +467,7 @@ impl TimelineController

{ /// /// Cheap because `im::Vector` is cheap to clone. pub(super) async fn items(&self) -> Vector> { - self.state.read().await.items.clone() + self.state.read().await.items.clone_items() } pub(super) async fn subscribe( @@ -466,7 +475,7 @@ impl TimelineController

{ ) -> (Vector>, impl Stream>> + Send) { trace!("Creating timeline items signal"); let state = self.state.read().await; - (state.items.clone(), state.items.subscribe().into_stream()) + (state.items.clone_items(), state.items.subscribe().into_stream()) } pub(super) async fn subscribe_batched( @@ -474,7 +483,7 @@ impl TimelineController

{ ) -> (Vector>, impl Stream>>>) { trace!("Creating timeline items signal"); let state = self.state.read().await; - (state.items.clone(), state.items.subscribe().into_batched_stream()) + (state.items.clone_items(), state.items.subscribe().into_batched_stream()) } pub(super) async fn subscribe_filter_map( @@ -591,7 +600,7 @@ impl TimelineController

{ if reaction_info.is_some() { let new_item = item.with_reactions(reactions); - state.items.set(item_pos, new_item); + state.items.replace(item_pos, new_item); } else { warn!("reaction is missing on the item, not removing it locally, but sending redaction."); } @@ -615,7 +624,7 @@ impl TimelineController

{ .or_default() .insert(user_id.to_owned(), reaction_info); let new_item = item.with_reactions(reactions); - state.items.set(item_pos, new_item); + state.items.replace(item_pos, new_item); } else { warn!("couldn't find item to re-add reaction anymore; maybe it's been redacted?"); } @@ -811,7 +820,7 @@ impl TimelineController

{ { trace!("updated reaction status to sent"); entry.status = ReactionStatus::RemoteToRemote(event_id.to_owned()); - txn.items.set(item_pos, event_item.with_reactions(reactions)); + txn.items.replace(item_pos, event_item.with_reactions(reactions)); txn.commit(); return; } @@ -857,7 +866,7 @@ impl TimelineController

{ } let new_item = item.with_inner_kind(local_item.with_send_state(send_state)); - txn.items.set(idx, new_item); + txn.items.replace(idx, new_item); txn.commit(); } @@ -904,7 +913,7 @@ impl TimelineController

{ let mut reactions = item.reactions().clone(); if reactions.remove_reaction(&full_key.sender, &full_key.key).is_some() { let updated_item = item.with_reactions(reactions); - state.items.set(idx, updated_item); + state.items.replace(idx, updated_item); } else { warn!( "missing reaction {} for sender {} on timeline item", @@ -961,7 +970,7 @@ impl TimelineController

{ prev_item.internal_id.to_owned(), ); - txn.items.set(idx, new_item); + txn.items.replace(idx, new_item); // This doesn't change the original sending time, so there's no need to adjust // day dividers. @@ -1128,7 +1137,7 @@ impl TimelineController

{ let new_item = entry.with_kind(TimelineItemKind::Event( event_item.with_sender_profile(profile_state.clone()), )); - ObservableVectorEntry::set(&mut entry, new_item); + ObservableItemsEntry::replace(&mut entry, new_item); } }); } @@ -1154,7 +1163,7 @@ impl TimelineController

{ let updated_item = event_item.with_sender_profile(TimelineDetails::Ready(profile)); let new_item = entry.with_kind(updated_item); - ObservableVectorEntry::set(&mut entry, new_item); + ObservableItemsEntry::replace(&mut entry, new_item); } None => { if !event_item.sender_profile().is_unavailable() { @@ -1162,7 +1171,7 @@ impl TimelineController

{ let updated_item = event_item.with_sender_profile(TimelineDetails::Unavailable); let new_item = entry.with_kind(updated_item); - ObservableVectorEntry::set(&mut entry, new_item); + ObservableItemsEntry::replace(&mut entry, new_item); } else { debug!(event_id, transaction_id, "Profile already marked unavailable"); } @@ -1198,7 +1207,7 @@ impl TimelineController

{ let updated_item = event_item.with_sender_profile(TimelineDetails::Ready(profile)); let new_item = entry.with_kind(updated_item); - ObservableVectorEntry::set(&mut entry, new_item); + ObservableItemsEntry::replace(&mut entry, new_item); } } None => { @@ -1207,7 +1216,7 @@ impl TimelineController

{ let updated_item = event_item.with_sender_profile(TimelineDetails::Unavailable); let new_item = entry.with_kind(updated_item); - ObservableVectorEntry::set(&mut entry, new_item); + ObservableItemsEntry::replace(&mut entry, new_item); } else { debug!(event_id, transaction_id, "Profile already marked unavailable"); } @@ -1316,7 +1325,7 @@ impl TimelineController

{ trace!("Adding local reaction to local echo"); let new_item = item.with_reactions(reactions); - state.items.set(item_pos, new_item); + state.items.replace(item_pos, new_item); // Add it to the reaction map, so we can discard it later if needs be. state.meta.reactions.map.insert( @@ -1456,7 +1465,7 @@ impl TimelineController { event, }), )); - state.items.set(index, TimelineItem::new(item, internal_id)); + state.items.replace(index, TimelineItem::new(item, internal_id)); Ok(()) } @@ -1481,13 +1490,22 @@ impl TimelineController { match receipt_type { SendReceiptType::Read => { - if let Some((old_pub_read, _)) = - state.meta.user_receipt(own_user_id, ReceiptType::Read, room).await + if let Some((old_pub_read, _)) = state + .meta + .user_receipt( + own_user_id, + ReceiptType::Read, + room, + state.items.all_remote_events(), + ) + .await { trace!(%old_pub_read, "found a previous public receipt"); - if let Some(relative_pos) = - state.meta.compare_events_positions(&old_pub_read, event_id) - { + if let Some(relative_pos) = state.meta.compare_events_positions( + &old_pub_read, + event_id, + state.items.all_remote_events(), + ) { trace!("event referred to new receipt is {relative_pos:?} the previous receipt"); return relative_pos == RelativePosition::After; } @@ -1500,9 +1518,11 @@ impl TimelineController { state.latest_user_read_receipt(own_user_id, room).await { trace!(%old_priv_read, "found a previous private receipt"); - if let Some(relative_pos) = - state.meta.compare_events_positions(&old_priv_read, event_id) - { + if let Some(relative_pos) = state.meta.compare_events_positions( + &old_priv_read, + event_id, + state.items.all_remote_events(), + ) { trace!("event referred to new receipt is {relative_pos:?} the previous receipt"); return relative_pos == RelativePosition::After; } @@ -1511,9 +1531,11 @@ impl TimelineController { SendReceiptType::FullyRead => { if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await { - if let Some(relative_pos) = - state.meta.compare_events_positions(&prev_event_id, event_id) - { + if let Some(relative_pos) = state.meta.compare_events_positions( + &prev_event_id, + event_id, + state.items.all_remote_events(), + ) { return relative_pos == RelativePosition::After; } } @@ -1529,7 +1551,7 @@ impl TimelineController { /// it's folded into another timeline item. pub(crate) async fn latest_event_id(&self) -> Option { let state = self.state.read().await; - state.meta.all_remote_events.last().map(|event_meta| &event_meta.event_id).cloned() + state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned() } } @@ -1575,7 +1597,7 @@ async fn fetch_replied_to_event( let event_item = item.with_content(TimelineItemContent::Message(reply), None); let new_timeline_item = TimelineItem::new(event_item, internal_id); - state.items.set(index, new_timeline_item); + state.items.replace(index, new_timeline_item); // Don't hold the state lock while the network request is made drop(state); diff --git a/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs b/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs new file mode 100644 index 00000000000..e5f62d70716 --- /dev/null +++ b/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs @@ -0,0 +1,1578 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + cmp::Ordering, + collections::{vec_deque::Iter, VecDeque}, + ops::Deref, + sync::Arc, +}; + +use eyeball_im::{ + ObservableVector, ObservableVectorEntries, ObservableVectorEntry, ObservableVectorTransaction, + ObservableVectorTransactionEntry, VectorSubscriber, +}; +use imbl::Vector; +use ruma::EventId; + +use super::{state::EventMeta, TimelineItem}; + +/// An `ObservableItems` is a type similar to +/// [`ObservableVector>`] except the API is limited and, +/// internally, maintains the mapping between remote events and timeline items. +#[derive(Debug)] +pub struct ObservableItems { + /// All timeline items. + /// + /// Yeah, there are here! This [`ObservableVector`] contains all the + /// timeline items that are rendered in your magnificent Matrix client. + /// + /// These items are the _core_ of the timeline, see [`TimelineItem`] to + /// learn more. + items: ObservableVector>, + + /// List of all the remote events as received in the timeline, even the ones + /// that are discarded in the timeline items. + /// + /// The list of all remote events is used to compute the read receipts and + /// read markers; additionally it's used to map events to timeline items, + /// for more info about that, take a look at the documentation for + /// [`EventMeta::timeline_item_index`]. + all_remote_events: AllRemoteEvents, +} + +impl ObservableItems { + /// Create an empty `ObservableItems`. + pub fn new() -> Self { + Self { + // Upstream default capacity is currently 16, which is making + // sliding-sync tests with 20 events lag. This should still be + // small enough. + items: ObservableVector::with_capacity(32), + all_remote_events: AllRemoteEvents::default(), + } + } + + /// Get a reference to all remote events. + pub fn all_remote_events(&self) -> &AllRemoteEvents { + &self.all_remote_events + } + + /// Check whether there is timeline items. + pub fn is_empty(&self) -> bool { + self.items.is_empty() + } + + /// Subscribe to timeline item updates. + pub fn subscribe(&self) -> VectorSubscriber> { + self.items.subscribe() + } + + /// Get a clone of all timeline items. + /// + /// Note that it doesn't clone `Self`, only the inner timeline items. + pub fn clone_items(&self) -> Vector> { + self.items.clone() + } + + /// Start a new transaction to make multiple updates as one unit. + pub fn transaction(&mut self) -> ObservableItemsTransaction<'_> { + ObservableItemsTransaction { + items: self.items.transaction(), + all_remote_events: &mut self.all_remote_events, + } + } + + /// Replace the timeline item at position `timeline_item_index` by + /// `timeline_item`. + /// + /// # Panics + /// + /// Panics if `timeline_item_index > total_number_of_timeline_items`. + pub fn replace( + &mut self, + timeline_item_index: usize, + timeline_item: Arc, + ) -> Arc { + self.items.set(timeline_item_index, timeline_item) + } + + /// Get an iterator over all the entries in this `ObservableItems`. + pub fn entries(&mut self) -> ObservableItemsEntries<'_> { + ObservableItemsEntries(self.items.entries()) + } + + /// Call the given closure for every element in this `ObservableItems`, + /// with an entry struct that allows updating that element. + pub fn for_each(&mut self, mut f: F) + where + F: FnMut(ObservableItemsEntry<'_>), + { + self.items.for_each(|entry| f(ObservableItemsEntry(entry))) + } +} + +// It's fine to deref to an immutable reference to `Vector`. +// +// We don't want, however, to deref to a mutable reference: it should be done +// via proper methods to control precisely the mapping between remote events and +// timeline items. +impl Deref for ObservableItems { + type Target = Vector>; + + fn deref(&self) -> &Self::Target { + &self.items + } +} + +/// An iterator that yields entries into an `ObservableItems`. +/// +/// It doesn't implement [`Iterator`] though because of a lifetime conflict: the +/// returned `Iterator::Item` could live longer than the `Iterator` itself. +/// Ideally, `Iterator::next` should take a `&'a mut self`, but this is not +/// possible. +pub struct ObservableItemsEntries<'a>(ObservableVectorEntries<'a, Arc>); + +impl ObservableItemsEntries<'_> { + /// Advance this iterator, yielding an `ObservableItemsEntry` for the next + /// item in the timeline, or `None` if all items have been visited. + pub fn next(&mut self) -> Option> { + self.0.next().map(ObservableItemsEntry) + } +} + +/// A handle to a single timeline item in an `ObservableItems`. +#[derive(Debug)] +pub struct ObservableItemsEntry<'a>(ObservableVectorEntry<'a, Arc>); + +impl ObservableItemsEntry<'_> { + /// Replace the timeline item by `timeline_item`. + pub fn replace(this: &mut Self, timeline_item: Arc) -> Arc { + ObservableVectorEntry::set(&mut this.0, timeline_item) + } +} + +// It's fine to deref to an immutable reference to `Arc`. +// +// We don't want, however, to deref to a mutable reference: it should be done +// via proper methods to control precisely the mapping between remote events and +// timeline items. +impl Deref for ObservableItemsEntry<'_> { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// A transaction that allows making multiple updates to an `ObservableItems` as +/// an atomic unit. +/// +/// For updates from the transaction to have affect, it has to be finalized with +/// [`ObservableItemsTransaction::commit`]. If the transaction is dropped +/// without that method being called, the updates will be discarded. +#[derive(Debug)] +pub struct ObservableItemsTransaction<'observable_items> { + items: ObservableVectorTransaction<'observable_items, Arc>, + all_remote_events: &'observable_items mut AllRemoteEvents, +} + +impl<'observable_items> ObservableItemsTransaction<'observable_items> { + /// Get a reference to the timeline item at position `timeline_item_index`. + pub fn get(&self, timeline_item_index: usize) -> Option<&Arc> { + self.items.get(timeline_item_index) + } + + /// Get a reference to all remote events. + pub fn all_remote_events(&self) -> &AllRemoteEvents { + self.all_remote_events + } + + /// Remove a remote event at the `event_index` position. + /// + /// Not to be confused with removing a timeline item! + pub fn remove_remote_event(&mut self, event_index: usize) -> Option { + self.all_remote_events.remove(event_index) + } + + /// Push a new remote event at the front of all remote events. + /// + /// Not to be confused with pushing a timeline item to the front! + pub fn push_front_remote_event(&mut self, event_meta: EventMeta) { + self.all_remote_events.push_front(event_meta); + } + + /// Push a new remote event at the back of all remote events. + /// + /// Not to be confused with pushing a timeline item to the back! + pub fn push_back_remote_event(&mut self, event_meta: EventMeta) { + self.all_remote_events.push_back(event_meta); + } + + /// Get a remote event by using an event ID. + pub fn get_remote_event_by_event_id_mut( + &mut self, + event_id: &EventId, + ) -> Option<&mut EventMeta> { + self.all_remote_events.get_by_event_id_mut(event_id) + } + + /// Replace a timeline item at position `timeline_item_index` by + /// `timeline_item`. + pub fn replace( + &mut self, + timeline_item_index: usize, + timeline_item: Arc, + ) -> Arc { + self.items.set(timeline_item_index, timeline_item) + } + + /// Remove a timeline item at position `timeline_item_index`. + pub fn remove(&mut self, timeline_item_index: usize) -> Arc { + let removed_timeline_item = self.items.remove(timeline_item_index); + self.all_remote_events.timeline_item_has_been_removed_at(timeline_item_index); + + removed_timeline_item + } + + /// Insert a new `timeline_item` at position `timeline_item_index`, with an + /// optionally associated `event_index`. + /// + /// If `event_index` is `Some(_)`, it means `timeline_item_index` has an + /// associated remote event (at position `event_index`) that maps to it. + /// Otherwise, if it is `None`, it means there is no remote event associated + /// to it; that's the case for virtual timeline item for example. See + /// [`EventMeta::timeline_item_index`] to learn more. + pub fn insert( + &mut self, + timeline_item_index: usize, + timeline_item: Arc, + event_index: Option, + ) { + self.items.insert(timeline_item_index, timeline_item); + self.all_remote_events.timeline_item_has_been_inserted_at(timeline_item_index, event_index); + } + + /// Push a new `timeline_item` at position 0, with an optionally associated + /// `event_index`. + /// + /// If `event_index` is `Some(_)`, it means `timeline_item_index` has an + /// associated remote event (at position `event_index`) that maps to it. + /// Otherwise, if it is `None`, it means there is no remote event associated + /// to it; that's the case for virtual timeline item for example. See + /// [`EventMeta::timeline_item_index`] to learn more. + pub fn push_front(&mut self, timeline_item: Arc, event_index: Option) { + self.items.push_front(timeline_item); + self.all_remote_events.timeline_item_has_been_inserted_at(0, event_index); + } + + /// Push a new `timeline_item` at position `len() - 1`, with an optionally + /// associated `event_index`. + /// + /// If `event_index` is `Some(_)`, it means `timeline_item_index` has an + /// associated remote event (at position `event_index`) that maps to it. + /// Otherwise, if it is `None`, it means there is no remote event associated + /// to it; that's the case for virtual timeline item for example. See + /// [`EventMeta::timeline_item_index`] to learn more. + pub fn push_back(&mut self, timeline_item: Arc, event_index: Option) { + self.items.push_back(timeline_item); + self.all_remote_events + .timeline_item_has_been_inserted_at(self.items.len().saturating_sub(1), event_index); + } + + /// Clear all timeline items and all remote events. + pub fn clear(&mut self) { + self.items.clear(); + self.all_remote_events.clear(); + } + + /// Call the given closure for every element in this `ObservableItems`, + /// with an entry struct that allows updating that element. + pub fn for_each(&mut self, mut f: F) + where + F: FnMut(ObservableItemsTransactionEntry<'_, 'observable_items>), + { + self.items.for_each(|entry| { + f(ObservableItemsTransactionEntry { entry, all_remote_events: self.all_remote_events }) + }) + } + + /// Commit this transaction, persisting the changes and notifying + /// subscribers. + pub fn commit(self) { + self.items.commit() + } +} + +// It's fine to deref to an immutable reference to `Vector`. +// +// We don't want, however, to deref to a mutable reference: it should be done +// via proper methods to control precisely the mapping between remote events and +// timeline items. +impl Deref for ObservableItemsTransaction<'_> { + type Target = Vector>; + + fn deref(&self) -> &Self::Target { + &self.items + } +} + +/// A handle to a single timeline item in an `ObservableItemsTransaction`. +pub struct ObservableItemsTransactionEntry<'observable_transaction_items, 'observable_items> { + entry: ObservableVectorTransactionEntry< + 'observable_transaction_items, + 'observable_items, + Arc, + >, + all_remote_events: &'observable_transaction_items mut AllRemoteEvents, +} + +impl ObservableItemsTransactionEntry<'_, '_> { + /// Replace the timeline item by `timeline_item`. + pub fn replace(this: &mut Self, timeline_item: Arc) -> Arc { + ObservableVectorTransactionEntry::set(&mut this.entry, timeline_item) + } + + /// Remove this timeline item. + pub fn remove(this: Self) { + let entry_index = ObservableVectorTransactionEntry::index(&this.entry); + + ObservableVectorTransactionEntry::remove(this.entry); + this.all_remote_events.timeline_item_has_been_removed_at(entry_index); + } +} + +// It's fine to deref to an immutable reference to `Arc`. +// +// We don't want, however, to deref to a mutable reference: it should be done +// via proper methods to control precisely the mapping between remote events and +// timeline items. +impl Deref for ObservableItemsTransactionEntry<'_, '_> { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.entry + } +} + +#[cfg(test)] +mod observable_items_tests { + use std::ops::Not; + + use assert_matches::assert_matches; + use eyeball_im::VectorDiff; + use ruma::{ + events::room::message::{MessageType, TextMessageEventContent}, + owned_user_id, MilliSecondsSinceUnixEpoch, + }; + use stream_assert::assert_next_matches; + + use super::*; + use crate::timeline::{ + controller::{EventTimelineItemKind, RemoteEventOrigin}, + event_item::RemoteEventTimelineItem, + EventTimelineItem, Message, TimelineDetails, TimelineItemContent, TimelineUniqueId, + }; + + fn item(event_id: &str) -> Arc { + TimelineItem::new( + EventTimelineItem::new( + owned_user_id!("@ivan:mnt.io"), + TimelineDetails::Unavailable, + MilliSecondsSinceUnixEpoch(0u32.into()), + TimelineItemContent::Message(Message { + msgtype: MessageType::Text(TextMessageEventContent::plain("hello")), + in_reply_to: None, + thread_root: None, + edited: false, + mentions: None, + }), + EventTimelineItemKind::Remote(RemoteEventTimelineItem { + event_id: event_id.parse().unwrap(), + transaction_id: None, + read_receipts: Default::default(), + is_own: false, + is_highlighted: false, + encryption_info: None, + original_json: None, + latest_edit_json: None, + origin: RemoteEventOrigin::Sync, + }), + Default::default(), + false, + ), + TimelineUniqueId(format!("__id_{event_id}")), + ) + } + + fn read_marker() -> Arc { + TimelineItem::read_marker() + } + + fn event_meta(event_id: &str) -> EventMeta { + EventMeta { event_id: event_id.parse().unwrap(), timeline_item_index: None, visible: false } + } + + macro_rules! assert_event_id { + ( $timeline_item:expr, $event_id:literal $( , $message:expr )? $(,)? ) => { + assert_eq!($timeline_item.as_event().unwrap().event_id().unwrap().as_str(), $event_id $( , $message)? ); + }; + } + + macro_rules! assert_mapping { + ( on $transaction:ident: + | event_id | event_index | timeline_item_index | + | $( - )+ | $( - )+ | $( - )+ | + $( + | $event_id:literal | $event_index:literal | $( $timeline_item_index:literal )? | + )+ + ) => { + let all_remote_events = $transaction .all_remote_events(); + + $( + // Remote event exists at this index… + assert_matches!(all_remote_events.0.get( $event_index ), Some(EventMeta { event_id, timeline_item_index, .. }) => { + // … this is the remote event with the expected event ID + assert_eq!( + event_id.as_str(), + $event_id , + concat!("event #", $event_index, " should have ID ", $event_id) + ); + + + // (tiny hack to handle the case where `$timeline_item_index` is absent) + #[allow(unused_variables)] + let timeline_item_index_is_expected = false; + $( + let timeline_item_index_is_expected = true; + let _ = $timeline_item_index; + )? + + if timeline_item_index_is_expected.not() { + // … this remote event does NOT map to a timeline item index + assert!( + timeline_item_index.is_none(), + concat!("event #", $event_index, " with ID ", $event_id, " should NOT map to a timeline item index" ) + ); + } + + $( + // … this remote event maps to a timeline item index + assert_eq!( + *timeline_item_index, + Some( $timeline_item_index ), + concat!("event #", $event_index, " with ID ", $event_id, " should map to timeline item #", $timeline_item_index ) + ); + + // … this timeline index exists + assert_matches!( $transaction .get( $timeline_item_index ), Some(timeline_item) => { + // … this timelime item has the expected event ID + assert_event_id!( + timeline_item, + $event_id , + concat!("timeline item #", $timeline_item_index, " should map to event ID ", $event_id ) + ); + }); + )? + }); + )* + } +} + + #[test] + fn test_is_empty() { + let mut items = ObservableItems::new(); + + assert!(items.is_empty()); + + // Push one event to check if `is_empty` returns false. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.commit(); + + assert!(items.is_empty().not()); + } + + #[test] + fn test_subscribe() { + let mut items = ObservableItems::new(); + let mut subscriber = items.subscribe().into_stream(); + + // Push one event to check the subscriber is emitting something. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.commit(); + + // It does! + assert_next_matches!(subscriber, VectorDiff::PushBack { value: event } => { + assert_event_id!(event, "$ev0"); + }); + } + + #[test] + fn test_clone_items() { + let mut items = ObservableItems::new(); + + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.push_back(item("$ev1"), Some(1)); + transaction.commit(); + + let items = items.clone_items(); + assert_eq!(items.len(), 2); + assert_event_id!(items[0], "$ev0"); + assert_event_id!(items[1], "$ev1"); + } + + #[test] + fn test_replace() { + let mut items = ObservableItems::new(); + + // Push one event that will be replaced. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.commit(); + + // That's time to replace it! + items.replace(0, item("$ev1")); + + let items = items.clone_items(); + assert_eq!(items.len(), 1); + assert_event_id!(items[0], "$ev1"); + } + + #[test] + fn test_entries() { + let mut items = ObservableItems::new(); + + // Push events to iterate on. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.push_back(item("$ev1"), Some(1)); + transaction.push_back(item("$ev2"), Some(2)); + transaction.commit(); + + let mut entries = items.entries(); + + assert_matches!(entries.next(), Some(entry) => { + assert_event_id!(entry, "$ev0"); + }); + assert_matches!(entries.next(), Some(entry) => { + assert_event_id!(entry, "$ev1"); + }); + assert_matches!(entries.next(), Some(entry) => { + assert_event_id!(entry, "$ev2"); + }); + assert_matches!(entries.next(), None); + } + + #[test] + fn test_entry_replace() { + let mut items = ObservableItems::new(); + + // Push events to iterate on. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.commit(); + + let mut entries = items.entries(); + + // Replace one event by another one. + assert_matches!(entries.next(), Some(mut entry) => { + assert_event_id!(entry, "$ev0"); + ObservableItemsEntry::replace(&mut entry, item("$ev1")); + }); + assert_matches!(entries.next(), None); + + // Check the new event. + let mut entries = items.entries(); + + assert_matches!(entries.next(), Some(entry) => { + assert_event_id!(entry, "$ev1"); + }); + assert_matches!(entries.next(), None); + } + + #[test] + fn test_for_each() { + let mut items = ObservableItems::new(); + + // Push events to iterate on. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.push_back(item("$ev1"), Some(1)); + transaction.push_back(item("$ev2"), Some(2)); + transaction.commit(); + + let mut nth = 0; + + // Iterate over events. + items.for_each(|entry| { + match nth { + 0 => { + assert_event_id!(entry, "$ev0"); + } + 1 => { + assert_event_id!(entry, "$ev1"); + } + 2 => { + assert_event_id!(entry, "$ev2"); + } + _ => unreachable!(), + } + + nth += 1; + }); + } + + #[test] + fn test_transaction_commit() { + let mut items = ObservableItems::new(); + + // Don't commit the transaction. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + drop(transaction); + + assert!(items.is_empty()); + + // Commit the transaction. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.commit(); + + assert!(items.is_empty().not()); + } + + #[test] + fn test_transaction_get() { + let mut items = ObservableItems::new(); + + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + + assert_matches!(transaction.get(0), Some(event) => { + assert_event_id!(event, "$ev0"); + }); + } + + #[test] + fn test_transaction_replace() { + let mut items = ObservableItems::new(); + + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.replace(0, item("$ev1")); + + assert_matches!(transaction.get(0), Some(event) => { + assert_event_id!(event, "$ev1"); + }); + } + + #[test] + fn test_transaction_insert() { + let mut items = ObservableItems::new(); + + let mut transaction = items.transaction(); + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev0")); + transaction.insert(0, item("$ev0"), Some(0)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | // new + } + + // Timeline item without a remote event (for example a read marker). + transaction.insert(0, read_marker(), None); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 1 | // has shifted + } + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev1")); + transaction.insert(2, item("$ev1"), Some(1)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 1 | + | "$ev1" | 1 | 2 | // new + } + + // Remote event without a timeline item (for example a state event). + transaction.push_back_remote_event(event_meta("$ev2")); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 1 | + | "$ev1" | 1 | 2 | + | "$ev2" | 2 | | // new + } + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev3")); + transaction.insert(3, item("$ev3"), Some(3)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 1 | + | "$ev1" | 1 | 2 | + | "$ev2" | 2 | | + | "$ev3" | 3 | 3 | // new + } + + // Timeline item with a remote event, but late. + // I don't know if this case is possible in reality, but let's be robust. + transaction.insert(3, item("$ev2"), Some(2)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 1 | + | "$ev1" | 1 | 2 | + | "$ev2" | 2 | 3 | // updated + | "$ev3" | 3 | 4 | // has shifted + } + + // Let's move the read marker for the fun. + transaction.remove(0); + transaction.insert(2, read_marker(), None); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | // has shifted + | "$ev1" | 1 | 1 | // has shifted + | "$ev2" | 2 | 3 | + | "$ev3" | 3 | 4 | + } + + assert_eq!(transaction.len(), 5); + } + + #[test] + fn test_transaction_push_front() { + let mut items = ObservableItems::new(); + + let mut transaction = items.transaction(); + + // Remote event with its timeline item. + transaction.push_front_remote_event(event_meta("$ev0")); + transaction.push_front(item("$ev0"), Some(0)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | // new + } + + // Timeline item without a remote event (for example a read marker). + transaction.push_front(read_marker(), None); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 1 | // has shifted + } + + // Remote event with its timeline item. + transaction.push_front_remote_event(event_meta("$ev1")); + transaction.push_front(item("$ev1"), Some(0)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev1" | 0 | 0 | // new + | "$ev0" | 1 | 2 | // has shifted + } + + // Remote event without a timeline item (for example a state event). + transaction.push_front_remote_event(event_meta("$ev2")); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev2" | 0 | | + | "$ev1" | 1 | 0 | // has shifted + | "$ev0" | 2 | 2 | // has shifted + } + + // Remote event with its timeline item. + transaction.push_front_remote_event(event_meta("$ev3")); + transaction.push_front(item("$ev3"), Some(0)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev3" | 0 | 0 | // new + | "$ev2" | 1 | | + | "$ev1" | 2 | 1 | // has shifted + | "$ev0" | 3 | 3 | // has shifted + } + + assert_eq!(transaction.len(), 4); + } + + #[test] + fn test_transaction_push_back() { + let mut items = ObservableItems::new(); + + let mut transaction = items.transaction(); + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev0")); + transaction.push_back(item("$ev0"), Some(0)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | // new + } + + // Timeline item without a remote event (for example a read marker). + transaction.push_back(read_marker(), None); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + } + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev1")); + transaction.push_back(item("$ev1"), Some(1)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | 2 | // new + } + + // Remote event without a timeline item (for example a state event). + transaction.push_back_remote_event(event_meta("$ev2")); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | 2 | + | "$ev2" | 2 | | // new + } + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev3")); + transaction.push_back(item("$ev3"), Some(3)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | 2 | + | "$ev2" | 2 | | + | "$ev3" | 3 | 3 | // new + } + + assert_eq!(transaction.len(), 4); + } + + #[test] + fn test_transaction_remove() { + let mut items = ObservableItems::new(); + + let mut transaction = items.transaction(); + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev0")); + transaction.push_back(item("$ev0"), Some(0)); + + // Timeline item without a remote event (for example a read marker). + transaction.push_back(read_marker(), None); + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev1")); + transaction.push_back(item("$ev1"), Some(1)); + + // Remote event without a timeline item (for example a state event). + transaction.push_back_remote_event(event_meta("$ev2")); + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev3")); + transaction.push_back(item("$ev3"), Some(3)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | 2 | + | "$ev2" | 2 | | + | "$ev3" | 3 | 3 | + } + + // Remove the timeline item that has no event. + transaction.remove(1); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | 1 | // has shifted + | "$ev2" | 2 | | + | "$ev3" | 3 | 2 | // has shifted + } + + // Remove an timeline item that has an event. + transaction.remove(1); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | | // has been removed + | "$ev2" | 2 | | + | "$ev3" | 3 | 1 | // has shifted + } + + // Remove the last timeline item to test off by 1 error. + transaction.remove(1); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | | + | "$ev2" | 2 | | + | "$ev3" | 3 | | // has been removed + } + + // Remove all the items \o/ + transaction.remove(0); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | | // has been removed + | "$ev1" | 1 | | + | "$ev2" | 2 | | + | "$ev3" | 3 | | + } + + assert!(transaction.is_empty()); + } + + #[test] + fn test_transaction_clear() { + let mut items = ObservableItems::new(); + + let mut transaction = items.transaction(); + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev0")); + transaction.push_back(item("$ev0"), Some(0)); + + // Timeline item without a remote event (for example a read marker). + transaction.push_back(read_marker(), None); + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev1")); + transaction.push_back(item("$ev1"), Some(1)); + + // Remote event without a timeline item (for example a state event). + transaction.push_back_remote_event(event_meta("$ev2")); + + // Remote event with its timeline item. + transaction.push_back_remote_event(event_meta("$ev3")); + transaction.push_back(item("$ev3"), Some(3)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | 2 | + | "$ev2" | 2 | | + | "$ev3" | 3 | 3 | + } + + assert_eq!(transaction.all_remote_events().0.len(), 4); + assert_eq!(transaction.len(), 4); + + // Let's clear everything. + transaction.clear(); + + assert!(transaction.all_remote_events().0.is_empty()); + assert!(transaction.is_empty()); + } + + #[test] + fn test_transaction_for_each() { + let mut items = ObservableItems::new(); + + // Push events to iterate on. + let mut transaction = items.transaction(); + transaction.push_back(item("$ev0"), Some(0)); + transaction.push_back(item("$ev1"), Some(1)); + transaction.push_back(item("$ev2"), Some(2)); + + let mut nth = 0; + + // Iterate over events. + transaction.for_each(|entry| { + match nth { + 0 => { + assert_event_id!(entry, "$ev0"); + } + 1 => { + assert_event_id!(entry, "$ev1"); + } + 2 => { + assert_event_id!(entry, "$ev2"); + } + _ => unreachable!(), + } + + nth += 1; + }); + } + + #[test] + fn test_transaction_for_each_remove() { + let mut items = ObservableItems::new(); + + // Push events to iterate on. + let mut transaction = items.transaction(); + + transaction.push_back_remote_event(event_meta("$ev0")); + transaction.push_back(item("$ev0"), Some(0)); + + transaction.push_back_remote_event(event_meta("$ev1")); + transaction.push_back(item("$ev1"), Some(1)); + + transaction.push_back_remote_event(event_meta("$ev2")); + transaction.push_back(item("$ev2"), Some(2)); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev1" | 1 | 1 | + | "$ev2" | 2 | 2 | + } + + // Iterate over events, and remove one. + transaction.for_each(|entry| { + if entry.as_event().unwrap().event_id().unwrap().as_str() == "$ev1" { + ObservableItemsTransactionEntry::remove(entry); + } + }); + + assert_mapping! { + on transaction: + + | event_id | event_index | timeline_item_index | + |----------|-------------|---------------------| + | "$ev0" | 0 | 0 | + | "$ev2" | 2 | 1 | // has shifted + } + + assert_eq!(transaction.all_remote_events().0.len(), 3); + assert_eq!(transaction.len(), 2); + } +} + +/// A type for all remote events. +/// +/// Having this type helps to know exactly which parts of the code and how they +/// use all remote events. It also helps to give a bit of semantics on top of +/// them. +#[derive(Clone, Debug, Default)] +pub struct AllRemoteEvents(VecDeque); + +impl AllRemoteEvents { + /// Return a front-to-back iterator over all remote events. + pub fn iter(&self) -> Iter<'_, EventMeta> { + self.0.iter() + } + + /// Remove all remote events. + fn clear(&mut self) { + self.0.clear(); + } + + /// Insert a new remote event at the front of all the others. + fn push_front(&mut self, event_meta: EventMeta) { + // If there is an associated `timeline_item_index`, shift all the + // `timeline_item_index` that come after this one. + if let Some(new_timeline_item_index) = event_meta.timeline_item_index { + self.increment_all_timeline_item_index_after(new_timeline_item_index); + } + + // Push the event. + self.0.push_front(event_meta) + } + + /// Insert a new remote event at the back of all the others. + fn push_back(&mut self, event_meta: EventMeta) { + // If there is an associated `timeline_item_index`, shift all the + // `timeline_item_index` that come after this one. + if let Some(new_timeline_item_index) = event_meta.timeline_item_index { + self.increment_all_timeline_item_index_after(new_timeline_item_index); + } + + // Push the event. + self.0.push_back(event_meta) + } + + /// Remove one remote event at a specific index, and return it if it exists. + fn remove(&mut self, event_index: usize) -> Option { + // Remove the event. + let event_meta = self.0.remove(event_index)?; + + // If there is an associated `timeline_item_index`, shift all the + // `timeline_item_index` that come after this one. + if let Some(removed_timeline_item_index) = event_meta.timeline_item_index { + self.decrement_all_timeline_item_index_after(removed_timeline_item_index); + }; + + Some(event_meta) + } + + /// Return a reference to the last remote event if it exists. + pub fn last(&self) -> Option<&EventMeta> { + self.0.back() + } + + /// Return the index of the last remote event if it exists. + pub fn last_index(&self) -> Option { + self.0.len().checked_sub(1) + } + + /// Get a mutable reference to a specific remote event by its ID. + pub fn get_by_event_id_mut(&mut self, event_id: &EventId) -> Option<&mut EventMeta> { + self.0.iter_mut().rev().find(|event_meta| event_meta.event_id == event_id) + } + + /// Shift to the right all timeline item indexes that are equal to or + /// greater than `new_timeline_item_index`. + fn increment_all_timeline_item_index_after(&mut self, new_timeline_item_index: usize) { + for event_meta in self.0.iter_mut() { + if let Some(timeline_item_index) = event_meta.timeline_item_index.as_mut() { + if *timeline_item_index >= new_timeline_item_index { + *timeline_item_index += 1; + } + } + } + } + + /// Shift to the left all timeline item indexes that are greater than + /// `removed_wtimeline_item_index`. + fn decrement_all_timeline_item_index_after(&mut self, removed_timeline_item_index: usize) { + for event_meta in self.0.iter_mut() { + if let Some(timeline_item_index) = event_meta.timeline_item_index.as_mut() { + if *timeline_item_index > removed_timeline_item_index { + *timeline_item_index -= 1; + } + } + } + } + + /// Notify that a timeline item has been inserted at + /// `new_timeline_item_index`. If `event_index` is `Some(_)`, it means the + /// remote event at `event_index` must be mapped to + /// `new_timeline_item_index`. + fn timeline_item_has_been_inserted_at( + &mut self, + new_timeline_item_index: usize, + event_index: Option, + ) { + self.increment_all_timeline_item_index_after(new_timeline_item_index); + + if let Some(event_index) = event_index { + if let Some(event_meta) = self.0.get_mut(event_index) { + event_meta.timeline_item_index = Some(new_timeline_item_index); + } + } + } + + /// Notify that a timeline item has been removed at + /// `new_timeline_item_index`. If `event_index` is `Some(_)`, it means the + /// remote event at `event_index` must be unmapped. + fn timeline_item_has_been_removed_at(&mut self, timeline_item_index_to_remove: usize) { + for event_meta in self.0.iter_mut() { + let mut remove_timeline_item_index = false; + + // A `timeline_item_index` is removed. Let's shift all indexes that come + // after the removed one. + if let Some(timeline_item_index) = event_meta.timeline_item_index.as_mut() { + match (*timeline_item_index).cmp(&timeline_item_index_to_remove) { + Ordering::Equal => { + remove_timeline_item_index = true; + } + + Ordering::Greater => { + *timeline_item_index -= 1; + } + + Ordering::Less => {} + } + } + + // This is the `event_meta` that holds the `timeline_item_index` that is being + // removed. So let's clean it. + if remove_timeline_item_index { + event_meta.timeline_item_index = None; + } + } + } +} + +#[cfg(test)] +mod all_remote_events_tests { + use assert_matches::assert_matches; + use ruma::event_id; + + use super::{AllRemoteEvents, EventMeta}; + + fn event_meta(event_id: &str, timeline_item_index: Option) -> EventMeta { + EventMeta { event_id: event_id.parse().unwrap(), timeline_item_index, visible: false } + } + + macro_rules! assert_events { + ( $events:ident, [ $( ( $event_id:literal, $timeline_item_index:expr ) ),* $(,)? ] ) => { + let mut iter = $events .iter(); + + $( + assert_matches!(iter.next(), Some(EventMeta { event_id, timeline_item_index, .. }) => { + assert_eq!(event_id.as_str(), $event_id ); + assert_eq!(*timeline_item_index, $timeline_item_index ); + }); + )* + + assert!(iter.next().is_none(), "Not all events have been asserted"); + } + } + + #[test] + fn test_clear() { + let mut events = AllRemoteEvents::default(); + + // Push some events. + events.push_back(event_meta("$ev0", None)); + events.push_back(event_meta("$ev1", None)); + events.push_back(event_meta("$ev2", None)); + + assert_eq!(events.iter().count(), 3); + + // And clear them! + events.clear(); + + assert_eq!(events.iter().count(), 0); + } + + #[test] + fn test_push_front() { + let mut events = AllRemoteEvents::default(); + + // Push front on an empty set, nothing particular. + events.push_front(event_meta("$ev0", Some(1))); + + // Push front with no `timeline_item_index`. + events.push_front(event_meta("$ev1", None)); + + // Push front with a `timeline_item_index`. + events.push_front(event_meta("$ev2", Some(0))); + + // Push front with the same `timeline_item_index`. + events.push_front(event_meta("$ev3", Some(0))); + + assert_events!( + events, + [ + // `timeline_item_index` is untouched + ("$ev3", Some(0)), + // `timeline_item_index` has been shifted once + ("$ev2", Some(1)), + // no `timeline_item_index` + ("$ev1", None), + // `timeline_item_index` has been shifted twice + ("$ev0", Some(3)), + ] + ); + } + + #[test] + fn test_push_back() { + let mut events = AllRemoteEvents::default(); + + // Push back on an empty set, nothing particular. + events.push_back(event_meta("$ev0", Some(0))); + + // Push back with no `timeline_item_index`. + events.push_back(event_meta("$ev1", None)); + + // Push back with a `timeline_item_index`. + events.push_back(event_meta("$ev2", Some(1))); + + // Push back with a `timeline_item_index` pointing to a timeline item that is + // not the last one. Is it possible in practise? Normally not, but let's test + // it anyway. + events.push_back(event_meta("$ev3", Some(1))); + + assert_events!( + events, + [ + // `timeline_item_index` is untouched + ("$ev0", Some(0)), + // no `timeline_item_index` + ("$ev1", None), + // `timeline_item_index` has been shifted once + ("$ev2", Some(2)), + // `timeline_item_index` is untouched + ("$ev3", Some(1)), + ] + ); + } + + #[test] + fn test_remove() { + let mut events = AllRemoteEvents::default(); + + // Push some events. + events.push_back(event_meta("$ev0", Some(0))); + events.push_back(event_meta("$ev1", Some(1))); + events.push_back(event_meta("$ev2", None)); + events.push_back(event_meta("$ev3", Some(2))); + + // Assert initial state. + assert_events!( + events, + [("$ev0", Some(0)), ("$ev1", Some(1)), ("$ev2", None), ("$ev3", Some(2))] + ); + + // Remove two events. + events.remove(2); // $ev2 has no `timeline_item_index` + events.remove(1); // $ev1 has a `timeline_item_index` + + assert_events!( + events, + [ + ("$ev0", Some(0)), + // `timeline_item_index` has shifted once + ("$ev3", Some(1)), + ] + ); + } + + #[test] + fn test_last() { + let mut events = AllRemoteEvents::default(); + + assert!(events.last().is_none()); + assert!(events.last_index().is_none()); + + // Push some events. + events.push_back(event_meta("$ev0", Some(0))); + events.push_back(event_meta("$ev1", Some(1))); + + assert_matches!(events.last(), Some(EventMeta { event_id, .. }) => { + assert_eq!(event_id.as_str(), "$ev1"); + }); + assert_eq!(events.last_index(), Some(1)); + } + + #[test] + fn test_get_by_event_by_mut() { + let mut events = AllRemoteEvents::default(); + + // Push some events. + events.push_back(event_meta("$ev0", Some(0))); + events.push_back(event_meta("$ev1", Some(1))); + + assert!(events.get_by_event_id_mut(event_id!("$ev0")).is_some()); + assert!(events.get_by_event_id_mut(event_id!("$ev42")).is_none()); + } + + #[test] + fn test_timeline_item_has_been_inserted_at() { + let mut events = AllRemoteEvents::default(); + + // Push some events. + events.push_back(event_meta("$ev0", Some(0))); + events.push_back(event_meta("$ev1", Some(1))); + events.push_back(event_meta("$ev2", None)); + events.push_back(event_meta("$ev3", None)); + events.push_back(event_meta("$ev4", Some(2))); + events.push_back(event_meta("$ev5", Some(3))); + events.push_back(event_meta("$ev6", None)); + + // A timeline item has been inserted at index 2, and maps to no event. + events.timeline_item_has_been_inserted_at(2, None); + + assert_events!( + events, + [ + ("$ev0", Some(0)), + ("$ev1", Some(1)), + ("$ev2", None), + ("$ev3", None), + // `timeline_item_index` is shifted once + ("$ev4", Some(3)), + // `timeline_item_index` is shifted once + ("$ev5", Some(4)), + ("$ev6", None), + ] + ); + + // A timeline item has been inserted at the back, and maps to `$ev6`. + events.timeline_item_has_been_inserted_at(5, Some(6)); + + assert_events!( + events, + [ + ("$ev0", Some(0)), + ("$ev1", Some(1)), + ("$ev2", None), + ("$ev3", None), + ("$ev4", Some(3)), + ("$ev5", Some(4)), + // `timeline_item_index` has been updated + ("$ev6", Some(5)), + ] + ); + } + + #[test] + fn test_timeline_item_has_been_removed_at() { + let mut events = AllRemoteEvents::default(); + + // Push some events. + events.push_back(event_meta("$ev0", Some(0))); + events.push_back(event_meta("$ev1", Some(1))); + events.push_back(event_meta("$ev2", None)); + events.push_back(event_meta("$ev3", None)); + events.push_back(event_meta("$ev4", Some(3))); + events.push_back(event_meta("$ev5", Some(4))); + events.push_back(event_meta("$ev6", None)); + + // A timeline item has been removed at index 2, which maps to no event. + events.timeline_item_has_been_removed_at(2); + + assert_events!( + events, + [ + ("$ev0", Some(0)), + ("$ev1", Some(1)), + ("$ev2", None), + ("$ev3", None), + // `timeline_item_index` is shifted once + ("$ev4", Some(2)), + // `timeline_item_index` is shifted once + ("$ev5", Some(3)), + ("$ev6", None), + ] + ); + + // A timeline item has been removed at index 2, which maps to `$ev4`. + events.timeline_item_has_been_removed_at(2); + + assert_events!( + events, + [ + ("$ev0", Some(0)), + ("$ev1", Some(1)), + ("$ev2", None), + ("$ev3", None), + // `timeline_item_index` has been updated + ("$ev4", None), + // `timeline_item_index` has shifted once + ("$ev5", Some(2)), + ("$ev6", None), + ] + ); + + // A timeline item has been removed at index 0, which maps to `$ev0`. + events.timeline_item_has_been_removed_at(0); + + assert_events!( + events, + [ + // `timeline_item_index` has been updated + ("$ev0", None), + // `timeline_item_index` has shifted once + ("$ev1", Some(0)), + ("$ev2", None), + ("$ev3", None), + ("$ev4", None), + // `timeline_item_index` has shifted once + ("$ev5", Some(1)), + ("$ev6", None), + ] + ); + } +} diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state.rs b/crates/matrix-sdk-ui/src/timeline/controller/state.rs index 1800bebdab7..878573af3ea 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state.rs @@ -13,13 +13,12 @@ // limitations under the License. use std::{ - collections::{vec_deque::Iter, HashMap, VecDeque}, + collections::HashMap, future::Future, num::NonZeroUsize, sync::{Arc, RwLock}, }; -use eyeball_im::{ObservableVector, ObservableVectorTransaction, ObservableVectorTransactionEntry}; use itertools::Itertools as _; use matrix_sdk::{ deserialized_responses::SyncTimelineEvent, ring_buffer::RingBuffer, send_queue::SendHandle, @@ -44,7 +43,13 @@ use ruma::{ }; use tracing::{debug, instrument, trace, warn}; -use super::{HandleManyEventsResult, TimelineFocusKind, TimelineSettings}; +use super::{ + observable_items::{ + AllRemoteEvents, ObservableItems, ObservableItemsTransaction, + ObservableItemsTransactionEntry, + }, + HandleManyEventsResult, TimelineFocusKind, TimelineSettings, +}; use crate::{ events::SyncTimelineEventWithoutContent, timeline::{ @@ -89,7 +94,7 @@ impl From for TimelineItemPosition { #[derive(Debug)] pub(in crate::timeline) struct TimelineState { - pub items: ObservableVector>, + pub items: ObservableItems, pub meta: TimelineMetadata, /// The kind of focus of this timeline. @@ -106,10 +111,7 @@ impl TimelineState { is_room_encrypted: Option, ) -> Self { Self { - // Upstream default capacity is currently 16, which is making - // sliding-sync tests with 20 events lag. This should still be - // small enough. - items: ObservableVector::with_capacity(32), + items: ObservableItems::new(), meta: TimelineMetadata::new( own_user_id, room_version, @@ -329,6 +331,7 @@ impl TimelineState { pub(super) fn transaction(&mut self) -> TimelineStateTransaction<'_> { let items = self.items.transaction(); let meta = self.meta.clone(); + TimelineStateTransaction { items, previous_meta: &mut self.meta, @@ -341,7 +344,7 @@ impl TimelineState { pub(in crate::timeline) struct TimelineStateTransaction<'a> { /// A vector transaction over the items themselves. Holds temporary state /// until committed. - pub items: ObservableVectorTransaction<'a, Arc>, + pub items: ObservableItemsTransaction<'a>, /// A clone of the previous meta, that we're operating on during the /// transaction, and that will be committed to the previous meta location in @@ -544,7 +547,7 @@ impl TimelineStateTransaction<'_> { }; // Remember the event before returning prematurely. - // See [`TimelineMetadata::all_remote_events`]. + // See [`ObservableItems::all_remote_events`]. self.add_or_update_remote_event( event_meta, position, @@ -583,7 +586,7 @@ impl TimelineStateTransaction<'_> { }; // Remember the event before returning prematurely. - // See [`TimelineMetadata::all_remote_events`]. + // See [`ObservableItems::all_remote_events`]. self.add_or_update_remote_event( event_meta, position, @@ -609,7 +612,7 @@ impl TimelineStateTransaction<'_> { }; // Remember the event. - // See [`TimelineMetadata::all_remote_events`]. + // See [`ObservableItems::all_remote_events`]. self.add_or_update_remote_event(event_meta, position, room_data_provider, settings).await; let sender_profile = room_data_provider.profile_from_user_id(&sender).await; @@ -621,7 +624,7 @@ impl TimelineStateTransaction<'_> { read_receipts: if settings.track_read_receipts && should_add { self.meta.read_receipts.compute_event_receipts( &event_id, - &self.meta.all_remote_events, + self.items.all_remote_events(), matches!(position, TimelineItemPosition::End { .. }), ) } else { @@ -654,7 +657,7 @@ impl TimelineStateTransaction<'_> { // Remove all remote events and the read marker self.items.for_each(|entry| { if entry.is_remote_event() || entry.is_read_marker() { - ObservableVectorTransactionEntry::remove(entry); + ObservableItemsTransactionEntry::remove(entry); } }); @@ -700,7 +703,7 @@ impl TimelineStateTransaction<'_> { } /// Add or update a remote event in the - /// [`TimelineMetadata::all_remote_events`] collection. + /// [`ObservableItems::all_remote_events`] collection. /// /// This method also adjusts read receipt if needed. async fn add_or_update_remote_event( @@ -710,7 +713,7 @@ impl TimelineStateTransaction<'_> { room_data_provider: &P, settings: &TimelineSettings, ) { - // Detect if an event already exists in [`TimelineMetadata::all_remote_events`]. + // Detect if an event already exists in [`ObservableItems::all_remote_events`]. // // Returns its position, in this case. fn event_already_exists( @@ -723,27 +726,27 @@ impl TimelineStateTransaction<'_> { match position { TimelineItemPosition::Start { .. } => { if let Some(pos) = - event_already_exists(event_meta.event_id, &self.meta.all_remote_events) + event_already_exists(event_meta.event_id, self.items.all_remote_events()) { - self.meta.all_remote_events.remove(pos); + self.items.remove_remote_event(pos); } - self.meta.all_remote_events.push_front(event_meta.base_meta()) + self.items.push_front_remote_event(event_meta.base_meta()) } TimelineItemPosition::End { .. } => { if let Some(pos) = - event_already_exists(event_meta.event_id, &self.meta.all_remote_events) + event_already_exists(event_meta.event_id, self.items.all_remote_events()) { - self.meta.all_remote_events.remove(pos); + self.items.remove_remote_event(pos); } - self.meta.all_remote_events.push_back(event_meta.base_meta()); + self.items.push_back_remote_event(event_meta.base_meta()); } TimelineItemPosition::UpdateDecrypted { .. } => { if let Some(event) = - self.meta.all_remote_events.get_by_event_id_mut(event_meta.event_id) + self.items.get_remote_event_by_event_id_mut(event_meta.event_id) { if event.visible != event_meta.visible { event.visible = event_meta.visible; @@ -787,7 +790,7 @@ impl TimelineStateTransaction<'_> { // Replace the existing item with a new version with the right encryption flag let item = item.with_kind(cloned_event); - self.items.set(idx, item); + self.items.replace(idx, item); } } } @@ -916,13 +919,6 @@ pub(in crate::timeline) struct TimelineMetadata { /// the device has terabytes of RAM. next_internal_id: u64, - /// List of all the remote events as received in the timeline, even the ones - /// that are discarded in the timeline items. - /// - /// This is useful to get this for the moment as it helps the `Timeline` to - /// compute read receipts and read markers. - pub all_remote_events: AllRemoteEvents, - /// State helping matching reactions to their associated events, and /// stashing pending reactions. pub reactions: Reactions, @@ -965,7 +961,6 @@ impl TimelineMetadata { ) -> Self { Self { own_user_id, - all_remote_events: Default::default(), next_internal_id: Default::default(), reactions: Default::default(), pending_poll_events: Default::default(), @@ -985,7 +980,6 @@ impl TimelineMetadata { pub(crate) fn clear(&mut self) { // Note: we don't clear the next internal id to avoid bad cases of stale unique // ids across timeline clears. - self.all_remote_events.clear(); self.reactions.clear(); self.pending_poll_events.clear(); self.pending_edits.clear(); @@ -1006,6 +1000,7 @@ impl TimelineMetadata { &self, event_a: &EventId, event_b: &EventId, + all_remote_events: &AllRemoteEvents, ) -> Option { if event_a == event_b { return Some(RelativePosition::Same); @@ -1013,11 +1008,11 @@ impl TimelineMetadata { // We can make early returns here because we know all events since the end of // the timeline, so the first event encountered is the oldest one. - for meta in self.all_remote_events.iter().rev() { - if meta.event_id == event_a { + for event_meta in all_remote_events.iter().rev() { + if event_meta.event_id == event_a { return Some(RelativePosition::Before); } - if meta.event_id == event_b { + if event_meta.event_id == event_b { return Some(RelativePosition::After); } } @@ -1040,10 +1035,7 @@ impl TimelineMetadata { } /// Try to update the read marker item in the timeline. - pub(crate) fn update_read_marker( - &mut self, - items: &mut ObservableVectorTransaction<'_, Arc>, - ) { + pub(crate) fn update_read_marker(&mut self, items: &mut ObservableItemsTransaction<'_>) { let Some(fully_read_event) = &self.fully_read_event else { return }; trace!(?fully_read_event, "Updating read marker"); @@ -1092,7 +1084,8 @@ impl TimelineMetadata { (None, Some(idx)) => { // Only insert the read marker if it is not at the end of the timeline. if idx + 1 < items.len() { - items.insert(idx + 1, TimelineItem::read_marker()); + let idx = idx + 1; + items.insert(idx, TimelineItem::read_marker(), None); self.has_up_to_date_read_marker_item = true; } else { // The next event might require a read marker to be inserted at the current @@ -1126,7 +1119,7 @@ impl TimelineMetadata { // Since the fully-read event's index was shifted to the left // by one position by the remove call above, insert the fully- // read marker at its previous position, rather than that + 1 - items.insert(to, read_marker); + items.insert(to, read_marker, None); self.has_up_to_date_read_marker_item = true; } else { self.has_up_to_date_read_marker_item = false; @@ -1136,51 +1129,6 @@ impl TimelineMetadata { } } -/// A type for all remote events. -/// -/// Having this type helps to know exactly which parts of the code and how they -/// use all remote events. It also helps to give a bit of semantics on top of -/// them. -#[derive(Clone, Debug, Default)] -pub(crate) struct AllRemoteEvents(VecDeque); - -impl AllRemoteEvents { - /// Return a front-to-back iterator over all remote events. - pub fn iter(&self) -> Iter<'_, EventMeta> { - self.0.iter() - } - - /// Remove all remote events. - pub fn clear(&mut self) { - self.0.clear(); - } - - /// Insert a new remote event at the front of all the others. - pub fn push_front(&mut self, event_meta: EventMeta) { - self.0.push_front(event_meta) - } - - /// Insert a new remote event at the back of all the others. - pub fn push_back(&mut self, event_meta: EventMeta) { - self.0.push_back(event_meta) - } - - /// Remove one remote event at a specific index, and return it if it exists. - pub fn remove(&mut self, event_index: usize) -> Option { - self.0.remove(event_index) - } - - /// Return a reference to the last remote event if it exists. - pub fn last(&self) -> Option<&EventMeta> { - self.0.back() - } - - /// Get a mutable reference to a specific remote event by its ID. - pub fn get_by_event_id_mut(&mut self, event_id: &EventId) -> Option<&mut EventMeta> { - self.0.iter_mut().rev().find(|event_meta| event_meta.event_id == event_id) - } -} - /// Full metadata about an event. /// /// Only used to group function parameters. @@ -1199,7 +1147,11 @@ pub(crate) struct FullEventMeta<'a> { impl FullEventMeta<'_> { fn base_meta(&self) -> EventMeta { - EventMeta { event_id: self.event_id.to_owned(), visible: self.visible } + EventMeta { + event_id: self.event_id.to_owned(), + visible: self.visible, + timeline_item_index: None, + } } } @@ -1208,6 +1160,70 @@ impl FullEventMeta<'_> { pub(crate) struct EventMeta { /// The ID of the event. pub event_id: OwnedEventId, + /// Whether the event is among the timeline items. pub visible: bool, + + /// Foundation for the mapping between remote events to timeline items. + /// + /// Let's explain it. The events represent the first set and are stored in + /// [`ObservableItems::all_remote_events`], and the timeline + /// items represent the second set and are stored in + /// [`ObservableItems::items`]. + /// + /// Each event is mapped to at most one timeline item: + /// + /// - `None` if the event isn't rendered in the timeline (e.g. some state + /// events, or malformed events) or is rendered as a timeline item that + /// attaches to or groups with another item, like reactions, + /// - `Some(_)` if the event is rendered in the timeline. + /// + /// This is neither a surjection nor an injection. Every timeline item may + /// not be attached to an event, for example with a virtual timeline item. + /// We can formulate other rules: + /// + /// - a timeline item that doesn't _move_ and that is represented by an + /// event has a mapping to an event, + /// - a virtual timeline item has no mapping to an event. + /// + /// Imagine the following remote events: + /// + /// | index | remote events | + /// +-------+---------------+ + /// | 0 | `$ev0` | + /// | 1 | `$ev1` | + /// | 2 | `$ev2` | + /// | 3 | `$ev3` | + /// | 4 | `$ev4` | + /// | 5 | `$ev5` | + /// + /// Once rendered in a timeline, it for example produces: + /// + /// | index | item | related items | + /// +-------+-------------------+----------------------+ + /// | 0 | content of `$ev0` | | + /// | 1 | content of `$ev2` | reaction with `$ev4` | + /// | 2 | day divider | | + /// | 3 | content of `$ev3` | | + /// | 4 | content of `$ev5` | | + /// + /// Note the day divider that is a virtual item. Also note `$ev4` which is + /// a reaction to `$ev2`. Finally note that `$ev1` is not rendered in + /// the timeline. + /// + /// The mapping between remote event index to timeline item index will look + /// like this: + /// + /// | remote event index | timeline item index | comment | + /// +--------------------+---------------------+--------------------------------------------+ + /// | 0 | `Some(0)` | `$ev0` is rendered as the #0 timeline item | + /// | 1 | `None` | `$ev1` isn't rendered in the timeline | + /// | 2 | `Some(1)` | `$ev2` is rendered as the #1 timeline item | + /// | 3 | `Some(3)` | `$ev3` is rendered as the #3 timeline item | + /// | 4 | `None` | `$ev4` is a reaction to item #1 | + /// | 5 | `Some(4)` | `$ev5` is rendered as the #4 timeline item | + /// + /// Note that the #2 timeline item (the day divider) doesn't map to any + /// remote event, but if it moves, it has an impact on this mapping. + pub timeline_item_index: Option, } diff --git a/crates/matrix-sdk-ui/src/timeline/day_dividers.rs b/crates/matrix-sdk-ui/src/timeline/day_dividers.rs index cbd76e8cce7..ba9cd9175b3 100644 --- a/crates/matrix-sdk-ui/src/timeline/day_dividers.rs +++ b/crates/matrix-sdk-ui/src/timeline/day_dividers.rs @@ -17,13 +17,13 @@ use std::{fmt::Display, sync::Arc}; -use eyeball_im::ObservableVectorTransaction; use ruma::MilliSecondsSinceUnixEpoch; use tracing::{error, event_enabled, instrument, trace, warn, Level}; use super::{ - controller::TimelineMetadata, util::timestamp_to_date, TimelineItem, TimelineItemKind, - VirtualTimelineItem, + controller::{ObservableItemsTransaction, TimelineMetadata}, + util::timestamp_to_date, + TimelineItem, TimelineItemKind, VirtualTimelineItem, }; /// Algorithm ensuring that day dividers are adjusted correctly, according to @@ -81,11 +81,7 @@ impl DayDividerAdjuster { /// Ensures that date separators are properly inserted/removed when needs /// be. #[instrument(skip_all)] - pub fn run( - &mut self, - items: &mut ObservableVectorTransaction<'_, Arc>, - meta: &mut TimelineMetadata, - ) { + pub fn run(&mut self, items: &mut ObservableItemsTransaction<'_>, meta: &mut TimelineMetadata) { // We're going to record vector operations like inserting, replacing and // removing day dividers. Since we may remove or insert new items, // recorded offsets will change as we're iterating over the array. The @@ -284,11 +280,7 @@ impl DayDividerAdjuster { } } - fn process_ops( - &self, - items: &mut ObservableVectorTransaction<'_, Arc>, - meta: &mut TimelineMetadata, - ) { + fn process_ops(&self, items: &mut ObservableItemsTransaction<'_>, meta: &mut TimelineMetadata) { // Record the deletion offset. let mut offset = 0i64; // Remember what the maximum index was, so we can assert that it's @@ -309,11 +301,11 @@ impl DayDividerAdjuster { // Keep push semantics, if we're inserting at the front or the back. if at == items.len() { - items.push_back(item); + items.push_back(item, None); } else if at == 0 { - items.push_front(item); + items.push_front(item, None); } else { - items.insert(at, item); + items.insert(at, item, None); } offset += 1; @@ -338,7 +330,7 @@ impl DayDividerAdjuster { unique_id.to_owned(), ); - items.set(at, item); + items.replace(at, item); max_i = i; } @@ -366,7 +358,7 @@ impl DayDividerAdjuster { /// Returns a report if and only if there was at least one error. fn check_invariants<'a, 'o>( &mut self, - items: &'a ObservableVectorTransaction<'o, Arc>, + items: &'a ObservableItemsTransaction<'o>, initial_state: Option>>, ) -> Option> { let mut report = DayDividerInvariantsReport { @@ -512,7 +504,7 @@ struct DayDividerInvariantsReport<'a, 'o> { /// The operations that have been applied on the list. operations: Vec, /// Final state after inserting the day dividers. - final_state: &'a ObservableVectorTransaction<'o, Arc>, + final_state: &'a ObservableItemsTransaction<'o>, /// Errors encountered in the algorithm. errors: Vec, } @@ -608,10 +600,9 @@ enum DayDividerInsertError { #[cfg(test)] mod tests { use assert_matches2::assert_let; - use eyeball_im::ObservableVector; use ruma::{owned_event_id, owned_user_id, uint, MilliSecondsSinceUnixEpoch}; - use super::DayDividerAdjuster; + use super::{super::controller::ObservableItems, DayDividerAdjuster}; use crate::timeline::{ controller::TimelineMetadata, event_item::{EventTimelineItemKind, RemoteEventTimelineItem}, @@ -654,7 +645,7 @@ mod tests { #[test] fn test_no_trailing_day_divider() { - let mut items = ObservableVector::new(); + let mut items = ObservableItems::new(); let mut txn = items.transaction(); let mut meta = test_metadata(); @@ -663,9 +654,12 @@ mod tests { let timestamp_next_day = MilliSecondsSinceUnixEpoch((42 + 3600 * 24 * 1000).try_into().unwrap()); - txn.push_back(meta.new_timeline_item(event_with_ts(timestamp))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp_next_day))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker)); + txn.push_back(meta.new_timeline_item(event_with_ts(timestamp)), None); + txn.push_back( + meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp_next_day)), + None, + ); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker), None); let mut adjuster = DayDividerAdjuster::default(); adjuster.run(&mut txn, &mut meta); @@ -688,7 +682,7 @@ mod tests { #[test] fn test_read_marker_in_between_event_and_day_divider() { - let mut items = ObservableVector::new(); + let mut items = ObservableItems::new(); let mut txn = items.transaction(); let mut meta = test_metadata(); @@ -699,10 +693,13 @@ mod tests { assert_ne!(timestamp_to_date(timestamp), timestamp_to_date(timestamp_next_day)); let event = event_with_ts(timestamp); - txn.push_back(meta.new_timeline_item(event.clone())); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp_next_day))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker)); - txn.push_back(meta.new_timeline_item(event)); + txn.push_back(meta.new_timeline_item(event.clone()), None); + txn.push_back( + meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp_next_day)), + None, + ); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker), None); + txn.push_back(meta.new_timeline_item(event), None); let mut adjuster = DayDividerAdjuster::default(); adjuster.run(&mut txn, &mut meta); @@ -720,7 +717,7 @@ mod tests { #[test] fn test_read_marker_in_between_day_dividers() { - let mut items = ObservableVector::new(); + let mut items = ObservableItems::new(); let mut txn = items.transaction(); let mut meta = test_metadata(); @@ -730,12 +727,12 @@ mod tests { MilliSecondsSinceUnixEpoch((42 + 3600 * 24 * 1000).try_into().unwrap()); assert_ne!(timestamp_to_date(timestamp), timestamp_to_date(timestamp_next_day)); - txn.push_back(meta.new_timeline_item(event_with_ts(timestamp))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker)); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp))); - txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day))); + txn.push_back(meta.new_timeline_item(event_with_ts(timestamp)), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)), None); + txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day)), None); let mut adjuster = DayDividerAdjuster::default(); adjuster.run(&mut txn, &mut meta); @@ -754,7 +751,7 @@ mod tests { #[test] fn test_remove_all_day_dividers() { - let mut items = ObservableVector::new(); + let mut items = ObservableItems::new(); let mut txn = items.transaction(); let mut meta = test_metadata(); @@ -764,10 +761,10 @@ mod tests { MilliSecondsSinceUnixEpoch((42 + 3600 * 24 * 1000).try_into().unwrap()); assert_ne!(timestamp_to_date(timestamp), timestamp_to_date(timestamp_next_day)); - txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp))); - txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day))); + txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day)), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)), None); + txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day)), None); let mut adjuster = DayDividerAdjuster::default(); adjuster.run(&mut txn, &mut meta); @@ -784,16 +781,16 @@ mod tests { #[test] fn test_event_read_marker_spurious_day_divider() { - let mut items = ObservableVector::new(); + let mut items = ObservableItems::new(); let mut txn = items.transaction(); let mut meta = test_metadata(); let timestamp = MilliSecondsSinceUnixEpoch(uint!(42)); - txn.push_back(meta.new_timeline_item(event_with_ts(timestamp))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker)); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp))); + txn.push_back(meta.new_timeline_item(event_with_ts(timestamp)), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)), None); let mut adjuster = DayDividerAdjuster::default(); adjuster.run(&mut txn, &mut meta); @@ -810,16 +807,16 @@ mod tests { #[test] fn test_multiple_trailing_day_dividers() { - let mut items = ObservableVector::new(); + let mut items = ObservableItems::new(); let mut txn = items.transaction(); let mut meta = test_metadata(); let timestamp = MilliSecondsSinceUnixEpoch(uint!(42)); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker)); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp))); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp))); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)), None); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)), None); let mut adjuster = DayDividerAdjuster::default(); adjuster.run(&mut txn, &mut meta); @@ -834,14 +831,14 @@ mod tests { #[test] fn test_start_with_read_marker() { - let mut items = ObservableVector::new(); + let mut items = ObservableItems::new(); let mut txn = items.transaction(); let mut meta = test_metadata(); let timestamp = MilliSecondsSinceUnixEpoch(uint!(42)); - txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker)); - txn.push_back(meta.new_timeline_item(event_with_ts(timestamp))); + txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker), None); + txn.push_back(meta.new_timeline_item(event_with_ts(timestamp)), None); let mut adjuster = DayDividerAdjuster::default(); adjuster.run(&mut txn, &mut meta); diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index cc01448714b..1eba4147626 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use as_variant::as_variant; -use eyeball_im::{ObservableVectorTransaction, ObservableVectorTransactionEntry}; use indexmap::IndexMap; use matrix_sdk::{ crypto::types::events::UtdCause, @@ -51,29 +50,24 @@ use ruma::{ use tracing::{debug, error, field::debug, info, instrument, trace, warn}; use super::{ - controller::{PendingEditKind, TimelineMetadata, TimelineStateTransaction}, + controller::{ + ObservableItemsTransaction, ObservableItemsTransactionEntry, PendingEdit, PendingEditKind, + TimelineMetadata, TimelineStateTransaction, + }, day_dividers::DayDividerAdjuster, event_item::{ extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content, AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind, - LocalEventTimelineItem, PollState, Profile, ReactionsByKeyBySender, RemoteEventOrigin, - RemoteEventTimelineItem, TimelineEventItemId, + LocalEventTimelineItem, PollState, Profile, ReactionInfo, ReactionStatus, + ReactionsByKeyBySender, RemoteEventOrigin, RemoteEventTimelineItem, TimelineEventItemId, }, - reactions::FullReactionKey, + reactions::{FullReactionKey, PendingReaction}, + traits::RoomDataProvider, util::{rfind_event_by_id, rfind_event_item}, - EventTimelineItem, InReplyToDetails, OtherState, Sticker, TimelineDetails, TimelineItem, - TimelineItemContent, -}; -use crate::{ - events::SyncTimelineEventWithoutContent, - timeline::{ - controller::PendingEdit, - event_item::{ReactionInfo, ReactionStatus}, - reactions::PendingReaction, - traits::RoomDataProvider, - RepliedToEvent, - }, + EventTimelineItem, InReplyToDetails, OtherState, RepliedToEvent, Sticker, TimelineDetails, + TimelineItem, TimelineItemContent, }; +use crate::events::SyncTimelineEventWithoutContent; /// When adding an event, useful information related to the source of the event. pub(super) enum Flow { @@ -330,7 +324,7 @@ pub(super) struct HandleEventResult { /// existing timeline item, transforming that item or creating a new one, /// updating the reactive Vec). pub(super) struct TimelineEventHandler<'a, 'o> { - items: &'a mut ObservableVectorTransaction<'o, Arc>, + items: &'a mut ObservableItemsTransaction<'o>, meta: &'a mut TimelineMetadata, ctx: TimelineEventContext, result: HandleEventResult, @@ -504,14 +498,14 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { trace!("No new item added"); if let Flow::Remote { - position: TimelineItemPosition::UpdateDecrypted { timeline_item_index: idx }, + position: TimelineItemPosition::UpdateDecrypted { timeline_item_index }, .. } = self.ctx.flow { // If add was not called, that means the UTD event is one that // wouldn't normally be visible. Remove it. trace!("Removing UTD that was successfully retried"); - self.items.remove(idx); + self.items.remove(timeline_item_index); self.result.item_removed = true; } @@ -576,7 +570,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { Self::maybe_update_responses(self.items, &replacement.event_id, &new_item); // Update the event itself. - self.items.set(item_pos, TimelineItem::new(new_item, internal_id)); + self.items.replace(item_pos, TimelineItem::new(new_item, internal_id)); self.result.items_updated += 1; } } else if let Flow::Remote { position, raw_event, .. } = &self.ctx.flow { @@ -730,7 +724,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { }, ); - self.items.set(idx, event_item.with_reactions(reactions)); + self.items.replace(idx, event_item.with_reactions(reactions)); self.result.items_updated += 1; } else { @@ -794,7 +788,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { }; trace!("Applying poll start edit."); - self.items.set(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); + self.items.replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); self.result.items_updated += 1; } @@ -898,7 +892,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { ); trace!("Adding poll response."); - self.items.set(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); + self.items.replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); self.result.items_updated += 1; } @@ -917,7 +911,8 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { let new_item = item.with_content(TimelineItemContent::Poll(poll_state), None); trace!("Ending poll."); - self.items.set(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); + self.items + .replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); self.result.items_updated += 1; } Err(_) => { @@ -959,7 +954,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { // the replied-to event there as well. Self::maybe_update_responses(self.items, &redacted, &new_item); - self.items.set(idx, TimelineItem::new(new_item, internal_id)); + self.items.replace(idx, TimelineItem::new(new_item, internal_id)); self.result.items_updated += 1; } } else { @@ -1000,7 +995,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { let mut reactions = item.reactions.clone(); if reactions.remove_reaction(&sender, &key).is_some() { trace!("Removing reaction"); - self.items.set(item_pos, item.with_reactions(reactions)); + self.items.replace(item_pos, item.with_reactions(reactions)); self.result.items_updated += 1; return true; } @@ -1010,6 +1005,16 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { } /// Add a new event item in the timeline. + /// + /// # Safety + /// + /// This method is not marked as unsafe **but** it manipulates + /// [`TimelineMetadata::all_remote_events`]. 2 rules **must** be respected: + /// + /// 1. the remote event of the item being added **must** be present in + /// `all_remote_events`, + /// 2. the lastly added or updated remote event must be associated to the + /// timeline item being added here. fn add_item( &mut self, content: TimelineItemContent, @@ -1082,7 +1087,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { trace!("Adding new local timeline item"); let item = self.meta.new_timeline_item(item); - self.items.push_back(item); + self.items.push_back(item, None); } Flow::Remote { position: TimelineItemPosition::Start { .. }, event_id, .. } => { @@ -1099,7 +1104,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { trace!("Adding new remote timeline item at the start"); let item = self.meta.new_timeline_item(item); - self.items.push_front(item); + self.items.push_front(item, Some(0)); } Flow::Remote { @@ -1141,7 +1146,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { // If the old item is the last one and no day divider // changes need to happen, replace and return early. trace!(idx, "Replacing existing event"); - self.items.set(idx, TimelineItem::new(item, old_item_id.to_owned())); + self.items.replace(idx, TimelineItem::new(item, old_item_id.to_owned())); return; } @@ -1153,19 +1158,6 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { // will run to re-add the removed item } - // Local echoes that are pending should stick to the bottom, - // find the latest event that isn't that. - let latest_event_idx = self - .items - .iter() - .enumerate() - .rev() - .find_map(|(idx, item)| (!item.as_event()?.is_local_echo()).then_some(idx)); - - // Insert the next item after the latest event item that's not a - // pending local echo, or at the start if there is no such item. - let insert_idx = latest_event_idx.map_or(0, |idx| idx + 1); - trace!("Adding new remote timeline item after all non-pending events"); let new_item = match removed_event_item_id { // If a previous version of the same item (usually a local @@ -1175,13 +1167,52 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { None => self.meta.new_timeline_item(item), }; - // Keep push semantics, if we're inserting at the front or the back. - if insert_idx == self.items.len() { - self.items.push_back(new_item); - } else if insert_idx == 0 { - self.items.push_front(new_item); + // Local events are always at the bottom. Let's find the latest remote event + // and insert after it, otherwise, if there is no remote event, insert at 0. + let timeline_item_index = self + .items + .iter() + .enumerate() + .rev() + .find_map(|(timeline_item_index, timeline_item)| { + (!timeline_item.as_event()?.is_local_echo()) + .then_some(timeline_item_index + 1) + }) + .unwrap_or(0); + + let event_index = self + .items + .all_remote_events() + .last_index() + // The last remote event is necessarily associated to this + // timeline item, see the contract of this method. Let's fallback to a similar + // value as `timeline_item_index` instead of panicking. + .or_else(|| { + error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present"); + + Some(0) + }); + + // Try to keep precise insertion semantics here, in this exact order: + // + // * _push back_ when the new item is inserted after all items (the assumption + // being that this is the hot path, because most of the time new events + // come from the sync), + // * _push front_ when the new item is inserted at index 0, + // * _insert_ otherwise. + + if timeline_item_index == self.items.len() { + trace!("Adding new remote timeline item at the back"); + self.items.push_back(new_item, event_index); + } else if timeline_item_index == 0 { + trace!("Adding new remote timeline item at the front"); + self.items.push_front(new_item, event_index); } else { - self.items.insert(insert_idx, new_item); + trace!( + timeline_item_index, + "Adding new remote timeline item at specific index" + ); + self.items.insert(timeline_item_index, new_item, event_index); } } @@ -1196,7 +1227,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { Self::maybe_update_responses(self.items, decrypted_event_id, &item); let internal_id = self.items[*idx].internal_id.clone(); - self.items.set(*idx, TimelineItem::new(item, internal_id)); + self.items.replace(*idx, TimelineItem::new(item, internal_id)); } } @@ -1209,7 +1240,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { /// After updating the timeline item `new_item` which id is /// `target_event_id`, update other items that are responses to this item. fn maybe_update_responses( - items: &mut ObservableVectorTransaction<'_, Arc>, + items: &mut ObservableItemsTransaction<'_>, target_event_id: &EventId, new_item: &EventTimelineItem, ) { @@ -1229,7 +1260,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { TimelineItemContent::Message(message.with_in_reply_to(in_reply_to)); let new_reply_item = entry.with_kind(event_item.with_content(new_reply_content, None)); - ObservableVectorTransactionEntry::set(&mut entry, new_reply_item); + ObservableItemsTransactionEntry::replace(&mut entry, new_reply_item); } }); } diff --git a/crates/matrix-sdk-ui/src/timeline/item.rs b/crates/matrix-sdk-ui/src/timeline/item.rs index 8096da48c1a..6487d28f6d7 100644 --- a/crates/matrix-sdk-ui/src/timeline/item.rs +++ b/crates/matrix-sdk-ui/src/timeline/item.rs @@ -26,13 +26,14 @@ use super::{EventTimelineItem, VirtualTimelineItem}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct TimelineUniqueId(pub String); +/// The type of timeline item. #[derive(Clone, Debug)] #[allow(clippy::large_enum_variant)] pub enum TimelineItemKind { /// An event or aggregation of multiple events. Event(EventTimelineItem), /// An item that doesn't correspond to an event, for example the user's - /// own read marker. + /// own read marker, or a day divider. Virtual(VirtualTimelineItem), } diff --git a/crates/matrix-sdk-ui/src/timeline/read_receipts.rs b/crates/matrix-sdk-ui/src/timeline/read_receipts.rs index a12ebbb0967..00964de20be 100644 --- a/crates/matrix-sdk-ui/src/timeline/read_receipts.rs +++ b/crates/matrix-sdk-ui/src/timeline/read_receipts.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{cmp::Ordering, collections::HashMap, sync::Arc}; +use std::{cmp::Ordering, collections::HashMap}; -use eyeball_im::ObservableVectorTransaction; use futures_core::Stream; use indexmap::IndexMap; use ruma::{ @@ -27,7 +26,8 @@ use tracing::{debug, error, warn}; use super::{ controller::{ - AllRemoteEvents, FullEventMeta, TimelineMetadata, TimelineState, TimelineStateTransaction, + AllRemoteEvents, FullEventMeta, ObservableItemsTransaction, TimelineMetadata, + TimelineState, TimelineStateTransaction, }, traits::RoomDataProvider, util::{rfind_event_by_id, RelativePosition}, @@ -99,9 +99,10 @@ impl ReadReceipts { &mut self, new_receipt: FullReceipt<'_>, is_own_user_id: bool, - all_events: &AllRemoteEvents, - timeline_items: &mut ObservableVectorTransaction<'_, Arc>, + timeline_items: &mut ObservableItemsTransaction<'_>, ) { + let all_events = timeline_items.all_remote_events(); + // Get old receipt. let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type); if old_receipt @@ -284,11 +285,7 @@ struct ReadReceiptTimelineUpdate { impl ReadReceiptTimelineUpdate { /// Remove the old receipt from the corresponding timeline item. - fn remove_old_receipt( - &self, - items: &mut ObservableVectorTransaction<'_, Arc>, - user_id: &UserId, - ) { + fn remove_old_receipt(&self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) { let Some(event_id) = &self.old_event_id else { // Nothing to do. return; @@ -310,7 +307,7 @@ impl ReadReceiptTimelineUpdate { receipt doesn't have a receipt for the user" ); } - items.set(receipt_pos, TimelineItem::new(event_item, event_item_id)); + items.replace(receipt_pos, TimelineItem::new(event_item, event_item_id)); } else { warn!("received a read receipt for a local item, this should not be possible"); } @@ -319,7 +316,7 @@ impl ReadReceiptTimelineUpdate { /// Add the new receipt to the corresponding timeline item. fn add_new_receipt( self, - items: &mut ObservableVectorTransaction<'_, Arc>, + items: &mut ObservableItemsTransaction<'_>, user_id: OwnedUserId, receipt: Receipt, ) { @@ -339,7 +336,7 @@ impl ReadReceiptTimelineUpdate { if let Some(remote_event_item) = event_item.as_remote_mut() { remote_event_item.read_receipts.insert(user_id, receipt); - items.set(receipt_pos, TimelineItem::new(event_item, event_item_id)); + items.replace(receipt_pos, TimelineItem::new(event_item, event_item_id)); } else { warn!("received a read receipt for a local item, this should not be possible"); } @@ -348,7 +345,7 @@ impl ReadReceiptTimelineUpdate { /// Apply this update to the timeline. fn apply( self, - items: &mut ObservableVectorTransaction<'_, Arc>, + items: &mut ObservableItemsTransaction<'_>, user_id: OwnedUserId, receipt: Receipt, ) { @@ -386,7 +383,6 @@ impl TimelineStateTransaction<'_> { self.meta.read_receipts.maybe_update_read_receipt( full_receipt, is_own_user_id, - &self.meta.all_remote_events, &mut self.items, ); } @@ -421,7 +417,6 @@ impl TimelineStateTransaction<'_> { self.meta.read_receipts.maybe_update_read_receipt( full_receipt, user_id == own_user_id, - &self.meta.all_remote_events, &mut self.items, ); } @@ -450,7 +445,6 @@ impl TimelineStateTransaction<'_> { self.meta.read_receipts.maybe_update_read_receipt( full_receipt, is_own_event, - &self.meta.all_remote_events, &mut self.items, ); } @@ -460,8 +454,8 @@ impl TimelineStateTransaction<'_> { pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) { // Find the previous visible event, if there is one. let Some(prev_event_meta) = self - .meta - .all_remote_events + .items + .all_remote_events() .iter() .rev() // Find the event item. @@ -491,7 +485,7 @@ impl TimelineStateTransaction<'_> { let read_receipts = self.meta.read_receipts.compute_event_receipts( &remote_prev_event_item.event_id, - &self.meta.all_remote_events, + self.items.all_remote_events(), false, ); @@ -501,7 +495,7 @@ impl TimelineStateTransaction<'_> { } remote_prev_event_item.read_receipts = read_receipts; - self.items.set(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id)); + self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id)); } } @@ -539,18 +533,24 @@ impl TimelineState { user_id: &UserId, room_data_provider: &P, ) -> Option<(OwnedEventId, Receipt)> { - let public_read_receipt = - self.meta.user_receipt(user_id, ReceiptType::Read, room_data_provider).await; - let private_read_receipt = - self.meta.user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider).await; + let all_remote_events = self.items.all_remote_events(); + let public_read_receipt = self + .meta + .user_receipt(user_id, ReceiptType::Read, room_data_provider, all_remote_events) + .await; + let private_read_receipt = self + .meta + .user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider, all_remote_events) + .await; // Let's assume that a private read receipt should be more recent than a public // read receipt, otherwise there's no point in the private read receipt, // and use it as default. - match self - .meta - .compare_optional_receipts(public_read_receipt.as_ref(), private_read_receipt.as_ref()) - { + match self.meta.compare_optional_receipts( + public_read_receipt.as_ref(), + private_read_receipt.as_ref(), + self.items.all_remote_events(), + ) { Ordering::Greater => public_read_receipt, Ordering::Less => private_read_receipt, _ => unreachable!(), @@ -572,16 +572,19 @@ impl TimelineState { // Let's assume that a private read receipt should be more recent than a public // read receipt, otherwise there's no point in the private read receipt, // and use it as default. - let (latest_receipt_id, _) = - match self.meta.compare_optional_receipts(public_read_receipt, private_read_receipt) { - Ordering::Greater => public_read_receipt?, - Ordering::Less => private_read_receipt?, - _ => unreachable!(), - }; + let (latest_receipt_id, _) = match self.meta.compare_optional_receipts( + public_read_receipt, + private_read_receipt, + self.items.all_remote_events(), + ) { + Ordering::Greater => public_read_receipt?, + Ordering::Less => private_read_receipt?, + _ => unreachable!(), + }; // Find the corresponding visible event. - self.meta - .all_remote_events + self.items + .all_remote_events() .iter() .rev() .skip_while(|ev| ev.event_id != *latest_receipt_id) @@ -601,6 +604,7 @@ impl TimelineMetadata { user_id: &UserId, receipt_type: ReceiptType, room_data_provider: &P, + all_remote_events: &AllRemoteEvents, ) -> Option<(OwnedEventId, Receipt)> { if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) { // Since it is in the timeline, it should be the most recent. @@ -620,6 +624,7 @@ impl TimelineMetadata { match self.compare_optional_receipts( main_thread_read_receipt.as_ref(), unthreaded_read_receipt.as_ref(), + all_remote_events, ) { Ordering::Greater => main_thread_read_receipt, Ordering::Less => unthreaded_read_receipt, @@ -637,6 +642,7 @@ impl TimelineMetadata { &self, lhs: Option<&(OwnedEventId, Receipt)>, rhs_or_default: Option<&(OwnedEventId, Receipt)>, + all_remote_events: &AllRemoteEvents, ) -> Ordering { // If we only have one, use it. let Some((lhs_event_id, lhs_receipt)) = lhs else { @@ -647,7 +653,9 @@ impl TimelineMetadata { }; // Compare by position in the timeline. - if let Some(relative_pos) = self.compare_events_positions(lhs_event_id, rhs_event_id) { + if let Some(relative_pos) = + self.compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events) + { if relative_pos == RelativePosition::Before { return Ordering::Greater; }