Skip to content

Commit

Permalink
feat: use multiple tasks order events for streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Dec 5, 2024
1 parent b21f7dc commit cd39825
Showing 1 changed file with 134 additions and 48 deletions.
182 changes: 134 additions & 48 deletions event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use anyhow::anyhow;
use ceramic_event::unvalidated;
use cid::Cid;
use ipld_core::ipld::Ipld;
use itertools::Itertools;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
use tracing::{debug, error, info, trace, warn};

use crate::event::service::PENDING_EVENTS_CHANNEL_DEPTH;
Expand All @@ -19,6 +21,12 @@ type EventCid = Cid;
type PrevCid = Cid;

const LOG_EVERY_N_ENTRIES: usize = 10_000;
/// The max number of tasks we'll spawn when ordering events.
/// Each stask is given a chunk of streams and wil order them
const MAX_STREAM_PROCESSING_TASKS: usize = 16;
/// The minimim number of streams each task will process. If there are many streams, we'll spawn
/// STREAM_NUM / MAX_STREAM_PROCESSING_TASKS.
const MIN_NUM_STREAMS_PER_BATCH: usize = 25;
/// The number of events we initially pull from the channel when doing startup undelivered batch processing.
/// Being larger was measured to go faster when processing millions of events, however there's no need to
/// allocate such a large array when normally processing events in the background as we tend to keep up.
Expand Down Expand Up @@ -125,13 +133,19 @@ impl OrderingTask {
{
trace!(?recon_events, "new events discovered!");
state.add_inserted_events(recon_events);
// the more events we get in memory, the fewer queries we need to run to find potential history.
// so we read out all the events we can can. we do have to yield again, but getting a bigger batch
// we can process in went faster than smaller batches when processing millions of events at startup
if !rx_inserted.is_empty() {
let to_take = rx_inserted.len();
let mut remaining_events = Vec::with_capacity(to_take);
rx_inserted.recv_many(&mut remaining_events, to_take).await;
state.add_inserted_events(remaining_events);
}

if let Err(should_exit) = state
.process_streams(Arc::clone(&event_access))
.await
.map_err(Self::log_error)
{
if should_exit {
state = match state.process_streams(Arc::clone(&event_access)).await {
Ok(s) => s,
Err(()) => {
error!("Ordering task exiting due to fatal error");
return;
}
Expand All @@ -147,28 +161,7 @@ impl OrderingTask {
state.add_inserted_events(remaining);
}

let _ = state
.process_streams(event_access)
.await
.map_err(Self::log_error);
}

/// Log an error and return a true if it was fatal
fn log_error(err: Error) -> bool {
match err {
Error::Application { error } => {
warn!("Encountered application error: {:?}", error);
false
}
Error::Fatal { error } => {
error!("Encountered fatal error: {:?}", error);
true
}
Error::Transient { error } | Error::InvalidArgument { error } => {
warn!("Encountered error: {:?}", error);
false
}
}
let _ = state.process_streams(event_access).await;
}
}

Expand All @@ -190,7 +183,7 @@ impl StreamEvent {
}

/// Builds a stream event from the database if it exists.
async fn load_by_cid(event_access: Arc<EventAccess>, cid: EventCid) -> Result<Option<Self>> {
async fn load_by_cid(event_access: &EventAccess, cid: EventCid) -> Result<Option<Self>> {
// TODO: Condense the multiple DB queries happening here into a single query
let (exists, deliverable) = event_access.deliverable_by_cid(&cid).await?;
if exists {
Expand Down Expand Up @@ -398,7 +391,7 @@ impl StreamEvents {
}
}

async fn order_events(&mut self, event_access: Arc<EventAccess>) -> Result<()> {
async fn order_events(&mut self, event_access: &EventAccess) -> Result<()> {
// We collect everything we can into memory and then order things.
// If our prev is deliverable then we can mark ourselves as deliverable. If our prev wasn't deliverable yet,
// we track it and repeat (i.e. add it to our state and the set we're iterating to attempt to load its prev).
Expand Down Expand Up @@ -438,7 +431,7 @@ impl StreamEvents {
// nothing to do until it arrives on the channel
}
} else if let Some(discovered_prev) =
StreamEvent::load_by_cid(Arc::clone(&event_access), desired_prev).await?
StreamEvent::load_by_cid(event_access, desired_prev).await?
{
match &discovered_prev {
// we found our prev in the database and it's deliverable, so we're deliverable now
Expand Down Expand Up @@ -504,6 +497,24 @@ impl OrderingState {
}
}

/// Log an error and return a true if it was fatal
fn log_error(err: Error) -> bool {
match err {
Error::Application { error } => {
warn!("Encountered application error: {:?}", error);
false
}
Error::Fatal { error } => {
error!("Encountered fatal error: {:?}", error);
true
}
Error::Transient { error } | Error::InvalidArgument { error } => {
warn!("Encountered error: {:?}", error);
false
}
}
}

/// Update the list of streams to process with new events.
/// Relies on `add_stream_event` to handle updating the internal state.
fn add_inserted_events(&mut self, events: Vec<DiscoveredEvent>) {
Expand All @@ -528,40 +539,115 @@ impl OrderingState {

/// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to commit things in batches,
/// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried.
async fn process_streams(&mut self, event_access: Arc<EventAccess>) -> Result<()> {
for (_stream_cid, stream_events) in self.pending_by_stream.iter_mut() {
if stream_events.should_process {
stream_events
.order_events(Arc::clone(&event_access))
.await?;
/// Requires taking ownership of self and returning it rather than &mut access to allow processing the streams in multiple tasks.
/// Returns an error if processing was fatal and should not be retried.
async fn process_streams(
mut self,
event_access: Arc<EventAccess>,
) -> std::result::Result<Self, ()> {
let mut to_process = HashMap::new();
self.pending_by_stream.retain(|k, s| {
if s.should_process {
let v = std::mem::take(s);
to_process.insert(*k, v);
false
} else {
true
}
});

// we need to collect everything we get back from the tasks so we can put it back on our state
// in case we fail to persist the updates and need to try again
let mut processed_streams = Vec::with_capacity(to_process.len());

let mut task_set =
Self::spawn_tasks_for_stream_batches(Arc::clone(&event_access), to_process);

while let Some(res) = task_set.join_next().await {
let streams = match res {
Ok(task_res) => match task_res {
Ok(streams) => streams,
Err(error) => {
warn!(?error, "Error encountered processing stream batch");
continue;
}
},
Err(e) => {
error!(error=?e, "Failed to join task spawned to process stream batch");
continue;
}
};
for (_, stream_events) in &streams {
self.deliverable
.extend(stream_events.new_deliverable.iter());
}
processed_streams.extend(streams);
}

match self.persist_ready_events(Arc::clone(&event_access)).await {
match self.persist_ready_events(&event_access).await {
Ok(_) => {}
Err(err) => {
// Clear the queue as we'll rediscover it on the next run, rather than try to double update everything.
// We will no-op the updates so it doesn't really hurt but it's unnecessary.
// The StreamEvents in our pending_by_stream map all have their state updated in memory so we can pick up where we left off.
self.deliverable.clear();
return Err(err);
if Self::log_error(err) {
// if it's fatal we will tell the task to exit
// if not, we return the state to try again
return Err(());
} else {
// Clear the queue as we'll rediscover it on the next run, rather than try to double update everything.
// We will no-op the updates so it doesn't really hurt but it's unnecessary.
// The StreamEvents in our pending_by_stream map all have their state updated in memory so we can pick up where we left off.
self.deliverable.clear();
return Ok(self);
}
}
}
// keep things that still have missing history but don't process them again until we get something new
self.pending_by_stream
.retain(|_, stream_events| !stream_events.processing_completed());
for (cid, mut stream) in processed_streams.into_iter() {
if !stream.processing_completed() {
self.pending_by_stream.insert(cid, stream);
}
}

debug!(remaining_streams=%self.pending_by_stream.len(), "Finished processing streams");
trace!(stream_state=?self, "Finished processing streams");

Ok(())
Ok(self)
}

/// Splits `to_process` into chunks and spawns at most `MAX_STREAM_PROCESSING_TASKS` to review and order
/// the events for each chunk of streams.
fn spawn_tasks_for_stream_batches(
event_access: Arc<EventAccess>,
to_process: HashMap<Cid, StreamEvents>,
) -> JoinSet<Result<Vec<(Cid, StreamEvents)>>> {
let mut task_set = JoinSet::new();

let chunk_size =
(to_process.len() / MAX_STREAM_PROCESSING_TASKS).max(MIN_NUM_STREAMS_PER_BATCH);

let chunks = &to_process.into_iter().chunks(chunk_size);
// chunks uses a RefCell and isn't send, so we need to allocate again and move them to avoid sync issues
let mut streams_to_send = Vec::with_capacity(MAX_STREAM_PROCESSING_TASKS);

for chunk in chunks {
let streams: Vec<(Cid, StreamEvents)> = chunk.collect();
streams_to_send.push(streams);
}

for mut streams in streams_to_send.into_iter() {
let event_access_cln = Arc::clone(&event_access);
task_set.spawn(async move {
for (_, stream_events) in streams.iter_mut() {
stream_events.order_events(&event_access_cln).await?;
}
Ok(streams)
});
}
task_set
}

/// Process all undelivered events in the database. This is a blocking operation that could take a long time.
/// It is intended to be run at startup but could be used on an interval or after some errors to recover.
pub(crate) async fn process_all_undelivered_events(
async fn process_all_undelivered_events(
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
Expand Down Expand Up @@ -726,7 +812,7 @@ impl OrderingState {

/// We should improve the error handling and likely add some batching if the number of ready events is very high.
/// We copy the events up front to avoid losing any events if the task is cancelled.
async fn persist_ready_events(&mut self, event_access: Arc<EventAccess>) -> Result<()> {
async fn persist_ready_events(&mut self, event_access: &EventAccess) -> Result<()> {
if !self.deliverable.is_empty() {
tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver");
let mut tx = event_access.begin_tx().await?;
Expand Down

0 comments on commit cd39825

Please sign in to comment.