Skip to content

Commit

Permalink
wip: run data migrations on daemon start with cli flag
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Jun 13, 2024
1 parent 4201fa8 commit 6ee824a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
26 changes: 23 additions & 3 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use ceramic_core::{EventId, Interest};
use ceramic_kubo_rpc::Multiaddr;
use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig};
use ceramic_service::{CeramicEventService, CeramicInterestService};
use ceramic_service::{CeramicEventService, CeramicInterestService, CeramicService};
use clap::{Args, Parser, Subcommand, ValueEnum};
use futures::StreamExt;
use multibase::Base;
Expand Down Expand Up @@ -157,6 +157,10 @@ struct DaemonOpts {
/// The default is to use `db.sqlite3` in the store directory.
#[arg(long, env = "CERAMIC_ONE_DATABASE_URL")]
database_url: Option<String>,

#[arg(long, default_value_t = false, env = "CERAMIC_ONE_MIGRATE_DATA")]
/// Whether to apply any required data migrations at startup. If false and migrations are required, the server will exit.
migrate_data: bool,
}

#[derive(ValueEnum, Debug, Clone, Default)]
Expand Down Expand Up @@ -280,11 +284,13 @@ impl DaemonOpts {
async fn build_sqlite_dbs(path: &str) -> Result<Databases> {
let sql_pool =
ceramic_store::SqlitePool::connect(path, ceramic_store::Migrations::Apply).await?;
let interest_store = Arc::new(CeramicInterestService::new(sql_pool.clone()));
let event_store = Arc::new(CeramicEventService::new(sql_pool).await?);
let ceramic_service = CeramicService::try_new(sql_pool).await?;
let interest_store = ceramic_service.interest_service().to_owned();
let event_store = ceramic_service.event_service().to_owned();
println!("Connected to sqlite database: {}", path);

Ok(Databases::Sqlite(SqliteBackend {
ceramic_service,
event_store,
interest_store,
}))
Expand All @@ -295,6 +301,7 @@ enum Databases {
Sqlite(SqliteBackend),
}
struct SqliteBackend {
ceramic_service: CeramicService,
interest_store: Arc<CeramicInterestService>,
event_store: Arc<CeramicEventService>,
}
Expand All @@ -312,6 +319,7 @@ impl Daemon {
Databases::Sqlite(db) => {
Daemon::run_int(
opts,
db.ceramic_service,
db.interest_store.clone(),
db.interest_store,
db.event_store.clone(),
Expand All @@ -325,6 +333,7 @@ impl Daemon {

async fn run_int<I1, I2, E1, E2, E3>(
opts: DaemonOpts,
service: CeramicService,
interest_api_store: Arc<I1>,
interest_recon_store: Arc<I2>,
model_api_store: Arc<E1>,
Expand Down Expand Up @@ -389,6 +398,17 @@ impl Daemon {
);
});

let migrator = service.data_migrator().await?;
if migrator.needs_migration().await? {
if opts.migrate_data {
info!("applying data migrations");
migrator.run_all().await?;
} else {
warn!("data migrations are required, but --migrate-data is not set");
return Ok(());
}
}

let p2p_config = Libp2pConfig {
mdns: false,
bitswap_server: false,
Expand Down
1 change: 1 addition & 0 deletions service/src/interest/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use ceramic_store::SqlitePool;

/// A Service that understands how to process and store Ceramic Interests.
/// Implements the [`recon::Store`], [`iroh_bitswap::Store`], and [`ceramic_api::EventStore`] traits for [`ceramic_core::Interest`].
#[derive(Debug, Clone)]
pub struct CeramicInterestService {
pub(crate) pool: SqlitePool,
}
Expand Down
37 changes: 37 additions & 0 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,48 @@
mod error;
mod event;
mod interest;

#[cfg(test)]
mod tests;

use std::sync::Arc;

use ceramic_store::{DataMigrator, SqlitePool};
pub use error::Error;
pub use event::CeramicEventService;
pub use interest::CeramicInterestService;

pub(crate) type Result<T> = std::result::Result<T, Error>;

/// The ceramic service holds the logic needed by the other components (e.g. api, recon) to access the store and process events
/// in a way that makes sense to the ceramic protocol, and not just as raw bytes.
#[derive(Debug)]
pub struct CeramicService {
pub(crate) interest: Arc<CeramicInterestService>,
pub(crate) event: Arc<CeramicEventService>,
}

impl CeramicService {
/// Create a new CeramicService
pub async fn try_new(pool: SqlitePool) -> Result<Self> {
let interest = Arc::new(CeramicInterestService::new(pool.clone()));
let event = Arc::new(CeramicEventService::new(pool).await?);
Ok(Self { interest, event })
}

/// Get the interest service
pub fn interest_service(&self) -> &Arc<CeramicInterestService> {
&self.interest
}

/// Get the event service
pub fn event_service(&self) -> &Arc<CeramicEventService> {
&self.event
}

/// Get the data migrator
pub async fn data_migrator(&self) -> Result<DataMigrator> {
let m = DataMigrator::try_new(self.event.pool.clone()).await?;
Ok(m)
}
}
4 changes: 2 additions & 2 deletions store/src/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ impl CeramicOneEvent {
///
/// IMPORTANT:
/// It is the caller's responsibility to order events marked deliverable correctly.
/// That is, events will be processed in the order they are given so earlier events are given a lower global ordering
/// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events
/// That is, events will be processed in the order they are given so earlier events are given a lower global ordering
/// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events
/// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers.
pub async fn insert_many(
pool: &SqlitePool,
Expand Down

0 comments on commit 6ee824a

Please sign in to comment.