Skip to content

Commit

Permalink
Num Shuffle Files : Gets the count of shuffle files pulled into memor…
Browse files Browse the repository at this point in the history
…y for a filter condition (#76)

* num shuffle files : gets the count of shuffle files for a filter condition

* num shuffle files : gets the count of shuffle files for a filter condition

* num shuffle files : added readable tags for output and some more unit tests

* num shuffle files : added readable tags for output and some more unit tests

* num shuffle files : updated documentation

* num shuffle files : updated documentation with min-max

* num shuffle files : updated documentation with getShuffleFileMetadata
  • Loading branch information
joydeepbroy-zeotap authored Sep 12, 2023
1 parent 340ae9f commit 96060b2
Show file tree
Hide file tree
Showing 4 changed files with 485 additions and 84 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,45 @@ DeltaHelpers.deltaNumRecordDistribution(path)
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+
```

## Number of Shuffle Files in Merge & Other Filter Conditions

The function `getNumShuffleFiles` gets the number of shuffle files (think of part files in parquet) that will be pulled into memory for a given filter condition. This is particularly useful to estimate memory requirements in a Delta Merge operation where the number of shuffle files can be a bottleneck.
To better tune your jobs, you can use this function to get the number of shuffle files for different kinds of filter condition and then perform operations like merge, zorder, compaction etc. to see if you reach the desired no. of shuffle files.


For example, if the condition is "country = 'GBR' and age >= 30 and age <= 40 and firstname like '%Jo%' " and country is the partition column,
```scala
DeltaHelpers.getNumShuffleFiles(path, "country = 'GBR' and age >= 30 and age <= 40 and firstname like '%Jo%' ")
```

then the output might look like following (explaining different parts of the condition as a key in the `Map` and the value contains the file count)
```scala
Map(
// number of files that will be pulled into memory for the entire provided condition
"OVERALL RESOLVED CONDITION => [ (country = 'GBR') and (age >= 30) and" +
" (age = 40) and firstname LIKE '%Joh%' ]" -> 18,
// number of files signifying the greater than/less than part => "age >= 30 and age <= 40"
"GREATER THAN / LESS THAN PART => [ (age >= 30) and (age = 40) ]" -> 100,
// number of files signifying the equals part => "country = 'GBR'
"EQUALS/EQUALS NULL SAFE PART => [ (country = 'GBR') ]" -> 300,
// number of files signifying the like (or any other) part => "firstname like '%Jo%' "
"LEFT OVER PART => [ firstname LIKE '%Joh%' ]" -> 600,
// number of files signifying any other part. This is mostly a failsafe
// 1. to capture any other condition that might have been missed
// 2. If wrong attribute names or conditions are provided like snapshot.id = source.id (usually found in merge conditions)
"UNRESOLVED PART => [ (snapshot.id = update.id) ]" -> 800,
// Total no. of files in the Delta Table
"TOTAL_NUM_FILES_IN_DELTA_TABLE =>" -> 800,
// List of unresolved columns/attributes in the provided condition.
// Will be empty if all columns are resolved.
"UNRESOLVED_COLUMNS =>" -> List())
```

Another important use case this method can help with is to see the min-max range overlap. Adding a min max on a high cardinality column like id say `id >= 900 and id <= 5000` can actually help in reducing the no. of shuffle files delta lake pulls into memory. However, such a operation is not always guaranteed to work and the effect can be viewed when you run this method.

This function works only on the Delta Log and does not scan any data in the Delta Table.

If you want more information about these individual files and their metadata, consider using the `getShuffleFileMetadata` function.
## Change Data Feed Helpers

### CASE I - When Delta aka Transaction Log gets purged
Expand Down
126 changes: 124 additions & 2 deletions src/main/scala/mrpowers/jodie/DeltaHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package mrpowers.jodie

import io.delta.tables._
import mrpowers.jodie.delta.DeltaConstants._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.expressions.Window.partitionBy
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

import scala.collection.mutable

Expand Down Expand Up @@ -71,6 +74,125 @@ object DeltaHelpers {
def deltaNumRecordDistribution(path: String, condition: Option[String] = None): DataFrame =
getAllPartitionStats(deltaFileStats(path, condition), statsPartitionColumn, numRecordsColumn).toDF(numRecordsDFColumns: _*)

/**
* Gets the number of shuffle files (part files for parquet) that will be pulled into memory for a given filter condition.
* This is particularly useful in a Delta Merge operation where the number of shuffle files can be a bottleneck. Running
* the merge condition through this method can give an idea about the amount of memory resources required to run the merge.
*
* For example, if the condition is "snapshot.id = update.id and country = 'GBR' and age >= 30 and age <= 40 and firstname like '%Jo%' "
* and country is the partition column, then the output might look like =>
* Map(
* OVERALL RESOLVED CONDITION => [ (country = 'GBR') and (age >= 30) and (age <= 40) and firstname LIKE '%Joh%' ] -> 18,
* GREATER THAN / LESS THAN PART => [ (age >= 30) and (age <= 40) ] -> 100,
* EQUALS/EQUALS NULL SAFE PART => [ (country = 'GBR') ] -> 300,
* LEFT OVER PART => [ firstname LIKE '%Joh%' ] -> 600,
* UNRESOLVED PART => [ (snapshot.id = update.id) ] -> 800,
* TOTAL_NUM_FILES_IN_DELTA_TABLE => -> 800,
* UNRESOLVED_COLUMNS => -> List(snapshot.id, update.id))
*
* 18 - number of files that will be pulled into memory for the entire provided condition
* 100 - number of files signifying the greater than/less than part => "age >= 30 and age <= 40"
* 300 - number of files signifying the equals part => "country = 'GBR'
* 600 - number of files signifying the like (or any other) part => "firstname like '%Jo%' "
* 800 - number of files signifying any other part. This is mostly a failsafe
* 1. to capture any other condition that might have been missed
* 2. If wrong attribute names or conditions are provided like snapshot.id = source.id (usually found in merge conditions)
* 800 - Total no. of files in the Delta Table without any filter condition or partitions
* List() - List of unresolved columns/attributes in the provided condition
* Note: Whenever a resolved condition comes back as Empty, the output will contain number of files in the entire Delta Table and can be ignored
* This function works only on the Delta Log and does not scan any data in the Delta Table.
*
* @param path
* @param condition
* @return
*/
def getNumShuffleFiles(path: String, condition: String) = {
val (deltaLog, unresolvedColumns, targetOnlyPredicates, minMaxOnlyExpressions, equalOnlyExpressions,
otherExpressions, removedPredicates) = getResolvedExpressions(path, condition)
deltaLog.withNewTransaction { deltaTxn =>
Map(s"$OVERALL [ ${formatSQL(targetOnlyPredicates).getOrElse("Empty")} ]" ->
deltaTxn.filterFiles(targetOnlyPredicates).count(a => true),
s"$MIN_MAX [ ${formatSQL(minMaxOnlyExpressions).getOrElse("Empty")} ]" ->
deltaTxn.filterFiles(minMaxOnlyExpressions).count(a => true),
s"$EQUALS [ ${formatSQL(equalOnlyExpressions).getOrElse("Empty")} ]" ->
deltaTxn.filterFiles(equalOnlyExpressions).count(a => true),
s"$LEFT_OVER [ ${formatSQL(otherExpressions).getOrElse("Empty")} ]" ->
deltaTxn.filterFiles(otherExpressions).count(a => true),
s"$UNRESOLVED [ ${formatSQL(removedPredicates).getOrElse("Empty")} ]" ->
deltaTxn.filterFiles(removedPredicates).count(a => true),
TOTAL_NUM_FILES -> deltaLog.snapshot.filesWithStatsForScan(Nil).count(),
UNRESOLVED_COLS -> unresolvedColumns)
}
}

def getShuffleFileMetadata(path: String, condition: String):
(Seq[AddFile], Seq[AddFile], Seq[AddFile], Seq[AddFile], Seq[AddFile], DataFrame, Seq[String]) = {
val (deltaLog, unresolvedColumns, targetOnlyPredicates, minMaxOnlyExpressions, equalOnlyExpressions, otherExpressions, removedPredicates) = getResolvedExpressions(path, condition)
deltaLog.withNewTransaction { deltaTxn =>
(deltaTxn.filterFiles(targetOnlyPredicates),
deltaTxn.filterFiles(minMaxOnlyExpressions),
deltaTxn.filterFiles(equalOnlyExpressions),
deltaTxn.filterFiles(otherExpressions),
deltaTxn.filterFiles(removedPredicates),
deltaLog.snapshot.filesWithStatsForScan(Nil),
unresolvedColumns)
}
}
private def getResolvedExpressions(path: String, condition: String) = {
val spark = SparkSession.active
val deltaTable = DeltaTable.forPath(path)
val deltaLog = DeltaLog.forTable(spark, path)

val expression = functions.expr(condition).expr
val targetPlan = deltaTable.toDF.queryExecution.analyzed
val resolvedExpression: Expression = spark.sessionState.analyzer.resolveExpressionByPlanOutput(expression, targetPlan, true)
val unresolvedColumns = if (!resolvedExpression.childrenResolved) {
resolvedExpression.references.filter(a => a match {
case b: UnresolvedAttribute => true
case _ => false
}).map(a => a.asInstanceOf[UnresolvedAttribute].sql).toSeq
} else Seq()

def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
}

val splitExpressions = splitConjunctivePredicates(resolvedExpression)
val targetOnlyPredicates = splitExpressions.filter(_.references.subsetOf(targetPlan.outputSet))

val minMaxOnlyExpressions = targetOnlyPredicates.filter(e => e match {
case GreaterThanOrEqual(_, _) => true
case LessThanOrEqual(_, _) => true
case LessThan(_, _) => true
case GreaterThan(_, _) => true
case _ => false
})

val equalOnlyExpressions = targetOnlyPredicates.filter(e => e match {
case EqualTo(_, _) => true
case EqualNullSafe(_, _) => true
case _ => false
})

val otherExpressions = targetOnlyPredicates.filter(e => e match {
case EqualTo(_, _) => false
case EqualNullSafe(_, _) => false
case GreaterThanOrEqual(_, _) => false
case LessThanOrEqual(_, _) => false
case LessThan(_, _) => false
case GreaterThan(_, _) => false
case _ => true
})

val removedPredicates = splitExpressions.filterNot(_.references.subsetOf(targetPlan.outputSet))

(deltaLog, unresolvedColumns, targetOnlyPredicates, minMaxOnlyExpressions, equalOnlyExpressions, otherExpressions, removedPredicates)
}


private def getAllPartitionStats(filteredDF: DataFrame, groupByCol: String, aggCol: String) = filteredDF
.groupBy(map_entries(col(groupByCol)))
Expand All @@ -92,7 +214,7 @@ object DeltaHelpers {
val snapshot = tableLog.snapshot
condition match {
case None => snapshot.filesWithStatsForScan(Nil)
case Some(value) => snapshot.filesWithStatsForScan(Seq(expr(value).expr))
case Some(value) => snapshot.filesWithStatsForScan(Seq(functions.expr(value).expr))
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/main/scala/mrpowers/jodie/delta/DeltaConstants.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package mrpowers.jodie.delta

import org.apache.spark.sql.catalyst.expressions.Expression

object DeltaConstants {
val sizeColumn = "size"
val numRecordsColumn = "stats.numRecords"
Expand Down Expand Up @@ -27,4 +29,16 @@ object DeltaConstants {
"max_file_size",
percentileCol
)
val OVERALL = "OVERALL RESOLVED CONDITION =>"
val MIN_MAX = "GREATER THAN / LESS THAN PART =>"
val EQUALS = "EQUALS/EQUALS NULL SAFE PART =>"
val LEFT_OVER = "LEFT OVER PART =>"
val UNRESOLVED = "UNRESOLVED PART =>"
val TOTAL_NUM_FILES = "TOTAL_NUM_FILES_IN_DELTA_TABLE =>"
val UNRESOLVED_COLS = "UNRESOLVED_COLUMNS =>"

def formatSQL(expressions: Seq[Expression]) = expressions.isEmpty match {
case true => None
case false => Some(expressions.map(a => a.sql).reduce(_ + " and " + _))
}
}
Loading

0 comments on commit 96060b2

Please sign in to comment.