Skip to content

Commit

Permalink
feat(bottlecap): generate file descriptor and threads enhanced metrics (
Browse files Browse the repository at this point in the history
#453)

* add fd and threads enhanced metrics

* clippy fixes

* fixes

* rename var
  • Loading branch information
shreyamalpani authored Nov 15, 2024
1 parent 56306a5 commit 72181b1
Show file tree
Hide file tree
Showing 25 changed files with 566 additions and 20 deletions.
2 changes: 2 additions & 0 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,14 @@ mod tests {

let uptime_offset = Some(50.0);
let (tmp_chan_tx, _) = watch::channel(());
let (process_chan_tx, _) = watch::channel(());

let enhanced_metric_data = Some(EnhancedMetricData {
network_offset,
cpu_offset,
uptime_offset,
tmp_chan_tx,
process_chan_tx,
});

buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone());
Expand Down
8 changes: 8 additions & 0 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,17 @@ impl Processor {
let (tmp_chan_tx, tmp_chan_rx) = watch::channel(());
self.enhanced_metrics.set_tmp_enhanced_metrics(tmp_chan_rx);

// Start a channel for monitoring file descriptor and thread count
let (process_chan_tx, process_chan_rx) = watch::channel(());
self.enhanced_metrics
.set_process_enhanced_metrics(process_chan_rx);

let enhanced_metric_offsets = Some(EnhancedMetricData {
network_offset,
cpu_offset,
uptime_offset,
tmp_chan_tx,
process_chan_tx,
});
self.context_buffer
.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
Expand Down Expand Up @@ -196,6 +202,8 @@ impl Processor {
);
// Send the signal to stop monitoring tmp
_ = offsets.tmp_chan_tx.send(());
// Send the signal to stop monitoring file descriptors and threads
_ = offsets.process_chan_tx.send(());
}
}

Expand Down
4 changes: 4 additions & 0 deletions bottlecap/src/metrics/enhanced/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,9 @@ pub const CPU_MIN_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_min_utiliz
pub const TMP_MAX_METRIC: &str = "aws.lambda.enhanced.tmp_max";
pub const TMP_USED_METRIC: &str = "aws.lambda.enhanced.tmp_used";
pub const TMP_FREE_METRIC: &str = "aws.lambda.enhanced.tmp_free";
pub const FD_MAX_METRIC: &str = "aws.lambda.enhanced.fd_max";
pub const FD_USE_METRIC: &str = "aws.lambda.enhanced.fd_use";
pub const THREADS_MAX_METRIC: &str = "aws.lambda.enhanced.threads_max";
pub const THREADS_USE_METRIC: &str = "aws.lambda.enhanced.threads_use";
//pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations";
pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS";
202 changes: 202 additions & 0 deletions bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,115 @@ impl Lambda {
});
}

pub fn generate_fd_enhanced_metrics(
fd_max: f64,
fd_use: f64,
aggr: &mut std::sync::MutexGuard<Aggregator>,
) {
let metric = Metric::new(
constants::FD_MAX_METRIC.into(),
MetricValue::distribution(fd_max),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert fd_max metric: {}", e);
}

// Check if fd_use value is valid before inserting metric
if fd_use > 0.0 {
let metric = Metric::new(
constants::FD_USE_METRIC.into(),
MetricValue::distribution(fd_use),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert fd_use metric: {}", e);
}
}
}

pub fn generate_threads_enhanced_metrics(
threads_max: f64,
threads_use: f64,
aggr: &mut std::sync::MutexGuard<Aggregator>,
) {
let metric = Metric::new(
constants::THREADS_MAX_METRIC.into(),
MetricValue::distribution(threads_max),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert threads_max metric: {}", e);
}

// Check if threads_use value is valid before inserting metric
if threads_use > 0.0 {
let metric = Metric::new(
constants::THREADS_USE_METRIC.into(),
MetricValue::distribution(threads_use),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert threads_use metric: {}", e);
}
}
}

