Skip to content

Commit

Permalink
feat: add support for local file object store
Browse files Browse the repository at this point in the history
Testing and developing ceramic-one is much easier with fewer
dependencies. This change makes it so that an s3 API compatible object
store is not required. Rather objects can be stored on local disk.
  • Loading branch information
nathanielc committed Dec 3, 2024
1 parent a390db6 commit d9968c0
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion flight/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ async fn start_server(feed: MockFeed) -> FlightSqlServiceClient<Channel> {
let ctx = ceramic_pipeline::session_from_config(ceramic_pipeline::Config {
conclusion_feed: feed.into(),
object_store: Arc::new(object_store::memory::InMemory::new()),
object_store_bucket_name: "test_bucket".to_string(),
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions one/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ tokio-stream = { workspace = true, features = ["io-util"] }
tokio.workspace = true
tonic.workspace = true
tracing.workspace = true
url.workspace = true

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator.workspace = true
Expand Down
128 changes: 81 additions & 47 deletions one/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig};
use ceramic_sql::sqlite::SqlitePool;
use clap::Args;
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use recon::{FullInterests, Recon, ReconInterestProvider};
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
Expand Down Expand Up @@ -175,7 +176,8 @@ pub struct DaemonOpts {
#[arg(
long,
env = "CERAMIC_ONE_FLIGHT_SQL_BIND_ADDRESS",
requires = "experimental_features"
requires = "experimental_features",
requires = "object_store_url"
)]
flight_sql_bind_address: Option<String>,

Expand Down Expand Up @@ -239,19 +241,24 @@ pub struct DaemonOpts {
)]
ethereum_rpc_urls: Vec<String>,

/// Enable the aggregator, requires Flight SQL and S3 bucket to be defined.
/// Enable the aggregator, requires Flight SQL and object store to be defined.
#[arg(
long,
requires = "flight_sql_bind_address",
requires = "s3_bucket",
requires = "object_store_url",
env = "CERAMIC_ONE_AGGREGATOR"
)]
aggregator: Option<bool>,

/// Name of the S3 bucket where Ceramic stores published data tables.
/// Requires using the experimental-features flag
/// Location of the object store bucket, of the form:
///
/// * s3://<bucket_name>
/// * file:///absolute/path/to/object_store/directory
/// * file://./relative/path/from/store_dir
///
/// Credentials are read from the environment:
///
/// When an s3:// URL is used the following environment variables are used
/// to configure the s3 API access:
///
/// * AWS_ACCESS_KEY_ID -> access_key_id
/// * AWS_SECRET_ACCESS_KEY -> secret_access_key
Expand All @@ -260,12 +267,13 @@ pub struct DaemonOpts {
/// * AWS_SESSION_TOKEN -> token
/// * AWS_ALLOW_HTTP -> set to "true" to permit HTTP connections without TLS
///
/// Requires using the experimental-features flag
#[arg(
long,
requires = "experimental_features",
env = "CERAMIC_ONE_S3_BUCKET"
env = "CERAMIC_ONE_OBJECT_STORE_URL"
)]
s3_bucket: Option<String>,
object_store_url: Option<url::Url>,
}

async fn get_eth_rpc_providers(
Expand Down Expand Up @@ -562,54 +570,80 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
})?;

