Skip to content

Commit

Permalink
feat: new crate stac-object-store-cache and import from it in core
Browse files Browse the repository at this point in the history
  • Loading branch information
alekzvik committed Dec 21, 2024
1 parent 3e2f3e7 commit a0a4a87
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 43 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"crates/derive",
"crates/duckdb",
"crates/extensions",
"crates/object-store-cache",
"crates/pgstac",
"crates/server",
]
Expand Down Expand Up @@ -76,6 +77,7 @@ stac-api = { version = "0.6.2", path = "crates/api" }
stac-derive = { version = "0.1.0", path = "crates/derive" }
stac-duckdb = { version = "0.0.3", path = "crates/duckdb" }
stac-server = { version = "0.3.2", path = "crates/server" }
stac-object-store-cache = { version = "0.1.0", path = "crates/object-store-cache" }
syn = "2.0"
tempfile = "3.13"
thiserror = "2.0"
Expand Down
4 changes: 2 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ geoparquet-compression = [
"parquet/lz4",
"parquet/zstd",
]
object-store = ["dep:object_store", "dep:tokio", "dep:once_cell"]
object-store = ["dep:object_store", "dep:tokio", "dep:stac-object-store-cache"]
object-store-aws = ["object-store", "object_store/aws"]
object-store-azure = ["object-store", "object_store/azure"]
object-store-gcp = ["object-store", "object_store/gcp"]
Expand Down Expand Up @@ -63,12 +63,12 @@ jsonschema = { workspace = true, optional = true }
log.workspace = true
mime.workspace = true
object_store = { workspace = true, optional = true }
once_cell = { workspace = true, optional = true }
parquet = { workspace = true, optional = true }
reqwest = { workspace = true, features = ["json", "blocking"], optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["preserve_order"] }
stac-derive.workspace = true
stac-object-store-cache = { workspace = true, optional = true }
thiserror.workspace = true
tokio = { workspace = true, optional = true }
tracing.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ pub enum Error {
#[error("json value is not an object")]
NotAnObject(serde_json::Value),

/// [stac-object-store-cache::Error]
#[error(transparent)]
#[cfg(feature = "object-store")]
ObjectStoreCache(#[from] stac_object_store_cache::Error),

/// [object_store::Error]
#[error(transparent)]
#[cfg(feature = "object-store")]
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl Format {
let href = href.into();
match href.realize() {
RealizedHref::Url(url) => {
let (object_store, path) = crate::parse_url_opts(&url, options).await?;
let (object_store, path) =
stac_object_store_cache::parse_url_opts(&url, options).await?;
let get_result = object_store.get(&path).await?;
let mut value: T = self.from_bytes(get_result.bytes().await?)?;
*value.self_href_mut() = Some(Href::Url(url));
Expand Down Expand Up @@ -235,9 +236,8 @@ impl Format {
{
let href = href.to_string();
if let Ok(url) = url::Url::parse(&href) {
use object_store::ObjectStore;

let (object_store, path) = object_store::parse_url_opts(&url, options)?;
let (object_store, path) =
stac_object_store_cache::parse_url_opts(&url, options).await?;
let bytes = self.into_vec(value)?;
let put_result = object_store.put(&path, bytes.into()).await?;
Ok(Some(put_result))
Expand Down
5 changes: 0 additions & 5 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ pub mod mime;
mod ndjson;
mod node;
#[cfg(feature = "object-store")]
mod object_store;
#[cfg(feature = "object-store")]
mod resolver;
mod statistics;
#[cfg(feature = "validate")]
Expand All @@ -189,9 +187,6 @@ mod version;

use std::fmt::Display;

#[cfg(feature = "object-store")]
pub use object_store::parse_url_opts;

#[cfg(feature = "object-store")]
pub use resolver::Resolver;
#[cfg(feature = "validate-blocking")]
Expand Down
35 changes: 35 additions & 0 deletions crates/object-store-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "stac-object-store-cache"
description = "Create and cache object stores based on url in stac."
version = "0.1.0"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true
categories.workspace = true
rust-version.workspace = true

[features]
object-store-aws = ["object_store/aws"]
object-store-azure = ["object_store/azure"]
object-store-gcp = ["object_store/gcp"]
object-store-http = ["object_store/http"]
object-store-all = [
"object-store-aws",
"object-store-azure",
"object-store-gcp",
"object-store-http",
]


[dependencies]
object_store = { workspace = true }
once_cell = { workspace = true }
thiserror.workspace = true
tokio = { workspace = true }
url = { workspace = true, features = ["serde"] }

[dev-dependencies]
tokio = { workspace = true, features = ["macros"] }
tokio-test.workspace = true
Empty file.
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use std::{collections::HashMap, sync::Arc};

use crate::{Error, Result};

use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme};
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use url::Url;

// To avoid memory leaks, we clear the cache when it grows too big.
// The value does not have any meaning, other than polars use the same.
const CACHE_SIZE: usize = 8;

static OBJECT_STORE_CACHE: Lazy<RwLock<HashMap<ObjectStoreIdentifier, Arc<DynObjectStore>>>> =
Lazy::new(Default::default);

/// Parameter set to identify and cache an object Storage
#[derive(PartialEq, Eq, Hash)]
/// Parameter set to identify and cache an object store
#[derive(PartialEq, Eq, Hash, Debug)]
struct ObjectStoreIdentifier {
/// A base url to the bucket.
// should be enough to identify cloud provider and bucket
base_url: Url,

/// Object Store options
Expand Down Expand Up @@ -40,6 +45,11 @@ impl ObjectStoreIdentifier {
}
}

#[cfg(any(
feature = "object-store-aws",
feature = "object-store-gcp",
feature = "object-store-azure"
))]
macro_rules! builder_env_opts {
($builder:ty, $url:expr, $options:expr) => {{
let builder = $options.into_iter().fold(
Expand All @@ -53,11 +63,24 @@ macro_rules! builder_env_opts {
}};
}

/// This was yanked from [object_store::parse_url_opts] with the following changes:
///
/// - Build [object_store::ObjectStore] with environment variables
/// - Return [Arc] instead of [Box]
#[cfg_attr(
not(any(
feature = "object-store-aws",
feature = "object-store-gcp",
feature = "object-store-azure",
feature = "object-store-http"
)),
allow(unused_variables)
)]
fn create_object_store<I, K, V>(
scheme: ObjectStoreScheme,
url: &Url,
options: I,
) -> Result<Arc<DynObjectStore>, object_store::Error>
) -> Result<Arc<DynObjectStore>>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
Expand Down Expand Up @@ -89,61 +112,49 @@ where
);
Arc::new(builder.build()?)
}
s => {
return Err(object_store::Error::Generic {
store: "parse_url",
source: format!("feature for {s:?} not enabled").into(),
})
}
s => return Err(Error::ObjectStoreCreate { scheme: s }),
};
Ok(store)
}

