From c2beae434355ce1837355f86abfa6f7a43f3a174 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 21 Nov 2024 16:51:20 -0800 Subject: [PATCH] Make metric work properly & add test --- core/src/telemetry/metrics.rs | 13 ++++----- core/src/worker/workflow/managed_run.rs | 6 ++--- core/src/worker/workflow/run_cache.rs | 20 +++++++++++--- core/src/worker/workflow/workflow_stream.rs | 4 +-- tests/integ_tests/metrics_tests.rs | 30 +++++++++++++++++++++ 5 files changed, 59 insertions(+), 14 deletions(-) diff --git a/core/src/telemetry/metrics.rs b/core/src/telemetry/metrics.rs index 50b971c85..b83488a0b 100644 --- a/core/src/telemetry/metrics.rs +++ b/core/src/telemetry/metrics.rs @@ -55,7 +55,7 @@ struct Instruments { sticky_cache_hit: Arc, sticky_cache_miss: Arc, sticky_cache_size: Arc, - sticky_cache_evictions: Arc, + sticky_cache_forced_evictions: Arc, } impl MetricsContext { @@ -262,10 +262,11 @@ impl MetricsContext { self.instruments.sticky_cache_size.record(size, &self.kvs); } - // TODO: Not on normal completion /// Count a workflow being evicted from the cache - pub(crate) fn cache_eviction(&self) { - self.instruments.sticky_cache_evictions.add(1, &self.kvs); + pub(crate) fn forced_cache_eviction(&self) { + self.instruments + .sticky_cache_forced_evictions + .add(1, &self.kvs); } } @@ -424,7 +425,7 @@ impl Instruments { description: "Current number of cached workflows".into(), unit: "".into(), }), - sticky_cache_evictions: meter.counter(MetricParameters { + sticky_cache_forced_evictions: meter.counter(MetricParameters { name: "sticky_cache_total_forced_eviction".into(), description: "Count of evictions of cached workflows".into(), unit: "".into(), @@ -868,7 +869,7 @@ mod tests { true, ); let mc = MetricsContext::top_level("foo".to_string(), "q".to_string(), &telem_instance); - mc.cache_eviction(); + mc.forced_cache_eviction(); let events = call_buffer.retrieve(); let a1 = assert_matches!( &events[0], diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 52e9f4c4e..3601f6b34 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -156,9 +156,9 @@ impl ManagedRun { self.activation.as_ref() } - /// Returns true if this run has already been told it will be evicted. - pub(super) fn is_trying_to_evict(&self) -> bool { - self.trying_to_evict.is_some() + /// Returns this run's eviction reason if it is going to be evicted + pub(super) fn trying_to_evict(&self) -> Option<&RequestEvictMsg> { + self.trying_to_evict.as_ref() } /// Called whenever a new workflow task is obtained for this run diff --git a/core/src/worker/workflow/run_cache.rs b/core/src/worker/workflow/run_cache.rs index e81fa2bd9..9990168f5 100644 --- a/core/src/worker/workflow/run_cache.rs +++ b/core/src/worker/workflow/run_cache.rs @@ -2,14 +2,17 @@ use crate::{ telemetry::metrics::workflow_type, worker::workflow::{ managed_run::{ManagedRun, RunUpdateAct}, - HistoryUpdate, LocalActivityRequestSink, PermittedWFT, RunBasics, + HistoryUpdate, LocalActivityRequestSink, PermittedWFT, RequestEvictMsg, RunBasics, }, MetricsContext, }; use lru::LruCache; use std::{num::NonZeroUsize, rc::Rc, sync::Arc}; use temporal_sdk_core_api::worker::WorkerConfig; -use temporal_sdk_core_protos::temporal::api::workflowservice::v1::get_system_info_response; +use temporal_sdk_core_protos::{ + coresdk::workflow_activation::remove_from_cache::EvictionReason, + temporal::api::workflowservice::v1::get_system_info_response, +}; pub(super) struct RunCache { worker_config: Arc, @@ -84,7 +87,18 @@ impl RunCache { pub(super) fn remove(&mut self, k: &str) -> Option { let r = self.runs.pop(k); self.metrics.cache_size(self.len() as u64); - self.metrics.cache_eviction(); + if let Some(rh) = &r { + // A workflow completing normally doesn't count as a forced eviction. + if !matches!( + rh.trying_to_evict(), + Some(RequestEvictMsg { + reason: EvictionReason::WorkflowExecutionEnding, + .. + }) + ) { + self.metrics.forced_cache_eviction(); + } + } r } diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index df556a5a6..a1babfdfb 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -505,7 +505,7 @@ impl WFStream { let num_existing_evictions = self .runs .runs_lru_order() - .filter(|(_, h)| h.is_trying_to_evict()) + .filter(|(_, h)| h.trying_to_evict().is_some()) .count(); let mut num_evicts_needed = num_in_buff.saturating_sub(num_existing_evictions); for (rid, handle) in self.runs.runs_lru_order() { @@ -559,7 +559,7 @@ impl WFStream { if let Some(r) = self.runs.peek(run_id) { info!(run_id, wft=?r.wft(), activation=?r.activation(), buffered_wft=r.has_buffered_wft(), - trying_to_evict=r.is_trying_to_evict(), more_work=r.more_pending_work()); + trying_to_evict=r.trying_to_evict().is_some(), more_work=r.more_pending_work()); } else { info!(run_id, "Run not found"); } diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index 16477d193..c4f11c8c8 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -785,3 +785,33 @@ async fn activity_metrics() { workflow_type=\"{wf_name}\"}} 1" ))); } + +#[tokio::test] +async fn evict_on_complete_does_not_count_as_forced_eviction() { + let (telemopts, addr, _aborter) = prom_metrics(None); + let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let wf_name = "evict_on_complete_does_not_count_as_forced_eviction"; + let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); + starter.worker_config.no_remote_activities(true); + let mut worker = starter.worker().await; + + worker.register_wf( + wf_name.to_string(), + |_: WfContext| async move { Ok(().into()) }, + ); + + worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); + + let body = get_text(format!("http://{addr}/metrics")).await; + // Metric shouldn't show up at all, since it's zero the whole time. + assert!(!body.contains("temporal_sticky_cache_total_forced_eviction")); +}