Skip to content

Commit

Permalink
refactor: refactor lru cache heap size reporter
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Mar 28, 2024
1 parent 407e407 commit 61d0949
Showing 1 changed file with 67 additions and 94 deletions.
161 changes: 67 additions & 94 deletions src/stream/src/cache/managed_lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,13 @@ where
/// Should only be updated by the `MemoryManager`.
_watermark_epoch: Arc<AtomicU64>,
evict_sequence: Arc<AtomicSequence>,
/// The heap size of keys/values
kv_heap_size: usize,
/// The metrics of memory usage
memory_usage_metrics: LabelGuardedIntGauge<3>,

// The metrics of evicted watermark time
_lru_evicted_watermark_time_ms: LabelGuardedIntGauge<3>,
// Metrics info
_metrics_info: MetricsInfo,
/// The size reported last time
last_reported_size_bytes: usize,
}

impl<K, V, S, A> Drop for ManagedLruCache<K, V, S, A>
where
K: Hash + Eq,
S: BuildHasher + Send + Sync + 'static,
A: Clone + Allocator,
{
fn drop(&mut self) {
self.memory_usage_metrics.set(0.into());
}
reporter: HeapSizeReporter,
}

impl<K, V, S, A> ManagedLruCache<K, V, S, A>
Expand Down Expand Up @@ -100,15 +86,15 @@ where
&metrics_info.desc,
]);

let reporter = HeapSizeReporter::new(memory_usage_metrics, 0, 0);

Self {
inner,
_watermark_epoch: watermark_epoch,
evict_sequence,
kv_heap_size: 0,
memory_usage_metrics,
_lru_evicted_watermark_time_ms: lru_evicted_watermark_time_ms,
_metrics_info: metrics_info,
last_reported_size_bytes: 0,
reporter,
}
}

Expand All @@ -117,30 +103,23 @@ where
let sequence = self.evict_sequence.load(Ordering::Relaxed);
while let Some((key, value, _)) = self.inner.pop_with_sequence(sequence) {
let charge = key.estimated_size() + value.estimated_size();
self.kv_heap_size_dec(charge);
self.reporter.dec(charge);
}
}

pub fn put(&mut self, k: K, v: V) -> Option<V> {
let key_size = k.estimated_size();
self.kv_heap_size_inc(key_size + v.estimated_size());
self.reporter.inc(key_size + v.estimated_size());
let old_val = self.inner.put(k, v);
if let Some(old_val) = &old_val {
self.kv_heap_size_dec(key_size + old_val.estimated_size());
self.reporter.dec(key_size + old_val.estimated_size());
}
old_val
}

pub fn get_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
let v = self.inner.get_mut(k);
v.map(|inner| {
MutGuard::new(
inner,
&mut self.kv_heap_size,
&mut self.last_reported_size_bytes,
&mut self.memory_usage_metrics,
)
})
v.map(|inner| MutGuard::new(inner, &mut self.reporter))
}

pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
Expand All @@ -161,14 +140,7 @@ where

pub fn peek_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
let v = self.inner.peek_mut(k);
v.map(|inner| {
MutGuard::new(
inner,
&mut self.kv_heap_size,
&mut self.last_reported_size_bytes,
&mut self.memory_usage_metrics,
)
})
v.map(|inner| MutGuard::new(inner, &mut self.reporter))
}

pub fn contains<Q>(&self, k: &Q) -> bool
Expand All @@ -190,28 +162,6 @@ where
pub fn clear(&mut self) {
self.inner.clear();
}

fn kv_heap_size_inc(&mut self, size: usize) {
self.kv_heap_size = self.kv_heap_size.saturating_add(size);
self.report_memory_usage();
}

fn kv_heap_size_dec(&mut self, size: usize) {
self.kv_heap_size = self.kv_heap_size.saturating_sub(size);
self.report_memory_usage();
}

fn report_memory_usage(&mut self) -> bool {
if self.kv_heap_size.abs_diff(self.last_reported_size_bytes)
> REPORT_SIZE_EVERY_N_KB_CHANGE << 10
{
self.memory_usage_metrics.set(self.kv_heap_size as _);
self.last_reported_size_bytes = self.kv_heap_size;
true
} else {
false
}
}
}

impl<K, V> ManagedLruCache<K, V>
Expand Down Expand Up @@ -261,51 +211,28 @@ where

pub struct MutGuard<'a, V: EstimateSize> {
inner: &'a mut V,
// The size of the original value
original_val_size: usize,
// The total size of a collection
total_size: &'a mut usize,
last_reported_size_bytes: &'a mut usize,
memory_usage_metrics: &'a mut LabelGuardedIntGauge<3>,
reporter: &'a mut HeapSizeReporter,
old_value_size: usize,
}

impl<'a, V: EstimateSize> MutGuard<'a, V> {
pub fn new(
inner: &'a mut V,
total_size: &'a mut usize,
last_reported_size_bytes: &'a mut usize,
memory_usage_metrics: &'a mut LabelGuardedIntGauge<3>,
) -> Self {
let original_val_size = inner.estimated_size();
fn new(inner: &'a mut V, reporter: &'a mut HeapSizeReporter) -> Self {
let old_value_size = inner.estimated_size();
Self {
inner,
original_val_size,
total_size,
last_reported_size_bytes,
memory_usage_metrics,
}
}

fn report_memory_usage(&mut self) -> bool {
if self.total_size.abs_diff(*self.last_reported_size_bytes)
> REPORT_SIZE_EVERY_N_KB_CHANGE << 10
{
self.memory_usage_metrics.set(*self.total_size as _);
*self.last_reported_size_bytes = *self.total_size;
true
} else {
false
reporter,
old_value_size,
}
}
}

impl<'a, V: EstimateSize> Drop for MutGuard<'a, V> {
fn drop(&mut self) {
*self.total_size = self
.total_size
.saturating_sub(self.original_val_size)
.saturating_add(self.inner.estimated_size());
self.report_memory_usage();
let new_value_size = self.inner.estimated_size();
if new_value_size != self.old_value_size {
self.reporter.dec(self.old_value_size);
self.reporter.inc(new_value_size);
}
}
}

Expand All @@ -322,3 +249,49 @@ impl<'a, V: EstimateSize> DerefMut for MutGuard<'a, V> {
self.inner
}
}

struct HeapSizeReporter {
metrics: LabelGuardedIntGauge<3>,
heap_size: usize,
last_reported: usize,
}

impl HeapSizeReporter {
pub fn new(
heap_size_metrics: LabelGuardedIntGauge<3>,
heap_size: usize,
last_reported: usize,
) -> Self {
Self {
metrics: heap_size_metrics,
heap_size,
last_reported,
}
}

pub fn inc(&mut self, size: usize) {
self.heap_size = self.heap_size.saturating_add(size);
self.try_report();
}

pub fn dec(&mut self, size: usize) {
self.heap_size = self.heap_size.saturating_sub(size);
self.try_report();
}

pub fn try_report(&mut self) -> bool {
if self.heap_size.abs_diff(self.last_reported) >= REPORT_SIZE_EVERY_N_KB_CHANGE << 10 {
self.metrics.set(self.heap_size as _);
self.last_reported = self.heap_size;
true
} else {
false
}
}
}

impl Drop for HeapSizeReporter {
fn drop(&mut self) {
self.metrics.set(0);
}
}

0 comments on commit 61d0949

Please sign in to comment.