Skip to content

Commit

Permalink
use only moka
Browse files Browse the repository at this point in the history
  • Loading branch information
IDX GitLab Automation committed Jul 2, 2024
1 parent 62f4e39 commit f1cfcc3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 43 deletions.
17 changes: 1 addition & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ vector-lib = { git = "https://github.com/blind-oracle/vector.git" }
webpki-roots = "0.26"
x509-parser = "0.16"
zeroize = { version = "1.8", features = ["derive"] }
dashmap = "6.0.1"

[dev-dependencies]
hex-literal = "0.4"
Expand Down
49 changes: 23 additions & 26 deletions src/routing/middleware/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,10 @@ const KEY_HASH_BYTES: usize = 20;
#[derive(Clone, Hash, PartialEq, Eq)]
pub struct CacheKey([u8; KEY_HASH_BYTES]);

use dashmap::DashMap;

type LockMap = Arc<DashMap<CacheKey, (Arc<Mutex<()>>, Arc<Notify>)>>;

pub struct Cache {
store: MokaCache<CacheKey, FullResponse, RandomState>,
lock_map: MokaCache<CacheKey, (Arc<Mutex<()>>, Arc<Notify>)>,
max_item_size: u64,
lock_map: LockMap,
}

fn weigh_entry(_k: &CacheKey, v: &FullResponse) -> u32 {
Expand All @@ -101,7 +97,7 @@ impl Cache {
.time_to_live(ttl)
.weigher(weigh_entry)
.build_with_hasher(RandomState::default()),
lock_map: Arc::new(DashMap::new()),
lock_map: MokaCacheBuilder::new(cache_size).time_to_live(ttl).build(),
})
}

Expand Down Expand Up @@ -152,32 +148,33 @@ pub async fn middleware(
return Ok(CacheStatus::Hit.with_response(from_full_response(full_response)));
}

// Get (or insert) a synchronization entry in the concurrent map.
// Get (or insert) a synchronization entry atomically into the concurrent map.
let sync_entry = cache
.lock_map
.entry(cache_key.clone())
.or_insert_with(|| (Arc::new(Mutex::new(())), Arc::new(Notify::new())));
.get_with(cache_key.clone(), async {
(Arc::new(Mutex::new(())), Arc::new(Notify::new()))
})
.await;

let (mutex, notify) = sync_entry;

// Register for notification, in case another request acquires the lock and executes the request.
let notify = sync_entry.1.clone();
let notified = notify.notified();

loop {
select! {
_ = sync_entry.0.lock() => {
// check again if key is cached, yes => remove sync_entry from the map, notify.notify_waiters(), return the response.
// no => executed response and cache it
// remove sync_entry from the map, notify.notify_waiters(), return the response.
}
_ = notified => {
// Another parallel request finished earlier and cached the response.
// Get cached response
}
_ = timeout(PROXY_LOCK_TIMEOUT, async {
sleep(2 * PROXY_LOCK_TIMEOUT).await;
}) => {
// Execute the request and cache the response.
}
select! {
_ = mutex.lock() => {
// check again if key is cached, yes => remove sync_entry from the map, notify.notify_waiters(), return the response.
// no => executed response and cache it
// remove sync_entry from the map, notify.notify_waiters(), return the response.
}
_ = notified => {
// Another parallel request finished earlier and cached the response.
// Get cached response
}
_ = timeout(PROXY_LOCK_TIMEOUT, async {
sleep(2 * PROXY_LOCK_TIMEOUT).await;
}) => {
// Execute the request and cache the response.
}
}

Expand Down

0 comments on commit f1cfcc3

Please sign in to comment.