Skip to content

Commit

Permalink
feat: use the stream and event_metadata tables when discovering events
Browse files Browse the repository at this point in the history
- populated when discovering events
- rewrote IOD using it (been running `recon_lots_of_streams` in a loop and it keeps passing)
- created crate specific InsertResult structs (api, recon, service, store). If any api batch write fails because of something in the service (i.e. no prev), it won't fail other writes in the batch
  • Loading branch information
dav1do committed Jun 14, 2024
1 parent 2198d3a commit 100185b
Show file tree
Hide file tree
Showing 27 changed files with 1,258 additions and 1,050 deletions.
2 changes: 1 addition & 1 deletion api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod server;

pub use resume_token::ResumeToken;

pub use server::{EventStore, InterestStore, Server};
pub use server::{EventInsertResult, EventStore, InterestStore, Server};

#[cfg(test)]
mod tests;
56 changes: 45 additions & 11 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

mod event;

use std::collections::HashMap;
use std::time::Duration;
use std::{future::Future, ops::Range};
use std::{marker::PhantomData, ops::RangeBounds};
Expand Down Expand Up @@ -162,11 +163,28 @@ impl<S: InterestStore> InterestStore for Arc<S> {
}
}

#[derive(Debug, Clone)]
pub struct EventInsertResult {
id: EventId,
// if set, the reason this event couldn't be inserted
failed: Option<String>,
}

impl EventInsertResult {
pub fn new(id: EventId, failed: Option<String>) -> Self {
Self { id, failed }
}

pub fn success(&self) -> bool {
self.failed.is_none()
}
}

