Skip to content

Commit

Permalink
Metrics bugfix (#1266)
Browse files Browse the repository at this point in the history
  • Loading branch information
damirabdul authored Mar 9, 2023
1 parent 113bcaf commit e9b39d3
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import reactor.core.publisher.Flux;

public interface MetricPointRepository {
Flux<MetricPointPojo> deletePointsWithLessTime(final List<MetricPointPojo> newPoints);
Flux<MetricPointPojo> getPointsBySeriesId(final List<Integer> seriesIds);

Flux<MetricPointPojo> deletePoints(final List<MetricPointPojo> pointsToDelete);

Flux<MetricPointPojo> createOrUpdatePoints(final List<MetricPointPojo> metricPoints);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ public class MetricPointRepositoryImpl implements MetricPointRepository {
private final JooqReactiveOperations jooqReactiveOperations;

@Override
public Flux<MetricPointPojo> deletePointsWithLessTime(final List<MetricPointPojo> newPoints) {
return jooqReactiveOperations.executeInPartitionReturning(newPoints, points -> {
public Flux<MetricPointPojo> getPointsBySeriesId(final List<Integer> seriesIds) {
final var query = DSL.selectFrom(METRIC_POINT)
.where(METRIC_POINT.SERIES_ID.in(seriesIds));
return jooqReactiveOperations.flux(query)
.map(r -> r.into(MetricPointPojo.class));
}

@Override
public Flux<MetricPointPojo> deletePoints(final List<MetricPointPojo> pointsToDelete) {
return jooqReactiveOperations.executeInPartitionReturning(pointsToDelete, points -> {
final Condition condition = points.stream()
.map(p -> METRIC_POINT.SERIES_ID.eq(p.getSeriesId())
.and(METRIC_POINT.LABEL_VALUES_IDS.ne(p.getLabelValuesIds()))
.and(METRIC_POINT.TIMESTAMP.lessThan(p.getTimestamp())))
.and(METRIC_POINT.LABEL_VALUES_IDS.eq(p.getLabelValuesIds())))
.reduce(Condition::or)
.orElseThrow(() -> new RuntimeException("Can't build delete condition for points"));
return jooqReactiveOperations.flux(DSL.deleteFrom(METRIC_POINT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.opendatadiscovery.oddplatform.dto.ingestion.IngestionMetricLabelsDto;
import org.opendatadiscovery.oddplatform.dto.ingestion.IngestionMetricPointDto;
import org.opendatadiscovery.oddplatform.dto.ingestion.IngestionMetricsRequest;
import org.opendatadiscovery.oddplatform.dto.metric.MetricLabelValueDto;
import org.opendatadiscovery.oddplatform.dto.metric.MetricSeriesDto;
import org.opendatadiscovery.oddplatform.dto.metric.MetricSeriesValueType;
import org.opendatadiscovery.oddplatform.dto.metric.SystemMetricLabel;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.Label;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.Metric;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.MetricFamily;
Expand Down Expand Up @@ -51,10 +54,12 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.function.TupleUtils;
import reactor.util.function.Tuples;

import static java.util.function.Function.identity;
import static org.opendatadiscovery.oddplatform.dto.metric.MetricSeriesValueType.BUCKET;
import static org.opendatadiscovery.oddplatform.dto.metric.SystemMetricLabel.BUCKET_UPPER_BOUND;
import static org.opendatadiscovery.oddplatform.dto.metric.SystemMetricLabel.QUANTILE;
import static org.opendatadiscovery.oddplatform.utils.MetricUtils.buildMetricFamilyKey;
import static reactor.function.TupleUtils.function;

Expand Down Expand Up @@ -125,7 +130,8 @@ private IngestionMetricsRequest buildIngestionMetricsRequest(final MetricSetList
if (metricFamily.getType() == MetricType.SUMMARY) {
validateQuantileValues(metricPoint.getSummaryValue().getQuantile());
metricPoint.getSummaryValue().getQuantile()
.forEach(q -> labels.put(QUANTILE.getLabelName(), q.getQuantile().toString()));
.forEach(
q -> labels.put(SystemMetricLabel.QUANTILE.getLabelName(), q.getQuantile().toString()));
}
final IngestionMetricPointDto pointDto =
new IngestionMetricPointDto(metricSet.getOddrn(),
Expand Down Expand Up @@ -174,19 +180,59 @@ private Mono<Void> createMetricSeriesAndMetricPoints(final List<IngestionMetricP
return metricSeriesRepository.createOrUpdateMetricSeries(metricSeriesPojos)
.collectList()
.flatMap(series -> {
final List<MetricPointPojo> metricPointPojos = metricSeriesList.stream()
final List<Integer> bucketAndQuantileSeries = getBucketAndQuantileSeries(series);
final List<MetricPointPojo> pointsToIngest = metricSeriesList.stream()
.flatMap(dto -> {
final Integer createdSeriesId = getCreatedSeriesId(series, dto.series());
return dto.points().stream()
.peek(point -> point.setSeriesId(createdSeriesId));
}).toList();
return metricPointRepository.deletePointsWithLessTime(metricPointPojos)
.then(metricPointRepository.createOrUpdatePoints(metricPointPojos)
.collectList());
return metricPointRepository.getPointsBySeriesId(bucketAndQuantileSeries)
.collectList()
.flatMap(bucketQuantilePoints -> {
final Set<Integer> labelValueIds = bucketQuantilePoints.stream()
.flatMap(p -> Arrays.stream(p.getLabelValuesIds()))
.collect(Collectors.toSet());
return metricLabelValueRepository.getDtoByIds(labelValueIds)
.map(labelDtos -> {
final Set<Integer> systemLabelValues = getSystemLabelValues(labelDtos);
return Tuples.of(bucketQuantilePoints, systemLabelValues);
});
}).flatMap(TupleUtils.function((bucketQuantilePoints, systemLabelValues) -> {
final List<MetricPointPojo> pointsToDelete = bucketQuantilePoints.stream()
.filter(p -> needToDeletePoint(p, pointsToIngest, systemLabelValues))
.toList();
return metricPointRepository.deletePoints(pointsToDelete)
.then(metricPointRepository.createOrUpdatePoints(pointsToIngest).collectList());
}));
})
.then();
}

private boolean needToDeletePoint(final MetricPointPojo pointPojo,
final List<MetricPointPojo> pointsToIngest,
final Set<Integer> systemLabelIds) {
final List<Integer> existingLabelValues = new ArrayList<>(Arrays.asList(pointPojo.getLabelValuesIds())).stream()
.filter(id -> !systemLabelIds.contains(id))
.toList();
return pointsToIngest.stream()
.anyMatch(p -> {
final List<Integer> ingestedLabelValues = new ArrayList<>(Arrays.asList(p.getLabelValuesIds())).stream()
.filter(id -> !systemLabelIds.contains(id))
.toList();
return CollectionUtils.isEqualCollection(existingLabelValues, ingestedLabelValues)
&& pointPojo.getTimestamp().isBefore(p.getTimestamp());
});
}

private Set<Integer> getSystemLabelValues(final List<MetricLabelValueDto> labelValueDtos) {
return labelValueDtos.stream()
.filter(lv -> lv.label().getName().equals(BUCKET_UPPER_BOUND.getLabelName())
|| lv.label().getName().equals(SystemMetricLabel.QUANTILE.getLabelName()))
.map(lv -> lv.labelValue().getId())
.collect(Collectors.toSet());
}

private List<MetricSeriesDto> extractSeriesAndPoints(final List<IngestionMetricPointDto> pointDtoList,
final Map<String, MetricEntityPojo> oddrnsMap,
final Map<String, MetricFamilyPojo> familiesMap,
Expand Down Expand Up @@ -228,6 +274,14 @@ private Integer getCreatedSeriesId(final List<MetricSeriesPojo> createdSeries,
.orElseThrow();
}

private List<Integer> getBucketAndQuantileSeries(final List<MetricSeriesPojo> series) {
return series.stream()
.filter(s -> List.of(BUCKET.getCode(), MetricSeriesValueType.QUANTILE.getCode())
.contains(s.getValueType()))
.map(MetricSeriesPojo::getId)
.toList();
}

private Comparator<MetricPoint> timestampComparator() {
return (mp1, mp2) -> {
final Integer firstTime = Optional.ofNullable(mp1.getTimestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void gaugeAndCounterTest() throws IOException {
final MetricPoint metricPointToUpdate = updatedMetrics.getItems().get(0).getMetricFamilies().get(0)
.getMetrics().get(0)
.getMetricPoints().get(0);
metricPointToUpdate.setTimestamp(metricPointToUpdate.getTimestamp() - 100);
metricPointToUpdate.setTimestamp(metricPointToUpdate.getTimestamp() - 1000);
metricPointToUpdate.getGaugeValue().setValue(BigDecimal.ONE);
ingestMetrics(updatedMetrics);

Expand Down

0 comments on commit e9b39d3

Please sign in to comment.