From d7e194b8b58d959e201e09ce891e143b4112cc33 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 19:53:01 -0600 Subject: [PATCH] chore: write migration to populate new tables from blockstore --- store/src/migration.rs | 116 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/store/src/migration.rs b/store/src/migration.rs index 9dbbcecbd..9205297c4 100644 --- a/store/src/migration.rs +++ b/store/src/migration.rs @@ -3,7 +3,10 @@ use std::sync::OnceLock; use sqlx::{prelude::FromRow, types::chrono}; use tracing::{debug, info}; -use crate::{Error, Result, SqlitePool}; +use crate::{ + sql::entities::{EventHeader, ReconEventBlockRaw}, + CeramicOneStream, Error, EventInsertableBody, Result, SqlitePool, +}; static MIGRATIONS: OnceLock> = OnceLock::new(); @@ -18,7 +21,12 @@ struct Migration { /// The list of migrations that need to be run in order to get the database up to date and start the server. /// Add new migrations to the end of the list. fn data_migrations() -> &'static Vec { - MIGRATIONS.get_or_init(Vec::new) + MIGRATIONS.get_or_init(|| { + vec![Migration { + name: "events_to_streams", + _version: "0.22.0", + }] + }) } #[derive(Debug, Clone, sqlx::FromRow)] @@ -123,8 +131,8 @@ impl DataMigrator { } async fn run_migration_by_name(&self, name: &str) -> Result<()> { - #[allow(clippy::match_single_binding)] - let _res: Result<()> = match name { + let res = match name { + "events_to_streams" => self.migrate_events_to_streams().await, _ => { return Err(Error::new_fatal(anyhow::anyhow!( "Unknown migration: {}", @@ -132,8 +140,7 @@ impl DataMigrator { ))) } }; - #[allow(unreachable_code)] - match _res { + match res { Ok(_) => { info!("Migration {} completed successfully", name); Ok(()) @@ -155,6 +162,103 @@ impl DataMigrator { } } } + + // This isn't the most efficient approach but it's simple and we should only run it once. + // It isn't expected to ever be run on something that isn't a sqlite database upgrading from version 0.22.0 or below. + async fn migrate_events_to_streams(&self) -> Result<()> { + let mut cid_cursor = Some(0); + + struct EventToStreamRow { + row_blocks: ReconEventBlockRaw, + row_id: i64, + } + use sqlx::Row; + impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for EventToStreamRow { + fn from_row(row: &sqlx::sqlite::SqliteRow) -> std::result::Result { + let row_id: i64 = row.try_get("rowid")?; + let row_blocks = ReconEventBlockRaw::from_row(row)?; + Ok(Self { row_id, row_blocks }) + } + } + + let limit = 1000; + let mut total_migrated = 0; + while let Some(last_cid) = cid_cursor { + let mut tx = self.pool.begin_tx().await?; + + // RowID starts from 1 so we can just use greater than 0 to start + let all_blocks: Vec = sqlx::query_as( + r#"WITH key AS ( + SELECT + e.cid AS event_cid, e.order_key, e.rowid + FROM ceramic_one_event e + WHERE + EXISTS ( + SELECT 1 + FROM ceramic_one_event_block + WHERE event_cid = e.cid + ) + AND NOT EXISTS ( + SELECT 1 + FROM ceramic_one_event_metadata + WHERE cid = e.cid + ) + AND e.rowid > $1 + ORDER BY e.rowid + LIMIT $2 + ) + SELECT + key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes, key.rowid + FROM key + JOIN ceramic_one_event_block eb ON key.event_cid = eb.event_cid + JOIN ceramic_one_block b ON b.multihash = eb.block_multihash + ORDER BY key.order_key, eb.idx;"#, + ) + .bind(last_cid) + .bind(limit) + .fetch_all(&mut **tx.inner()) + .await?; + + let new_max = all_blocks.iter().map(|v| v.row_id).max(); + if all_blocks.is_empty() || new_max.is_none() { + tx.commit().await?; + return Ok(()); + } else { + cid_cursor = new_max; + } + + let values = ReconEventBlockRaw::into_carfiles( + all_blocks.into_iter().map(|b| b.row_blocks).collect(), + ) + .await?; + total_migrated += values.len(); + if total_migrated % 10000 == 0 { + debug!("Migrated {} events to the stream format", total_migrated); + } + tracing::trace!("found {} values to migrate", values.len()); + + for (event_id, payload) in values { + tracing::trace!("Migrating event: {}", event_id); + // should we log and continue if anything fails? It shouldn't be possible unless something bad happened + // and we allowed unexpected data into the system, but if we error, it will require manual intervention to recover + // as the bad data will need to be deleted by hand. temporary failures to write can be retried. + let event_cid = event_id.cid().ok_or_else(|| { + Error::new_fatal(anyhow::anyhow!("Event ID is missing a CID: {}", event_id)) + })?; + + let insertable = EventInsertableBody::try_from_carfile(event_cid, &payload).await?; + + if let EventHeader::Init { header, .. } = &insertable.header { + CeramicOneStream::insert_tx(&mut tx, insertable.cid, header).await?; + } + + CeramicOneStream::insert_event_header_tx(&mut tx, &insertable.header).await?; + } + tx.commit().await?; + } + + Ok(()) + } } #[derive(Debug, Clone, FromRow)]