/// Trait for accessing persistent storage of Events
#[async_trait]
pub trait EventStore: Send + Sync {
/// Returns (new_key, new_value) where true if was newly inserted, false if it already existed.
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<bool>>;
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
&self,
range: Range<EventId>,
Expand Down Expand Up @@ -199,7 +217,7 @@ pub trait EventStore: Send + Sync {

#[async_trait::async_trait]
impl<S: EventStore> EventStore for Arc<S> {
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<bool>> {
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>> {
self.as_ref().insert_many(items).await
}

Expand Down Expand Up @@ -241,7 +259,7 @@ impl<S: EventStore> EventStore for Arc<S> {
struct EventInsert {
id: EventId,
data: Vec<u8>,
tx: tokio::sync::oneshot::Sender<Result<bool>>,
tx: tokio::sync::oneshot::Sender<Result<EventInsertResult>>,
}

struct InsertTask {
Expand Down Expand Up @@ -325,25 +343,35 @@ where
if events.is_empty() {
return;
}
let mut oneshots = Vec::with_capacity(events.len());
let mut oneshots = HashMap::with_capacity(events.len());
let mut items = Vec::with_capacity(events.len());
events.drain(..).for_each(|req: EventInsert| {
oneshots.push(req.tx);
oneshots.insert(req.id.to_bytes(), req.tx);
items.push((req.id, req.data));
});
tracing::trace!("calling insert many with {} items.", items.len());
match event_store.insert_many(&items).await {
Ok(results) => {
tracing::debug!("insert many returned {} results.", results.len());
for (tx, result) in oneshots.into_iter().zip(results.into_iter()) {
if let Err(e) = tx.send(Ok(result)) {
tracing::warn!("failed to send success response to api listener: {:?}", e);
for result in results {
if let Some(tx) = oneshots.remove(&result.id.to_bytes()) {
if let Err(e) = tx.send(Ok(result)) {
tracing::warn!(
"failed to send success response to api listener: {:?}",
e
);
}
} else {
tracing::warn!(
"lost channel to respond to API listener for event ID: {:?}",
result.id
);
}
}
}
Err(e) => {
tracing::warn!("failed to insert events: {e}");
for tx in oneshots.into_iter() {
for tx in oneshots.into_values() {
if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) {
tracing::warn!("failed to send failed response to api listener: {:?}", e);
}
Expand Down Expand Up @@ -495,15 +523,21 @@ where
.await?
.map_err(|_| ErrorResponse::new("Database service not available".to_owned()))?;

let _new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx)
let new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx)
.await
.map_err(|_| {
ErrorResponse::new("Timeout waiting for database service response".to_owned())
})?
.map_err(|_| ErrorResponse::new("No response. Database service crashed".to_owned()))?
.map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?;

Ok(EventsPostResponse::Success)
if let Some(failed) = new.failed {
Ok(EventsPostResponse::BadRequest(BadRequestResponse::new(
failed,
)))
} else {
Ok(EventsPostResponse::Success)
}
}

pub async fn post_interests(
Expand Down
55 changes: 53 additions & 2 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{ops::Range, str::FromStr, sync::Arc};
use crate::server::decode_multibase_data;
use crate::server::BuildResponse;
use crate::server::Server;
use crate::EventInsertResult;
use crate::{EventStore, InterestStore};

use anyhow::Result;
Expand Down Expand Up @@ -121,7 +122,7 @@ mock! {
pub EventStoreTest {}
#[async_trait]
impl EventStore for EventStoreTest {
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<bool>>;
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
&self,
range: Range<EventId>,
Expand Down Expand Up @@ -198,7 +199,12 @@ async fn create_event() {
.expect_insert_many()
.with(predicate::eq(args))
.times(1)
.returning(|_| Ok(vec![true]));
.returning(|input| {
Ok(input
.iter()
.map(|(id, _)| EventInsertResult::new(id.clone(), None))
.collect())
});
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.events_post(
Expand All @@ -211,6 +217,51 @@ async fn create_event() {
.unwrap();
assert!(matches!(resp, EventsPostResponse::Success));
}

#[tokio::test]
async fn create_event_fails() {
let peer_id = PeerId::random();
let network = Network::Mainnet;
let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap();

// Remove whitespace from event CAR file
let event_data = DATA_EVENT_CAR
.chars()
.filter(|c| !c.is_whitespace())
.collect::<String>();
let mock_interest = MockAccessInterestStoreTest::new();
let mut mock_event_store = MockEventStoreTest::new();
mock_get_init_event(&mut mock_event_store);
let args = vec![(
expected_event_id.clone(),
decode_multibase_data(&event_data).unwrap(),
)];

mock_event_store
.expect_insert_many()
.with(predicate::eq(args))
.times(1)
.returning(|input| {
Ok(input
.iter()
.map(|(id, _)| {
EventInsertResult::new(id.clone(), Some("Event is missing prev".to_string()))
})
.collect())
});
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.events_post(
models::EventData {
data: event_data.to_string(),
},
&Context,
)
.await
.unwrap();
assert!(matches!(resp, EventsPostResponse::BadRequest(_)));
}

#[tokio::test]
#[traced_test]
async fn register_interest_sort_value() {
Expand Down
3 changes: 1 addition & 2 deletions service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ ceramic-store.workspace = true
cid.workspace = true
hex.workspace = true
ipld-core.workspace = true
serde_ipld_dagcbor.workspace = true
iroh-bitswap.workspace = true
multihash-codetable.workspace = true
recon.workspace = true
Expand All @@ -32,8 +31,8 @@ ipld-core.workspace = true
multibase.workspace = true
paste = "1.0"
rand.workspace = true
serde.workspace = true
serde_ipld_dagcbor.workspace = true
serde.workspace = true
test-log.workspace = true
tmpdir.workspace = true
tokio.workspace = true
Expand Down
1 change: 1 addition & 0 deletions service/src/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod order_events;
mod ordering_task;
mod service;
mod store;
Expand Down
102 changes: 102 additions & 0 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::collections::HashSet;

use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool};
use cid::Cid;

use crate::Result;

pub(crate) struct OrderEvents {
pub(crate) deliverable: Vec<EventInsertable>,
pub(crate) missing_history: Vec<EventInsertable>,
}

impl OrderEvents {
/// Groups the events into lists those with a delivered prev and those without. This can be used to return an error if the event is required to have history.
/// The events will be marked as deliverable so that they can be passed directly to the store to be persisted.
pub async fn try_new(
pool: &SqlitePool,
mut candidate_events: Vec<EventInsertable>,
) -> Result<Self> {
// move all the init events to the front so we make sure to add them first and get the deliverable order correct
let new_cids: HashSet<Cid> = HashSet::from_iter(candidate_events.iter().map(|e| e.cid()));
let mut deliverable = Vec::with_capacity(candidate_events.len());
candidate_events.retain(|e| {
if e.deliverable() {
deliverable.push(e.clone());
false
} else {
true
}
});
if candidate_events.is_empty() {
return Ok(OrderEvents {
deliverable,
missing_history: Vec::new(),
});
}

let mut prevs_in_memory = Vec::with_capacity(candidate_events.len());
let mut missing_history = Vec::with_capacity(candidate_events.len());

while let Some(mut event) = candidate_events.pop() {
match &event.prev() {
None => {
unreachable!("Init events should have been filtered out since they're always deliverable");
}
Some(prev) => {
if new_cids.contains(prev) {
prevs_in_memory.push(event.clone());
continue;
} else {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, prev).await?;
if prev_deliverable {
event.set_deliverable(true);
deliverable.push(event);
} else {
// technically, we may have the "rosetta stone" event in memory that could unlock this chain, if we loaded everything and recursed,
// but the immediate prev is not in this set and has not been delivered to the client yet, so they shouldn't have known how to
// construct this event so we'll consider this missing history. This can be used to return an error if the event is required to have history.
missing_history.push(event);
}
}
}
}
}

// We add the events to the deliverable list until nothing changes.
// It should be a small set and it will shrink each loop, so continually looping is acceptable.
loop {
let mut made_changes = false;
while let Some(mut event) = prevs_in_memory.pop() {
match &event.prev() {
None => {
unreachable!(
"Init events should have been filtered out of the in memory set"
);
}
Some(prev) => {
// a hashset would be better loopkup but we're not going to have that many events so hashing
// for a handful of lookups and then convert back to a vec probably isn't worth it.
if deliverable.iter().any(|e| e.cid() == *prev) {
event.set_deliverable(true);
deliverable.push(event);
made_changes = true;
} else {
prevs_in_memory.push(event);
}
}
}
}
if !made_changes {
missing_history.extend(prevs_in_memory);
break;
}
}

Ok(OrderEvents {
deliverable,
missing_history,
})
}
}
Loading

0 comments on commit 100185b

Please sign in to comment.