// Start Flight server
let (pipeline_ctx, flight_handle, aggregator_handle) =
if let Some(addr) = opts.flight_sql_bind_address {
let addr = addr.parse()?;
let feed = event_svc.clone();
let bucket = opts
.s3_bucket
.ok_or_else(|| anyhow!("s3_bucket option is required when exposing flight sql"))?;
let ctx = ceramic_pipeline::session_from_config(ceramic_pipeline::Config {
conclusion_feed: feed.into(),
object_store: Arc::new(
AmazonS3Builder::from_env()
.with_bucket_name(&bucket)
.build()?,
),
object_store_bucket_name: bucket,
})
.await?;

// Start aggregator
let aggregator_handle = if opts.aggregator.unwrap_or_default() {
let mut ss = shutdown_signal.resubscribe();
let ctx = ctx.clone();
Some(tokio::spawn(async move {
if let Err(err) = ceramic_pipeline::aggregator::run(ctx, async move {
let _ = ss.recv().await;
})
.await
{
error!(%err, "aggregator task failed");
let (pipeline_ctx, flight_handle, aggregator_handle) = if let Some(addr) =
opts.flight_sql_bind_address
{
let addr = addr.parse()?;
let feed = event_svc.clone();
let object_store_url = opts.object_store_url.ok_or_else(|| {
anyhow!("object_store_url option is required when exposing flight sql")
})?;
let object_store: Arc<dyn object_store::ObjectStore> = match object_store_url.scheme() {
"s3" => Arc::new(
AmazonS3Builder::from_env()
.with_bucket_name(object_store_url.host_str().ok_or_else(|| {
anyhow!("object_store_url must have a bucket name in the host position")
})?)
.build()?,
),
"file" => {
debug!(url=?object_store_url, host=?object_store_url.host_str(), path=?object_store_url.path(), "object_store_url");
let path = if let Some(host) = object_store_url.host_str() {
debug!(store_dir=%store_dir.display(),host, "is relative");
if host == "." {
store_dir.join(object_store_url.path().trim_start_matches('/'))
} else {
bail!("object_store_url must have a relative or absolute path");
}
}))
} else {
None
};
} else {
object_store_url.path().into()
};
// Create object_store dir if it does not exist.
if !tokio::fs::try_exists(&path).await? {
tokio::fs::create_dir(&path).await?;
}
debug!(path = %path.display(), "object store path");
Arc::new(LocalFileSystem::new_with_prefix(path)?)
}
scheme => bail!("unsupported object_store_url scheme {scheme}"),
};

let ctx = ceramic_pipeline::session_from_config(ceramic_pipeline::Config {
conclusion_feed: feed.into(),
object_store,
})
.await?;

// Start aggregator
let aggregator_handle = if opts.aggregator.unwrap_or_default() {
let mut ss = shutdown_signal.resubscribe();
let pipeline_ctx = ctx.clone();
let flight_handle = tokio::spawn(async move {
ceramic_flight::server::run(ctx, addr, async move {
let ctx = ctx.clone();
Some(tokio::spawn(async move {
if let Err(err) = ceramic_pipeline::aggregator::run(ctx, async move {
let _ = ss.recv().await;
})
.await
});
(Some(pipeline_ctx), aggregator_handle, Some(flight_handle))
{
error!(%err, "aggregator task failed");
}
}))
} else {
(None, None, None)
None
};

let mut ss = shutdown_signal.resubscribe();
let pipeline_ctx = ctx.clone();
let flight_handle = tokio::spawn(async move {
ceramic_flight::server::run(ctx, addr, async move {
let _ = ss.recv().await;
})
.await
});
(Some(pipeline_ctx), aggregator_handle, Some(flight_handle))
} else {
(None, None, None)
};

// Start anchoring if remote anchor service URL is provided
let anchor_service_handle =
if let Some(remote_anchor_service_url) = opts.remote_anchor_service_url {
Expand Down
2 changes: 0 additions & 2 deletions pipeline/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ mod tests {
async fn init_ctx() -> anyhow::Result<SessionContext> {
session_from_config(Config {
conclusion_feed: MockConclusionFeed::new().into(),
object_store_bucket_name: "test_bucket".to_string(),
object_store: Arc::new(InMemory::new()),
})
.await
Expand All @@ -294,7 +293,6 @@ mod tests {
conclusion_feed: ConclusionFeedSource::<MockConclusionFeed>::InMemory(
MemTable::try_new(schemas::conclusion_events(), vec![vec![conclusion_events]])?,
),
object_store_bucket_name: "test_bucket".to_string(),
object_store: Arc::new(InMemory::new()),
})
.await
Expand Down
3 changes: 0 additions & 3 deletions pipeline/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ pub struct Config<F> {
/// Define how the conclusion feed will be accessed.
pub conclusion_feed: ConclusionFeedSource<F>,

/// Bucket name in which to store objects.
pub object_store_bucket_name: String,

/// Access to an object store.
pub object_store: Arc<dyn ObjectStore>,
}
Expand Down
6 changes: 3 additions & 3 deletions pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ pub async fn session_from_config<F: ConclusionFeed + 'static>(
// Register JSON functions
datafusion_functions_json::register_all(&mut ctx)?;

// Register s3 object store
let mut url = Url::parse("s3://")?;
url.set_host(Some(&config.object_store_bucket_name))?;
// Register s3 object store, use hardcoded bucket name `pipeline` as the actual bucket name is
// already known by the object store.
let mut url = Url::parse("s3://pipeline")?;
ctx.register_object_store(&url, config.object_store);

// Configure event_states listing table
Expand Down

0 comments on commit d9968c0

Please sign in to comment.