From e46e101cbf29880b94099ac827201556d989d191 Mon Sep 17 00:00:00 2001 From: David Estes Date: Fri, 14 Jun 2024 15:35:45 -0600 Subject: [PATCH] fix: determine new install status on init and take exclusive access while running - looks like more than it is. pulled some queries into the version struct and adjusted signatures accordingly (&self/&mut self/Self, removed async) - made things more retry friendly. fixed previous version query to exclude the current version and not error if inserting migration as completed a second time --- one/src/lib.rs | 4 +- store/src/migration.rs | 109 +++++++++++++++++++++++++---------------- 2 files changed, 69 insertions(+), 44 deletions(-) diff --git a/one/src/lib.rs b/one/src/lib.rs index 5b4587908..159be9e01 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -398,8 +398,8 @@ impl Daemon { ); }); - let migrator = service.data_migrator().await?; - if migrator.needs_migration().await? { + let mut migrator = service.data_migrator().await?; + if migrator.needs_migration() { if opts.migrate_data { migrator.run_all().await?; } else { diff --git a/store/src/migration.rs b/store/src/migration.rs index 9dbbcecbd..0664fb2b4 100644 --- a/store/src/migration.rs +++ b/store/src/migration.rs @@ -30,9 +30,33 @@ struct Version { installed_at: chrono::NaiveDateTime, } +impl Version { + async fn fetch_previous(pool: &SqlitePool, current_version: &str) -> Result> { + Ok(sqlx::query_as( + "SELECT id, version, installed_at + FROM ceramic_one_version + WHERE version <> $1 + ORDER BY installed_at DESC limit 1;", + ) + .bind(current_version) + .fetch_optional(pool.reader()) + .await?) + } + + async fn insert_current(pool: &SqlitePool, current_version: &str) -> Result<()> { + sqlx::query( + "INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT DO NOTHING;", + ) + .bind(current_version) + .execute(pool.writer()) + .await?; + Ok(()) + } +} + /// The data migrator is responsible for running data migrations the node requires as part of the verison upgrade. +#[derive(Debug)] pub struct DataMigrator { - prev_version: Option, required_migrations: Vec<&'static Migration>, pool: SqlitePool, } @@ -41,24 +65,30 @@ impl DataMigrator { /// Create a new data migrator. This updates the version table with the current version and determine which migrations need to be run. pub async fn try_new(pool: SqlitePool) -> Result { let current_version = env!("CARGO_PKG_VERSION").to_string(); + let prev_version = Version::fetch_previous(&pool, ¤t_version).await?; - let prev_version: Option = sqlx::query_as( - "SELECT id, version, installed_at FROM ceramic_one_version ORDER BY installed_at DESC limit 1;", - ) - .fetch_optional(pool.reader()) - .await?; + Version::insert_current(&pool, ¤t_version).await?; - sqlx::query( - "INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT DO NOTHING;", - ) - .bind(¤t_version) - .execute(pool.writer()) - .await?; + let required_migrations = + Self::determine_required_migrations(&pool, prev_version, ¤t_version).await?; - let applied_migrations = DataMigration::fetch_all(&pool).await?; + Ok(Self { + required_migrations, + pool, + }) + } + + async fn determine_required_migrations( + pool: &SqlitePool, + prev_version: Option, + current_version: &str, + ) -> Result> { + let new_install = Self::is_new_install(pool, &prev_version, current_version).await?; + + let applied_migrations = DataMigration::fetch_all(pool).await?; debug!(?prev_version, %current_version, ?applied_migrations, "Current data migration status"); // In the future, we can filter out migrations that are not required based on the version as well - let required_migrations = data_migrations() + let unapplied_migrations = data_migrations() .iter() .flat_map(|candidate| { if applied_migrations @@ -72,38 +102,33 @@ impl DataMigrator { Some(candidate) } }) - .collect(); - - tracing::debug!("required migrations: {:?}", required_migrations); - - Ok(Self { - prev_version, - required_migrations, - pool, - }) - } - - /// Determines whether migrations are needed. Will mark all migrations as "complete" on a fresh install. - pub async fn needs_migration(&self) -> Result { - let new_install = self.is_new_install().await?; - + .collect::>(); if new_install { debug!("Setting up new node... data migrations are not required."); - for migration_info in &self.required_migrations { - DataMigration::insert_completed(&self.pool, migration_info.name).await?; + for migration_info in &unapplied_migrations { + DataMigration::insert_completed(pool, migration_info.name).await?; } - return Ok(false); + Ok(Vec::new()) + } else { + Ok(unapplied_migrations) } + } - Ok(!self.required_migrations.is_empty()) + /// Determines whether migrations are needed + pub fn needs_migration(&self) -> bool { + !self.required_migrations.is_empty() } /// In the future, we can check the version and run migrations based on that. For now, we need to know if this is just getting /// the new feature that tracks versions, or if it is a fresh install. We use the presence of recon event data to indicate it's not new. - async fn is_new_install(&self) -> Result { - if self.prev_version.is_none() { + async fn is_new_install( + pool: &SqlitePool, + prev_version: &Option, + _current_version: &str, + ) -> Result { + if prev_version.is_none() { let row = sqlx::query(r#"SELECT cid as rowid from ceramic_one_event;"#) - .fetch_optional(self.pool.reader()) + .fetch_optional(pool.reader()) .await?; Ok(row.is_none()) } else { @@ -112,10 +137,10 @@ impl DataMigrator { } /// Run all migrations that have not been run yet. - pub async fn run_all(&self) -> Result<()> { + pub async fn run_all(&mut self) -> Result<()> { for migration_info in &self.required_migrations { info!("Starting migration: {}", migration_info.name); - DataMigration::upsert(&self.pool, migration_info.name).await?; + DataMigration::upsert_as_started(&self.pool, migration_info.name).await?; self.run_migration_by_name(migration_info.name).await?; DataMigration::mark_completed(&self.pool, migration_info.name).await?; } @@ -177,7 +202,7 @@ impl DataMigration { .await?) } - async fn upsert(pool: &SqlitePool, name: &str) -> Result<()> { + async fn upsert_as_started(pool: &SqlitePool, name: &str) -> Result<()> { sqlx::query("INSERT INTO ceramic_one_data_migration (name, version) VALUES ($1, $2) on conflict (name) do update set last_attempted_at = CURRENT_TIMESTAMP;") .bind(name) .bind(env!("CARGO_PKG_VERSION")) @@ -187,7 +212,7 @@ impl DataMigration { } async fn insert_completed(pool: &SqlitePool, name: &str) -> Result<()> { - sqlx::query("INSERT INTO ceramic_one_data_migration (name, version, completed_at) VALUES ($1, $2, CURRENT_TIMESTAMP);") + sqlx::query("INSERT INTO ceramic_one_data_migration (name, version, completed_at) VALUES ($1, $2, CURRENT_TIMESTAMP) on conflict (name) do nothing;") .bind(name) .bind(env!("CARGO_PKG_VERSION")) .execute(pool.writer()) @@ -196,7 +221,7 @@ impl DataMigration { } async fn mark_completed(pool: &SqlitePool, name: &str) -> Result<()> { - sqlx::query("UPDATE ceramic_one_data_migration SET completed_at = CURRENT_TIMESTAMP WHERE name = $1;").bind(name).execute(pool.writer()).await?; + sqlx::query("UPDATE ceramic_one_data_migration SET completed_at = CURRENT_TIMESTAMP WHERE name = $1 and completed_at IS NULL;").bind(name).execute(pool.writer()).await?; Ok(()) } } @@ -207,6 +232,6 @@ mod test { async fn correctly_skips_new_install() { let pool = crate::sql::SqlitePool::connect_in_memory().await.unwrap(); let migrator = crate::migration::DataMigrator::try_new(pool).await.unwrap(); - assert!(!migrator.needs_migration().await.unwrap()); + assert!(!migrator.needs_migration()); } }