-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: capture stream metadata in database and use for IOD #375
base: feat/aes-130-data-migrations
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good
@@ -0,0 +1,9 @@ | |||
-- Add up migration script here | |||
CREATE TABLE IF NOT EXISTS "ceramic_one_event_header" ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this table get populated for existing dbs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to tackle that part today 😅. I'm planning to follow the kubo approach and basically add a CLI flag to not prompt, otherwise ask/exit if they should be applied. It will require reading every event, parsing, and storing the headers so it can't (easily) be done via a sql statement. I'm open to other approaches... was thinking I'd create a table like below and then query/update at start.
create table ceramic_one_data_migrations (
version text unique not null,
started_at timestamp not null default current_timestamp,
completed_at timestamp
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't finished reviewing ordering_task.rs
or service.rs
yet, but there's a lot of comments already I want to get out and my brain is starting to turn mushy reading all this code so I think this is all I can get through today. Will plan to revisit tomorrow.
-- Add up migration script here | ||
CREATE TABLE IF NOT EXISTS "ceramic_one_stream" ( | ||
"cid" BLOB NOT NULL, -- init event cid | ||
"sep" TEXT NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about controller? Seems like we'll definitely want that for event validation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I wanted to discuss controller and whether we want to fully normalize it out. it's easy to capture for just the init event here, but we may want to include it for every event as well. I wasn't sure if we wanted many to many or just a single one. having it normalized for every event makes it possible to do "flat recon" things but it would still be a decent amount of work to implement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for now we should assume that there is only ever a single controller, and that the controller for a stream never changes. Both of those are true today, and if either were to ever change it would require work across the entire stack, so I think requiring a database migration if/when that were to happen would be okay. For now let's keep it simple.
"stream_cid" BLOB NOT NULL, -- id field in header. can't have FK because stream may not exist until we discover it but should reference ceramic_one_stream(cid) | ||
"prev" BLOB, -- prev event cid (can't have FK because node may not know about prev event) | ||
PRIMARY KEY(cid), | ||
FOREIGN KEY(cid) REFERENCES ceramic_one_event(cid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why have this metadata in a separate table at all instead of directly in the ceramic_one_event
table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to make that table any wider as we do large scans and we have to load the entire row off disk even if we only read some of the columns. Since these types are pretty small it probably doesn't matter, but I kind of expected event_metadata
(probably going with that since event_header
is silly) to keep growing and I didn't want to end up with a 25 column event table.
If I could redefine these tables (wow, that was quick), I'd rename event -> event_hash and make event_header/event_metadata the event table and put delivered and all the new data in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay fair enough. Was also just talking to @smrz2001 about the fact that for self-anchoring we'll probably want to track whether we learned of an event via the API or via Recon. That might be something we want to add to this metadata table as well.
service/src/event/ordering_task.rs
Outdated
deliverable.push_back(start_with); | ||
self.remove_by_event_cid(&start_with); | ||
let mut tip = start_with; | ||
// technically, could be in deliverable set as well if the stream is forking? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should definitely make sure we have tests that cover the case of stream forks and multiple histories each with gaps that get filled in at different times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the thorough review Spencer! I pretty much agree with everything you mentioned. Here's my summary of what I think I should do:
- fix the bugs 😄
- rename
ceramic_one_event_header
toceramic_one_event_metadata
(or possibly just put the data in the event table but I'd rather not as I wrote in that comment) - remove
commit
from the codebase and just refer to events - review and rename/consolidate types. A number of them were specific to queries for serialization or grouping a few CIDs and useful data without relying on a tuple just for a bit of sanity, but it could be a lot cleaner. most of them aren't public outside of the module but also may not be necessary.
- this includes relying on something from the event crate and possibly sharing carfile parsing with the api server
- clean up comments. some were just missed/cruft/typos, some were me reasoning about why something was correct/okay but don't add any value and are a bit confusing now
And there's possibly more I missed but I'll go over each one as I change things tomorrow.
store/src/sql/access/event.rs
Outdated
unreachable!("Init events should have been filtered out") | ||
} | ||
EventHeader::Data { cid, prev, .. } | EventHeader::Time { cid, prev, .. } => { | ||
// check for prev in this set and fallback to database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think you're right. Good catch! I should probably write a better test and make sure 😄. This would fail if we somehow got 3 writes from the API for a single stream out of order. For recon discovered events, we would miss marking them immediately (but wouldn't error), and the stream would get sent to the task and it would find them, but it shouldn't need to.
store/src/sql/access/event.rs
Outdated
events: &'a [EventInsertable], | ||
pool: &SqlitePool, | ||
require_history: bool, | ||
) -> Result<Vec<(bool, &'a EventInsertable)>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I 100% agree with this. I discussed a bit with Nathaniel and we want to make the an event from the event crate part of the public interface, but some of it comes out of event validation and the migrations stuff he's working on right now.
I made a lot of types to wrap specific behavior (like a group of CIDs rather than a tuple) or a specific database query result but it merits more thought and clean up.
store/src/sql/access/event.rs
Outdated
let new_key = Self::insert_key(&mut tx, &item.order_key).await?; | ||
for (idx, (deliverable, item)) in to_add.iter().enumerate() { | ||
let new_key = Self::insert_key(&mut tx, &item.order_key, *deliverable).await?; | ||
let candiadate = CandidateEvent::new(item.cid(), item.stream_cid()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks :). I renamed this locally in the migration to populate the new tables and will push it tonight even though it's not quite ready (i.e. working)
}; | ||
|
||
/// Access to the stream and related tables. Generally querying events as a stream. | ||
pub struct CeramicOneStream {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense. These access structs were intended to be a sort of grouping of higher level functionality than just single table/entity writes but the names are pretty much copied straight from the tables which is a misnomer for sure.
} | ||
} else { | ||
undelivered.push(candiadate); | ||
} | ||
if new_key { | ||
for block in item.body.blocks.iter() { | ||
CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everything is wrapped in a single (large) transaction so it gets rolled back together
-- Add up migration script here | ||
CREATE TABLE IF NOT EXISTS "ceramic_one_stream" ( | ||
"cid" BLOB NOT NULL, -- init event cid | ||
"sep" TEXT NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I wanted to discuss controller and whether we want to fully normalize it out. it's easy to capture for just the init event here, but we may want to include it for every event as well. I wasn't sure if we wanted many to many or just a single one. having it normalized for every event makes it possible to do "flat recon" things but it would still be a decent amount of work to implement.
"stream_cid" BLOB NOT NULL, -- id field in header. can't have FK because stream may not exist until we discover it but should reference ceramic_one_stream(cid) | ||
"prev" BLOB, -- prev event cid (can't have FK because node may not know about prev event) | ||
PRIMARY KEY(cid), | ||
FOREIGN KEY(cid) REFERENCES ceramic_one_event(cid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to make that table any wider as we do large scans and we have to load the entire row off disk even if we only read some of the columns. Since these types are pretty small it probably doesn't matter, but I kind of expected event_metadata
(probably going with that since event_header
is silly) to keep growing and I didn't want to end up with a 25 column event table.
If I could redefine these tables (wow, that was quick), I'd rename event -> event_hash and make event_header/event_metadata the event table and put delivered and all the new data in there.
6a2c772
to
6ee824a
Compare
636fcb1
to
ac79dee
Compare
IPLD impls debug, PartialEq, Eq so unless we allow true floats I think this is okay
- populated when discovering events - rewrote IOD using it (been running `recon_lots_of_streams` in a loop and it keeps passing) - created crate specific InsertResult structs (api, recon, service, store). If any api batch write fails because of something in the service (i.e. no prev), it won't fail other writes in the batch
ac79dee
to
d7e194b
Compare
83a3068
to
e46e101
Compare
We now store stream metadata in the database so we can find all events for a stream. This simplifies the in order delivery behavior. We do make more database queries now but use less memory to try to keep track of events. This could be more efficient, but for now I went with simple and hopefully good enough, especially while streams are short. If streams start being longer, this may not be a very good approach.
We run a task to update deliverable status for streams. It gets notifications when an event for a recon event is inserted and is currently deliverable, and it tries to sort any other pieces out and update them. It includes start up operation to load streams and mark them deliverable as well. Finally, there's a data migration run on daemon start to populate the new tables.
This should fix the test in js-ceramic. It requires #381 and #382.