Skip to content

Commit

Permalink
Feat: support adaptive flush strategy again (#482)
Browse files Browse the repository at this point in the history
* Feat: support adaptive flush strategy again

* fix: clippy

* fix: clippy
  • Loading branch information
astuyve authored Nov 26, 2024
1 parent c57fe70 commit b72d570
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 13 deletions.
6 changes: 3 additions & 3 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

use bottlecap::{
base_url,
config::{self, AwsConfig, Config},
config::{self, flush_strategy::FlushStrategy, AwsConfig, Config},
event_bus::bus::EventBus,
events::Event,
lifecycle::{
Expand Down Expand Up @@ -356,7 +356,7 @@ async fn extension_loop_active(
let telemetry_listener_cancel_token =
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;

let flush_control = FlushControl::new(config.serverless_flush_strategy);
let mut flush_control = FlushControl::new(config.serverless_flush_strategy);
let mut shutdown = false;

let mut flush_interval = flush_control.get_flush_interval();
Expand Down Expand Up @@ -501,7 +501,7 @@ async fn extension_loop_active(
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
);
if !flush_control.should_flush_end() {
if matches!(flush_control.flush_strategy, FlushStrategy::Periodically(_)) {
break;
}
}
Expand Down
83 changes: 73 additions & 10 deletions bottlecap/src/lifecycle/flush_control.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use crate::config::flush_strategy::FlushStrategy;
use std::time;
use tokio::time::Interval;
use tracing::debug;

use crate::lifecycle::invocation_times::InvocationTimes;

const DEFAULT_FLUSH_INTERVAL: u64 = 1000; // 1s
const TWENTY_SECONDS: u64 = 20 * 1000;

#[derive(Clone, Copy, Debug, PartialEq)]
pub struct FlushControl {
flush_strategy: FlushStrategy,
pub last_flush: u64,
pub flush_strategy: FlushStrategy,
invocation_times: InvocationTimes,
}

// 1. Default Strategy
Expand All @@ -17,12 +24,42 @@ pub struct FlushControl {
impl FlushControl {
#[must_use]
pub fn new(flush_strategy: FlushStrategy) -> FlushControl {
FlushControl { flush_strategy }
FlushControl {
flush_strategy,
last_flush: 0,
invocation_times: InvocationTimes::new(),
}
}

#[must_use]
pub fn should_flush_end(&self) -> bool {
!matches!(&self.flush_strategy, FlushStrategy::Periodically(_))
pub fn should_flush_end(&mut self) -> bool {
// previously: would return true if flush_strategy is not Periodically
// !matches!(self.flush_strategy, FlushStrategy::Periodically(_))
let now = match time::SystemTime::now().duration_since(time::UNIX_EPOCH) {
Ok(now) => now.as_secs(),
Err(e) => {
debug!("Failed to get current time: {:?}", e);
return false;
}
};
self.invocation_times.add(now);
match &self.flush_strategy {
FlushStrategy::End | FlushStrategy::EndPeriodically(_) => true,
FlushStrategy::Periodically(_) => false,
FlushStrategy::Default => {
if self.invocation_times.should_adapt_to_periodic(now) {
let should_periodic_flush = self.should_periodic_flush(now, TWENTY_SECONDS);
debug!(
"Adapting over to periodic flush strategy. should_periodic_flush: {}",
should_periodic_flush
);
return should_periodic_flush;
}
debug!("Not enough invocations to adapt to periodic flush, flushing at the end of the invocation");
self.last_flush = now;
true
}
}
}

#[must_use]
Expand All @@ -37,6 +74,16 @@ impl FlushControl {
FlushStrategy::End => tokio::time::interval(tokio::time::Duration::MAX),
}
}

// Only used for default strategy
fn should_periodic_flush(&mut self, now: u64, interval: u64) -> bool {
if now - self.last_flush > (interval / 1000) {
self.last_flush = now;
true
} else {
false
}
}
}

#[cfg(test)]
Expand All @@ -46,23 +93,39 @@ mod tests {

#[test]
fn should_flush_end() {
let flush_control = FlushControl::new(FlushStrategy::Default);
let mut flush_control = FlushControl::new(FlushStrategy::Default);
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy {
interval: 1,
}));
let mut flush_control =
FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy {
interval: 1,
}));
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::End);
let mut flush_control = FlushControl::new(FlushStrategy::End);
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
let mut flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
interval: 1,
}));
assert!(!flush_control.should_flush_end());
}

