From e9b39d303747f8b5f1c98581535e6f9744e33b70 Mon Sep 17 00:00:00 2001 From: Damir Abdullin Date: Thu, 9 Mar 2023 14:32:57 +0300 Subject: [PATCH] Metrics bugfix (#1266) --- .../metric/MetricPointRepository.java | 4 +- .../metric/MetricPointRepositoryImpl.java | 15 +++-- .../InternalIngestionMetricsServiceImpl.java | 66 +++++++++++++++++-- .../api/ingestion/MetricsIngestionTest.java | 2 +- 4 files changed, 75 insertions(+), 12 deletions(-) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/metric/MetricPointRepository.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/metric/MetricPointRepository.java index aa7705ecb..b6b946e58 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/metric/MetricPointRepository.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/metric/MetricPointRepository.java @@ -5,7 +5,9 @@ import reactor.core.publisher.Flux; public interface MetricPointRepository { - Flux deletePointsWithLessTime(final List newPoints); + Flux getPointsBySeriesId(final List seriesIds); + + Flux deletePoints(final List pointsToDelete); Flux createOrUpdatePoints(final List metricPoints); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/metric/MetricPointRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/metric/MetricPointRepositoryImpl.java index cba9e5410..488cfd0cf 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/metric/MetricPointRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/metric/MetricPointRepositoryImpl.java @@ -20,12 +20,19 @@ public class MetricPointRepositoryImpl implements MetricPointRepository { private final JooqReactiveOperations jooqReactiveOperations; @Override - public Flux deletePointsWithLessTime(final List newPoints) { - return jooqReactiveOperations.executeInPartitionReturning(newPoints, points -> { + public Flux getPointsBySeriesId(final List seriesIds) { + final var query = DSL.selectFrom(METRIC_POINT) + .where(METRIC_POINT.SERIES_ID.in(seriesIds)); + return jooqReactiveOperations.flux(query) + .map(r -> r.into(MetricPointPojo.class)); + } + + @Override + public Flux deletePoints(final List pointsToDelete) { + return jooqReactiveOperations.executeInPartitionReturning(pointsToDelete, points -> { final Condition condition = points.stream() .map(p -> METRIC_POINT.SERIES_ID.eq(p.getSeriesId()) - .and(METRIC_POINT.LABEL_VALUES_IDS.ne(p.getLabelValuesIds())) - .and(METRIC_POINT.TIMESTAMP.lessThan(p.getTimestamp()))) + .and(METRIC_POINT.LABEL_VALUES_IDS.eq(p.getLabelValuesIds()))) .reduce(Condition::or) .orElseThrow(() -> new RuntimeException("Can't build delete condition for points")); return jooqReactiveOperations.flux(DSL.deleteFrom(METRIC_POINT) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/metric/InternalIngestionMetricsServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/metric/InternalIngestionMetricsServiceImpl.java index 845b21db6..059d22831 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/metric/InternalIngestionMetricsServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/metric/InternalIngestionMetricsServiceImpl.java @@ -23,7 +23,10 @@ import org.opendatadiscovery.oddplatform.dto.ingestion.IngestionMetricLabelsDto; import org.opendatadiscovery.oddplatform.dto.ingestion.IngestionMetricPointDto; import org.opendatadiscovery.oddplatform.dto.ingestion.IngestionMetricsRequest; +import org.opendatadiscovery.oddplatform.dto.metric.MetricLabelValueDto; import org.opendatadiscovery.oddplatform.dto.metric.MetricSeriesDto; +import org.opendatadiscovery.oddplatform.dto.metric.MetricSeriesValueType; +import org.opendatadiscovery.oddplatform.dto.metric.SystemMetricLabel; import org.opendatadiscovery.oddplatform.ingestion.contract.model.Label; import org.opendatadiscovery.oddplatform.ingestion.contract.model.Metric; import org.opendatadiscovery.oddplatform.ingestion.contract.model.MetricFamily; @@ -51,10 +54,12 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import reactor.function.TupleUtils; +import reactor.util.function.Tuples; import static java.util.function.Function.identity; +import static org.opendatadiscovery.oddplatform.dto.metric.MetricSeriesValueType.BUCKET; import static org.opendatadiscovery.oddplatform.dto.metric.SystemMetricLabel.BUCKET_UPPER_BOUND; -import static org.opendatadiscovery.oddplatform.dto.metric.SystemMetricLabel.QUANTILE; import static org.opendatadiscovery.oddplatform.utils.MetricUtils.buildMetricFamilyKey; import static reactor.function.TupleUtils.function; @@ -125,7 +130,8 @@ private IngestionMetricsRequest buildIngestionMetricsRequest(final MetricSetList if (metricFamily.getType() == MetricType.SUMMARY) { validateQuantileValues(metricPoint.getSummaryValue().getQuantile()); metricPoint.getSummaryValue().getQuantile() - .forEach(q -> labels.put(QUANTILE.getLabelName(), q.getQuantile().toString())); + .forEach( + q -> labels.put(SystemMetricLabel.QUANTILE.getLabelName(), q.getQuantile().toString())); } final IngestionMetricPointDto pointDto = new IngestionMetricPointDto(metricSet.getOddrn(), @@ -174,19 +180,59 @@ private Mono createMetricSeriesAndMetricPoints(final List { - final List metricPointPojos = metricSeriesList.stream() + final List bucketAndQuantileSeries = getBucketAndQuantileSeries(series); + final List pointsToIngest = metricSeriesList.stream() .flatMap(dto -> { final Integer createdSeriesId = getCreatedSeriesId(series, dto.series()); return dto.points().stream() .peek(point -> point.setSeriesId(createdSeriesId)); }).toList(); - return metricPointRepository.deletePointsWithLessTime(metricPointPojos) - .then(metricPointRepository.createOrUpdatePoints(metricPointPojos) - .collectList()); + return metricPointRepository.getPointsBySeriesId(bucketAndQuantileSeries) + .collectList() + .flatMap(bucketQuantilePoints -> { + final Set labelValueIds = bucketQuantilePoints.stream() + .flatMap(p -> Arrays.stream(p.getLabelValuesIds())) + .collect(Collectors.toSet()); + return metricLabelValueRepository.getDtoByIds(labelValueIds) + .map(labelDtos -> { + final Set systemLabelValues = getSystemLabelValues(labelDtos); + return Tuples.of(bucketQuantilePoints, systemLabelValues); + }); + }).flatMap(TupleUtils.function((bucketQuantilePoints, systemLabelValues) -> { + final List pointsToDelete = bucketQuantilePoints.stream() + .filter(p -> needToDeletePoint(p, pointsToIngest, systemLabelValues)) + .toList(); + return metricPointRepository.deletePoints(pointsToDelete) + .then(metricPointRepository.createOrUpdatePoints(pointsToIngest).collectList()); + })); }) .then(); } + private boolean needToDeletePoint(final MetricPointPojo pointPojo, + final List pointsToIngest, + final Set systemLabelIds) { + final List existingLabelValues = new ArrayList<>(Arrays.asList(pointPojo.getLabelValuesIds())).stream() + .filter(id -> !systemLabelIds.contains(id)) + .toList(); + return pointsToIngest.stream() + .anyMatch(p -> { + final List ingestedLabelValues = new ArrayList<>(Arrays.asList(p.getLabelValuesIds())).stream() + .filter(id -> !systemLabelIds.contains(id)) + .toList(); + return CollectionUtils.isEqualCollection(existingLabelValues, ingestedLabelValues) + && pointPojo.getTimestamp().isBefore(p.getTimestamp()); + }); + } + + private Set getSystemLabelValues(final List labelValueDtos) { + return labelValueDtos.stream() + .filter(lv -> lv.label().getName().equals(BUCKET_UPPER_BOUND.getLabelName()) + || lv.label().getName().equals(SystemMetricLabel.QUANTILE.getLabelName())) + .map(lv -> lv.labelValue().getId()) + .collect(Collectors.toSet()); + } + private List extractSeriesAndPoints(final List pointDtoList, final Map oddrnsMap, final Map familiesMap, @@ -228,6 +274,14 @@ private Integer getCreatedSeriesId(final List createdSeries, .orElseThrow(); } + private List getBucketAndQuantileSeries(final List series) { + return series.stream() + .filter(s -> List.of(BUCKET.getCode(), MetricSeriesValueType.QUANTILE.getCode()) + .contains(s.getValueType())) + .map(MetricSeriesPojo::getId) + .toList(); + } + private Comparator timestampComparator() { return (mp1, mp2) -> { final Integer firstTime = Optional.ofNullable(mp1.getTimestamp()) diff --git a/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/api/ingestion/MetricsIngestionTest.java b/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/api/ingestion/MetricsIngestionTest.java index d8821dd0e..e5a5f823c 100644 --- a/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/api/ingestion/MetricsIngestionTest.java +++ b/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/api/ingestion/MetricsIngestionTest.java @@ -60,7 +60,7 @@ public void gaugeAndCounterTest() throws IOException { final MetricPoint metricPointToUpdate = updatedMetrics.getItems().get(0).getMetricFamilies().get(0) .getMetrics().get(0) .getMetricPoints().get(0); - metricPointToUpdate.setTimestamp(metricPointToUpdate.getTimestamp() - 100); + metricPointToUpdate.setTimestamp(metricPointToUpdate.getTimestamp() - 1000); metricPointToUpdate.getGaugeValue().setValue(BigDecimal.ONE); ingestMetrics(updatedMetrics);