Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Feb 2, 2024
1 parent 83a97c6 commit 9271789
Show file tree
Hide file tree
Showing 20 changed files with 97 additions and 233 deletions.
1 change: 0 additions & 1 deletion server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ if (!isEclipse) {
}
}


dependencies {

api project(':libs:opensearch-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
* @opensearch.internal
*/
public class Lucene {
public static final String LATEST_CODEC = "Lucene95";
public static final String LATEST_CODEC = "StarTreeCodec"; // TODO : this is a hack

public static final String SOFT_DELETES_FIELD = "__soft_deletes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
assert null != indexSettings;
if (mapperService == null) {
/**
* Todo : currently we don't have a single field with which we handle aggregation ( binary field etc )
* So no better way to test the changes then to change the default codec - this should be changed.
* Todo : currently we don't have a single field to use per field codec to handle aggregation
* So no better way to test the changes then to change the default codec - This should be changed.
*
* There are issues with this as restarting the process and reloading the indices results in errors
* Lucene95Codec is read when reloading the indices ( Solved now by using StarTreeCodec as the latest codec )
*/
codecs.put(DEFAULT_CODEC, new StarTreeCodec());
codecs.put(LZ4, new StarTreeCodec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.codec.freshstartree.codec.StarTreeCodec;
import org.opensearch.index.codec.freshstartree.codec.StarTreeDocValuesFormat;
import org.opensearch.index.mapper.CompletionFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
Expand All @@ -54,7 +55,7 @@
*
* @opensearch.internal
*/
public class PerFieldMappingPostingFormatCodec extends Lucene95Codec {
public class PerFieldMappingPostingFormatCodec extends StarTreeCodec { // TODO : this is a hack , can't extend startreecodec
private final Logger logger;
private final MapperService mapperService;
private final DocValuesFormat dvFormat = new StarTreeDocValuesFormat();
Expand All @@ -64,8 +65,8 @@ public class PerFieldMappingPostingFormatCodec extends Lucene95Codec {
: "PerFieldMappingPostingFormatCodec must subclass the latest " + "lucene codec: " + Lucene.LATEST_CODEC;
}

public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService mapperService, Logger logger) {
super(compressionMode);
public PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode compressionMode, MapperService mapperService, Logger logger) {
super();
this.mapperService = mapperService;
this.logger = logger;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ private Map<Long, StarTreeBuilderUtils.TreeNode> constructNonStarNodes(int start
long nodeDimensionValue = getDimensionValue(startDocId, dimensionId);
for (int i = startDocId + 1; i < endDocId; i++) {
long dimensionValue = getDimensionValue(i, dimensionId);
// System.out.println("Dim value : " + dimensionValue );
if (dimensionValue != nodeDimensionValue) {
StarTreeBuilderUtils.TreeNode child = getNewNode();
child._dimensionId = dimensionId;
Expand Down Expand Up @@ -227,11 +226,6 @@ public void build()
long startTime = System.currentTimeMillis();
Iterator<Record> recordIterator = sortAndAggregateSegmentRecords(numSegmentRecords);
logger.info("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime));
// System.out.println(
// "== =============Finished sorting and aggregating star-tree in ms : " +
// (System.currentTimeMillis()
// - startTime));

build(recordIterator, false);
}

Expand Down Expand Up @@ -443,7 +437,7 @@ private Record createAggregatedDocs(StarTreeBuilderUtils.TreeNode node)
assert aggregatedRecord != null;
for (int i = node._dimensionId + 1; i < _numDimensions; i++) {
aggregatedRecord._dimensions[i] =
STAR_IN_DOC_VALUES_INDEX; // StarTreeV2Constants.STAR_IN_FORWARD_INDEX;
STAR_IN_DOC_VALUES_INDEX;
}
node._aggregatedDocId = _numDocs;
appendToStarTree(aggregatedRecord);
Expand All @@ -469,7 +463,7 @@ private Record createAggregatedDocs(StarTreeBuilderUtils.TreeNode node)
assert aggregatedRecord != null;
for (int i = node._dimensionId + 1; i < _numDimensions; i++) {
aggregatedRecord._dimensions[i] =
STAR_IN_DOC_VALUES_INDEX; // StarTreeV2Constants.STAR_IN_FORWARD_INDEX;
STAR_IN_DOC_VALUES_INDEX;
}
node._aggregatedDocId = _numDocs;
appendToStarTree(aggregatedRecord);
Expand Down Expand Up @@ -591,7 +585,6 @@ public void close()
success = true;
} catch (Exception e) {
throw new RuntimeException(e);
// System.out.println(e.getMessage());
} finally {
if (success) {
IOUtils.close(indexOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.index.IndexFileNames;
Expand All @@ -44,15 +46,31 @@


/**
* Off heap implementation of star tree builder Segment records are sorted and aggregated completely
* off heap Star tree records are using mixed approach where we have a buffer of hashmap to doc ids
* and also a temp file This is done since star tree records file needs to be read and written at
* same time, sometimes latest changes are not present during read
* Off heap implementation of star tree builder
*
* Segment records are stored in single file - segment.record for sorting and aggregation ( we create a doc id array
* and swap doc ids in array during sorting based on the actual segment record contents in the file )
*
* Star tree records are stored in multiple files as the algo is:
* 1. Initially create a bunch of aggregated records based on segment records
* 2. Read the above set of records and create aggregated star records and append
* 3. Repeat until we have all combinations
*
* We cannot append the result of newly created aggregated record as lucene doesn't allow append to closed files.
* We cannot keep the 'IndexOutput' open and create a 'IndexInput' to read the content as some of the recent content
* will not be visible in the reader. So we need to 'close' the 'IndexOutput' before we create a 'IndexInput'
*
* And we cannot reopen 'IndexOutput' - so only option is to create a new file for new appends until the next read.
*
* So we keep set of files and maintain a tracker array to track the start doc id for each file.
*
*/
public class OffHeapBufferedSingleTreeBuilder extends BaseSingleTreeBuilder {
private static final Logger logger = LogManager.getLogger(OffHeapBufferedSingleTreeBuilder.class);
private static final String SEGMENT_RECORD_FILE_NAME = "segment.record";
private static final String STAR_TREE_RECORD_FILE_NAME = "star-tree.record";


private final List<Long> _starTreeRecordOffsets;

private int _numReadableStarTreeRecords;
Expand Down Expand Up @@ -88,16 +106,8 @@ public OffHeapBufferedSingleTreeBuilder(IndexOutput output, List<String> dimensi
// TODO : create temp output
starTreeRecordFileOutput = state.directory.createOutput(starTreeRecordFileName, state.context);
starTreeFileCount++;
// CodecUtil.writeIndexHeader(
// starTreeRecordFileOutput,
// "STARTreeCodec",
// 0,
// state.segmentInfo.getId(),
// state.segmentSuffix);
segmentRecordFileOutput = state.directory.createOutput(segmentRecordFileName, state.context);

_starTreeRecordOffsets = new ArrayList<>();
//_starTreeRecordOffsets.add(0L);
}

@Override
Expand Down Expand Up @@ -233,9 +243,7 @@ private Record deserializeStarTreeRecord(RandomAccessInput buffer, long offset)
void appendRecord(Record record)
throws IOException {
byte[] bytes = serializeStarTreeRecord(record);
// System.out.println("Appending record : " + record.toString());
starTreeRecordFileOutput.writeBytes(bytes, bytes.length);
//System.out.println("Appending doc : " + _numDocs + "curr bytes : " + currBytes + " offset: " + _starTreeRecordOffsets.size());
_starTreeRecordOffsets.add(currBytes);
currBytes += bytes.length;
}
Expand All @@ -244,19 +252,13 @@ void appendRecord(Record record)
Record getStarTreeRecord(int docId)
throws IOException {
ensureBufferReadable(docId);
//System.out.println("Want star record of id : " + docId);
return deserializeStarTreeRecord(starTreeRecordRandomInput, _starTreeRecordOffsets.get(docId));
}

@Override
long getDimensionValue(int docId, int dimensionId)
throws IOException {
// System.out.println("doc id : " + docId + " _numReadableStarTreeRecords : " +
// _numReadableStarTreeRecords);
//System.out.println("Want dimension value record of id : " + docId);
ensureBufferReadable(docId, false, true);
// System.out.println("want offset : " + (_starTreeRecordOffsets.get(docId) + (dimensionId *
// Integer.BYTES)));
ensureBufferReadable(docId, false);
return starTreeRecordRandomInput.readLong(
(_starTreeRecordOffsets.get(docId) + (dimensionId * Long.BYTES)));
}
Expand All @@ -266,8 +268,6 @@ Iterator<Record> sortAndAggregateSegmentRecords(int numDocs)
throws IOException {
// Write all dimensions for segment records into the buffer, and sort all records using an int
// array
// PinotDataBuffer dataBuffer;
// long bufferSize = (long) numDocs * _numDimensions * Integer.BYTES;
int recordBytesLength = 0;
Integer[] sortedDocIds = new Integer[numDocs];
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -295,12 +295,10 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, SEGMENT_RECORD_FILE_NAME),
state.context);
final long recordBytes = recordBytesLength;
logger.info("Segment record is of length : {}", segmentRecordFileInput.length());
segmentRandomInput = segmentRecordFileInput.randomAccessSlice(0, segmentRecordFileInput.length());

try {
// ArrayUtil.introSort(sortedDocIds, comparator);
// Arrays.sort(sortedDocIds, comparator);

QuickSorter.quickSort(0, numDocs, (i1, i2) -> {
long offset1 = (long) sortedDocIds[i1] * recordBytes;
long offset2 = (long) sortedDocIds[i2] * recordBytes;
Expand All @@ -321,8 +319,6 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
sortedDocIds[i1] = sortedDocIds[i2];
sortedDocIds[i2] = temp;
});

// System.out.println("Sorted doc ids : " + Arrays.toString(sortedDocIds));
} finally {
// segmentRecordFileInput.close();
// state.directory.deleteFile(IndexFileNames.segmentFileName(state.segmentInfo.name,
Expand All @@ -333,9 +329,9 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
// SEGMENT_RECORD_FILE_NAME)));
}
if(sortedDocIds != null)
System.out.println("Sorted doc ids length" + sortedDocIds.length);
logger.info("Sorted doc ids length" + sortedDocIds.length);
else
System.out.println("Sorted doc ids array is null");
logger.info("Sorted doc ids array is null");

// Create an iterator for aggregated records
return new Iterator<Record>() {
Expand Down Expand Up @@ -380,7 +376,6 @@ public Record getSegmentRecord(int docID, long recordBytes)
@Override
Iterator<Record> generateRecordsForStarNode(int startDocId, int endDocId, int dimensionId)
throws IOException {
//System.out.println("End doc id " + endDocId);
ensureBufferReadable(endDocId, true);

// Sort all records using an int array
Expand Down Expand Up @@ -460,53 +455,37 @@ private void ensureBufferReadable(int docId) throws IOException {
ensureBufferReadable(docId, false);
}

private void ensureBufferReadable(int docId, boolean endDoc) throws IOException {
ensureBufferReadable(docId, endDoc, false);
}

private void ensureBufferReadable(int docId, boolean endDocCheck, boolean dimIdCheck)
private void ensureBufferReadable(int docId, boolean endDocCheck)
throws IOException {

if (docId >= prevStartDocId && (( endDocCheck && docId <= _numReadableStarTreeRecords )
|| (!endDocCheck && docId < _numReadableStarTreeRecords)) ) {
return;
}
//System.out.println("want doc : " + docId + " dim : " + dimIdCheck);
IndexInput in = null;
if(docId < _numDocs ) {
try {
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId < entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
prevStartDocId = entry.getValue();
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId < entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("First loop Current start : " + prevStartDocId + " - Current end : " + _numReadableStarTreeRecords);
this.prevStartDocId = prevStartDocId;
} finally {
// if (in != null) {
// in.close();
// }
prevStartDocId = entry.getValue();
}
this.prevStartDocId = prevStartDocId;
}

if(in != null) return;



//System.out.println("want doc 1 : " + docId + " num docs : " + _numDocs);
starTreeRecordFileOutput.close();
logger.info("Created a file : {} of size : {}" , segmentRecordFileOutput.getName(), segmentRecordFileOutput.getFilePointer());
fileToByteSizeMap.put(starTreeRecordFileOutput.getName(),
_numDocs);

//System.out.println("Star tree record file size : " + starTreeRecordFileOutput.getFilePointer());
//System.out.println("Star tree record file name : " + starTreeRecordFileOutput.getName());

starTreeRecordFileOutput.close();

String starTreeRecordFileName =
IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, STAR_TREE_RECORD_FILE_NAME) +
Expand All @@ -517,31 +496,22 @@ private void ensureBufferReadable(int docId, boolean endDocCheck, boolean dimIdC
starTreeFileCount++;

currBytes = 0;
// state.directory.sync(Collections.singleton(starTreeRecordFileOutput.getName()));
if (starTreeRecordRandomInput != null) {
starTreeRecordRandomInput = null;
}

try {
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId <= entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("Setting start value : " + entry.getValue());
prevStartDocId = entry.getValue();
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId <= entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("Current start : " + prevStartDocId + " - Current end : " + _numReadableStarTreeRecords);
this.prevStartDocId = prevStartDocId;
} finally {
// if (in != null) {
// in.close();
// }
prevStartDocId = entry.getValue();
}
this.prevStartDocId = prevStartDocId;

}

Expand All @@ -551,21 +521,18 @@ public void close()
boolean success = false;
try {
if (starTreeRecordFileOutput != null) {
starTreeRecordFileOutput.close();
IOUtils.deleteFilesIgnoringExceptions(state.directory, starTreeRecordFileOutput.getName());
}
success = true;
} catch (Exception e) {
throw new RuntimeException(e);
// System.out.println(e.getMessage());
} finally {
if (success) {
IOUtils.close(starTreeRecordFileOutput);
} else {
IOUtils.closeWhileHandlingException(starTreeRecordFileOutput);
}
// starTreeRecordFileOutput = null;
IOUtils.closeWhileHandlingException(starTreeRecordFileOutput);
}
// Delete all temporary segment record files
IOUtils.deleteFilesIgnoringExceptions(state.directory, segmentRecordFileOutput.getName());
// Delete all temporary star tree record files
IOUtils.deleteFilesIgnoringExceptions(state.directory, fileToByteSizeMap.keySet());
super.close();
}
Expand Down
Loading

0 comments on commit 9271789

Please sign in to comment.