Skip to content

Commit

Permalink
fix: determine new install status on init and take exclusive access w…
Browse files Browse the repository at this point in the history
…hile running

- looks like more than it is. pulled some queries into the version struct and adjusted signatures accordingly (&self/&mut self/Self, removed async)
- made things more retry friendly. fixed previous version query to exclude the current version and not error if inserting migration as completed a second time
  • Loading branch information
dav1do committed Jun 14, 2024
1 parent b2fbe70 commit e46e101
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 44 deletions.
4 changes: 2 additions & 2 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ impl Daemon {
);
});

let migrator = service.data_migrator().await?;
if migrator.needs_migration().await? {
let mut migrator = service.data_migrator().await?;
if migrator.needs_migration() {
if opts.migrate_data {
migrator.run_all().await?;
} else {
Expand Down
109 changes: 67 additions & 42 deletions store/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,33 @@ struct Version {
installed_at: chrono::NaiveDateTime,
}

impl Version {
async fn fetch_previous(pool: &SqlitePool, current_version: &str) -> Result<Option<Self>> {
Ok(sqlx::query_as(
"SELECT id, version, installed_at
FROM ceramic_one_version
WHERE version <> $1
ORDER BY installed_at DESC limit 1;",
)
.bind(current_version)
.fetch_optional(pool.reader())
.await?)
}

async fn insert_current(pool: &SqlitePool, current_version: &str) -> Result<()> {
sqlx::query(
"INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT DO NOTHING;",
)
.bind(current_version)
.execute(pool.writer())
.await?;
Ok(())
}
}

/// The data migrator is responsible for running data migrations the node requires as part of the verison upgrade.
#[derive(Debug)]
pub struct DataMigrator {
prev_version: Option<Version>,
required_migrations: Vec<&'static Migration>,
pool: SqlitePool,
}
Expand All @@ -41,24 +65,30 @@ impl DataMigrator {
/// Create a new data migrator. This updates the version table with the current version and determine which migrations need to be run.
pub async fn try_new(pool: SqlitePool) -> Result<Self> {
let current_version = env!("CARGO_PKG_VERSION").to_string();
let prev_version = Version::fetch_previous(&pool, &current_version).await?;

let prev_version: Option<Version> = sqlx::query_as(
"SELECT id, version, installed_at FROM ceramic_one_version ORDER BY installed_at DESC limit 1;",
)
.fetch_optional(pool.reader())
.await?;
Version::insert_current(&pool, &current_version).await?;

sqlx::query(
"INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT DO NOTHING;",
)
.bind(&current_version)
.execute(pool.writer())
.await?;
let required_migrations =
Self::determine_required_migrations(&pool, prev_version, &current_version).await?;

let applied_migrations = DataMigration::fetch_all(&pool).await?;
Ok(Self {
required_migrations,
pool,
})
}

async fn determine_required_migrations(
pool: &SqlitePool,
prev_version: Option<Version>,
current_version: &str,
) -> Result<Vec<&'static Migration>> {
let new_install = Self::is_new_install(pool, &prev_version, current_version).await?;

let applied_migrations = DataMigration::fetch_all(pool).await?;
debug!(?prev_version, %current_version, ?applied_migrations, "Current data migration status");
// In the future, we can filter out migrations that are not required based on the version as well
let required_migrations = data_migrations()
let unapplied_migrations = data_migrations()
.iter()
.flat_map(|candidate| {
if applied_migrations
Expand All @@ -72,38 +102,33 @@ impl DataMigrator {
Some(candidate)
}
})
.collect();

tracing::debug!("required migrations: {:?}", required_migrations);

Ok(Self {
prev_version,
required_migrations,
pool,
})
}

/// Determines whether migrations are needed. Will mark all migrations as "complete" on a fresh install.
pub async fn needs_migration(&self) -> Result<bool> {
let new_install = self.is_new_install().await?;

.collect::<Vec<_>>();
if new_install {
debug!("Setting up new node... data migrations are not required.");
for migration_info in &self.required_migrations {
DataMigration::insert_completed(&self.pool, migration_info.name).await?;
for migration_info in &unapplied_migrations {
DataMigration::insert_completed(pool, migration_info.name).await?;
}
return Ok(false);
Ok(Vec::new())
} else {
Ok(unapplied_migrations)
}
}

Ok(!self.required_migrations.is_empty())
/// Determines whether migrations are needed
pub fn needs_migration(&self) -> bool {
!self.required_migrations.is_empty()
}

/// In the future, we can check the version and run migrations based on that. For now, we need to know if this is just getting
/// the new feature that tracks versions, or if it is a fresh install. We use the presence of recon event data to indicate it's not new.
async fn is_new_install(&self) -> Result<bool> {
if self.prev_version.is_none() {
async fn is_new_install(
pool: &SqlitePool,
prev_version: &Option<Version>,
_current_version: &str,
) -> Result<bool> {
if prev_version.is_none() {
let row = sqlx::query(r#"SELECT cid as rowid from ceramic_one_event;"#)
.fetch_optional(self.pool.reader())
.fetch_optional(pool.reader())
.await?;
Ok(row.is_none())
} else {
Expand All @@ -112,10 +137,10 @@ impl DataMigrator {
}

/// Run all migrations that have not been run yet.
pub async fn run_all(&self) -> Result<()> {
pub async fn run_all(&mut self) -> Result<()> {
for migration_info in &self.required_migrations {
info!("Starting migration: {}", migration_info.name);
DataMigration::upsert(&self.pool, migration_info.name).await?;
DataMigration::upsert_as_started(&self.pool, migration_info.name).await?;
self.run_migration_by_name(migration_info.name).await?;
DataMigration::mark_completed(&self.pool, migration_info.name).await?;
}
Expand Down Expand Up @@ -177,7 +202,7 @@ impl DataMigration {
.await?)
}

async fn upsert(pool: &SqlitePool, name: &str) -> Result<()> {
async fn upsert_as_started(pool: &SqlitePool, name: &str) -> Result<()> {
sqlx::query("INSERT INTO ceramic_one_data_migration (name, version) VALUES ($1, $2) on conflict (name) do update set last_attempted_at = CURRENT_TIMESTAMP;")
.bind(name)
.bind(env!("CARGO_PKG_VERSION"))
Expand All @@ -187,7 +212,7 @@ impl DataMigration {
}

async fn insert_completed(pool: &SqlitePool, name: &str) -> Result<()> {
sqlx::query("INSERT INTO ceramic_one_data_migration (name, version, completed_at) VALUES ($1, $2, CURRENT_TIMESTAMP);")
sqlx::query("INSERT INTO ceramic_one_data_migration (name, version, completed_at) VALUES ($1, $2, CURRENT_TIMESTAMP) on conflict (name) do nothing;")
.bind(name)
.bind(env!("CARGO_PKG_VERSION"))
.execute(pool.writer())
Expand All @@ -196,7 +221,7 @@ impl DataMigration {
}

async fn mark_completed(pool: &SqlitePool, name: &str) -> Result<()> {
sqlx::query("UPDATE ceramic_one_data_migration SET completed_at = CURRENT_TIMESTAMP WHERE name = $1;").bind(name).execute(pool.writer()).await?;
sqlx::query("UPDATE ceramic_one_data_migration SET completed_at = CURRENT_TIMESTAMP WHERE name = $1 and completed_at IS NULL;").bind(name).execute(pool.writer()).await?;
Ok(())
}
}
Expand All @@ -207,6 +232,6 @@ mod test {
async fn correctly_skips_new_install() {
let pool = crate::sql::SqlitePool::connect_in_memory().await.unwrap();
let migrator = crate::migration::DataMigrator::try_new(pool).await.unwrap();
assert!(!migrator.needs_migration().await.unwrap());
assert!(!migrator.needs_migration());
}
}

0 comments on commit e46e101

Please sign in to comment.