Skip to content
This repository has been archived by the owner on Mar 27, 2021. It is now read-only.

Commit

Permalink
Add a simple group combiners and additional IT tests. (#744)
Browse files Browse the repository at this point in the history
* Add distribution query integration test. (Part 2 of 2)

* Addresses PR comments

* Addresses PR comments

* Add a new group combiner that handles tdigest query without aggregation

* Addresses pr comments and add boilerplate methods to metricollection to handle tdigestPoint creation
  • Loading branch information
ao2017 authored Jan 20, 2021
1 parent e18fc9f commit a055e34
Show file tree
Hide file tree
Showing 18 changed files with 560 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ data class TdigestInstance (
}

override fun build(bucket: TDigestBucket): Metric {
if ( bucket.value().size() == 0L ) return Metric.invalid
else return TdigestPoint.create(bucket.value(), bucket.timestamp)
if ( bucket.value().size() == 0L ) {
return Metric.invalid
}
return TdigestPoint.create(bucket.value(), bucket.timestamp)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public static AtomicReference<TDigest> buildAtomicReference() {
return new AtomicReference<>(tDigest);
}

public static TDigest inital() {
return MergingDigest.createDigest(TDIGEST_COMPRESSION_LEVEL);
}

public static UnaryOperator<TDigest> getOp(final ByteBuffer serializedTDigest) {
TDigest input = MergingDigest.fromBytes(serializedTDigest);
return t -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import com.spotify.heroic.aggregation.TDigestBucket
import com.spotify.heroic.metric.DistributionPoint
import com.spotify.heroic.metric.HeroicDistribution
import com.spotify.heroic.metric.TdigestPoint
import com.tdunning.math.stats.MergingDigest
import com.tdunning.math.stats.TDigest;
import java.util.concurrent.atomic.AtomicReference


/**
*
Expand All @@ -36,20 +37,36 @@ import java.util.concurrent.atomic.AtomicReference
*
*/
data class TdigestMergingBucket(override val timestamp: Long) : AbstractBucket(), TDigestBucket {
private val datasketch : AtomicReference<TDigest> = TdigestInstanceUtils.buildAtomicReference()

private val datasketch : TDigest = TdigestInstanceUtils.inital()

override fun updateDistributionPoint(key: Map<String, String>, sample : DistributionPoint) {
val heroicDistribution : HeroicDistribution = HeroicDistribution.create(sample.value().value)
val serializedDatasketch = heroicDistribution.toByteBuffer()
datasketch.getAndUpdate(TdigestInstanceUtils.getOp(serializedDatasketch))
val input: TDigest = MergingDigest.fromBytes(serializedDatasketch)
if ( input.size() > 0) {
update(input)
}
}

@Synchronized fun update(input : TDigest) {
//This is a temp fix to handle corrupted datapoint.
try {
datasketch.add(input)
}catch(ignore: Exception) {

}

}


override fun updateTDigestPoint(key: Map<String, String>, sample : TdigestPoint) {
datasketch.getAndUpdate(TdigestInstanceUtils.getOp(sample.value()))
val input: TDigest = sample.value()
if ( input.size() > 0) {
update(input)
}
}

override fun value(): TDigest {
return datasketch.get()
@Synchronized override fun value(): TDigest {
return datasketch
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.spotify.heroic.aggregation.simple;


import com.google.common.collect.ImmutableList;
import com.spotify.heroic.aggregation.DoubleBucket;
import com.spotify.heroic.aggregation.TDigestBucket;
import java.util.Collection;
import java.util.List;

public class TdigestBucketIntegrationTest extends ValueBucketIntegrationTest {


public TdigestBucketIntegrationTest() {
super(Double.NEGATIVE_INFINITY, null);
}

@Override
public Collection<DoubleBucket> buckets() {
return List.of();
}

@Override
public Collection<? extends TDigestBucket> tDigestBuckets(){
return ImmutableList.<TDigestBucket>of(new TdigestMergingBucket(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ public void testZeroValue(){
Assert.assertEquals(0,val.size());
}

@Test(expected = java.nio.BufferUnderflowException.class)
public void testNullValue(){
final TdigestMergingBucket b = new TdigestMergingBucket(timeStamp);
DistributionPoint dp = DistributionPoint.create(HeroicDistribution.create(ByteString.EMPTY),
System.currentTimeMillis());
b.updateDistributionPoint(TAGS, dp);
}


@Test
public void testCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

import com.google.common.collect.ImmutableMap;
import com.spotify.heroic.aggregation.DoubleBucket;
import com.spotify.heroic.aggregation.TDigestBucket;
import com.spotify.heroic.metric.DistributionPoint;
import com.spotify.heroic.metric.HeroicDistribution;
import com.spotify.heroic.metric.Point;
import com.tdunning.math.stats.MergingDigest;
import com.tdunning.math.stats.TDigest;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -19,6 +25,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.protobuf.ByteString;

public abstract class ValueBucketIntegrationTest {
private static final int NCPU = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -63,6 +70,62 @@ public void teardown() {

public abstract Collection<? extends DoubleBucket> buckets();

public Collection<? extends TDigestBucket> tDigestBuckets(){
return List.of();
}


@Test(timeout = 10000)
public void testTDigestBucket() throws InterruptedException, ExecutionException {
final Random rnd = new Random();



for (final TDigestBucket bucket : tDigestBuckets()) {
final List<Future<Void>> futures = new ArrayList<>();

TDigest expected = TDigest.createDigest(100.0);

for (int iteration = 0; iteration < iterations; iteration++) {
final List<DistributionPoint> updates = new ArrayList<>();
for (int i = 0; i< 10; i++) {
int count = 5;
double [] data = new double[count];
while (count-- > 0) {
double sample = rnd.nextDouble();
data[count] = sample;
expected.add(sample);
}
DistributionPoint dp = DistributionPointUtils
.createDistributionPoint(data, System.currentTimeMillis());
updates.add(dp);
}

for (int thread = 0; thread < threadCount; thread++) {
futures.add(service.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
for (final DistributionPoint d : updates) {
bucket.updateDistributionPoint(tags, d);
}

return null;
}
}));
}

for (final Future<Void> f : futures) {
f.get();
}
}

assertEquals(bucket.getClass().getSimpleName(), expected.size()*threadCount,
bucket.value().size());

}
}


@Test(timeout = 10000)
public void testExpectedValue() throws InterruptedException, ExecutionException {
final Random rnd = new Random();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,11 @@
package com.spotify.heroic.aggregation;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.spotify.heroic.common.Series;
import com.spotify.heroic.metric.MetricCollection;
import com.spotify.heroic.metric.MetricType;
import com.spotify.heroic.metric.Point;
import com.spotify.heroic.metric.ShardedResultGroup;

import com.spotify.heroic.metric.TdigestPoint;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;


public interface AggregationCombiner {
ImmutableList<MetricType> TDIGEST_TYPE = ImmutableList.of(MetricType.TDIGEST_POINT,
MetricType.DISTRIBUTION_POINTS);

List<ShardedResultGroup> combine(List<List<ShardedResultGroup>> all);

Expand All @@ -63,41 +50,18 @@ public String toString() {
}
};

default void compute(final ImmutableList.Builder<ShardedResultGroup> groups,
final AggregationOutput out,
final long cadence) {
final List<TdigestPoint> metrics = out.getMetrics().getDataAs(TdigestPoint.class);
final Map<ComputeDistributionStat.Percentile, List<Point>> resMap = new HashMap<>();
for (TdigestPoint tdigestPoint : metrics) {
ComputeDistributionStat
.Percentile
.DEFAULT
.forEach(p -> compute(tdigestPoint, resMap, p));
}
for (Map.Entry<ComputeDistributionStat.Percentile,
List<Point>> entry : resMap.entrySet()) {
Set<Series> newSet = new HashSet<>();
out.getSeries().forEach(s -> updateMetadata(s, entry.getKey(), newSet));
groups.add(new ShardedResultGroup(ImmutableMap.of(), out.getKey(), newSet,
MetricCollection.points(entry.getValue()), cadence));
}
}

private void updateMetadata(final Series s,
final ComputeDistributionStat.Percentile percentile,
final Set<Series> newSet) {
Map<String, String> tags = new HashMap<>(s.getTags());
tags.put("tdigeststat", percentile.getName());
Series newSeries = Series.of(s.getKey(), tags, s.getResource());
newSet.add(newSeries);
}
AggregationCombiner TDIGEST_DEFAULT = new AggregationCombiner() {
@Override
public List<ShardedResultGroup> combine(
final List<List<ShardedResultGroup>> all
) {
return TDigestAggregationCombiner.simpleCombine(all);
}

private void compute(final TdigestPoint tdigestPoint,
final Map<ComputeDistributionStat.Percentile, List<Point>> resMap,
final ComputeDistributionStat.Percentile percentile) {
Point point = ComputeDistributionStat.computePercentile(tdigestPoint, percentile);
List<Point> points = resMap.getOrDefault(percentile, new ArrayList<>());
points.add(point);
resMap.put(percentile, points);
}
@Override
public String toString() {
return "TDIGEST_DEFAULT";
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,9 @@ public List<ShardedResultGroup> combine(
final AggregationResult result = session.result();

for (final AggregationOutput out : result.getResult()) {
if (TDIGEST_TYPE.contains(out.getMetrics().getType())) {
compute(groups, out, cadence);
} else {
groups.add(new ShardedResultGroup(ImmutableMap.of(), out.getKey(), out.getSeries(),
out.getMetrics(), cadence));
}
groups.add(new ShardedResultGroup(ImmutableMap.of(), out.getKey(), out.getSeries(),
out.getMetrics(), cadence));
}

return groups.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ public AggregationResult result() {
MetricCollection::distributionPoints));
}

if (!sub.tDigestPoints.isEmpty()) {
groups.add(collectGroup(group, sub.tDigestPoints,
MetricCollection::tdigestPoints));
}

if (!sub.spreads.isEmpty()) {
groups.add(collectGroup(group, sub.spreads, MetricCollection::spreads));
}
Expand Down
Loading

0 comments on commit a055e34

Please sign in to comment.