Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I can't append on a table indexed with float columns. It is not possible to define columnsStats for float columns #515

Open
cugni opened this issue Dec 12, 2024 · 1 comment · May be fixed by #522
Assignees
Labels
type: bug Something isn't working

Comments

@cugni
Copy link
Member

cugni commented Dec 12, 2024

What went wrong?

I want to index a table on a few float columns because the float transformer expects a float as input. However, in JSON, floats and doubles are encoded in the same way and mapped to Doubles when parsed, causing a ClassCastException.

How to reproduce?

It is impossible to define the column's min-max as float instead of Double, as JSON does not distinguish between the two types. So if I do:

spark.range(10).map(_.toFloat).toDF("f").write.format("qbeast").option("columnsToIndex","f").option("columnStats","""{"f_max":0.0,"f_min":0.0}""").mode("overwrite").save("/tmp/double")

I get this:

java.lang.ClassCastException: class java.lang.Double cannot be cast to class java.lang.Float (java.lang.Double and java.lang.Float are in module java.base of loader 'bootstrap')
  at scala.runtime.BoxesRunTime.unboxToFloat(BoxesRunTime.java:111)
  at scala.math.Numeric$FloatIsFractional$.lteq(Numeric.scala:144)
  at io.qbeast.core.transform.IdentityTransformation.merge(IdentityTransformation.scala:56)
  at io.qbeast.spark.index.DoublePassOTreeDataAnalyzer$.$anonfun$calculateRevisionChanges$4(OTreeDataAnalyzer.scala:101)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at io.qbeast.spark.index.DoublePassOTreeDataAnalyzer$.calculateRevisionChanges(OTreeDataAnalyzer.scala:98)
  at io.qbeast.spark.index.DoublePassOTreeDataAnalyzer$.analyze(OTreeDataAnalyzer.scala:280)
  at io.qbeast.spark.index.SparkOTreeManager$.index(SparkOTreeManager.scala:51)
  at io.qbeast.table.IndexedTableImpl.$anonfun$doWrite$2(IndexedTable.scala:457)
  at io.qbeast.spark.delta.DeltaMetadataWriter.$anonfun$writeWithTransaction$5(DeltaMetadataWriter.scala:182)
  at io.qbeast.spark.delta.DeltaMetadataWriter.$anonfun$writeWithTransaction$5$adapted(DeltaMetadataWriter.scala:175)
  at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:223)
  at io.qbeast.spark.delta.DeltaMetadataWriter.writeWithTransaction(DeltaMetadataWriter.scala:175)
  at io.qbeast.spark.delta.DeltaMetadataManager$.updateWithTransaction(DeltaMetadataManager.scala:42)
  at io.qbeast.table.IndexedTableImpl.doWrite(IndexedTable.scala:456)
  at io.qbeast.table.IndexedTableImpl.write(IndexedTable.scala:429)
  at io.qbeast.table.IndexedTableImpl.save(IndexedTable.scala:388)
  at io.qbeast.sources.QbeastDataSource.createRelation(QbeastDataSource.scala:122)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)

If I try to force the number to be float (at least for Scala) by adding the f at the end of the file (0.0f), I get another error, as the JSON is malformed.

scala> spark.range(10).map(_.toFloat).toDF("f").write.format("qbeast").option("columnsToIndex","f").option("columnStats","""{"f_max":0.0f,"f_min":1.0f}""").mode("overwrite").save("/tmp/doube")
                                                                                
scala> spark.range(10).map(_.toFloat).toDF("f").write.format("qbeast").option("columnsToIndex","f").option("columnStats","""{"f_max":0.0f,"f_min":1.0f}""").mode("append").save("/tmp/doube")
java.lang.IllegalArgumentException: f_min does not exist. Available: _corrupt_record
  at org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:310)
  at scala.collection.immutable.Map$Map1.getOrElse(Map.scala:168)
  at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:309)
  at org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema.fieldIndex(rows.scala:48)
  at org.apache.spark.sql.Row.getAs(Row.scala:372)
  at org.apache.spark.sql.Row.getAs$(Row.scala:372)
  at org.apache.spark.sql.catalyst.expressions.GenericRow.getAs(rows.scala:27)
  at io.qbeast.table.IndexedTableImpl.$anonfun$isNewRevision$2(IndexedTable.scala:295)
  at io.qbeast.core.transform.LinearTransformer.makeTransformation(LinearTransformer.scala:60)
  at io.qbeast.table.IndexedTableImpl.$anonfun$isNewRevision$3(IndexedTable.scala:297)
  at scala.collection.immutable.List.map(List.scala:293)
  at io.qbeast.table.IndexedTableImpl.isNewRevision(IndexedTable.scala:297)
  at io.qbeast.table.IndexedTableImpl.save(IndexedTable.scala:346)
  at io.qbeast.sources.QbeastDataSource.createRelation(QbeastDataSource.scala:121)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:307)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
  ... 47 elided

