Skip to content

Commit

Permalink
Onheap impl startree (#28)
Browse files Browse the repository at this point in the history
* OnHeap Star Tree Implementation

Signed-off-by: Sarthak Aggarwal <[email protected]>

---------

Signed-off-by: Sarthak Aggarwal <[email protected]>
Co-authored-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
bharath-techie and sarthakaggarwal97 authored Jul 2, 2024
1 parent d9d75be commit 248191d
Show file tree
Hide file tree
Showing 26 changed files with 3,053 additions and 0 deletions.
646 changes: 646 additions & 0 deletions server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree;

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.Arrays;

/**
* Star tree document
* @opensearch.experimental
*/
@ExperimentalApi
public class StarTreeDocument {
public final Long[] dimensions;
public final Object[] metrics;

public StarTreeDocument(Long[] dimensions, Object[] metrics) {
this.dimensions = dimensions;
this.metrics = metrics;
}

@Override
public String toString() {
return Arrays.toString(dimensions) + " | " + Arrays.toString(metrics);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;

/**
* Count value aggregator for star tree
*
* @opensearch.experimental
*/
public class CountValueAggregator implements ValueAggregator<Long> {
public static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;

@Override
public MetricStat getAggregationType() {
return MetricStat.COUNT;
}

@Override
public StarTreeNumericType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

@Override
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
return 1L;
}

@Override
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
return value + 1;
}

@Override
public Long mergeAggregatedValues(Long value, Long aggregatedValue) {
return value + aggregatedValue;
}

@Override
public Long getInitialAggregatedValue(Long value) {
return value;
}

@Override
public int getMaxAggregatedValueByteSize() {
return Long.BYTES;
}

@Override
public Long toLongValue(Long value) {
return value;
}

@Override
public Long toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.fielddata.IndexNumericFieldData;

import java.util.Comparator;
import java.util.Objects;

/**
* Builds aggregation function and doc values field pair to support various aggregations
* @opensearch.experimental
*/
public class MetricAggregatorInfo implements Comparable<MetricAggregatorInfo> {

public static final String DELIMITER = "_";
private final String metric;
private final String starFieldName;
private final MetricStat metricStat;
private final String field;
private final ValueAggregator valueAggregators;
private final StarTreeNumericType starTreeNumericType;
private final SequentialDocValuesIterator metricStatReader;

/**
* Constructor for MetricAggregatorInfo
*/
public MetricAggregatorInfo(
MetricStat metricStat,
String field,
String starFieldName,
IndexNumericFieldData.NumericType numericType,
SequentialDocValuesIterator metricStatReader
) {
this.metricStat = metricStat;
this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat);
this.starTreeNumericType = StarTreeNumericType.fromNumericType(numericType);
this.metricStatReader = metricStatReader;
this.field = field;
this.starFieldName = starFieldName;
this.metric = toFieldName();
}

/**
* @return metric type
*/
public MetricStat getMetricStat() {
return metricStat;
}

/**
* @return field Name
*/
public String getField() {
return field;
}

/**
* @return the metric stat name
*/
public String getMetric() {
return metric;
}

/**
* @return aggregator for the field value
*/
public ValueAggregator getValueAggregators() {
return valueAggregators;
}

/**
* @return star tree aggregated value type
*/
public StarTreeNumericType getAggregatedValueType() {
return starTreeNumericType;
}

/**
* @return metric value reader iterator
*/
public SequentialDocValuesIterator getMetricStatReader() {
return metricStatReader;
}

/**
* @return field name with metric type and field
*/
public String toFieldName() {
return starFieldName + DELIMITER + field + DELIMITER + metricStat.getTypeName();
}

@Override
public int hashCode() {
return Objects.hashCode(toFieldName());
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof MetricAggregatorInfo) {
MetricAggregatorInfo anotherPair = (MetricAggregatorInfo) obj;
return metricStat == anotherPair.metricStat && field.equals(anotherPair.field);
}
return false;
}

@Override
public String toString() {
return toFieldName();
}

@Override
public int compareTo(MetricAggregatorInfo other) {
return Comparator.comparing((MetricAggregatorInfo o) -> o.field)
.thenComparing((MetricAggregatorInfo o) -> o.metricStat)
.compare(this, other);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.apache.lucene.util.NumericUtils;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.search.aggregations.metrics.CompensatedSum;

/**
* Sum value aggregator for star tree
*
* @opensearch.experimental
*/
public class SumValueAggregator implements ValueAggregator<Double> {

public static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.DOUBLE;
private double sum = 0;
private double compensation = 0;
private CompensatedSum kahanSummation = new CompensatedSum(0, 0);

@Override
public MetricStat getAggregationType() {
return MetricStat.SUM;
}

@Override
public StarTreeNumericType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

@Override
public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
kahanSummation.reset(0, 0);
kahanSummation.add(starTreeNumericType.getDoubleValue(segmentDocValue));
compensation = kahanSummation.delta();
sum = kahanSummation.value();
return kahanSummation.value();
}

@Override
public Double mergeAggregatedValueAndSegmentValue(Double value, Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
assert kahanSummation.value() == value;
kahanSummation.reset(sum, compensation);
kahanSummation.add(starTreeNumericType.getDoubleValue(segmentDocValue));
compensation = kahanSummation.delta();
sum = kahanSummation.value();
return kahanSummation.value();
}

@Override
public Double mergeAggregatedValues(Double value, Double aggregatedValue) {
assert kahanSummation.value() == aggregatedValue;
kahanSummation.reset(sum, compensation);
kahanSummation.add(value);
compensation = kahanSummation.delta();
sum = kahanSummation.value();
return kahanSummation.value();
}

@Override
public Double getInitialAggregatedValue(Double value) {
kahanSummation.reset(0, 0);
kahanSummation.add(value);
compensation = kahanSummation.delta();
sum = kahanSummation.value();
return kahanSummation.value();
}

@Override
public int getMaxAggregatedValueByteSize() {
return Double.BYTES;
}

@Override
public Long toLongValue(Double value) {
try {
return NumericUtils.doubleToSortableLong(value);
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable long", e);
}
}

@Override
public Double toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
try {
return type.getDoubleValue(value);
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;

/**
* A value aggregator that pre-aggregates on the input values for a specific type of aggregation.
*
* @opensearch.experimental
*/
public interface ValueAggregator<A> {

/**
* Returns the type of the aggregation.
*/
MetricStat getAggregationType();

/**
* Returns the data type of the aggregated value.
*/
StarTreeNumericType getAggregatedValueType();

/**
* Returns the initial aggregated value.
*/
A getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType);

/**
* Applies a segment doc value to the current aggregated value.
*/
A mergeAggregatedValueAndSegmentValue(A value, Long segmentDocValue, StarTreeNumericType starTreeNumericType);

/**
* Applies an aggregated value to the current aggregated value.
*/
A mergeAggregatedValues(A value, A aggregatedValue);

/**
* Clones an aggregated value.
*/
A getInitialAggregatedValue(A value);

/**
* Returns the maximum size in bytes of the aggregated values seen so far.
*/
int getMaxAggregatedValueByteSize();

/**
* Converts an aggregated value into a Long type.
*/
Long toLongValue(A value);

/**
* Converts an aggregated value from a Long type.
*/
A toStarTreeNumericTypeValue(Long rawValue, StarTreeNumericType type);
}
Loading

0 comments on commit 248191d

Please sign in to comment.