Skip to content

Commit

Permalink
Fixes issue with data access in VAFCursor.
Browse files Browse the repository at this point in the history
  • Loading branch information
ppanopticon committed Dec 10, 2023
1 parent 6a2127b commit ef332bd
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ sealed class CachedVAFCursor<T: ProximityPredicate>(protected val partition: Lon
/**
* An (experimental) [VAFCursor] implementation for range search.
*/
class ENN(partition: LongRange, predicate: ProximityPredicate.ENN, index: VAFIndex.Tx) : VAFCursor<ProximityPredicate.ENN>(partition, predicate, index) {
class ENN(partition: LongRange, predicate: ProximityPredicate.ENN, index: VAFIndex.Tx) : CachedVAFCursor<ProximityPredicate.ENN>(partition, predicate, index) {

override fun moveNext(): Boolean {
while (this.boc.compareAndExchange(true, false) || (this.cursor.next && this.cursor.key < this.endKey)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import org.vitrivr.cottontail.core.queries.sort.SortOrder
import org.vitrivr.cottontail.core.tuple.StandaloneTuple
import org.vitrivr.cottontail.core.tuple.Tuple
import org.vitrivr.cottontail.core.types.RealVectorValue
import org.vitrivr.cottontail.core.types.Value
import org.vitrivr.cottontail.core.types.VectorValue
import org.vitrivr.cottontail.core.values.DoubleValue
import org.vitrivr.cottontail.dbms.catalogue.toKey
import org.vitrivr.cottontail.dbms.column.ColumnTx
import org.vitrivr.cottontail.dbms.entity.EntityTx
import org.vitrivr.cottontail.dbms.execution.operators.sort.RecordComparator
import org.vitrivr.cottontail.dbms.index.va.bounds.Bounds
Expand All @@ -41,19 +41,19 @@ sealed class VAFCursor<T: ProximityPredicate>(protected val partition: LongRange
protected val bounds: Bounds

/** The sub-transaction used by this [VAFCursor]. */
protected val subTx = this.index.context.txn.xodusTx.readonlySnapshot
private val subTx = this.index.context.txn.xodusTx.readonlySnapshot

/** Cursor backing this [VAFCursor]. */
protected val cursor = this.index.dataStore.openCursor(this.subTx)
protected val indexCursor = this.index.dataStore.openCursor(this.subTx)

/** The [Cursor] used to access underlying column values. */
protected val columnCursor: Cursor<Value?>

/** A begin of cursor (BoC) flag. */
protected val boc = AtomicBoolean(true)

/** Internal [ColumnTx] used to access actual values. */
protected val columnTx: ColumnTx<*>

/** The [TupleId] to start with. */
protected val startKey = this.partition.first.toKey()
private val startKey = this.partition.first.toKey()

/** The [TupleId] to end at. */
protected val endKey = this.partition.last.toKey()
Expand Down Expand Up @@ -86,10 +86,10 @@ sealed class VAFCursor<T: ProximityPredicate>(protected val partition: LongRange

/* Obtain Tx object for column. */
val entityTx: EntityTx = this.index.dbo.parent.newTx(this.index.context)
this.columnTx = entityTx.columnForName(this.predicate.column.name).newTx(this.index.context)
this.columnCursor = entityTx.columnForName(this.predicate.column.name).newTx(this.index.context).cursor() as Cursor<Value?>

/* Move cursors to correct position. */
if (this.cursor.getSearchKeyRange(this.startKey) == null) {
if (this.indexCursor.getSearchKeyRange(this.startKey) == null) {
this.boc.set(false)
}
}
Expand All @@ -98,7 +98,8 @@ sealed class VAFCursor<T: ProximityPredicate>(protected val partition: LongRange
* Closes this [Cursor]
*/
override fun close() {
this.cursor.close()
this.columnCursor.close()
this.indexCursor.close()
this.subTx.abort()
}

Expand Down Expand Up @@ -175,6 +176,7 @@ sealed class VAFCursor<T: ProximityPredicate>(protected val partition: LongRange
* A [VAFCursor] implementation for nearest neighbour search.
*/
class NNS(partition: LongRange, predicate: ProximityPredicate.NNS, index: VAFIndex.Tx) : KLimited<ProximityPredicate.NNS>(partition, predicate, index) {

/**
* Prepares the result set using the [VAFIndex] and the VA-SSA algorithm described in [1].
*
Expand All @@ -186,19 +188,22 @@ sealed class VAFCursor<T: ProximityPredicate>(protected val partition: LongRange
/* First phase: Just add entries until we have k-results. */
var threshold: Double
do {
val tupleId = LongBinding.compressedEntryToLong(cursor.key)
val value = this.columnTx.read(tupleId)
val distance = this.predicate.distance(this.query, value)!!
localSelection.offer(StandaloneTuple(tupleId, this.produces, arrayOf(distance, value)))
} while (localSelection.size < localSelection.k && this.cursor.next && this.cursor.key <= this.endKey)
val tupleId = LongBinding.compressedEntryToLong(indexCursor.key)
if (this.columnCursor.moveTo(tupleId)) {
val value = this.columnCursor.value()
val distance = this.predicate.distance(this.query, value)!!
localSelection.offer(StandaloneTuple(tupleId, this.produces, arrayOf(distance, value)))
}
} while (localSelection.size < localSelection.k && this.indexCursor.next && this.indexCursor.key <= this.endKey)

/* Second phase: Use lower-bound to decide whether entry should be added. */
threshold = (localSelection.peek()!![0] as DoubleValue).value
while (this.cursor.next && this.cursor.key <= this.endKey) {
val signature = VAFSignature.fromEntry(cursor.value)
while (this.indexCursor.next && this.indexCursor.key <= this.endKey) {
val signature = VAFSignature.fromEntry(indexCursor.value)
if (this.bounds.lb(signature, threshold) < threshold) {
val tupleId = LongBinding.compressedEntryToLong(cursor.key)
val value = this.columnTx.read(tupleId)
val tupleId = LongBinding.compressedEntryToLong(indexCursor.key)
this.columnCursor.moveTo(tupleId)
val value = this.columnCursor.value()
val distance = this.predicate.distance(this.query, value)!!
threshold = (localSelection.offer(StandaloneTuple(tupleId, this.produces, arrayOf(distance, value)))[0] as DoubleValue).value
}
Expand Down Expand Up @@ -229,19 +234,21 @@ sealed class VAFCursor<T: ProximityPredicate>(protected val partition: LongRange
/* First phase: Just add entries until we have k-results. */
var threshold: Double
do {
val tupleId = LongBinding.compressedEntryToLong(cursor.key)
val value = this.columnTx.read(tupleId)
val tupleId = LongBinding.compressedEntryToLong(indexCursor.key)
this.columnCursor.moveTo(tupleId)
val value = this.columnCursor.value()
val distance = this.predicate.distance(this.query, value)!!
localSelection.offer(StandaloneTuple(tupleId, this.produces, arrayOf(distance, value)))
} while (localSelection.size < localSelection.k && this.cursor.next && this.cursor.key <= this.endKey)
} while (localSelection.size < localSelection.k && this.indexCursor.next && this.indexCursor.key <= this.endKey)

/* Second phase: Use lower-bound to decide whether entry should be added. */
threshold = (localSelection.peek()!![0] as DoubleValue).value
while (this.cursor.next && this.cursor.key <= this.endKey) {
val signature = VAFSignature.fromEntry(cursor.value)
while (this.indexCursor.next && this.indexCursor.key <= this.endKey) {
val signature = VAFSignature.fromEntry(indexCursor.value)
if (this.bounds.ub(signature, threshold) < threshold) {
val tupleId = LongBinding.compressedEntryToLong(cursor.key)
val value = this.columnTx.read(tupleId)
val tupleId = LongBinding.compressedEntryToLong(indexCursor.key)
this.columnCursor.moveTo(tupleId)
val value = this.columnCursor.value()
val distance = this.predicate.distance(this.query, value)!!
threshold = (localSelection.offer(StandaloneTuple(tupleId, this.produces, arrayOf(distance, value)))[0] as DoubleValue).value
}
Expand All @@ -263,8 +270,8 @@ sealed class VAFCursor<T: ProximityPredicate>(protected val partition: LongRange
class ENN(partition: LongRange, predicate: ProximityPredicate.ENN, index: VAFIndex.Tx) : VAFCursor<ProximityPredicate.ENN>(partition, predicate, index) {

override fun moveNext(): Boolean {
while (this.boc.compareAndExchange(true, false) || (this.cursor.next && this.cursor.key < this.endKey)) {
val signature = VAFSignature.fromEntry(this.cursor.value)
while (this.boc.compareAndExchange(true, false) || (this.indexCursor.next && this.indexCursor.key < this.endKey)) {
val signature = VAFSignature.fromEntry(this.indexCursor.value)
val (lb,ub) = this.bounds.bounds(signature)
if (this.predicate.eMin.value >= lb && this.predicate.eMax.value < ub) {
return true
Expand All @@ -273,10 +280,11 @@ sealed class VAFCursor<T: ProximityPredicate>(protected val partition: LongRange
return false
}

override fun key(): TupleId = LongBinding.compressedEntryToLong(this.cursor.key)
override fun key(): TupleId = LongBinding.compressedEntryToLong(this.indexCursor.key)
override fun value(): Tuple {
val tupleId = LongBinding.compressedEntryToLong(this.cursor.key)
val value = this.columnTx.read(tupleId)
val tupleId = LongBinding.compressedEntryToLong(this.indexCursor.key)
this.columnCursor.moveTo(tupleId)
val value = this.columnCursor.value()
val distance = this.predicate.distance(this.query, value)!!
return StandaloneTuple(tupleId, this.produces, arrayOf(distance, value))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class VAFIndexRebuilder(index: VAFIndex, context: QueryContext): AbstractIndexRe

/* Data is flushed every once in a while. */
if ((counter ++) % 1_000_000 == 0) {
LOGGER.debug("Rebuilding index ${this.index.name} (${this.index.type}) still running ($counter / $count)...")
LOGGER.debug("Rebuilding index {} ({}) still running ({} / {})...", this.index.name, this.index.type, counter, count)
if (!this.context.txn.xodusTx.flush()) {
return false
}
Expand Down

0 comments on commit ef332bd

Please sign in to comment.