#[test]
fn should_flush_default_end() {
let mut flush_control = super::FlushControl::new(FlushStrategy::Default);
assert!(flush_control.should_flush_end());
}
#[test]
fn should_flush_default_periodic() {
const LOOKBACK_COUNT: usize = 20;
let mut flush_control = super::FlushControl::new(FlushStrategy::Default);
for _ in 0..LOOKBACK_COUNT - 1 {
assert!(flush_control.should_flush_end());
}
assert!(!flush_control.should_flush_end());
}

#[tokio::test]
async fn get_flush_interval() {
let flush_control = FlushControl::new(FlushStrategy::Default);
Expand Down
89 changes: 89 additions & 0 deletions bottlecap/src/lifecycle/invocation_times.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
const LOOKBACK_COUNT: usize = 20;
const ONE_TWENTY_SECONDS: f64 = 120.0;

#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) struct InvocationTimes {
times: [u64; LOOKBACK_COUNT],
head: usize,
}

impl InvocationTimes {
pub(crate) fn new() -> InvocationTimes {
InvocationTimes {
times: [0; LOOKBACK_COUNT],
head: 0,
}
}

pub(crate) fn add(&mut self, timestamp: u64) {
self.times[self.head] = timestamp;
self.head = (self.head + 1) % LOOKBACK_COUNT;
}

pub(crate) fn should_adapt_to_periodic(&self, now: u64) -> bool {
let mut count = 0;
let mut last = 0;
for time in &self.times {
if *time != 0 {
count += 1;
last = *time;
}
}
// If we haven't seen enough invocations, we should flush
if count < LOOKBACK_COUNT {
return false;
}
let elapsed = now - last;
(elapsed as f64 / (count - 1) as f64) < ONE_TWENTY_SECONDS
}
}

#[cfg(test)]
mod tests {
use crate::lifecycle::invocation_times;

#[test]
fn new() {
let invocation_times = invocation_times::InvocationTimes::new();
assert_eq!(
invocation_times.times,
[0; invocation_times::LOOKBACK_COUNT]
);
assert_eq!(invocation_times.head, 0);
}

#[test]
fn insertion() {
let mut invocation_times = invocation_times::InvocationTimes::new();
let timestamp = 1;
invocation_times.add(timestamp);
assert_eq!(invocation_times.times[0], timestamp);
assert_eq!(invocation_times.head, 1);
assert!(!invocation_times.should_adapt_to_periodic(1));
}

#[test]
fn insertion_with_full_buffer_fast_invokes() {
let mut invocation_times = invocation_times::InvocationTimes::new();
for i in 0..=invocation_times::LOOKBACK_COUNT {
invocation_times.add(i as u64);
}
// should wrap around
assert_eq!(invocation_times.times[0], 20);
assert_eq!(invocation_times.head, 1);
assert!(invocation_times.should_adapt_to_periodic(21));
}

#[test]
fn insertion_with_full_buffer_slow_invokes() {
let mut invocation_times = invocation_times::InvocationTimes::new();
invocation_times.add(1_u64);
for i in 0..invocation_times::LOOKBACK_COUNT {
invocation_times.add((i + 5000) as u64);
}
// should wrap around
assert_eq!(invocation_times.times[0], 5019);
assert_eq!(invocation_times.head, 1);
assert!(!invocation_times.should_adapt_to_periodic(10000));
}
}
1 change: 1 addition & 0 deletions bottlecap/src/lifecycle/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod flush_control;
pub mod invocation;
pub mod invocation_times;
pub mod listener;

0 comments on commit b72d570

Please sign in to comment.