diff --git a/Cargo.toml b/Cargo.toml index 658acc9b..bbf1e8a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/derive", "crates/duckdb", "crates/extensions", + "crates/object-store-cache", "crates/pgstac", "crates/server", ] @@ -76,6 +77,7 @@ stac-api = { version = "0.6.2", path = "crates/api" } stac-derive = { version = "0.1.0", path = "crates/derive" } stac-duckdb = { version = "0.0.3", path = "crates/duckdb" } stac-server = { version = "0.3.2", path = "crates/server" } +stac-object-store-cache = { version = "0.1.0", path = "crates/object-store-cache" } syn = "2.0" tempfile = "3.13" thiserror = "2.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 28218856..ff479e1b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,7 +31,7 @@ geoparquet-compression = [ "parquet/lz4", "parquet/zstd", ] -object-store = ["dep:object_store", "dep:tokio", "dep:once_cell"] +object-store = ["dep:object_store", "dep:tokio", "dep:stac-object-store-cache"] object-store-aws = ["object-store", "object_store/aws"] object-store-azure = ["object-store", "object_store/azure"] object-store-gcp = ["object-store", "object_store/gcp"] @@ -63,12 +63,12 @@ jsonschema = { workspace = true, optional = true } log.workspace = true mime.workspace = true object_store = { workspace = true, optional = true } -once_cell = { workspace = true, optional = true } parquet = { workspace = true, optional = true } reqwest = { workspace = true, features = ["json", "blocking"], optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = ["preserve_order"] } stac-derive.workspace = true +stac-object-store-cache = { workspace = true, optional = true } thiserror.workspace = true tokio = { workspace = true, optional = true } tracing.workspace = true diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index e597618f..633db317 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -86,6 +86,11 @@ pub enum Error { #[error("json value is not an object")] NotAnObject(serde_json::Value), + /// [stac-object-store-cache::Error] + #[error(transparent)] + #[cfg(feature = "object-store")] + ObjectStoreCache(#[from] stac_object_store_cache::Error), + /// [object_store::Error] #[error(transparent)] #[cfg(feature = "object-store")] diff --git a/crates/core/src/format.rs b/crates/core/src/format.rs index aae86a7d..87703d88 100644 --- a/crates/core/src/format.rs +++ b/crates/core/src/format.rs @@ -156,7 +156,8 @@ impl Format { let href = href.into(); match href.realize() { RealizedHref::Url(url) => { - let (object_store, path) = crate::parse_url_opts(&url, options).await?; + let (object_store, path) = + stac_object_store_cache::parse_url_opts(&url, options).await?; let get_result = object_store.get(&path).await?; let mut value: T = self.from_bytes(get_result.bytes().await?)?; *value.self_href_mut() = Some(Href::Url(url)); @@ -235,9 +236,8 @@ impl Format { { let href = href.to_string(); if let Ok(url) = url::Url::parse(&href) { - use object_store::ObjectStore; - - let (object_store, path) = object_store::parse_url_opts(&url, options)?; + let (object_store, path) = + stac_object_store_cache::parse_url_opts(&url, options).await?; let bytes = self.into_vec(value)?; let put_result = object_store.put(&path, bytes.into()).await?; Ok(Some(put_result)) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 101abc43..6bea547d 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -178,8 +178,6 @@ pub mod mime; mod ndjson; mod node; #[cfg(feature = "object-store")] -mod object_store; -#[cfg(feature = "object-store")] mod resolver; mod statistics; #[cfg(feature = "validate")] @@ -189,9 +187,6 @@ mod version; use std::fmt::Display; -#[cfg(feature = "object-store")] -pub use object_store::parse_url_opts; - #[cfg(feature = "object-store")] pub use resolver::Resolver; #[cfg(feature = "validate-blocking")] diff --git a/crates/object-store-cache/Cargo.toml b/crates/object-store-cache/Cargo.toml new file mode 100644 index 00000000..3ef4816e --- /dev/null +++ b/crates/object-store-cache/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "stac-object-store-cache" +description = "Create and cache object stores based on url in stac." +version = "0.1.0" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +categories.workspace = true +rust-version.workspace = true + +[features] +object-store-aws = ["object_store/aws"] +object-store-azure = ["object_store/azure"] +object-store-gcp = ["object_store/gcp"] +object-store-http = ["object_store/http"] +object-store-all = [ + "object-store-aws", + "object-store-azure", + "object-store-gcp", + "object-store-http", +] + + +[dependencies] +object_store = { workspace = true } +once_cell = { workspace = true } +thiserror.workspace = true +tokio = { workspace = true } +url = { workspace = true, features = ["serde"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros"] } +tokio-test.workspace = true diff --git a/crates/object-store-cache/README.md b/crates/object-store-cache/README.md new file mode 100644 index 00000000..e69de29b diff --git a/crates/core/src/object_store.rs b/crates/object-store-cache/src/cache.rs similarity index 61% rename from crates/core/src/object_store.rs rename to crates/object-store-cache/src/cache.rs index 5f9247f1..5bfa23b2 100644 --- a/crates/core/src/object_store.rs +++ b/crates/object-store-cache/src/cache.rs @@ -1,18 +1,23 @@ use std::{collections::HashMap, sync::Arc}; +use crate::{Error, Result}; + use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme}; use once_cell::sync::Lazy; use tokio::sync::RwLock; use url::Url; +// To avoid memory leaks, we clear the cache when it grows too big. +// The value does not have any meaning, other than polars use the same. +const CACHE_SIZE: usize = 8; + static OBJECT_STORE_CACHE: Lazy>>> = Lazy::new(Default::default); -/// Parameter set to identify and cache an object Storage -#[derive(PartialEq, Eq, Hash)] +/// Parameter set to identify and cache an object store +#[derive(PartialEq, Eq, Hash, Debug)] struct ObjectStoreIdentifier { /// A base url to the bucket. - // should be enough to identify cloud provider and bucket base_url: Url, /// Object Store options @@ -40,6 +45,11 @@ impl ObjectStoreIdentifier { } } +#[cfg(any( + feature = "object-store-aws", + feature = "object-store-gcp", + feature = "object-store-azure" +))] macro_rules! builder_env_opts { ($builder:ty, $url:expr, $options:expr) => {{ let builder = $options.into_iter().fold( @@ -53,11 +63,24 @@ macro_rules! builder_env_opts { }}; } +/// This was yanked from [object_store::parse_url_opts] with the following changes: +/// +/// - Build [object_store::ObjectStore] with environment variables +/// - Return [Arc] instead of [Box] +#[cfg_attr( + not(any( + feature = "object-store-aws", + feature = "object-store-gcp", + feature = "object-store-azure", + feature = "object-store-http" + )), + allow(unused_variables) +)] fn create_object_store( scheme: ObjectStoreScheme, url: &Url, options: I, -) -> Result, object_store::Error> +) -> Result> where I: IntoIterator, K: AsRef, @@ -89,61 +112,49 @@ where ); Arc::new(builder.build()?) } - s => { - return Err(object_store::Error::Generic { - store: "parse_url", - source: format!("feature for {s:?} not enabled").into(), - }) - } + s => return Err(Error::ObjectStoreCreate { scheme: s }), }; Ok(store) } -/// Modified version of object_store::parse_url_opts that also parses env +/// Drop-in replacement for [object_store::parse_url_opts] with caching and env vars. /// -/// It does the same, except we start from env vars, then apply url and then overrides from options -/// -/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching +/// It will create or retrieve object store based on passed `url` and `options`. +/// Keeps global cache pub async fn parse_url_opts( url: &Url, options: I, -) -> Result<(Arc, Path), crate::Error> +) -> crate::Result<(Arc, Path)> where I: IntoIterator, K: AsRef, V: Into, { - // TODO: Handle error properly - let (scheme, path) = ObjectStoreScheme::parse(url).unwrap(); - - let path_string: String = path.clone().into(); - let path_str = path_string.as_str(); - // TODO: Handle error properly - let base_url = url[..] - .strip_suffix(path_str) + let (scheme, path) = ObjectStoreScheme::parse(url).map_err(object_store::Error::from)?; + + let base_url = url + .as_ref() + .strip_suffix(path.as_ref()) .unwrap_or_default() - .try_into() - .unwrap(); + .try_into()?; let object_store_id = ObjectStoreIdentifier::new(base_url, options); let options = object_store_id.get_options(); { let cache = OBJECT_STORE_CACHE.read().await; - if let Some(store) = cache.get(&object_store_id) { + if let Some(store) = (*cache).get(&object_store_id) { return Ok((store.clone(), path)); } } - let store = create_object_store(scheme, url, options)?; { let mut cache = OBJECT_STORE_CACHE.write().await; - // TODO: Do we need this cache clean? What is a reasonable cache size here? - if cache.len() >= 8 { - cache.clear() + if cache.len() >= CACHE_SIZE { + (*cache).clear() } - _ = cache.insert(object_store_id, store.clone()); + _ = (*cache).insert(object_store_id, store.clone()); } Ok((store.clone(), path)) @@ -155,18 +166,58 @@ mod tests { use super::*; + #[tokio::test] + async fn file_different_path() { + let options: Vec<(String, String)> = Vec::new(); + + let url = Url::parse("file:///some/path").unwrap(); + let (store, path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("file:///other/path").unwrap(); + let (store2, _) = parse_url_opts(&url2, options.clone()).await.unwrap(); + + { + let cache = OBJECT_STORE_CACHE.read().await; + println!("{cache:#?}") + } + + assert!(Arc::ptr_eq(&store, &store2)); + assert!(std::ptr::addr_eq(Arc::as_ptr(&store), Arc::as_ptr(&store2))); + // assert_eq!(store.as_ref(), store2.as_ref()); + // assert_eq!(Arc::as_ptr(&store), Arc::as_ptr(&store2)); + assert_eq!(path.as_ref(), "some/path"); + } + + #[tokio::test] + async fn file_different_options() { + let options: Vec<(String, String)> = Vec::new(); + + let url = Url::parse("file:///some/path").unwrap(); + let (store, _) = parse_url_opts(&url, options).await.unwrap(); + + let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let url2 = Url::parse("file:///some/path").unwrap(); + let (store2, _) = parse_url_opts(&url2, options2).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } + + #[cfg(feature = "object-store-aws")] #[tokio::test] async fn cache_works() { let url = Url::parse("s3://bucket/item").unwrap(); let options: Vec<(String, String)> = Vec::new(); - let (store1, _path) = parse_url_opts(&url, options.clone()).await.unwrap(); + let (store1, path) = parse_url_opts(&url, options.clone()).await.unwrap(); let url2 = Url::parse("s3://bucket/item2").unwrap(); let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap(); assert!(Arc::ptr_eq(&store1, &store2)); + assert_eq!(path.as_ref(), "item"); } + + #[cfg(feature = "object-store-aws")] #[tokio::test] async fn different_options() { let url = Url::parse("s3://bucket/item").unwrap(); @@ -180,6 +231,8 @@ mod tests { assert!(!Arc::ptr_eq(&store, &store2)); } + + #[cfg(feature = "object-store-aws")] #[tokio::test] async fn different_urls() { let url = Url::parse("s3://bucket/item").unwrap(); diff --git a/crates/object-store-cache/src/error.rs b/crates/object-store-cache/src/error.rs new file mode 100644 index 00000000..d71f547e --- /dev/null +++ b/crates/object-store-cache/src/error.rs @@ -0,0 +1,26 @@ +use object_store::ObjectStoreScheme; +use thiserror::Error; + +/// Error enum for crate-specific errors. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum Error { + /// TODO: Better error description + #[error("Failed to create object_store for {scheme:?}. Check if required feature is enabled.")] + ObjectStoreCreate { + /// feature + scheme: ObjectStoreScheme, + }, + + /// [url::ParseError] + #[error(transparent)] + UrlParse(#[from] url::ParseError), + + /// [object_store::Error] + #[error(transparent)] + ObjectStore(#[from] object_store::Error), + + /// [object_store::path::Error] + #[error(transparent)] + ObjectStorePath(#[from] object_store::path::Error), +} diff --git a/crates/object-store-cache/src/lib.rs b/crates/object-store-cache/src/lib.rs new file mode 100644 index 00000000..ebe3c379 --- /dev/null +++ b/crates/object-store-cache/src/lib.rs @@ -0,0 +1,16 @@ +//! Work with [ObjectStore](object_store::ObjectStore) in STAC. +//! +//! Features: +//! - cache used objects_stores based on url and options +//! - read cloud creadentials from env +//! + +mod cache; +mod error; + +pub use cache::parse_url_opts; + +pub use error::Error; + +/// Custom [Result](std::result::Result) type for this crate. +pub type Result = std::result::Result;