Skip to content

Commit

Permalink
chore: write migration to populate new tables from blockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Jun 14, 2024
1 parent 100185b commit d7e194b
Showing 1 changed file with 110 additions and 6 deletions.
116 changes: 110 additions & 6 deletions store/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::sync::OnceLock;
use sqlx::{prelude::FromRow, types::chrono};
use tracing::{debug, info};

use crate::{Error, Result, SqlitePool};
use crate::{
sql::entities::{EventHeader, ReconEventBlockRaw},
CeramicOneStream, Error, EventInsertableBody, Result, SqlitePool,
};

static MIGRATIONS: OnceLock<Vec<Migration>> = OnceLock::new();

Expand All @@ -18,7 +21,12 @@ struct Migration {
/// The list of migrations that need to be run in order to get the database up to date and start the server.
/// Add new migrations to the end of the list.
fn data_migrations() -> &'static Vec<Migration> {
MIGRATIONS.get_or_init(Vec::new)
MIGRATIONS.get_or_init(|| {
vec![Migration {
name: "events_to_streams",
_version: "0.22.0",
}]
})
}

#[derive(Debug, Clone, sqlx::FromRow)]
Expand Down Expand Up @@ -123,17 +131,16 @@ impl DataMigrator {
}

async fn run_migration_by_name(&self, name: &str) -> Result<()> {
#[allow(clippy::match_single_binding)]
let _res: Result<()> = match name {
let res = match name {
"events_to_streams" => self.migrate_events_to_streams().await,
_ => {
return Err(Error::new_fatal(anyhow::anyhow!(
"Unknown migration: {}",
name
)))
}
};
#[allow(unreachable_code)]
match _res {
match res {
Ok(_) => {
info!("Migration {} completed successfully", name);
Ok(())
Expand All @@ -155,6 +162,103 @@ impl DataMigrator {
}
}
}

// This isn't the most efficient approach but it's simple and we should only run it once.
// It isn't expected to ever be run on something that isn't a sqlite database upgrading from version 0.22.0 or below.
async fn migrate_events_to_streams(&self) -> Result<()> {
let mut cid_cursor = Some(0);

struct EventToStreamRow {
row_blocks: ReconEventBlockRaw,
row_id: i64,
}
use sqlx::Row;
impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for EventToStreamRow {
fn from_row(row: &sqlx::sqlite::SqliteRow) -> std::result::Result<Self, sqlx::Error> {
let row_id: i64 = row.try_get("rowid")?;
let row_blocks = ReconEventBlockRaw::from_row(row)?;
Ok(Self { row_id, row_blocks })
}
}

let limit = 1000;
let mut total_migrated = 0;
while let Some(last_cid) = cid_cursor {
let mut tx = self.pool.begin_tx().await?;

// RowID starts from 1 so we can just use greater than 0 to start
let all_blocks: Vec<EventToStreamRow> = sqlx::query_as(
r#"WITH key AS (
SELECT
e.cid AS event_cid, e.order_key, e.rowid
FROM ceramic_one_event e
WHERE
EXISTS (
SELECT 1
FROM ceramic_one_event_block
WHERE event_cid = e.cid
)
AND NOT EXISTS (
SELECT 1
FROM ceramic_one_event_metadata
WHERE cid = e.cid
)
AND e.rowid > $1
ORDER BY e.rowid
LIMIT $2
)
SELECT
key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes, key.rowid
FROM key
JOIN ceramic_one_event_block eb ON key.event_cid = eb.event_cid
JOIN ceramic_one_block b ON b.multihash = eb.block_multihash
ORDER BY key.order_key, eb.idx;"#,
)
.bind(last_cid)
.bind(limit)
.fetch_all(&mut **tx.inner())
.await?;

let new_max = all_blocks.iter().map(|v| v.row_id).max();
if all_blocks.is_empty() || new_max.is_none() {
tx.commit().await?;
return Ok(());
} else {
cid_cursor = new_max;
}

let values = ReconEventBlockRaw::into_carfiles(
all_blocks.into_iter().map(|b| b.row_blocks).collect(),
)
.await?;
total_migrated += values.len();
if total_migrated % 10000 == 0 {
debug!("Migrated {} events to the stream format", total_migrated);
}
tracing::trace!("found {} values to migrate", values.len());

for (event_id, payload) in values {
tracing::trace!("Migrating event: {}", event_id);
// should we log and continue if anything fails? It shouldn't be possible unless something bad happened
// and we allowed unexpected data into the system, but if we error, it will require manual intervention to recover
// as the bad data will need to be deleted by hand. temporary failures to write can be retried.
let event_cid = event_id.cid().ok_or_else(|| {
Error::new_fatal(anyhow::anyhow!("Event ID is missing a CID: {}", event_id))
})?;

let insertable = EventInsertableBody::try_from_carfile(event_cid, &payload).await?;

if let EventHeader::Init { header, .. } = &insertable.header {
CeramicOneStream::insert_tx(&mut tx, insertable.cid, header).await?;
}

CeramicOneStream::insert_event_header_tx(&mut tx, &insertable.header).await?;
}
tx.commit().await?;
}

Ok(())
}
}

#[derive(Debug, Clone, FromRow)]
Expand Down

0 comments on commit d7e194b

Please sign in to comment.