From 636fcb1c4b037a257d0e7d722c6722e4f2094377 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 13:02:45 -0600 Subject: [PATCH] fix: clean up data migration code for testing --- Cargo.lock | 1 - one/src/lib.rs | 2 +- service/Cargo.toml | 1 - store/src/migration.rs | 203 +++++++++++++++++++++++----------- store/src/sql/entities/mod.rs | 1 - store/src/sql/query.rs | 3 + 6 files changed, 142 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0cfeb61f6..a3d182581 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1336,7 +1336,6 @@ dependencies = [ "hex", "ipld-core", "iroh-bitswap", - "itertools 0.12.1", "multibase 0.9.1", "multihash-codetable", "paste", diff --git a/one/src/lib.rs b/one/src/lib.rs index 67ef1521f..48a465214 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -404,7 +404,7 @@ impl Daemon { info!("applying data migrations"); migrator.run_all().await?; } else { - warn!("data migrations are required, but --migrate-data is not set"); + warn!("Data migrations are required, but --migrate-data is not set. Run with --migrate-data to apply migrations. Before doing so, you should back up your sqlite files from {:?}", dir); return Ok(()); } } diff --git a/service/Cargo.toml b/service/Cargo.toml index 7fd148464..b68033ba9 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -19,7 +19,6 @@ cid.workspace = true hex.workspace = true ipld-core.workspace = true iroh-bitswap.workspace = true -itertools = "0.12" multihash-codetable.workspace = true recon.workspace = true thiserror.workspace = true diff --git a/store/src/migration.rs b/store/src/migration.rs index 73a1631e8..4d08643d7 100644 --- a/store/src/migration.rs +++ b/store/src/migration.rs @@ -1,15 +1,16 @@ use std::sync::OnceLock; use sqlx::{prelude::FromRow, types::chrono}; -use tracing::info; +use tracing::{debug, info}; use crate::{ - sql::entities::{EventCid, EventHeader, ReconEventBlockRaw}, + sql::entities::{EventHeader, ReconEventBlockRaw}, CeramicOneStream, Error, EventInsertableBody, Result, SqlitePool, }; static MIGRATIONS: OnceLock> = OnceLock::new(); +#[derive(Debug)] struct Migration { /// The name of the migration. name: &'static str, @@ -19,16 +20,18 @@ struct Migration { /// The list of migrations that need to be run in order to get the database up to date and start the server. /// Add new migrations to the end of the list. -fn required_migrations() -> &'static Vec { +fn data_migrations() -> &'static Vec { MIGRATIONS.get_or_init(|| { vec![Migration { name: "events_to_streams", - _version: "0.23.0", + _version: "0.22.0", }] }) } #[derive(Debug, Clone, sqlx::FromRow)] +// We want to retrieve these fields for logging but we don't refer to them directly +#[allow(dead_code)] struct Version { id: i64, version: String, @@ -43,6 +46,7 @@ pub struct DataMigrator { } impl DataMigrator { + /// Create a new data migrator. This updates the version table with the current version and determine which migrations need to be run. pub async fn try_new(pool: SqlitePool) -> Result { let current_version = env!("CARGO_PKG_VERSION").to_string(); @@ -60,8 +64,9 @@ impl DataMigrator { .await?; let applied_migrations = DataMigration::fetch_all(&pool).await?; + debug!(?prev_version, %current_version, ?applied_migrations, "Current data migration status"); // In the future, we can filter out migrations that are not required based on the version as well - let required_migrations = required_migrations() + let required_migrations = data_migrations() .iter() .flat_map(|candidate| { if applied_migrations @@ -77,6 +82,8 @@ impl DataMigrator { }) .collect(); + tracing::debug!("required migrations: {:?}", required_migrations); + Ok(Self { prev_version, required_migrations, @@ -103,10 +110,10 @@ impl DataMigrator { /// the new feature that tracks versions, or if it is a fresh install. We use the presence of recon event data to indicate it's not new. async fn is_new_install(&self) -> Result { if self.prev_version.is_none() { - let x = sqlx::query(r#"SELECT cid from ceramic_one_event limit 1;"#) + let row = sqlx::query(r#"SELECT cid as rowid from ceramic_one_event;"#) .fetch_optional(self.pool.reader()) .await?; - Ok(x.is_some()) + Ok(row.is_none()) } else { Ok(false) } @@ -115,6 +122,7 @@ impl DataMigrator { /// Run all migrations that have not been run yet. pub async fn run_all(&self) -> Result<()> { for migration_info in &self.required_migrations { + info!("Starting migration: {}", migration_info.name); DataMigration::upsert(&self.pool, migration_info.name).await?; self.run_migration_by_name(migration_info.name).await?; DataMigration::mark_completed(&self.pool, migration_info.name).await?; @@ -123,84 +131,139 @@ impl DataMigrator { } async fn run_migration_by_name(&self, name: &str) -> Result<()> { - info!("Starting migration: {}", name); - match name { + let res = match name { "events_to_streams" => self.migrate_events_to_streams().await, - _ => Err(Error::new_fatal(anyhow::anyhow!( - "Unknown migration: {}", - name - ))), + _ => { + return Err(Error::new_fatal(anyhow::anyhow!( + "Unknown migration: {}", + name + ))) + } + }; + match res { + Ok(_) => { + info!("Migration {} completed successfully", name); + Ok(()) + } + Err(e) => { + tracing::error!("Migration encountered error: {:?}", e); + match e { + Error::Fatal { error } => Err(Error::new_fatal(anyhow::anyhow!( + "Migration {} failed in irrecoverable way: {}", + name, + error + ))), + + e => { + let err = e.context("Migration failed but can be retried"); + Err(err) + } + } + } } } // This isn't the most efficient approach but it's simple and we should only run it once. // It isn't expected to ever be run on something that isn't a sqlite database upgrading from version 0.22.0 or below. async fn migrate_events_to_streams(&self) -> Result<()> { - let mut cid_cursor = Some(EventCid::default()); + let mut cid_cursor = Some(0); - while let Some(last_cid) = cid_cursor { - cid_cursor = self.migrate_events_to_streams_batch(last_cid).await?; + struct EventToStreamRow { + row_blocks: ReconEventBlockRaw, + row_id: i64, + } + use sqlx::Row; + impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for EventToStreamRow { + fn from_row(row: &sqlx::sqlite::SqliteRow) -> std::result::Result { + let row_id: i64 = row.try_get("rowid")?; + let row_blocks = ReconEventBlockRaw::from_row(row)?; + Ok(Self { row_id, row_blocks }) + } } - Ok(()) - } - - async fn migrate_events_to_streams_batch( - &self, - last_cid: EventCid, - ) -> Result> { - let all_blocks: Vec = sqlx::query_as( - r#"SELECT - key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes - FROM ( - SELECT - e.cid as event_cid, e.order_key - FROM ceramic_one_event e - WHERE - EXISTS (SELECT 1 FROM ceramic_one_event_block where event_cid = e.cid) - AND NOT EXISTS (SELECT 1 from ceramic_one_event_metadata where cid = e.cid) - AND e.cid > $1 - ORDER BY e.cid - LIMIT 1000 - ) key - JOIN - ceramic_one_event_block eb ON key.event_cid = eb.event_cid - JOIN ceramic_one_block b on b.multihash = eb.block_multihash - ORDER BY key.order_key, eb.idx;"#, - ) - .bind(last_cid.to_bytes()) - .fetch_all(self.pool.reader()) - .await?; + let limit = 1000; + let mut total_migrated = 0; + while let Some(last_cid) = cid_cursor { + let mut tx = self.pool.begin_tx().await?; + + // RowID starts from 1 so we can just use greater than 0 to start + let all_blocks: Vec = sqlx::query_as( + r#"WITH key AS ( + SELECT + e.cid AS event_cid, e.order_key, e.rowid + FROM ceramic_one_event e + WHERE + EXISTS ( + SELECT 1 + FROM ceramic_one_event_block + WHERE event_cid = e.cid + ) + AND NOT EXISTS ( + SELECT 1 + FROM ceramic_one_event_metadata + WHERE cid = e.cid + ) + AND e.rowid > $1 + ORDER BY e.rowid + LIMIT $2 + ) + SELECT + key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes, key.rowid + FROM key + JOIN ceramic_one_event_block eb ON key.event_cid = eb.event_cid + JOIN ceramic_one_block b ON b.multihash = eb.block_multihash + ORDER BY key.order_key, eb.idx;"#, + ) + .bind(last_cid) + .bind(limit) + .fetch_all(&mut **tx.inner()) + .await?; - let values = ReconEventBlockRaw::into_carfiles(all_blocks).await?; + let new_max = all_blocks.iter().map(|v| v.row_id).max(); + if all_blocks.is_empty() || new_max.is_none() { + tx.commit().await?; + return Ok(()); + } else { + cid_cursor = new_max; + } - let last_cid = values.last().and_then(|(id, _)| id.cid()); - if last_cid.is_none() { - return Ok(None); - } - let mut tx = self.pool.begin_tx().await?; - for (event_id, payload) in values { - // should we log and continue? this shouldn't be possible unless something bad happened - // if we error, will require manual intervention to recover - let event_cid = event_id.cid().ok_or_else(|| { - Error::new_fatal(anyhow::anyhow!("Event ID is missing a CID: {}", event_id)) - })?; - - let insertable = EventInsertableBody::try_from_carfile(event_cid, &payload).await?; - - if let EventHeader::Init { header, .. } = &insertable.header { - CeramicOneStream::insert_tx(&mut tx, insertable.cid, header).await?; + let values = ReconEventBlockRaw::into_carfiles( + all_blocks.into_iter().map(|b| b.row_blocks).collect(), + ) + .await?; + total_migrated += values.len(); + if total_migrated % 10000 == 0 { + debug!("Migrated {} events to the stream format", total_migrated); } + tracing::trace!("found {} values to migrate", values.len()); - CeramicOneStream::insert_event_header_tx(&mut tx, &insertable.header).await?; + for (event_id, payload) in values { + tracing::trace!("Migrating event: {}", event_id); + // should we log and continue if anything fails? It shouldn't be possible unless something bad happened + // and we allowed unexpected data into the system, but if we error, it will require manual intervention to recover + // as the bad data will need to be deleted by hand. temporary failures to write can be retried. + let event_cid = event_id.cid().ok_or_else(|| { + Error::new_fatal(anyhow::anyhow!("Event ID is missing a CID: {}", event_id)) + })?; + + let insertable = EventInsertableBody::try_from_carfile(event_cid, &payload).await?; + + if let EventHeader::Init { header, .. } = &insertable.header { + CeramicOneStream::insert_tx(&mut tx, insertable.cid, header).await?; + } + + CeramicOneStream::insert_event_header_tx(&mut tx, &insertable.header).await?; + } + tx.commit().await?; } - tx.commit().await?; - Ok(last_cid) + Ok(()) } } #[derive(Debug, Clone, FromRow)] +// we want to retrieve these fields for logging but we don't refer to them directly +#[allow(dead_code)] struct DataMigration { name: String, version: String, @@ -241,3 +304,13 @@ impl DataMigration { Ok(()) } } + +#[cfg(test)] +mod test { + #[tokio::test] + async fn correctly_skips_new_install() { + let pool = crate::sql::SqlitePool::connect_in_memory().await.unwrap(); + let migrator = crate::migration::DataMigrator::try_new(pool).await.unwrap(); + assert!(!migrator.needs_migration().await.unwrap()); + } +} diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index 12a2bda20..f03d7c9ce 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -16,4 +16,3 @@ pub use stream::{IncompleteStream, StreamEventRow, StreamRow}; pub use utils::{CountRow, DeliveredEventRow, OrderKey}; pub type StreamCid = cid::Cid; -pub type EventCid = cid::Cid; diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index df631d80c..81b517bc7 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -97,6 +97,9 @@ impl EventQuery { } /// Updates the delivered column in the event table so it can be set to the client + /// Requires 2 parameters: + /// $1 = delivered (i64) + /// $2 = cid (bytes) pub fn mark_ready_to_deliver() -> &'static str { "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2 and delivered is NULL;" }