pub fn set_process_enhanced_metrics(&self, mut send_metrics: Receiver<()>) {
if !self.config.enhanced_metrics {
return;
}

let aggr = Arc::clone(&self.aggregator);

tokio::spawn(async move {
// get list of all process ids
let pids = proc::get_pid_list();

// Set fd_max and initial value for fd_use to -1
let fd_max = proc::get_fd_max_data(&pids);
let mut fd_use = -1_f64;

// Set threads_max and initial value for threads_use to -1
let threads_max = proc::get_threads_max_data(&pids);
let mut threads_use = -1_f64;

let mut interval = interval(Duration::from_millis(1));
loop {
tokio::select! {
biased;
// When the stop signal is received, generate final metrics
_ = send_metrics.changed() => {
let mut aggr: std::sync::MutexGuard<Aggregator> =
aggr.lock().expect("lock poisoned");
Self::generate_fd_enhanced_metrics(fd_max, fd_use, &mut aggr);
Self::generate_threads_enhanced_metrics(threads_max, threads_use, &mut aggr);
return;
}
// Otherwise keep monitoring file descriptor and thread usage periodically
_ = interval.tick() => {
match proc::get_fd_use_data(&pids) {
Ok(fd_use_curr) => {
fd_use = fd_use.max(fd_use_curr);
},
Err(_) => {
debug!("Could not update file descriptor use enhanced metric.");
}
};
match proc::get_threads_use_data(&pids) {
Ok(threads_use_curr) => {
threads_use = threads_use.max(threads_use_curr);
},
Err(_) => {
debug!("Could not update threads use enhanced metric.");
}
};
}
}
}
});
}

fn calculate_estimated_cost_usd(billed_duration_ms: u64, memory_size_mb: u64) -> f64 {
let gb_seconds = (billed_duration_ms as f64 * constants::MS_TO_SEC)
* (memory_size_mb as f64 / constants::MB_TO_GB);
Expand Down Expand Up @@ -503,6 +612,7 @@ pub struct EnhancedMetricData {
pub cpu_offset: Option<CPUData>,
pub uptime_offset: Option<f64>,
pub tmp_chan_tx: Sender<()>,
pub process_chan_tx: Sender<()>,
}

impl PartialEq for EnhancedMetricData {
Expand Down Expand Up @@ -669,6 +779,18 @@ mod tests {
assert!(aggr
.get_entry_by_id(constants::TMP_FREE_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::FD_MAX_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::FD_USE_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::THREADS_MAX_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None)
.is_none());
}

#[test]
Expand Down Expand Up @@ -818,4 +940,84 @@ mod tests {
assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12165120.0);
assert_sketch(&metrics_aggr, constants::TMP_FREE_METRIC, 538296320.0);
}

#[test]
fn test_set_fd_enhanced_metrics_valid_fd_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let fd_max = 1024.0;
let fd_use = 175.0;

Lambda::generate_fd_enhanced_metrics(
fd_max,
fd_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0);
assert_sketch(&metrics_aggr, constants::FD_USE_METRIC, 175.0);
}

#[test]
fn test_set_fd_enhanced_metrics_invalid_fd_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let fd_max = 1024.0;
let fd_use = -1.0;

Lambda::generate_fd_enhanced_metrics(
fd_max,
fd_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0);

let aggr = lambda.aggregator.lock().expect("lock poisoned");
assert!(aggr
.get_entry_by_id(constants::FD_USE_METRIC.into(), &None)
.is_none());
}

#[test]
fn test_set_threads_enhanced_metrics_valid_threads_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let threads_max = 1024.0;
let threads_use = 40.0;

Lambda::generate_threads_enhanced_metrics(
threads_max,
threads_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0);
assert_sketch(&metrics_aggr, constants::THREADS_USE_METRIC, 40.0);
}

#[test]
fn test_set_threads_enhanced_metrics_invalid_threads_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let threads_max = 1024.0;
let threads_use = -1.0;

Lambda::generate_threads_enhanced_metrics(
threads_max,
threads_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0);

let aggr = lambda.aggregator.lock().expect("lock poisoned");
assert!(aggr
.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None)
.is_none());
}
}
3 changes: 3 additions & 0 deletions bottlecap/src/proc/constants.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub const PROC_NET_DEV_PATH: &str = "/proc/net/dev";
pub const PROC_STAT_PATH: &str = "/proc/stat";
pub const PROC_UPTIME_PATH: &str = "/proc/uptime";
pub const PROC_PATH: &str = "/proc";

pub const LAMDBA_NETWORK_INTERFACE: &str = "vinternal_1";
pub const LAMBDA_FILE_DESCRIPTORS_DEFAULT_LIMIT: f64 = 1024.0;
pub const LAMBDA_EXECUTION_PROCESSES_DEFAULT_LIMIT: f64 = 1024.0;
Loading

0 comments on commit 72181b1

Please sign in to comment.