Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SQL syntax delta.pathand qbeast.pathwhen using the QbeastCatalog. #412

Open
cugni opened this issue Sep 13, 2024 · 1 comment
Open
Assignees
Labels
priority: normal This issue has normal priority type: bug Something isn't working

Comments

@cugni
Copy link
Member

cugni commented Sep 13, 2024

What went wrong?

Clear, concise explanation and the expected behavior.

How to reproduce?

Different steps about how to reproduce the problem.

1. Code that triggered the bug, or steps to reproduce:

$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog

spark.range(10).write.format("delta").save("/tmp/test1")
spark.sql("SELECT * FROM delta.`/tmp/test1`")
// org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema `delta` cannot be found. Verify the spelling and correctness of the schema and catalog.

 // I get the same results If I create the in Qbeast
 spark.range(10).write.format("qbeast").option("columnsToIndex","id").save("/tmp/test2")
 spark.sql("SELECT * FROM qbeast.`/tmp/test2`")
 // org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema `delta` cannot be found. Verify the spelling and correctness of the schema and catalog.

On the other hand, if we use the delta catalog

$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

spark.range(10).write.format("delta").save("/tmp/test3")
spark.sql("SELECT * FROM delta.`/tmp/test3`")
// This works 
 // I get the same results If I create the in Qbeast
 spark.range(10).write.format("qbeast").option("columnsToIndex","id").save("/tmp/test4")
 spark.sql("SELECT * FROM qbeast.`/tmp/test4`")
// 24/09/13 11:41:59 WARN ObjectStore: Failed to get database qbeast, returning NoSuchObjectException
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY] Unsupported data source type for direct query on files: qbeast; line 1 pos 14
// but it works reading as Delta. 
park.sql("SELECT * FROM delta.`/tmp/test4`")

2. Branch and commit id:

0.7.0

3. Spark version:

3.5.0

4. Hadoop version:

3.3.4

5. How are you running Spark?

local computer

6. Stack trace:

Trace of the log/error messages when using QbeastCatalog

24/09/13 11:34:12 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/09/13 11:34:12 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/09/13 11:34:13 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/09/13 11:34:13 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore cesare@127.0.0.1
24/09/13 11:34:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
24/09/13 11:34:14 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException
24/09/13 11:34:14 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException
org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema `delta` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog.
To tolerate the error on drop use DROP SCHEMA IF EXISTS.
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireDbExists(SessionCatalog.scala:250)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:540)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:526)
  at io.qbeast.spark.internal.sources.v2.QbeastTableImpl.table$lzycompute(QbeastTableImpl.scala:82)
  at io.qbeast.spark.internal.sources.v2.QbeastTableImpl.table(QbeastTableImpl.scala:77)
  at io.qbeast.spark.internal.sources.v2.QbeastTableImpl.schema(QbeastTableImpl.scala:88)
  at org.apache.spark.sql.connector.catalog.Table.columns(Table.java:65)
  at io.qbeast.spark.internal.sources.v2.QbeastTableImpl.columns(QbeastTableImpl.scala:56)
  
  Trace of the log/error messages when using DeltaCatalog
  
  24/09/13 11:41:59 WARN ObjectStore: Failed to get database qbeast, returning NoSuchObjectException
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY] Unsupported data source type for direct query on files: qbeast; line 1 pos 14
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile.org$apache$spark$sql$execution$datasources$ResolveSQLOnFile$$resolveDataSource(rules.scala:58)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile$$anonfun$apply$1.applyOrElse(rules.scala:78)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile$$anonfun$apply$1.applyOrElse(rules.scala:63)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
  at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:32)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile.apply(rules.scala:63)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile.apply(rules.scala:43)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:91)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
  at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
  ... 47 elided
@cugni cugni added type: bug Something isn't working priority: normal This issue has normal priority labels Sep 13, 2024
@osopardo1 osopardo1 self-assigned this Sep 13, 2024
@osopardo1
Copy link
Member

osopardo1 commented Sep 13, 2024

In the second example, the QbeastCatalog is missing as a secondary catalog, and I think that's the problem when trying to recognize the format.

In the first one, I will look into the QbeastCatalog.loadTable method. Seems that is not parsing the TableIdentifier properly when a path is specified.

  override def loadTable(ident: Identifier): Table = {
    try {
      getSessionCatalog().loadTable(ident) match {
        case table
            if QbeastCatalogUtils.isQbeastProvider(table.properties().asScala.get("provider")) =>
          QbeastCatalogUtils.loadQbeastTable(table, tableFactory)
        case o => o
      }
    } catch {
      case _: NoSuchDatabaseException | _: NoSuchNamespaceException | _: NoSuchTableException
          if QbeastCatalogUtils.isPathTable(ident) =>
        QbeastTableImpl(
          TableIdentifier(ident.name(), ident.namespace().headOption),
          new Path(ident.name()),
          Map.empty,
          tableFactory = tableFactory)
    }
  }

@osopardo1 osopardo1 removed their assignment Sep 16, 2024
@fpj fpj assigned fpj and osopardo1 and unassigned fpj Nov 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: normal This issue has normal priority type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants