From 1d0285102825c903f90ceddfe3cb8763d8707dc4 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 4 Dec 2024 00:00:40 +0000 Subject: [PATCH] implement several broadband loops Signed-off-by: Jason Volk --- src/api/client/relations.rs | 7 +++---- src/api/client/sync/mod.rs | 7 +++++-- src/api/server/send_join.rs | 20 +++++++++++++------ .../rooms/event_handler/resolve_state.rs | 12 +++++++---- .../rooms/event_handler/state_at_incoming.rs | 9 +++++++-- src/service/rooms/pdu_metadata/data.rs | 7 +++++-- src/service/rooms/threads/mod.rs | 7 +++++-- 7 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index 902e6be60..de54c4e44 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -1,10 +1,10 @@ use axum::extract::State; use conduit::{ at, - utils::{result::FlatOk, IterStream, ReadyExt}, + utils::{result::FlatOk, stream::WidebandExt, IterStream, ReadyExt}, PduCount, Result, }; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use ruma::{ api::{ client::relations::{ @@ -138,11 +138,10 @@ async fn paginate_relations_with_filter( .is_none_or(|rel_type| pdu.relation_type_equal(rel_type)) }) .stream() - .filter_map(|item| visibility_filter(services, sender_user, item)) .ready_take_while(|(count, _)| Some(*count) != to) + .wide_filter_map(|item| visibility_filter(services, sender_user, item)) .take(limit) .collect() - .boxed() .await; let next_batch = match dir { diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index ba50d77c3..6f7918604 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -1,7 +1,10 @@ mod v3; mod v4; -use conduit::{utils::ReadyExt, PduCount}; +use conduit::{ + utils::stream::{BroadbandExt, ReadyExt}, + PduCount, +}; use futures::StreamExt; use ruma::{RoomId, UserId}; @@ -55,7 +58,7 @@ async fn share_encrypted_room( .state_cache .get_shared_rooms(sender_user, user_id) .ready_filter(|&room_id| Some(room_id) != ignore_room) - .any(|other_room_id| { + .broad_any(|other_room_id| { services .rooms .state_accessor diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 92ab3b504..d1574e626 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -3,7 +3,12 @@ use std::{borrow::Borrow, collections::HashMap}; use axum::extract::State; -use conduit::{err, pdu::gen_event_id_canonical_json, utils::IterStream, warn, Error, Result}; +use conduit::{ + err, + pdu::gen_event_id_canonical_json, + utils::stream::{IterStream, TryBroadbandExt}, + warn, Error, Result, +}; use futures::{FutureExt, StreamExt, TryStreamExt}; use ruma::{ api::{client::error::ErrorKind, federation::membership::create_join_event}, @@ -160,6 +165,7 @@ async fn create_join_event( .rooms .event_handler .handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true) + .boxed() .await? .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; @@ -172,16 +178,17 @@ async fn create_join_event( .await?; let state = state_ids - .iter() + .values() .try_stream() - .and_then(|(_, event_id)| services.rooms.timeline.get_pdu_json(event_id)) - .and_then(|pdu| { + .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id)) + .broad_and_then(|pdu| { services .sending .convert_to_outgoing_federation_event(pdu) .map(Ok) }) .try_collect() + .boxed() .await?; let starting_events = state_ids.values().map(Borrow::borrow); @@ -191,14 +198,15 @@ async fn create_join_event( .event_ids_iter(room_id, starting_events) .await? .map(Ok) - .and_then(|event_id| async move { services.rooms.timeline.get_pdu_json(&event_id).await }) - .and_then(|pdu| { + .broad_and_then(|event_id| async move { services.rooms.timeline.get_pdu_json(&event_id).await }) + .broad_and_then(|pdu| { services .sending .convert_to_outgoing_federation_event(pdu) .map(Ok) }) .try_collect() + .boxed() .await?; services.sending.send_pdu_room(room_id, &pdu_id).await?; diff --git a/src/service/rooms/event_handler/resolve_state.rs b/src/service/rooms/event_handler/resolve_state.rs index 81cf77331..3329a1467 100644 --- a/src/service/rooms/event_handler/resolve_state.rs +++ b/src/service/rooms/event_handler/resolve_state.rs @@ -4,7 +4,11 @@ use std::{ sync::Arc, }; -use conduit::{debug, err, implement, utils::IterStream, Result}; +use conduit::{ + debug, err, implement, + utils::stream::{IterStream, WidebandExt}, + Result, +}; use futures::{FutureExt, StreamExt, TryFutureExt}; use ruma::{ state_res::{self, StateMap}, @@ -52,11 +56,11 @@ pub async fn resolve_state( let fork_states: Vec>> = fork_states .into_iter() .stream() - .then(|fork_state| { + .wide_then(|fork_state| { fork_state .into_iter() .stream() - .filter_map(|(k, id)| { + .wide_filter_map(|(k, id)| { self.services .short .get_statekey_from_short(k) @@ -83,7 +87,7 @@ pub async fn resolve_state( let state_events: Vec<_> = state .iter() .stream() - .then(|((event_type, state_key), event_id)| { + .wide_then(|((event_type, state_key), event_id)| { self.services .short .get_or_create_shortstatekey(event_type, state_key) diff --git a/src/service/rooms/event_handler/state_at_incoming.rs b/src/service/rooms/event_handler/state_at_incoming.rs index 96ee9907b..9b30a830f 100644 --- a/src/service/rooms/event_handler/state_at_incoming.rs +++ b/src/service/rooms/event_handler/state_at_incoming.rs @@ -4,7 +4,12 @@ use std::{ sync::Arc, }; -use conduit::{debug, err, implement, result::LogErr, utils::IterStream, PduEvent, Result}; +use conduit::{ + debug, err, implement, + result::LogErr, + utils::stream::{BroadbandExt, IterStream}, + PduEvent, Result, +}; use futures::{FutureExt, StreamExt}; use ruma::{ state_res::{self, StateMap}, @@ -166,7 +171,7 @@ pub(super) async fn state_at_incoming_resolved( new_state .iter() .stream() - .then(|((event_type, state_key), event_id)| { + .broad_then(|((event_type, state_key), event_id)| { self.services .short .get_or_create_shortstatekey(event_type, state_key) diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index b06e988e8..3d05a1c8f 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -3,7 +3,10 @@ use std::{mem::size_of, sync::Arc}; use arrayvec::ArrayVec; use conduit::{ result::LogErr, - utils::{stream::TryIgnore, u64_from_u8, ReadyExt}, + utils::{ + stream::{TryIgnore, WidebandExt}, + u64_from_u8, ReadyExt, + }, PduCount, PduEvent, }; use database::Map; @@ -67,7 +70,7 @@ impl Data { .ready_take_while(move |key| key.starts_with(&target.to_be_bytes())) .map(|to_from| u64_from_u8(&to_from[8..16])) .map(PduCount::from_unsigned) - .filter_map(move |shorteventid| async move { + .wide_filter_map(move |shorteventid| async move { let pdu_id: RawPduId = PduId { shortroomid, shorteventid, diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index 5821f2795..a304e4820 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -2,7 +2,10 @@ use std::{collections::BTreeMap, sync::Arc}; use conduit::{ err, - utils::{stream::TryIgnore, ReadyExt}, + utils::{ + stream::{TryIgnore, WidebandExt}, + ReadyExt, + }, PduCount, PduEvent, PduId, RawPduId, Result, }; use database::{Deserialized, Map}; @@ -143,7 +146,7 @@ impl Service { .ignore_err() .map(RawPduId::from) .ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes()) - .filter_map(move |pdu_id| async move { + .wide_filter_map(move |pdu_id| async move { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; let pdu_id: PduId = pdu_id.into();