Skip to content

Commit

Permalink
fixing merge aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Feb 9, 2024
1 parent c0c8a28 commit 252cdec
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public abstract class BaseSingleTreeBuilder {
}

// TODO : Removing hardcoding
_maxLeafRecords = 100; // builderConfig.getMaxLeafRecords();
_maxLeafRecords = 10000; // builderConfig.getMaxLeafRecords();
}

private void constructStarTree(StarTreeBuilderUtils.TreeNode node, int startDocId, int endDocId) throws IOException {
Expand Down Expand Up @@ -381,6 +381,7 @@ private void appendToStarTree(Record record) throws IOException {
// if(star) {
// System.out.println("======Overall sum =====" + (long) record._metrics[0]);
// }
//logger.info("Record : {}", record.toString());
appendRecord(record);
_numDocs++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* We don't create aggregated doc value fields in traditional add/update doc workflow where fieldInfo gets populated
*/
public class Lucene90DocValuesProducerCopy extends DocValuesProducer {

private final Map<String, NumericEntry> numerics;
private final Map<String, SortedNumericEntry> sortedNumerics;
private final IndexInput data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.opensearch.index.codec.freshstartree.codec;

import java.util.LinkedHashMap;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;


/** Custom star tree doc values reader */
public class StarTreeDocValuesReader extends DocValuesProducer {
private DocValuesProducer delegate;
Expand All @@ -59,7 +61,7 @@ public StarTreeDocValuesReader(DocValuesProducer producer, SegmentReadState stat
CodecUtil.checkIndexHeader(data, "STARTreeCodec", 0, 0, state.segmentInfo.getId(), state.segmentSuffix);
starTree = new OffHeapStarTree(data);
valuesProducer = new Lucene90DocValuesProducerCopy(state, DATA_CODEC, "sttd", META_CODEC, "sttm", starTree.getDimensionNames());
dimensionValues = new HashMap<>();
dimensionValues = new LinkedHashMap<>();
}

@Override
Expand All @@ -69,7 +71,6 @@ public NumericDocValues getNumeric(FieldInfo field) throws IOException {

@Override
public StarTreeAggregatedValues getAggregatedDocValues() throws IOException {
// starTree.printTree(new HashMap<>());
List<String> dimensionsSplitOrder = starTree.getDimensionNames();
for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
dimensionValues.put(dimensionsSplitOrder.get(i), valuesProducer.getNumeric(dimensionsSplitOrder.get(i) + "_dim"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.opensearch.index.codec.freshstartree.codec;

import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
Expand Down Expand Up @@ -60,7 +62,12 @@ public StarTreeDocValuesWriter(DocValuesConsumer delegate, SegmentWriteState seg
this.state = segmentWriteState;
dimensionReaders = new HashMap<>();
dimensionsSplitOrder = new ArrayList<>();

// dimensionsSplitOrder.add("minute");
// dimensionsSplitOrder.add("hour");
// dimensionsSplitOrder.add("day");
// dimensionsSplitOrder.add("month");
// // dimensionsSplitOrder.add("year");
// dimensionsSplitOrder.add("status");
docValuesConsumer = new Lucene90DocValuesConsumerCopy(state, DATA_CODEC, "sttd", META_CODEC, "sttm");
}

Expand Down Expand Up @@ -100,7 +107,7 @@ public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProdu
} else {
// logger.info("Adding field : " + field.name);
dimensionReaders.put(field.name + "_dim", valuesProducer.getSortedNumeric(field));
dimensionsSplitOrder.add(field.name);
//dimensionsSplitOrder.add(field.name);
}
if (field.name.contains("status")) {
// TODO : change this metric type
Expand All @@ -121,16 +128,18 @@ public void merge(MergeState mergeState) throws IOException {

public void mergeAggregatedValues(MergeState mergeState) throws IOException {
List<StarTreeAggregatedValues> aggrList = new ArrayList<>();
List<String> dimNames = new ArrayList<>();
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
DocValuesProducer producer = mergeState.docValuesProducers[i];
Object obj = producer.getAggregatedDocValues();
StarTreeAggregatedValues starTree = (StarTreeAggregatedValues) obj;
dimNames = starTree.dimensionValues.keySet().stream().collect(Collectors.toList());
aggrList.add(starTree);
}
long startTime = System.currentTimeMillis();
builder = new OffHeapBufferedSingleTreeBuilder(
data,
dimensionsSplitOrder,
dimNames,
dimensionReaders,
state.segmentInfo.maxDoc(),
docValuesConsumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.opensearch.index.codec.freshstartree.node;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RandomAccessInput;

Expand All @@ -29,6 +31,7 @@

/** Off heap implementation of star tree. */
public class OffHeapStarTree implements StarTree {
private static final Logger logger = LogManager.getLogger(OffHeapStarTree.class);
public static final long MAGIC_MARKER = 0xBADDA55B00DAD00DL;
public static final int VERSION = 1;
private final OffHeapStarTreeNode _root;
Expand Down Expand Up @@ -108,6 +111,7 @@ private void printTreeHelper(Map<String, Map<String, String>> dictionaryMap, Off
.toString();

stringBuilder.append(formattedOutput);
logger.info(stringBuilder.toString());

if (!node.isLeaf()) {
Iterator<OffHeapStarTreeNode> childrenIterator = node.getChildrenIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ public void collect(int doc, long bucket) throws IOException {
for (String field : fieldCols) {
fieldColToDocValuesMap.put(field, aggrVals.dimensionValues.get(field));
}

NumericDocValues dv = aggrVals.metricValues.get("status_sum");
if (dv.advanceExact(doc)) {

Expand Down

0 comments on commit 252cdec

Please sign in to comment.