Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Feb 2, 2024
1 parent 83a97c6 commit 382f6b2
Show file tree
Hide file tree
Showing 19 changed files with 75 additions and 218 deletions.
1 change: 0 additions & 1 deletion server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ if (!isEclipse) {
}
}


dependencies {

api project(':libs:opensearch-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
* @opensearch.internal
*/
public class Lucene {
public static final String LATEST_CODEC = "Lucene95";
public static final String LATEST_CODEC = "StarTreeCodec"; // TODO : this is a hack

public static final String SOFT_DELETES_FIELD = "__soft_deletes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
assert null != indexSettings;
if (mapperService == null) {
/**
* Todo : currently we don't have a single field with which we handle aggregation ( binary field etc )
* So no better way to test the changes then to change the default codec - this should be changed.
* Todo : currently we don't have a single field to use per field codec to handle aggregation
* So no better way to test the changes then to change the default codec - This should be changed.
*
* There are issues with this as restarting the process and reloading the indices results in errors
* Lucene95Codec is read when reloading the indices ( Solved now by using StarTreeCodec as the latest codec )
*/
codecs.put(DEFAULT_CODEC, new StarTreeCodec());
codecs.put(LZ4, new StarTreeCodec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.codec.freshstartree.codec.StarTreeCodec;
import org.opensearch.index.codec.freshstartree.codec.StarTreeDocValuesFormat;
import org.opensearch.index.mapper.CompletionFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
Expand All @@ -54,7 +55,7 @@
*
* @opensearch.internal
*/
public class PerFieldMappingPostingFormatCodec extends Lucene95Codec {
public class PerFieldMappingPostingFormatCodec extends StarTreeCodec { // TODO : this is a hack , can't extend startreecodec
private final Logger logger;
private final MapperService mapperService;
private final DocValuesFormat dvFormat = new StarTreeDocValuesFormat();
Expand All @@ -64,8 +65,8 @@ public class PerFieldMappingPostingFormatCodec extends Lucene95Codec {
: "PerFieldMappingPostingFormatCodec must subclass the latest " + "lucene codec: " + Lucene.LATEST_CODEC;
}

public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService mapperService, Logger logger) {
super(compressionMode);
public PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode compressionMode, MapperService mapperService, Logger logger) {
super();
this.mapperService = mapperService;
this.logger = logger;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ private Map<Long, StarTreeBuilderUtils.TreeNode> constructNonStarNodes(int start
long nodeDimensionValue = getDimensionValue(startDocId, dimensionId);
for (int i = startDocId + 1; i < endDocId; i++) {
long dimensionValue = getDimensionValue(i, dimensionId);
// System.out.println("Dim value : " + dimensionValue );
if (dimensionValue != nodeDimensionValue) {
StarTreeBuilderUtils.TreeNode child = getNewNode();
child._dimensionId = dimensionId;
Expand Down Expand Up @@ -227,11 +226,6 @@ public void build()
long startTime = System.currentTimeMillis();
Iterator<Record> recordIterator = sortAndAggregateSegmentRecords(numSegmentRecords);
logger.info("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime));
// System.out.println(
// "== =============Finished sorting and aggregating star-tree in ms : " +
// (System.currentTimeMillis()
// - startTime));

build(recordIterator, false);
}

Expand Down Expand Up @@ -591,7 +585,6 @@ public void close()
success = true;
} catch (Exception e) {
throw new RuntimeException(e);
// System.out.println(e.getMessage());
} finally {
if (success) {
IOUtils.close(indexOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.index.IndexFileNames;
Expand All @@ -50,9 +52,11 @@
* same time, sometimes latest changes are not present during read
*/
public class OffHeapBufferedSingleTreeBuilder extends BaseSingleTreeBuilder {
private static final Logger logger = LogManager.getLogger(OffHeapBufferedSingleTreeBuilder.class);
private static final String SEGMENT_RECORD_FILE_NAME = "segment.record";
private static final String STAR_TREE_RECORD_FILE_NAME = "star-tree.record";


private final List<Long> _starTreeRecordOffsets;

private int _numReadableStarTreeRecords;
Expand Down Expand Up @@ -88,16 +92,8 @@ public OffHeapBufferedSingleTreeBuilder(IndexOutput output, List<String> dimensi
// TODO : create temp output
starTreeRecordFileOutput = state.directory.createOutput(starTreeRecordFileName, state.context);
starTreeFileCount++;
// CodecUtil.writeIndexHeader(
// starTreeRecordFileOutput,
// "STARTreeCodec",
// 0,
// state.segmentInfo.getId(),
// state.segmentSuffix);
segmentRecordFileOutput = state.directory.createOutput(segmentRecordFileName, state.context);

_starTreeRecordOffsets = new ArrayList<>();
//_starTreeRecordOffsets.add(0L);
}

@Override
Expand Down Expand Up @@ -233,9 +229,7 @@ private Record deserializeStarTreeRecord(RandomAccessInput buffer, long offset)
void appendRecord(Record record)
throws IOException {
byte[] bytes = serializeStarTreeRecord(record);
// System.out.println("Appending record : " + record.toString());
starTreeRecordFileOutput.writeBytes(bytes, bytes.length);
//System.out.println("Appending doc : " + _numDocs + "curr bytes : " + currBytes + " offset: " + _starTreeRecordOffsets.size());
_starTreeRecordOffsets.add(currBytes);
currBytes += bytes.length;
}
Expand All @@ -244,19 +238,13 @@ void appendRecord(Record record)
Record getStarTreeRecord(int docId)
throws IOException {
ensureBufferReadable(docId);
//System.out.println("Want star record of id : " + docId);
return deserializeStarTreeRecord(starTreeRecordRandomInput, _starTreeRecordOffsets.get(docId));
}

@Override
long getDimensionValue(int docId, int dimensionId)
throws IOException {
// System.out.println("doc id : " + docId + " _numReadableStarTreeRecords : " +
// _numReadableStarTreeRecords);
//System.out.println("Want dimension value record of id : " + docId);
ensureBufferReadable(docId, false, true);
// System.out.println("want offset : " + (_starTreeRecordOffsets.get(docId) + (dimensionId *
// Integer.BYTES)));
return starTreeRecordRandomInput.readLong(
(_starTreeRecordOffsets.get(docId) + (dimensionId * Long.BYTES)));
}
Expand All @@ -266,8 +254,6 @@ Iterator<Record> sortAndAggregateSegmentRecords(int numDocs)
throws IOException {
// Write all dimensions for segment records into the buffer, and sort all records using an int
// array
// PinotDataBuffer dataBuffer;
// long bufferSize = (long) numDocs * _numDimensions * Integer.BYTES;
int recordBytesLength = 0;
Integer[] sortedDocIds = new Integer[numDocs];
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -295,12 +281,10 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, SEGMENT_RECORD_FILE_NAME),
state.context);
final long recordBytes = recordBytesLength;
logger.info("Segment record is of length : {}", segmentRecordFileInput.length());
segmentRandomInput = segmentRecordFileInput.randomAccessSlice(0, segmentRecordFileInput.length());

try {
// ArrayUtil.introSort(sortedDocIds, comparator);
// Arrays.sort(sortedDocIds, comparator);

QuickSorter.quickSort(0, numDocs, (i1, i2) -> {
long offset1 = (long) sortedDocIds[i1] * recordBytes;
long offset2 = (long) sortedDocIds[i2] * recordBytes;
Expand All @@ -321,8 +305,6 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
sortedDocIds[i1] = sortedDocIds[i2];
sortedDocIds[i2] = temp;
});

// System.out.println("Sorted doc ids : " + Arrays.toString(sortedDocIds));
} finally {
// segmentRecordFileInput.close();
// state.directory.deleteFile(IndexFileNames.segmentFileName(state.segmentInfo.name,
Expand All @@ -333,9 +315,9 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
// SEGMENT_RECORD_FILE_NAME)));
}
if(sortedDocIds != null)
System.out.println("Sorted doc ids length" + sortedDocIds.length);
logger.info("Sorted doc ids length" + sortedDocIds.length);
else
System.out.println("Sorted doc ids array is null");
logger.info("Sorted doc ids array is null");

// Create an iterator for aggregated records
return new Iterator<Record>() {
Expand Down Expand Up @@ -380,7 +362,6 @@ public Record getSegmentRecord(int docID, long recordBytes)
@Override
Iterator<Record> generateRecordsForStarNode(int startDocId, int endDocId, int dimensionId)
throws IOException {
//System.out.println("End doc id " + endDocId);
ensureBufferReadable(endDocId, true);

// Sort all records using an int array
Expand Down Expand Up @@ -471,42 +452,30 @@ private void ensureBufferReadable(int docId, boolean endDocCheck, boolean dimIdC
|| (!endDocCheck && docId < _numReadableStarTreeRecords)) ) {
return;
}
//System.out.println("want doc : " + docId + " dim : " + dimIdCheck);
IndexInput in = null;
if(docId < _numDocs ) {
try {
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId < entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
prevStartDocId = entry.getValue();
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId < entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("First loop Current start : " + prevStartDocId + " - Current end : " + _numReadableStarTreeRecords);
this.prevStartDocId = prevStartDocId;
} finally {
// if (in != null) {
// in.close();
// }
prevStartDocId = entry.getValue();
}
this.prevStartDocId = prevStartDocId;
}

if(in != null) return;



//System.out.println("want doc 1 : " + docId + " num docs : " + _numDocs);
starTreeRecordFileOutput.close();
logger.info("Created a file : {} of size : {}" , segmentRecordFileOutput.getName(), segmentRecordFileOutput.getFilePointer());
fileToByteSizeMap.put(starTreeRecordFileOutput.getName(),
_numDocs);

//System.out.println("Star tree record file size : " + starTreeRecordFileOutput.getFilePointer());
//System.out.println("Star tree record file name : " + starTreeRecordFileOutput.getName());

starTreeRecordFileOutput.close();

String starTreeRecordFileName =
IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, STAR_TREE_RECORD_FILE_NAME) +
Expand All @@ -517,31 +486,22 @@ private void ensureBufferReadable(int docId, boolean endDocCheck, boolean dimIdC
starTreeFileCount++;

currBytes = 0;
// state.directory.sync(Collections.singleton(starTreeRecordFileOutput.getName()));
if (starTreeRecordRandomInput != null) {
starTreeRecordRandomInput = null;
}

try {
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId <= entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("Setting start value : " + entry.getValue());
prevStartDocId = entry.getValue();
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId <= entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("Current start : " + prevStartDocId + " - Current end : " + _numReadableStarTreeRecords);
this.prevStartDocId = prevStartDocId;
} finally {
// if (in != null) {
// in.close();
// }
prevStartDocId = entry.getValue();
}
this.prevStartDocId = prevStartDocId;

}

Expand All @@ -551,21 +511,18 @@ public void close()
boolean success = false;
try {
if (starTreeRecordFileOutput != null) {
starTreeRecordFileOutput.close();
IOUtils.deleteFilesIgnoringExceptions(state.directory, starTreeRecordFileOutput.getName());
}
success = true;
} catch (Exception e) {
throw new RuntimeException(e);
// System.out.println(e.getMessage());
} finally {
if (success) {
IOUtils.close(starTreeRecordFileOutput);
} else {
IOUtils.closeWhileHandlingException(starTreeRecordFileOutput);
}
// starTreeRecordFileOutput = null;
IOUtils.closeWhileHandlingException(starTreeRecordFileOutput);
}
// Delete all temporary segment record files
IOUtils.deleteFilesIgnoringExceptions(state.directory, segmentRecordFileOutput.getName());
// Delete all temporary star tree record files
IOUtils.deleteFilesIgnoringExceptions(state.directory, fileToByteSizeMap.keySet());
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ private Iterator<Record> mergeRecords(List<StarTreeAggregatedValues> aggrList)
i++;
}
BaseSingleTreeBuilder.Record record = new BaseSingleTreeBuilder.Record(dims, metrics);
// System.out.println("Adding : " + record.toString());
records.add(record);
}
}
Expand All @@ -90,7 +89,6 @@ private Iterator<Record> mergeRecords(List<StarTreeAggregatedValues> aggrList)
@Override
void appendRecord(Record record)
throws IOException {
// System.out.println("Appending record : " + record.toString());
_records.add(record);
}

Expand All @@ -103,8 +101,6 @@ Record getStarTreeRecord(int docId)
@Override
long getDimensionValue(int docId, int dimensionId)
throws IOException {
// System.out.println("doc id : " + docId + " dim id : " + dimensionId + " size : " +
// _records.size());
return _records.get(docId)._dimensions[dimensionId];
}

Expand All @@ -114,9 +110,6 @@ Iterator<Record> sortAndAggregateSegmentRecords(int numDocs)
Record[] records = new Record[numDocs];
for (int i = 0; i < numDocs; i++) {
records[i] = getNextSegmentRecord();
// System.out.println("Step 3 : " + records[i]._dimensions[0] + " | " +
// records[i]._dimensions[1] + " | " +
// records[i]._metrics[0]);
}
return sortAndAggregateSegmentRecords(records);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
/**
* writer for {@link Lucene90DocValuesFormat}
*
* TODO : anyway to reuse original lucene doc values consumer ?
* TODO : This is added because Lucene90DocValuesConsumer is not public
* Alternate is to make it public
*
* */
public final class Lucene90DocValuesConsumerCopy extends DocValuesConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
/**
* Created a copy to initialize producer without field info stored in state which is the case for
* aggregated doc values fields
*
* We don't create aggregated doc value fields in traditional add/update doc workflow where fieldInfo gets populated
*/
public class Lucene90DocValuesProducerCopy extends DocValuesProducer {
private final Map<String, NumericEntry> numerics;
Expand Down Expand Up @@ -160,7 +162,6 @@ public FieldInfo[] getFieldInfoArr() {
private void readFields(IndexInput meta, FieldInfo[] infos)
throws IOException {
for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) {
// System.out.println("Field number : " + fieldNumber);
FieldInfo info = infos[fieldNumber];
if (info == null) {
throw new CorruptIndexException("Invalid field number: " + fieldNumber, meta);
Expand Down
Loading

0 comments on commit 382f6b2

Please sign in to comment.