Skip to content

Commit

Permalink
fix: use a reader and a writer task to process undelivered events at …
Browse files Browse the repository at this point in the history
…start up

This will help some as we're able to do all the sorting/reading of event history in one task while the other finds new events that need to be added. It is similar to the insert/ordering task flow now.
  • Loading branch information
dav1do committed Nov 27, 2024
1 parent 624f1cc commit 5d80f0a
Showing 1 changed file with 82 additions and 41 deletions.
123 changes: 82 additions & 41 deletions event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::anyhow;
use ceramic_event::unvalidated;
use cid::Cid;
use ipld_core::ipld::Ipld;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, info, trace, warn};

use crate::store::EventAccess;
Expand Down Expand Up @@ -36,8 +37,34 @@ impl OrderingTask {
max_iterations: usize,
batch_size: u32,
) -> Result<usize> {
OrderingState::process_all_undelivered_events(event_access, max_iterations, batch_size)
.await
let (tx, rx_inserted) = tokio::sync::mpsc::channel::<DiscoveredEvent>(10_000);

let event_access_cln = event_access.clone();
let writer_handle =
tokio::task::spawn(async move { Self::run_loop(event_access_cln, rx_inserted).await });
let cnt = match OrderingState::process_all_undelivered_events(
event_access,
max_iterations,
batch_size,
tx,
)
.await
{
Ok(cnt) => cnt,
Err(e) => {
error!("encountered error processing undelivered events: {}", e);
writer_handle.abort();
return Err(Error::new_fatal(anyhow!(
"failed to process undelivered events: {}",
e
)));
}
};
info!("Waiting for {cnt} undelivered events to finish ordering...");
if let Err(e) = writer_handle.await {
error!("event ordering task failed to complete: {}", e);
}
Ok(cnt)
}

/// Spawn a task to run the ordering task background process in a loop
Expand Down Expand Up @@ -76,6 +103,13 @@ impl OrderingTask {
}
}
}
if !rx_inserted.is_empty() {
let mut remaining = Vec::with_capacity(rx_inserted.len());
rx_inserted
.recv_many(&mut remaining, rx_inserted.len())
.await;
state.add_inserted_events(remaining);
}

let _ = state
.process_streams(event_access)
Expand Down Expand Up @@ -495,6 +529,7 @@ impl OrderingState {
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
tx: Sender<DiscoveredEvent>,
) -> Result<usize> {
info!("Attempting to process all undelivered events. This could take some time.");
let mut state = Self::new();
Expand All @@ -516,7 +551,7 @@ impl OrderingState {
// In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them
// or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay.
let number_processed = state
.process_undelivered_events_batch(Arc::clone(&event_access), undelivered)
.process_undelivered_events_batch(undelivered, &tx)
.await?;
event_cnt += number_processed;
if event_cnt % LOG_EVERY_N_ENTRIES < number_processed {
Expand All @@ -536,28 +571,46 @@ impl OrderingState {

async fn process_undelivered_events_batch(
&mut self,
event_access: Arc<EventAccess>,
event_data: Vec<(Cid, unvalidated::Event<Ipld>)>,
tx: &Sender<DiscoveredEvent>,
) -> Result<usize> {
trace!(cnt=%event_data.len(), "Processing undelivered events batch");
let mut event_cnt = 0;
let mut discovered_inits = Vec::new();
for (cid, parsed_event) in event_data {
event_cnt += 1;
if parsed_event.is_init() {
discovered_inits.push(cid);
if let Err(e) = tx
.send(DiscoveredEvent {
cid,
prev: None,
id: cid,
known_deliverable: true,
})
.await
{
todo!();
};
continue;
}

event_cnt += 1;
let stream_cid = parsed_event.stream_cid();
let prev = parsed_event
.prev()
.expect("prev must exist for non-init events");

self.add_stream_event(
*stream_cid,
StreamEvent::Undelivered(StreamEventMetadata::new(cid, *prev)),
);
if let Err(e) = tx
.send(DiscoveredEvent {
cid,
prev: Some(*prev),
id: *stream_cid,
known_deliverable: false,
})
.await
{
todo!();
}
}
// while undelivered init events should be unreachable, we can fix the state if it happens so we won't panic in release mode
// and simply correct things in the database. We could make this fatal in the future, but for now it's just a warning to
Expand All @@ -569,13 +622,7 @@ impl OrderingState {
"Found init events in undelivered batch. This should never happen.",
);
debug_assert!(false);
let mut tx = event_access.begin_tx().await?;
for cid in discovered_inits {
event_access.mark_ready_to_deliver(&mut tx, &cid).await?;
}
tx.commit().await?;
}
self.process_streams(event_access).await?;

Ok(event_cnt)
}
Expand Down Expand Up @@ -630,7 +677,7 @@ mod test {
async fn test_undelivered_batch_empty() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());
let processed = OrderingState::process_all_undelivered_events(event_access, 10, 100)
let processed = OrderingTask::process_all_undelivered_events(event_access, 1, 5)
.await
.unwrap();
assert_eq!(0, processed);
Expand Down Expand Up @@ -673,28 +720,21 @@ mod test {
let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap();
assert_eq!(1, events.len());

let processed =
OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 1, 5)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 1, 5)
.await
.unwrap();
assert_eq!(5, processed);
let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap();
assert_eq!(6, events.len());
// the last 5 are processed and we have 10 delivered
let processed =
OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 1, 5)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 1, 5)
.await
.unwrap();
assert_eq!(4, processed);
let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap();
assert_eq!(10, events.len());

// nothing left
let processed =
OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 1, 100)
.await
.unwrap();

let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 1, 5)
.await
.unwrap();
assert_eq!(0, processed);
}

Expand All @@ -711,12 +751,13 @@ mod test {

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());
let _res = OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 4, 10)
let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 4, 10)
.await
.unwrap();

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(45, event.len());
assert_eq!(40, processed);
}

#[test(tokio::test)]
Expand All @@ -732,16 +773,15 @@ mod test {

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());
let _res = OrderingState::process_all_undelivered_events(
Arc::clone(&event_access),
100_000_000,
5,
)
.await
.unwrap();

let processed =
OrderingTask::process_all_undelivered_events(event_access.clone(), 100_000_000, 5)
.await
.unwrap();

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(50, event.len());
assert_eq!(45, processed);
}

#[test(tokio::test)]
Expand Down Expand Up @@ -782,12 +822,13 @@ mod test {

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());
let _res = OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 1, 100)
let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 1, 100)
.await
.unwrap();

let (_hw, cids) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(50, cids.len());
assert_eq!(45, processed);
assert_eq!(expected_a, build_expected(&cids, &expected_a));
assert_eq!(expected_b, build_expected(&cids, &expected_b));
assert_eq!(expected_c, build_expected(&cids, &expected_c));
Expand Down

0 comments on commit 5d80f0a

Please sign in to comment.