Skip to content

Commit

Permalink
Make metric work properly & add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Nov 22, 2024
1 parent 117a08c commit c2beae4
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 14 deletions.
13 changes: 7 additions & 6 deletions core/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct Instruments {
sticky_cache_hit: Arc<dyn Counter>,
sticky_cache_miss: Arc<dyn Counter>,
sticky_cache_size: Arc<dyn Gauge>,
sticky_cache_evictions: Arc<dyn Counter>,
sticky_cache_forced_evictions: Arc<dyn Counter>,
}

impl MetricsContext {
Expand Down Expand Up @@ -262,10 +262,11 @@ impl MetricsContext {
self.instruments.sticky_cache_size.record(size, &self.kvs);
}

// TODO: Not on normal completion
/// Count a workflow being evicted from the cache
pub(crate) fn cache_eviction(&self) {
self.instruments.sticky_cache_evictions.add(1, &self.kvs);
pub(crate) fn forced_cache_eviction(&self) {
self.instruments
.sticky_cache_forced_evictions
.add(1, &self.kvs);
}
}

Expand Down Expand Up @@ -424,7 +425,7 @@ impl Instruments {
description: "Current number of cached workflows".into(),
unit: "".into(),
}),
sticky_cache_evictions: meter.counter(MetricParameters {
sticky_cache_forced_evictions: meter.counter(MetricParameters {
name: "sticky_cache_total_forced_eviction".into(),
description: "Count of evictions of cached workflows".into(),
unit: "".into(),
Expand Down Expand Up @@ -868,7 +869,7 @@ mod tests {
true,
);
let mc = MetricsContext::top_level("foo".to_string(), "q".to_string(), &telem_instance);
mc.cache_eviction();
mc.forced_cache_eviction();
let events = call_buffer.retrieve();
let a1 = assert_matches!(
&events[0],
Expand Down
6 changes: 3 additions & 3 deletions core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ impl ManagedRun {
self.activation.as_ref()
}

/// Returns true if this run has already been told it will be evicted.
pub(super) fn is_trying_to_evict(&self) -> bool {
self.trying_to_evict.is_some()
/// Returns this run's eviction reason if it is going to be evicted
pub(super) fn trying_to_evict(&self) -> Option<&RequestEvictMsg> {
self.trying_to_evict.as_ref()
}

/// Called whenever a new workflow task is obtained for this run
Expand Down
20 changes: 17 additions & 3 deletions core/src/worker/workflow/run_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use crate::{
telemetry::metrics::workflow_type,
worker::workflow::{
managed_run::{ManagedRun, RunUpdateAct},
HistoryUpdate, LocalActivityRequestSink, PermittedWFT, RunBasics,
HistoryUpdate, LocalActivityRequestSink, PermittedWFT, RequestEvictMsg, RunBasics,
},
MetricsContext,
};
use lru::LruCache;
use std::{num::NonZeroUsize, rc::Rc, sync::Arc};
use temporal_sdk_core_api::worker::WorkerConfig;
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::get_system_info_response;
use temporal_sdk_core_protos::{
coresdk::workflow_activation::remove_from_cache::EvictionReason,
temporal::api::workflowservice::v1::get_system_info_response,
};

pub(super) struct RunCache {
worker_config: Arc<WorkerConfig>,
Expand Down Expand Up @@ -84,7 +87,18 @@ impl RunCache {
pub(super) fn remove(&mut self, k: &str) -> Option<ManagedRun> {
let r = self.runs.pop(k);
self.metrics.cache_size(self.len() as u64);
self.metrics.cache_eviction();
if let Some(rh) = &r {
// A workflow completing normally doesn't count as a forced eviction.
if !matches!(
rh.trying_to_evict(),
Some(RequestEvictMsg {
reason: EvictionReason::WorkflowExecutionEnding,
..
})
) {
self.metrics.forced_cache_eviction();
}
}
r
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ impl WFStream {
let num_existing_evictions = self
.runs
.runs_lru_order()
.filter(|(_, h)| h.is_trying_to_evict())
.filter(|(_, h)| h.trying_to_evict().is_some())
.count();
let mut num_evicts_needed = num_in_buff.saturating_sub(num_existing_evictions);
for (rid, handle) in self.runs.runs_lru_order() {
Expand Down Expand Up @@ -559,7 +559,7 @@ impl WFStream {
if let Some(r) = self.runs.peek(run_id) {
info!(run_id, wft=?r.wft(), activation=?r.activation(),
buffered_wft=r.has_buffered_wft(),
trying_to_evict=r.is_trying_to_evict(), more_work=r.more_pending_work());
trying_to_evict=r.trying_to_evict().is_some(), more_work=r.more_pending_work());
} else {
info!(run_id, "Run not found");
}
Expand Down
30 changes: 30 additions & 0 deletions tests/integ_tests/metrics_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,3 +785,33 @@ async fn activity_metrics() {
workflow_type=\"{wf_name}\"}} 1"
)));
}

#[tokio::test]
async fn evict_on_complete_does_not_count_as_forced_eviction() {
let (telemopts, addr, _aborter) = prom_metrics(None);
let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap();
let wf_name = "evict_on_complete_does_not_count_as_forced_eviction";
let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt);
starter.worker_config.no_remote_activities(true);
let mut worker = starter.worker().await;

worker.register_wf(
wf_name.to_string(),
|_: WfContext| async move { Ok(().into()) },
);

worker
.submit_wf(
wf_name.to_owned(),
wf_name.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();

let body = get_text(format!("http://{addr}/metrics")).await;
// Metric shouldn't show up at all, since it's zero the whole time.
assert!(!body.contains("temporal_sticky_cache_total_forced_eviction"));
}

0 comments on commit c2beae4

Please sign in to comment.