Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] trim_left investigation #67

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 298 additions & 19 deletions lib/ddsketch-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,20 @@ pub struct DDSketch {
}

impl DDSketch {
fn with_config(config: Config) -> Self {
let initial_bins = cmp::min(INITIAL_BINS, config.bin_limit) as usize;

Self {
config,
bins: Vec::with_capacity(initial_bins),
count: 0,
min: f64::MAX,
max: f64::MIN,
sum: 0.0,
avg: 0.0,
}
}

#[cfg(test)]
fn bin_count(&self) -> usize {
self.bins.len()
Expand Down Expand Up @@ -434,16 +448,31 @@ impl DDSketch {

let key = self.config.key(v);

// Fast path for adding to an existing bin.
for b in &mut self.bins {
if b.k == key && b.n < MAX_BIN_WIDTH {
b.n += 1;
return;
let mut insert_at = None;

for (bin_idx, b) in self.bins.iter_mut().enumerate() {
if b.k == key {
if b.n < MAX_BIN_WIDTH {
// Fast path for adding to an existing bin without overflow.
b.n += 1;
return;
} else {
insert_at = Some(bin_idx);
break;
}
}
if b.k > key {
insert_at = Some(bin_idx);
break;
}
}

// Slow path could be also optimized.
self.insert_keys(vec![key]);
if let Some(bin_idx) = insert_at {
self.bins.insert(bin_idx, Bin { k: key, n: 1 });
} else {
self.bins.push(Bin { k: key, n: 1 });
}
trim_left(&mut self.bins, self.config.bin_limit);
}

/// Inserts many values into the sketch.
Expand Down Expand Up @@ -709,17 +738,8 @@ impl PartialEq for DDSketch {
impl Default for DDSketch {
fn default() -> Self {
let config = Config::default();
let initial_bins = cmp::min(INITIAL_BINS, config.bin_limit) as usize;

Self {
config,
bins: Vec::with_capacity(initial_bins),
count: 0,
min: f64::MAX,
max: f64::MIN,
sum: 0.0,
avg: 0.0,
}
Self::with_config(config)
}
}

Expand Down Expand Up @@ -821,6 +841,8 @@ mod tests {
use rand::thread_rng;
use rand_distr::{Distribution, Pareto};

use crate::AGENT_DEFAULT_MIN_VALUE;

use super::{Bucket, Config, DDSketch, AGENT_DEFAULT_EPS, MAX_KEY};

const FLOATING_POINT_ACCEPTABLE_ERROR: f64 = 1.0e-10;
Expand Down Expand Up @@ -1096,11 +1118,12 @@ mod tests {
test_relative_accuracy(config, AGENT_DEFAULT_EPS, min_value, max_value)
}

fn parse_sketch_from_string_bins(layout: &str) -> DDSketch {
fn parse_sketch_from_string_bins_with_custom_blank_ddsketch(blank_sketch: DDSketch, layout: &str) -> DDSketch {
layout
.split(' ')
.filter(|v| !v.is_empty())
.map(|pair| pair.split(':').map(ToOwned::to_owned).collect::<Vec<_>>())
.fold(DDSketch::default(), |mut sketch, mut kn| {
.fold(blank_sketch, |mut sketch, mut kn| {
let k = kn.remove(0).parse::<i16>().unwrap();
let n = kn.remove(0).parse::<u16>().unwrap();

Expand All @@ -1109,6 +1132,17 @@ mod tests {
})
}

fn parse_sketch_from_string_bins(layout: &str) -> DDSketch {
parse_sketch_from_string_bins_with_custom_blank_ddsketch(DDSketch::default(), layout)
}

fn parse_sketch_from_string_bins_with_bin_limit(layout: &str, bin_limit: u16) -> DDSketch {
let config = Config::new(AGENT_DEFAULT_EPS, AGENT_DEFAULT_MIN_VALUE, bin_limit);
let sketch = DDSketch::with_config(config);

parse_sketch_from_string_bins_with_custom_blank_ddsketch(sketch, layout)
}

fn compare_sketches(actual: &DDSketch, expected: &DDSketch, allowed_err: f64) {
let actual_sum = actual.sum().unwrap();
let expected_sum = expected.sum().unwrap();
Expand All @@ -1124,6 +1158,251 @@ mod tests {
assert_eq!(actual.bins(), expected.bins());
}

#[test]
fn test_sketch_trimleft() {
/// values to insert into a sketch
#[allow(dead_code)]
enum Value {
Float(f64),
Vec(Vec<f64>),
NFloats(u32, f64),
}
/// ways to insert values into a sketch
#[derive(Debug)]
enum InsertFn {
Insert,
InsertMany,
InsertN,
}
struct Case {
description: &'static str,
start: &'static str,
insert: Value,
expected: &'static str,
max_bins: u16,
}

let cases = &[
Case {
description: "baseline: inserting from empty up to bin limit",
start: "",
insert: Value::Vec(vec![0.0, 0.5, 1.0, 1.5]),
expected: "0:1 1293:1 1338:1 1364:1",
max_bins: 4,
},
Case {
description: "inserting from empty to over bin limit",
start: "",
insert: Value::Vec(vec![0.0, 0.5, 1.0, 1.5]),
expected: "1293:2 1338:1 1364:1",
max_bins: 3,
},
Case {
description: "inserting from empty to well over bin limit",
start: "",
insert: Value::Vec(vec![0.0, 0.5, 1.0, 1.5, 0.0, 0.0]),
expected: "1293:4 1338:1 1364:1",
max_bins: 3,
},
Case {
description: "inserting from empty to over bin limit with overflow",
start: "",
insert: Value::NFloats(65535 * 5, 0.0),
// longstanding trimLeft bug
expected: "0:65535 0:65535 0:65535 0:65535 0:65535",
// actual expected: "0:65535 0:65535 0:65535"
max_bins: 3,
},
Case {
description: "inserting early bin over the bin limit",
start: "0:65535 0:65535 1338:65535",
insert: Value::Float(0.0),
// longstanding trimLeft bug
expected: "0:1 0:65535 0:65535 1338:65535",
// actual expected: "0:65535 0:65535 1338:65535"
max_bins: 3,
},
Case {
description: "inserting last bin over the bin limit",
start: "0:65535 0:65535 1338:65535",
insert: Value::Float(1.0),
// This is a bug. I'm not sure what ought to happen here. Need to review the DDSketch paper.
expected: "0:65535 0:65535 1338:1 1338:65535",
// actual expected: something like "1338:65535 1338:65535 1338:65535" ?
max_bins: 3,
},
];

for case in cases {
for insert_fn in &[InsertFn::Insert, InsertFn::InsertMany, InsertFn::InsertN] {
let mut sketch = parse_sketch_from_string_bins_with_bin_limit(case.start, case.max_bins);

match insert_fn {
InsertFn::Insert => match &case.insert {
Value::Float(v) => sketch.insert(*v),
Value::Vec(vs) => {
for v in vs {
sketch.insert(*v);
}
}
Value::NFloats(n, v) => {
for _ in 0..*n {
sketch.insert(*v);
}
}
},
InsertFn::InsertMany => match &case.insert {
Value::Float(v) => sketch.insert_many(&[*v]),
Value::Vec(vs) => sketch.insert_many(vs),
Value::NFloats(n, v) => {
for _ in 0..*n {
sketch.insert_many(&[*v]);
}
}
},
InsertFn::InsertN => match &case.insert {
Value::Float(v) => sketch.insert_n(*v, 1),
Value::Vec(vs) => {
for v in vs {
sketch.insert_n(*v, 1);
}
}
Value::NFloats(n, v) => sketch.insert_n(*v, *n),
},
}

let expected = parse_sketch_from_string_bins_with_bin_limit(case.expected, case.max_bins);
assert_eq!(expected.bins(), sketch.bins(), "{:?}: {}", insert_fn, case.description);
}
}
}

#[test]
fn test_sketch_insert_and_overflow() {
/// values to insert into a sketch
#[allow(dead_code)]
enum Value {
Float(f64),
Vec(Vec<f64>),
NFloats(u32, f64),
}
/// ways to insert values into a sketch
#[derive(Debug)]
enum InsertFn {
Insert,
InsertMany,
InsertN,
}
struct Case {
description: &'static str,
start: &'static str,
insert: Value,
expected: &'static str,
}

let cases = &[
Case {
description: "baseline: inserting into an empty sketch",
start: "",
insert: Value::Float(0.0),
expected: "0:1",
},
Case {
description: "inserting a value into an existing bin",
start: "0:1",
insert: Value::Float(0.0),
expected: "0:2",
},
Case {
description: "inserting a value into a new bin at the start",
start: "1338:1",
insert: Value::Float(0.0),
expected: "0:1 1338:1",
},
Case {
description: "inserting a value into a new bin in the middle",
start: "0:1 1338:1",
insert: Value::Float(0.5),
expected: "0:1 1293:1 1338:1",
},
Case {
description: "inserting a value into a new bin at the end",
start: "0:1",
insert: Value::Float(1.0),
expected: "0:1 1338:1",
},
Case {
description: "inserting a value into an existing bin and filling it",
start: "0:65534",
insert: Value::Float(0.0),
expected: "0:65535",
},
Case {
description: "inserting a value into an existing bin and causing an overflow",
start: "0:65535",
insert: Value::Float(0.0),
expected: "0:1 0:65535",
},
Case {
description: "inserting many values into a sketch and causing an overflow",
start: "0:100",
insert: Value::NFloats(65535, 0.0),
expected: "0:100 0:65535",
},
Case {
description: "inserting a value into a new bin in the middle and causing an overflow",
start: "0:1 1338:1",
insert: Value::NFloats(65536, 0.5),
expected: "0:1 1293:1 1293:65535 1338:1",
},
];

for case in cases {
for insert_fn in &[InsertFn::Insert, InsertFn::InsertMany, InsertFn::InsertN] {
// Insert each value every way possible.

let mut sketch = parse_sketch_from_string_bins(case.start);

match insert_fn {
InsertFn::Insert => match &case.insert {
Value::Float(v) => sketch.insert(*v),
Value::Vec(vs) => {
for v in vs {
sketch.insert(*v);
}
}
Value::NFloats(n, v) => {
for _ in 0..*n {
sketch.insert(*v);
}
}
},
InsertFn::InsertMany => match &case.insert {
Value::Float(v) => sketch.insert_many(&[*v]),
Value::Vec(vs) => sketch.insert_many(vs),
Value::NFloats(n, v) => {
for _ in 0..*n {
sketch.insert_many(&[*v]);
}
}
},
InsertFn::InsertN => match &case.insert {
Value::Float(v) => sketch.insert_n(*v, 1),
Value::Vec(vs) => {
for v in vs {
sketch.insert_n(*v, 1);
}
}
Value::NFloats(n, v) => sketch.insert_n(*v, *n),
},
}

let expected = parse_sketch_from_string_bins(case.expected);
assert_eq!(expected.bins(), sketch.bins(), "{:?}: {}", insert_fn, case.description);
}
}
}

#[test]
fn test_histogram_interpolation_agent_similarity() {
#[derive(Clone)]
Expand Down
Loading