/// Modified version of object_store::parse_url_opts that also parses env
/// Drop-in replacement for [object_store::parse_url_opts] with caching and env vars.
///
/// It does the same, except we start from env vars, then apply url and then overrides from options
///
/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching
/// It will create or retrieve object store based on passed `url` and `options`.
/// Keeps global cache
pub async fn parse_url_opts<I, K, V>(
url: &Url,
options: I,
) -> Result<(Arc<DynObjectStore>, Path), crate::Error>
) -> crate::Result<(Arc<DynObjectStore>, Path)>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
// TODO: Handle error properly
let (scheme, path) = ObjectStoreScheme::parse(url).unwrap();

let path_string: String = path.clone().into();
let path_str = path_string.as_str();
// TODO: Handle error properly
let base_url = url[..]
.strip_suffix(path_str)
let (scheme, path) = ObjectStoreScheme::parse(url).map_err(object_store::Error::from)?;

let base_url = url
.as_ref()
.strip_suffix(path.as_ref())
.unwrap_or_default()
.try_into()
.unwrap();
.try_into()?;

let object_store_id = ObjectStoreIdentifier::new(base_url, options);
let options = object_store_id.get_options();

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some(store) = cache.get(&object_store_id) {
if let Some(store) = (*cache).get(&object_store_id) {
return Ok((store.clone(), path));
}
}