2. Branch and commit id:

main ea4bcd8
and also cugni:next-in-line-rebased.

3. Spark version:

spark 3.5.3, on the main version (commit ea4bcd8)

5. How are you running Spark?

both distributed and local terminal

@cugni cugni added the type: bug Something isn't working label Dec 12, 2024
@osopardo1 osopardo1 self-assigned this Dec 13, 2024
@osopardo1 osopardo1 linked a pull request Dec 16, 2024 that will close this issue
6 tasks
@osopardo1
Copy link
Member

osopardo1 commented Dec 16, 2024

This issue is related to how we manage Revision Changes when appending data.
Currently, we load the Column Stats in two different ways, depending if it's an append or an overwrite.

For the first operation (overwrite), we use the code in SparkRevisionFactory:

override def createNewRevision(
qtableID: QTableID,
schema: StructType,
options: QbeastOptions): Revision = {
val desiredCubeSize = options.cubeSize
val columnsToIndex = options.columnsToIndexParsed
val transformers = columnsToIndex.map(_.toTransformer(schema)).toVector
// Check if the columns to index are present in the schema
var shouldCreateNewSpace = true
val manualDefinedColumnStats = options.columnStats.isDefined
val columnStats = if (manualDefinedColumnStats) {
val spark = SparkSession.active
import spark.implicits._
spark.read
.option("inferTimestamp", "true")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS'Z'")
.json(Seq(options.columnStats.get).toDS())
.first()
} else Row.empty
val transformations = {
val builder = Vector.newBuilder[Transformation]
builder.sizeHint(transformers.size)
transformers.foreach(transformer => {
// A Transformer needs manual column stats if:
// - it's type is ManualColumnStats
val needManualColumnStats = transformer.stats match {
case _: ManualColumnStats => true
case _ => false
}
val hasManualColumnStats = manualDefinedColumnStats &&
columnStats.schema.exists(_.name.contains(transformer.columnName))
if (hasManualColumnStats) {
// If manual column stats are provided
// Create transformation with boundaries
builder += transformer.makeTransformation(columnName =>
columnStats.getAs[Object](columnName))
} else if (needManualColumnStats) {
// If no column stats are provided, and manual stats are required
// Use an ManualPlaceholderTransformation which will throw an error when indexing
builder += ManualPlaceholderTransformation(
transformer.columnName,
transformer.stats.statsNames)
shouldCreateNewSpace = false
} else {
// If no column stats are provided, and no manual stats are required
// Use an EmptyTransformation which will always be superseded
builder += EmptyTransformation()
shouldCreateNewSpace = false
}
})
builder.result()
}

Overwrite doesn't throw an error because it checks if there's any column reference in the columnStats. If not found (which is the case) it returns an EmptyTransformation which would be superseded by a LinearTransformation once the DataFrame is Analyzed in OTreeDataAnalyzer.

For the second case, we use this code in IndexedTable:

private def isNewRevision(qbeastOptions: QbeastOptions): Boolean = {
// TODO feature: columnsToIndex may change between revisions
val columnsToIndex = qbeastOptions.columnsToIndex
val currentColumnsToIndex = latestRevision.columnTransformers.map(_.columnName)
val isNewColumns = !latestRevision.matchColumns(columnsToIndex)
if (isNewColumns) {
throw AnalysisExceptionFactory.create(
s"Columns to index '${columnsToIndex.mkString(",")}' do not match " +
s"existing index ${currentColumnsToIndex.mkString(",")}.")
}
// Checks if the desiredCubeSize is different from the existing one
val isNewCubeSize = latestRevision.desiredCubeSize != qbeastOptions.cubeSize
// Checks if the user-provided column boundaries would trigger the creation of
// a new revision.
val isNewSpace = qbeastOptions.columnStats match {
case None => false
case Some(statsString) =>
val spark = SparkSession.active
import spark.implicits._
val columnStatsRow = spark.read
.option("inferTimestamp", "true")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS'Z'")
.json(Seq(statsString).toDS())
.first()
val statsFunc = (statsName: String) => columnStatsRow.getAs[Object](statsName)
val newPossibleTransformations =
latestRevision.columnTransformers.map(_.makeTransformation(statsFunc))
latestRevision.transformations
.zip(newPossibleTransformations)
.forall(t => {
t._1.isSupersededBy(t._2)
})
}
isNewCubeSize || isNewSpace

Which does not make the same check and directly looks for f_min and f_max. Thus aren't found because of bad parsing of JSON schema.

So, two errors here:

  1. Incorrect flow of Revision Changes. (Solved on Refactor Revision (flow) and Metadata Changes  #223 )
  2. Incorrect JSON parsing of stats.

For the second problem, I am trying a solution on #522 by adding a schema to the ColumnStats, built from Transformers and the data Schema.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants