From cd3982555028f394851b1bcc652e987ba66d61ea Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 5 Dec 2024 12:03:11 -0700 Subject: [PATCH] feat: use multiple tasks order events for streams --- event-svc/src/event/ordering_task.rs | 182 ++++++++++++++++++++------- 1 file changed, 134 insertions(+), 48 deletions(-) diff --git a/event-svc/src/event/ordering_task.rs b/event-svc/src/event/ordering_task.rs index 980a3caf..f89b8435 100644 --- a/event-svc/src/event/ordering_task.rs +++ b/event-svc/src/event/ordering_task.rs @@ -5,7 +5,9 @@ use anyhow::anyhow; use ceramic_event::unvalidated; use cid::Cid; use ipld_core::ipld::Ipld; +use itertools::Itertools; use tokio::sync::mpsc::Sender; +use tokio::task::JoinSet; use tracing::{debug, error, info, trace, warn}; use crate::event::service::PENDING_EVENTS_CHANNEL_DEPTH; @@ -19,6 +21,12 @@ type EventCid = Cid; type PrevCid = Cid; const LOG_EVERY_N_ENTRIES: usize = 10_000; +/// The max number of tasks we'll spawn when ordering events. +/// Each stask is given a chunk of streams and wil order them +const MAX_STREAM_PROCESSING_TASKS: usize = 16; +/// The minimim number of streams each task will process. If there are many streams, we'll spawn +/// STREAM_NUM / MAX_STREAM_PROCESSING_TASKS. +const MIN_NUM_STREAMS_PER_BATCH: usize = 25; /// The number of events we initially pull from the channel when doing startup undelivered batch processing. /// Being larger was measured to go faster when processing millions of events, however there's no need to /// allocate such a large array when normally processing events in the background as we tend to keep up. @@ -125,13 +133,19 @@ impl OrderingTask { { trace!(?recon_events, "new events discovered!"); state.add_inserted_events(recon_events); + // the more events we get in memory, the fewer queries we need to run to find potential history. + // so we read out all the events we can can. we do have to yield again, but getting a bigger batch + // we can process in went faster than smaller batches when processing millions of events at startup + if !rx_inserted.is_empty() { + let to_take = rx_inserted.len(); + let mut remaining_events = Vec::with_capacity(to_take); + rx_inserted.recv_many(&mut remaining_events, to_take).await; + state.add_inserted_events(remaining_events); + } - if let Err(should_exit) = state - .process_streams(Arc::clone(&event_access)) - .await - .map_err(Self::log_error) - { - if should_exit { + state = match state.process_streams(Arc::clone(&event_access)).await { + Ok(s) => s, + Err(()) => { error!("Ordering task exiting due to fatal error"); return; } @@ -147,28 +161,7 @@ impl OrderingTask { state.add_inserted_events(remaining); } - let _ = state - .process_streams(event_access) - .await - .map_err(Self::log_error); - } - - /// Log an error and return a true if it was fatal - fn log_error(err: Error) -> bool { - match err { - Error::Application { error } => { - warn!("Encountered application error: {:?}", error); - false - } - Error::Fatal { error } => { - error!("Encountered fatal error: {:?}", error); - true - } - Error::Transient { error } | Error::InvalidArgument { error } => { - warn!("Encountered error: {:?}", error); - false - } - } + let _ = state.process_streams(event_access).await; } } @@ -190,7 +183,7 @@ impl StreamEvent { } /// Builds a stream event from the database if it exists. - async fn load_by_cid(event_access: Arc, cid: EventCid) -> Result> { + async fn load_by_cid(event_access: &EventAccess, cid: EventCid) -> Result> { // TODO: Condense the multiple DB queries happening here into a single query let (exists, deliverable) = event_access.deliverable_by_cid(&cid).await?; if exists { @@ -398,7 +391,7 @@ impl StreamEvents { } } - async fn order_events(&mut self, event_access: Arc) -> Result<()> { + async fn order_events(&mut self, event_access: &EventAccess) -> Result<()> { // We collect everything we can into memory and then order things. // If our prev is deliverable then we can mark ourselves as deliverable. If our prev wasn't deliverable yet, // we track it and repeat (i.e. add it to our state and the set we're iterating to attempt to load its prev). @@ -438,7 +431,7 @@ impl StreamEvents { // nothing to do until it arrives on the channel } } else if let Some(discovered_prev) = - StreamEvent::load_by_cid(Arc::clone(&event_access), desired_prev).await? + StreamEvent::load_by_cid(event_access, desired_prev).await? { match &discovered_prev { // we found our prev in the database and it's deliverable, so we're deliverable now @@ -504,6 +497,24 @@ impl OrderingState { } } + /// Log an error and return a true if it was fatal + fn log_error(err: Error) -> bool { + match err { + Error::Application { error } => { + warn!("Encountered application error: {:?}", error); + false + } + Error::Fatal { error } => { + error!("Encountered fatal error: {:?}", error); + true + } + Error::Transient { error } | Error::InvalidArgument { error } => { + warn!("Encountered error: {:?}", error); + false + } + } + } + /// Update the list of streams to process with new events. /// Relies on `add_stream_event` to handle updating the internal state. fn add_inserted_events(&mut self, events: Vec) { @@ -528,40 +539,115 @@ impl OrderingState { /// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to commit things in batches, /// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried. - async fn process_streams(&mut self, event_access: Arc) -> Result<()> { - for (_stream_cid, stream_events) in self.pending_by_stream.iter_mut() { - if stream_events.should_process { - stream_events - .order_events(Arc::clone(&event_access)) - .await?; + /// Requires taking ownership of self and returning it rather than &mut access to allow processing the streams in multiple tasks. + /// Returns an error if processing was fatal and should not be retried. + async fn process_streams( + mut self, + event_access: Arc, + ) -> std::result::Result { + let mut to_process = HashMap::new(); + self.pending_by_stream.retain(|k, s| { + if s.should_process { + let v = std::mem::take(s); + to_process.insert(*k, v); + false + } else { + true + } + }); + + // we need to collect everything we get back from the tasks so we can put it back on our state + // in case we fail to persist the updates and need to try again + let mut processed_streams = Vec::with_capacity(to_process.len()); + + let mut task_set = + Self::spawn_tasks_for_stream_batches(Arc::clone(&event_access), to_process); + + while let Some(res) = task_set.join_next().await { + let streams = match res { + Ok(task_res) => match task_res { + Ok(streams) => streams, + Err(error) => { + warn!(?error, "Error encountered processing stream batch"); + continue; + } + }, + Err(e) => { + error!(error=?e, "Failed to join task spawned to process stream batch"); + continue; + } + }; + for (_, stream_events) in &streams { self.deliverable .extend(stream_events.new_deliverable.iter()); } + processed_streams.extend(streams); } - match self.persist_ready_events(Arc::clone(&event_access)).await { + match self.persist_ready_events(&event_access).await { Ok(_) => {} Err(err) => { - // Clear the queue as we'll rediscover it on the next run, rather than try to double update everything. - // We will no-op the updates so it doesn't really hurt but it's unnecessary. - // The StreamEvents in our pending_by_stream map all have their state updated in memory so we can pick up where we left off. - self.deliverable.clear(); - return Err(err); + if Self::log_error(err) { + // if it's fatal we will tell the task to exit + // if not, we return the state to try again + return Err(()); + } else { + // Clear the queue as we'll rediscover it on the next run, rather than try to double update everything. + // We will no-op the updates so it doesn't really hurt but it's unnecessary. + // The StreamEvents in our pending_by_stream map all have their state updated in memory so we can pick up where we left off. + self.deliverable.clear(); + return Ok(self); + } } } // keep things that still have missing history but don't process them again until we get something new - self.pending_by_stream - .retain(|_, stream_events| !stream_events.processing_completed()); + for (cid, mut stream) in processed_streams.into_iter() { + if !stream.processing_completed() { + self.pending_by_stream.insert(cid, stream); + } + } debug!(remaining_streams=%self.pending_by_stream.len(), "Finished processing streams"); trace!(stream_state=?self, "Finished processing streams"); - Ok(()) + Ok(self) + } + + /// Splits `to_process` into chunks and spawns at most `MAX_STREAM_PROCESSING_TASKS` to review and order + /// the events for each chunk of streams. + fn spawn_tasks_for_stream_batches( + event_access: Arc, + to_process: HashMap, + ) -> JoinSet>> { + let mut task_set = JoinSet::new(); + + let chunk_size = + (to_process.len() / MAX_STREAM_PROCESSING_TASKS).max(MIN_NUM_STREAMS_PER_BATCH); + + let chunks = &to_process.into_iter().chunks(chunk_size); + // chunks uses a RefCell and isn't send, so we need to allocate again and move them to avoid sync issues + let mut streams_to_send = Vec::with_capacity(MAX_STREAM_PROCESSING_TASKS); + + for chunk in chunks { + let streams: Vec<(Cid, StreamEvents)> = chunk.collect(); + streams_to_send.push(streams); + } + + for mut streams in streams_to_send.into_iter() { + let event_access_cln = Arc::clone(&event_access); + task_set.spawn(async move { + for (_, stream_events) in streams.iter_mut() { + stream_events.order_events(&event_access_cln).await?; + } + Ok(streams) + }); + } + task_set } /// Process all undelivered events in the database. This is a blocking operation that could take a long time. /// It is intended to be run at startup but could be used on an interval or after some errors to recover. - pub(crate) async fn process_all_undelivered_events( + async fn process_all_undelivered_events( event_access: Arc, max_iterations: usize, batch_size: u32, @@ -726,7 +812,7 @@ impl OrderingState { /// We should improve the error handling and likely add some batching if the number of ready events is very high. /// We copy the events up front to avoid losing any events if the task is cancelled. - async fn persist_ready_events(&mut self, event_access: Arc) -> Result<()> { + async fn persist_ready_events(&mut self, event_access: &EventAccess) -> Result<()> { if !self.deliverable.is_empty() { tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver"); let mut tx = event_access.begin_tx().await?;