let store = create_object_store(scheme, url, options)?;
{
let mut cache = OBJECT_STORE_CACHE.write().await;

// TODO: Do we need this cache clean? What is a reasonable cache size here?
if cache.len() >= 8 {
cache.clear()
if cache.len() >= CACHE_SIZE {
(*cache).clear()
}
_ = cache.insert(object_store_id, store.clone());
_ = (*cache).insert(object_store_id, store.clone());
}

Ok((store.clone(), path))
Expand All @@ -155,18 +166,58 @@ mod tests {

use super::*;

#[tokio::test]
async fn file_different_path() {
let options: Vec<(String, String)> = Vec::new();

let url = Url::parse("file:///some/path").unwrap();
let (store, path) = parse_url_opts(&url, options.clone()).await.unwrap();

let url2 = Url::parse("file:///other/path").unwrap();
let (store2, _) = parse_url_opts(&url2, options.clone()).await.unwrap();

{
let cache = OBJECT_STORE_CACHE.read().await;
println!("{cache:#?}")
}

assert!(Arc::ptr_eq(&store, &store2));
assert!(std::ptr::addr_eq(Arc::as_ptr(&store), Arc::as_ptr(&store2)));
// assert_eq!(store.as_ref(), store2.as_ref());
// assert_eq!(Arc::as_ptr(&store), Arc::as_ptr(&store2));
assert_eq!(path.as_ref(), "some/path");
}

#[tokio::test]
async fn file_different_options() {
let options: Vec<(String, String)> = Vec::new();

let url = Url::parse("file:///some/path").unwrap();
let (store, _) = parse_url_opts(&url, options).await.unwrap();

let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];
let url2 = Url::parse("file:///some/path").unwrap();
let (store2, _) = parse_url_opts(&url2, options2).await.unwrap();

assert!(!Arc::ptr_eq(&store, &store2));
}

#[cfg(feature = "object-store-aws")]
#[tokio::test]
async fn cache_works() {
let url = Url::parse("s3://bucket/item").unwrap();
let options: Vec<(String, String)> = Vec::new();

let (store1, _path) = parse_url_opts(&url, options.clone()).await.unwrap();
let (store1, path) = parse_url_opts(&url, options.clone()).await.unwrap();

let url2 = Url::parse("s3://bucket/item2").unwrap();
let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap();

assert!(Arc::ptr_eq(&store1, &store2));
assert_eq!(path.as_ref(), "item");
}

#[cfg(feature = "object-store-aws")]
#[tokio::test]
async fn different_options() {
let url = Url::parse("s3://bucket/item").unwrap();
Expand All @@ -180,6 +231,8 @@ mod tests {

assert!(!Arc::ptr_eq(&store, &store2));
}

#[cfg(feature = "object-store-aws")]
#[tokio::test]
async fn different_urls() {
let url = Url::parse("s3://bucket/item").unwrap();
Expand Down
26 changes: 26 additions & 0 deletions crates/object-store-cache/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use object_store::ObjectStoreScheme;
use thiserror::Error;

/// Error enum for crate-specific errors.
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
/// TODO: Better error description
#[error("Failed to create object_store for {scheme:?}. Check if required feature is enabled.")]
ObjectStoreCreate {
/// feature
scheme: ObjectStoreScheme,
},

/// [url::ParseError]
#[error(transparent)]
UrlParse(#[from] url::ParseError),

/// [object_store::Error]
#[error(transparent)]
ObjectStore(#[from] object_store::Error),

/// [object_store::path::Error]
#[error(transparent)]
ObjectStorePath(#[from] object_store::path::Error),
}
16 changes: 16 additions & 0 deletions crates/object-store-cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! Work with [ObjectStore](object_store::ObjectStore) in STAC.
//!
//! Features:
//! - cache used objects_stores based on url and options
//! - read cloud creadentials from env
//!
mod cache;
mod error;

pub use cache::parse_url_opts;

pub use error::Error;

/// Custom [Result](std::result::Result) type for this crate.
pub type Result<T> = std::result::Result<T, Error>;

0 comments on commit a0a4a87

Please sign in to comment.