Skip to content

Commit

Permalink
fix: clean up data migration code for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Jun 13, 2024
1 parent 49c1f66 commit 636fcb1
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 69 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl Daemon {
info!("applying data migrations");
migrator.run_all().await?;
} else {
warn!("data migrations are required, but --migrate-data is not set");
warn!("Data migrations are required, but --migrate-data is not set. Run with --migrate-data to apply migrations. Before doing so, you should back up your sqlite files from {:?}", dir);
return Ok(());
}
}
Expand Down
1 change: 0 additions & 1 deletion service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ cid.workspace = true
hex.workspace = true
ipld-core.workspace = true
iroh-bitswap.workspace = true
itertools = "0.12"
multihash-codetable.workspace = true
recon.workspace = true
thiserror.workspace = true
Expand Down
203 changes: 138 additions & 65 deletions store/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::sync::OnceLock;

use sqlx::{prelude::FromRow, types::chrono};
use tracing::info;
use tracing::{debug, info};

use crate::{
sql::entities::{EventCid, EventHeader, ReconEventBlockRaw},
sql::entities::{EventHeader, ReconEventBlockRaw},
CeramicOneStream, Error, EventInsertableBody, Result, SqlitePool,
};

static MIGRATIONS: OnceLock<Vec<Migration>> = OnceLock::new();

