Skip to content

Commit

Permalink
Add more logging and error handling to partition pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsimha committed Feb 1, 2024
1 parent e663aef commit 5cb59f3
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class Analyzer(tableUtils: TableUtils,
val gbStartPartition = part.groupBy.sources.toScala
.map(_.query.startPartition)
.filter(_ != null)
if (!gbStartPartition.isEmpty)
if (gbStartPartition.nonEmpty)
gbStartPartitions += (part.groupBy.metaData.name -> gbStartPartition)
}
val noAccessTables = runTablePermissionValidation((gbTables.toList ++ List(joinConf.left.table)).toSet)
Expand Down
4 changes: 4 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ object Extensions {
val count: Long = partitionCounts.values.sum

def prunePartitions(range: PartitionRange): Option[DfWithStats] = {
println(
s"Pruning down to new range $range, original range: $partitionRange." +
s"\nOriginal partition counts: $partitionCounts")
val intersected = partitionRange.intersect(range)
if (!intersected.wellDefined) return None
val intersectedCounts = partitionCounts.filter(intersected.partitions contains _._1)
if (intersectedCounts.isEmpty) return None
Some(DfWithStats(df.prunePartition(range), intersectedCounts))
}
def stats: DfStats = DfStats(count, partitionRange)
Expand Down

0 comments on commit 5cb59f3

Please sign in to comment.