Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: capture stream metadata in database and use for IOD #375

Draft
wants to merge 9 commits into
base: feat/aes-130-data-migrations
Choose a base branch
from
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ serde_qs = "0.10.1"
serde_with = "2.1"
sha2 = { version = "0.10", default-features = false }
smallvec = "1.10"
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio"] }
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] }
ssh-key = { version = "0.5.1", default-features = false }
ssi = { version = "0.7", features = ["ed25519"] }
swagger = { version = "6.1", features = [
Expand Down
2 changes: 1 addition & 1 deletion api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod server;

pub use resume_token::ResumeToken;

pub use server::{EventStore, InterestStore, Server};
pub use server::{EventInsertResult, EventStore, InterestStore, Server};

#[cfg(test)]
mod tests;
56 changes: 45 additions & 11 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

mod event;

use std::collections::HashMap;
use std::time::Duration;
use std::{future::Future, ops::Range};
use std::{marker::PhantomData, ops::RangeBounds};
Expand Down Expand Up @@ -162,11 +163,28 @@ impl<S: InterestStore> InterestStore for Arc<S> {
}
}

#[derive(Debug, Clone)]
pub struct EventInsertResult {
id: EventId,
// if set, the reason this event couldn't be inserted
failed: Option<String>,
}

impl EventInsertResult {
pub fn new(id: EventId, failed: Option<String>) -> Self {
Self { id, failed }
}

pub fn success(&self) -> bool {
self.failed.is_none()
}
}

/// Trait for accessing persistent storage of Events
#[async_trait]
pub trait EventStore: Send + Sync {
/// Returns (new_key, new_value) where true if was newly inserted, false if it already existed.
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<bool>>;
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
&self,
range: Range<EventId>,
Expand Down Expand Up @@ -199,7 +217,7 @@ pub trait EventStore: Send + Sync {

#[async_trait::async_trait]
impl<S: EventStore> EventStore for Arc<S> {
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<bool>> {
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>> {
self.as_ref().insert_many(items).await
}

Expand Down Expand Up @@ -241,7 +259,7 @@ impl<S: EventStore> EventStore for Arc<S> {
struct EventInsert {
id: EventId,
data: Vec<u8>,
tx: tokio::sync::oneshot::Sender<Result<bool>>,
tx: tokio::sync::oneshot::Sender<Result<EventInsertResult>>,
}

struct InsertTask {
Expand Down Expand Up @@ -325,25 +343,35 @@ where
if events.is_empty() {
return;
}
let mut oneshots = Vec::with_capacity(events.len());
let mut oneshots = HashMap::with_capacity(events.len());
let mut items = Vec::with_capacity(events.len());
events.drain(..).for_each(|req: EventInsert| {
oneshots.push(req.tx);
oneshots.insert(req.id.to_bytes(), req.tx);
items.push((req.id, req.data));
});
tracing::trace!("calling insert many with {} items.", items.len());
match event_store.insert_many(&items).await {
Ok(results) => {
tracing::debug!("insert many returned {} results.", results.len());
for (tx, result) in oneshots.into_iter().zip(results.into_iter()) {
if let Err(e) = tx.send(Ok(result)) {
tracing::warn!("failed to send success response to api listener: {:?}", e);
for result in results {
if let Some(tx) = oneshots.remove(&result.id.to_bytes()) {
if let Err(e) = tx.send(Ok(result)) {
tracing::warn!(
"failed to send success response to api listener: {:?}",
e
);
}
} else {
tracing::warn!(
"lost channel to respond to API listener for event ID: {:?}",
result.id
);
}
}
}
Err(e) => {
tracing::warn!("failed to insert events: {e}");
for tx in oneshots.into_iter() {
for tx in oneshots.into_values() {
if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) {
tracing::warn!("failed to send failed response to api listener: {:?}", e);
}
Expand Down Expand Up @@ -495,15 +523,21 @@ where
.await?
.map_err(|_| ErrorResponse::new("Database service not available".to_owned()))?;

let _new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx)
let new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx)
.await
.map_err(|_| {
ErrorResponse::new("Timeout waiting for database service response".to_owned())
})?
.map_err(|_| ErrorResponse::new("No response. Database service crashed".to_owned()))?
.map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?;

Ok(EventsPostResponse::Success)
if let Some(failed) = new.failed {
Ok(EventsPostResponse::BadRequest(BadRequestResponse::new(
failed,
)))
} else {
Ok(EventsPostResponse::Success)
}
}

pub async fn post_interests(
Expand Down
55 changes: 53 additions & 2 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{ops::Range, str::FromStr, sync::Arc};
use crate::server::decode_multibase_data;
use crate::server::BuildResponse;
use crate::server::Server;
use crate::EventInsertResult;
use crate::{EventStore, InterestStore};

use anyhow::Result;
Expand Down Expand Up @@ -121,7 +122,7 @@ mock! {
pub EventStoreTest {}
#[async_trait]
impl EventStore for EventStoreTest {
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<bool>>;
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
&self,
range: Range<EventId>,
Expand Down Expand Up @@ -198,7 +199,12 @@ async fn create_event() {
.expect_insert_many()
.with(predicate::eq(args))
.times(1)
.returning(|_| Ok(vec![true]));
.returning(|input| {
Ok(input
.iter()
.map(|(id, _)| EventInsertResult::new(id.clone(), None))
.collect())
});
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.events_post(
Expand All @@ -211,6 +217,51 @@ async fn create_event() {
.unwrap();
assert!(matches!(resp, EventsPostResponse::Success));
}

#[tokio::test]
async fn create_event_fails() {
let peer_id = PeerId::random();
let network = Network::Mainnet;
let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap();

// Remove whitespace from event CAR file
let event_data = DATA_EVENT_CAR
.chars()
.filter(|c| !c.is_whitespace())
.collect::<String>();
let mock_interest = MockAccessInterestStoreTest::new();
let mut mock_event_store = MockEventStoreTest::new();
mock_get_init_event(&mut mock_event_store);
let args = vec![(
expected_event_id.clone(),
decode_multibase_data(&event_data).unwrap(),
)];

mock_event_store
.expect_insert_many()
.with(predicate::eq(args))
.times(1)
.returning(|input| {
Ok(input
.iter()
.map(|(id, _)| {
EventInsertResult::new(id.clone(), Some("Event is missing prev".to_string()))
})
.collect())
});
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.events_post(
models::EventData {
data: event_data.to_string(),
},
&Context,
)
.await
.unwrap();
assert!(matches!(resp, EventsPostResponse::BadRequest(_)));
}

#[tokio::test]
#[traced_test]
async fn register_interest_sort_value() {
Expand Down
2 changes: 1 addition & 1 deletion event/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{
};

/// Sequence of byte values.
#[derive(Clone, PartialEq, Default, Debug)]
#[derive(Clone, PartialEq, Eq, Default, Debug)]
pub struct Bytes(Vec<u8>);

impl Bytes {
Expand Down
12 changes: 11 additions & 1 deletion event/src/unvalidated/payload/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl<D> Payload<D> {
}

/// Headers for an init event
#[derive(Default, Serialize, Deserialize)]
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
dav1do marked this conversation as resolved.
Show resolved Hide resolved
#[serde(rename_all = "camelCase")]
pub struct Header {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
Expand Down Expand Up @@ -81,4 +81,14 @@ impl Header {
pub fn should_index(&self) -> bool {
self.should_index.unwrap_or(true)
}

/// The unique value for the stream
pub fn unique(&self) -> Option<&[u8]> {
self.unique.as_ref().map(Bytes::as_slice)
}

/// The context value for the stream
pub fn context(&self) -> Option<&[u8]> {
self.context.as_ref().map(Bytes::as_slice)
}
}
2 changes: 2 additions & 0 deletions migrations/sqlite/20240529202212_stream.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS "ceramic_one_stream";
8 changes: 8 additions & 0 deletions migrations/sqlite/20240529202212_stream.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS "ceramic_one_stream" (
"cid" BLOB NOT NULL, -- init event cid
"sep" TEXT NOT NULL,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about controller? Seems like we'll definitely want that for event validation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I wanted to discuss controller and whether we want to fully normalize it out. it's easy to capture for just the init event here, but we may want to include it for every event as well. I wasn't sure if we wanted many to many or just a single one. having it normalized for every event makes it possible to do "flat recon" things but it would still be a decent amount of work to implement.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now we should assume that there is only ever a single controller, and that the controller for a stream never changes. Both of those are true today, and if either were to ever change it would require work across the entire stack, so I think requiring a database migration if/when that were to happen would be okay. For now let's keep it simple.

"sep_value" blob NOT NULL,
-- we ignore the composeDB/indexing related fields: should_index, unique, context
PRIMARY KEY(cid)
);
2 changes: 2 additions & 0 deletions migrations/sqlite/20240530125008_event_metadata.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS "ceramic_one_event_metadata";
11 changes: 11 additions & 0 deletions migrations/sqlite/20240530125008_event_metadata.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS "ceramic_one_event_metadata" (
"cid" BLOB NOT NULL, -- event cid
"event_type" INTEGER NOT NULL, -- enum EventType: Init, Data, Time
"stream_cid" BLOB NOT NULL, -- id field in header. can't have FK because stream may not exist until we discover it but should reference ceramic_one_stream(cid)
"prev" BLOB, -- prev event cid. can't have a foreign key because node may not know about prev event but it should reference ceramic_one_event(cid)
PRIMARY KEY(cid),
FOREIGN KEY(cid) REFERENCES ceramic_one_event(cid)
);

CREATE INDEX IF NOT EXISTS "idx_ceramic_one_event_metadata_stream_cid" ON "ceramic_one_event_metadata" ("stream_cid");
2 changes: 2 additions & 0 deletions migrations/sqlite/20240611165721_data_migration.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXSITS ceramic_one_data_migration;
9 changes: 9 additions & 0 deletions migrations/sqlite/20240611165721_data_migration.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Add up migration script here

CREATE TABLE IF NOT EXISTS ceramic_one_data_migration (
"name" TEXT PRIMARY KEY NOT NULL,
"version" TEXT NOT NULL,
started_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_attempted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
);
2 changes: 2 additions & 0 deletions migrations/sqlite/20240611183747_version.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS "ceramic_one_version";
6 changes: 6 additions & 0 deletions migrations/sqlite/20240611183747_version.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS "ceramic_one_version" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
"version" TEXT NOT NULL UNIQUE,
"installed_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
Loading
Loading