#[derive(Debug)]
struct Migration {
/// The name of the migration.
name: &'static str,
Expand All @@ -19,16 +20,18 @@ struct Migration {

/// The list of migrations that need to be run in order to get the database up to date and start the server.
/// Add new migrations to the end of the list.
fn required_migrations() -> &'static Vec<Migration> {
fn data_migrations() -> &'static Vec<Migration> {
MIGRATIONS.get_or_init(|| {
vec![Migration {
name: "events_to_streams",
_version: "0.23.0",
_version: "0.22.0",
}]
})
}

#[derive(Debug, Clone, sqlx::FromRow)]
// We want to retrieve these fields for logging but we don't refer to them directly
#[allow(dead_code)]
struct Version {
id: i64,
version: String,
Expand All @@ -43,6 +46,7 @@ pub struct DataMigrator {
}

impl DataMigrator {
/// Create a new data migrator. This updates the version table with the current version and determine which migrations need to be run.
pub async fn try_new(pool: SqlitePool) -> Result<Self> {
let current_version = env!("CARGO_PKG_VERSION").to_string();

Expand All @@ -60,8 +64,9 @@ impl DataMigrator {
.await?;

let applied_migrations = DataMigration::fetch_all(&pool).await?;
debug!(?prev_version, %current_version, ?applied_migrations, "Current data migration status");
// In the future, we can filter out migrations that are not required based on the version as well
let required_migrations = required_migrations()
let required_migrations = data_migrations()
.iter()
.flat_map(|candidate| {
if applied_migrations
Expand All @@ -77,6 +82,8 @@ impl DataMigrator {
})
.collect();

tracing::debug!("required migrations: {:?}", required_migrations);

Ok(Self {
prev_version,
required_migrations,
Expand All @@ -103,10 +110,10 @@ impl DataMigrator {
/// the new feature that tracks versions, or if it is a fresh install. We use the presence of recon event data to indicate it's not new.
async fn is_new_install(&self) -> Result<bool> {
if self.prev_version.is_none() {
let x = sqlx::query(r#"SELECT cid from ceramic_one_event limit 1;"#)
let row = sqlx::query(r#"SELECT cid as rowid from ceramic_one_event;"#)
.fetch_optional(self.pool.reader())
.await?;
Ok(x.is_some())
Ok(row.is_none())
} else {
Ok(false)
}
Expand All @@ -115,6 +122,7 @@ impl DataMigrator {
/// Run all migrations that have not been run yet.
pub async fn run_all(&self) -> Result<()> {
for migration_info in &self.required_migrations {
info!("Starting migration: {}", migration_info.name);
DataMigration::upsert(&self.pool, migration_info.name).await?;
self.run_migration_by_name(migration_info.name).await?;
DataMigration::mark_completed(&self.pool, migration_info.name).await?;
Expand All @@ -123,84 +131,139 @@ impl DataMigrator {
}

async fn run_migration_by_name(&self, name: &str) -> Result<()> {
info!("Starting migration: {}", name);
match name {
let res = match name {
"events_to_streams" => self.migrate_events_to_streams().await,
_ => Err(Error::new_fatal(anyhow::anyhow!(
"Unknown migration: {}",
name
))),
_ => {
return Err(Error::new_fatal(anyhow::anyhow!(
"Unknown migration: {}",
name
)))
}
};
match res {
Ok(_) => {
info!("Migration {} completed successfully", name);
Ok(())
}
Err(e) => {
tracing::error!("Migration encountered error: {:?}", e);
match e {
Error::Fatal { error } => Err(Error::new_fatal(anyhow::anyhow!(
"Migration {} failed in irrecoverable way: {}",
name,
error
))),

e => {
let err = e.context("Migration failed but can be retried");
Err(err)
}
}
}
}
}

// This isn't the most efficient approach but it's simple and we should only run it once.
// It isn't expected to ever be run on something that isn't a sqlite database upgrading from version 0.22.0 or below.
async fn migrate_events_to_streams(&self) -> Result<()> {
let mut cid_cursor = Some(EventCid::default());
let mut cid_cursor = Some(0);

while let Some(last_cid) = cid_cursor {
cid_cursor = self.migrate_events_to_streams_batch(last_cid).await?;
struct EventToStreamRow {
row_blocks: ReconEventBlockRaw,
row_id: i64,
}
use sqlx::Row;
impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for EventToStreamRow {
fn from_row(row: &sqlx::sqlite::SqliteRow) -> std::result::Result<Self, sqlx::Error> {
let row_id: i64 = row.try_get("rowid")?;
let row_blocks = ReconEventBlockRaw::from_row(row)?;
Ok(Self { row_id, row_blocks })
}
}

Ok(())
}

async fn migrate_events_to_streams_batch(
&self,
last_cid: EventCid,
) -> Result<Option<EventCid>> {
let all_blocks: Vec<ReconEventBlockRaw> = sqlx::query_as(
r#"SELECT
key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes
FROM (
SELECT
e.cid as event_cid, e.order_key
FROM ceramic_one_event e
WHERE
EXISTS (SELECT 1 FROM ceramic_one_event_block where event_cid = e.cid)
AND NOT EXISTS (SELECT 1 from ceramic_one_event_metadata where cid = e.cid)
AND e.cid > $1
ORDER BY e.cid
LIMIT 1000
) key
JOIN
ceramic_one_event_block eb ON key.event_cid = eb.event_cid
JOIN ceramic_one_block b on b.multihash = eb.block_multihash
ORDER BY key.order_key, eb.idx;"#,
)
.bind(last_cid.to_bytes())
.fetch_all(self.pool.reader())
.await?;
let limit = 1000;
let mut total_migrated = 0;
while let Some(last_cid) = cid_cursor {
let mut tx = self.pool.begin_tx().await?;

// RowID starts from 1 so we can just use greater than 0 to start
let all_blocks: Vec<EventToStreamRow> = sqlx::query_as(
r#"WITH key AS (
SELECT
e.cid AS event_cid, e.order_key, e.rowid
FROM ceramic_one_event e
WHERE
EXISTS (
SELECT 1
FROM ceramic_one_event_block
WHERE event_cid = e.cid
)
AND NOT EXISTS (
SELECT 1
FROM ceramic_one_event_metadata
WHERE cid = e.cid
)
AND e.rowid > $1
ORDER BY e.rowid
LIMIT $2
)
SELECT
key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes, key.rowid
FROM key
JOIN ceramic_one_event_block eb ON key.event_cid = eb.event_cid
JOIN ceramic_one_block b ON b.multihash = eb.block_multihash
ORDER BY key.order_key, eb.idx;"#,
)
.bind(last_cid)
.bind(limit)
.fetch_all(&mut **tx.inner())
.await?;

let values = ReconEventBlockRaw::into_carfiles(all_blocks).await?;
let new_max = all_blocks.iter().map(|v| v.row_id).max();
if all_blocks.is_empty() || new_max.is_none() {
tx.commit().await?;
return Ok(());
} else {
cid_cursor = new_max;
}

let last_cid = values.last().and_then(|(id, _)| id.cid());
if last_cid.is_none() {
return Ok(None);
}
let mut tx = self.pool.begin_tx().await?;
for (event_id, payload) in values {
// should we log and continue? this shouldn't be possible unless something bad happened
// if we error, will require manual intervention to recover
let event_cid = event_id.cid().ok_or_else(|| {
Error::new_fatal(anyhow::anyhow!("Event ID is missing a CID: {}", event_id))
})?;

let insertable = EventInsertableBody::try_from_carfile(event_cid, &payload).await?;

if let EventHeader::Init { header, .. } = &insertable.header {
CeramicOneStream::insert_tx(&mut tx, insertable.cid, header).await?;
let values = ReconEventBlockRaw::into_carfiles(
all_blocks.into_iter().map(|b| b.row_blocks).collect(),
)
.await?;
total_migrated += values.len();
if total_migrated % 10000 == 0 {
debug!("Migrated {} events to the stream format", total_migrated);
}
tracing::trace!("found {} values to migrate", values.len());

CeramicOneStream::insert_event_header_tx(&mut tx, &insertable.header).await?;
for (event_id, payload) in values {
tracing::trace!("Migrating event: {}", event_id);
// should we log and continue if anything fails? It shouldn't be possible unless something bad happened
// and we allowed unexpected data into the system, but if we error, it will require manual intervention to recover
// as the bad data will need to be deleted by hand. temporary failures to write can be retried.
let event_cid = event_id.cid().ok_or_else(|| {
Error::new_fatal(anyhow::anyhow!("Event ID is missing a CID: {}", event_id))
})?;

let insertable = EventInsertableBody::try_from_carfile(event_cid, &payload).await?;

if let EventHeader::Init { header, .. } = &insertable.header {
CeramicOneStream::insert_tx(&mut tx, insertable.cid, header).await?;
}

CeramicOneStream::insert_event_header_tx(&mut tx, &insertable.header).await?;
}
tx.commit().await?;
}
tx.commit().await?;

Ok(last_cid)
Ok(())
}
}

#[derive(Debug, Clone, FromRow)]
// we want to retrieve these fields for logging but we don't refer to them directly
#[allow(dead_code)]
struct DataMigration {
name: String,
version: String,
Expand Down Expand Up @@ -241,3 +304,13 @@ impl DataMigration {
Ok(())
}
}

#[cfg(test)]
mod test {
#[tokio::test]
async fn correctly_skips_new_install() {
let pool = crate::sql::SqlitePool::connect_in_memory().await.unwrap();
let migrator = crate::migration::DataMigrator::try_new(pool).await.unwrap();
assert!(!migrator.needs_migration().await.unwrap());
}
}
1 change: 0 additions & 1 deletion store/src/sql/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,3 @@ pub use stream::{IncompleteStream, StreamEventRow, StreamRow};
pub use utils::{CountRow, DeliveredEventRow, OrderKey};

pub type StreamCid = cid::Cid;
pub type EventCid = cid::Cid;
3 changes: 3 additions & 0 deletions store/src/sql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ impl EventQuery {
}

/// Updates the delivered column in the event table so it can be set to the client
/// Requires 2 parameters:
/// $1 = delivered (i64)
/// $2 = cid (bytes)
pub fn mark_ready_to_deliver() -> &'static str {
"UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2 and delivered is NULL;"
}
Expand Down

0 comments on commit 636fcb1

Please sign in to comment.