Skip to content

Commit

Permalink
fix: start up ordering task bug (#516)
Browse files Browse the repository at this point in the history
* fix: using id instead of prev for start up ordering

added test to expose problem (fails before, passes now)

* fix: remove unnecessary log statement

was made info during deubgging, maybe merits trace but removing for now
  • Loading branch information
dav1do authored Sep 9, 2024
1 parent e58b206 commit 243307d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 10 deletions.
59 changes: 50 additions & 9 deletions event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ impl OrderingState {
.id()
.expect("id must exist for non-init events");
let prev = parsed_event
.id()
.prev()
.expect("prev must exist for non-init events");

self.add_stream_event(
Expand Down Expand Up @@ -624,7 +624,7 @@ mod test {
assert_eq!(0, processed);
}

async fn insert_10_with_9_undelivered(pool: &SqlitePool) {
async fn insert_10_with_9_undelivered(pool: &SqlitePool) -> Vec<EventInsertable> {
let mut insertable = get_n_insertable_events(10).await;
let mut init = insertable.remove(0);
init.set_deliverable(true);
Expand All @@ -641,6 +641,7 @@ mod test {
.unwrap();
assert_eq!(1, new.inserted.len());
assert_eq!(1, new.inserted.iter().filter(|e| e.deliverable).count());
insertable
}

#[test(tokio::test)]
Expand Down Expand Up @@ -731,11 +732,36 @@ mod test {
async fn test_process_all_undelivered_one_batch() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
// create 5 streams with 9 undelivered events each
insert_10_with_9_undelivered(&pool).await;
insert_10_with_9_undelivered(&pool).await;
insert_10_with_9_undelivered(&pool).await;
insert_10_with_9_undelivered(&pool).await;
insert_10_with_9_undelivered(&pool).await;
let expected_a = Vec::from_iter(
insert_10_with_9_undelivered(&pool)
.await
.into_iter()
.map(|e| *e.cid()),
);
let expected_b = Vec::from_iter(
insert_10_with_9_undelivered(&pool)
.await
.into_iter()
.map(|e| *e.cid()),
);
let expected_c = Vec::from_iter(
insert_10_with_9_undelivered(&pool)
.await
.into_iter()
.map(|e| *e.cid()),
);
let expected_d = Vec::from_iter(
insert_10_with_9_undelivered(&pool)
.await
.into_iter()
.map(|e| *e.cid()),
);
let expected_e = Vec::from_iter(
insert_10_with_9_undelivered(&pool)
.await
.into_iter()
.map(|e| *e.cid()),
);

let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000)
.await
Expand All @@ -745,9 +771,24 @@ mod test {
.await
.unwrap();

let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000)
let (_hw, cids) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000)
.await
.unwrap();
assert_eq!(50, event.len());
assert_eq!(50, cids.len());
assert_eq!(expected_a, build_expected(&cids, &expected_a));
assert_eq!(expected_b, build_expected(&cids, &expected_b));
assert_eq!(expected_c, build_expected(&cids, &expected_c));
assert_eq!(expected_d, build_expected(&cids, &expected_d));
assert_eq!(expected_e, build_expected(&cids, &expected_e));
}

fn build_expected(new_cids: &[Cid], expected: &[Cid]) -> Vec<Cid> {
let mut output = Vec::with_capacity(new_cids.len());
for e in new_cids {
if expected.iter().any(|i| i == e) {
output.push(*e);
}
}
output
}
}
1 change: 0 additions & 1 deletion recon/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,6 @@ where
/// - before we sign off on a conversation as either the initiator or responder
/// - when our in memory list gets too large
async fn persist_all(&mut self) -> Result<()> {
tracing::info!("calling persist all: {}", self.event_q.len());
if self.event_q.is_empty() {
return Ok(());
}
Expand Down

0 comments on commit 243307d

Please sign in to comment.