Skip to content

Commit

Permalink
helper methods - delta partition file size and numRecords distribution (
Browse files Browse the repository at this point in the history
#73)

* helper methods - delta partition size and numRecords distribution

* delta partition size : added percentiles

* delta partition size : fix build

* delta partition size : added unit tests + fixed failing tests

* delta partition size : added unit tests for filesizesInMB and updated README.md

* delta partition size : reducing record size and updated README.md
  • Loading branch information
joydeepbroy-zeotap authored Jun 9, 2023
1 parent 798c647 commit e51fb12
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 139 deletions.
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,68 @@ Map("size_in_bytes" -> 1320,
"average_file_size_in_bytes" -> 660)
```

## Delta Table File Size Distribution
The function `deltaFileSizeDistributionInMB` returns a `DataFrame` that contains the following stats in megabytes about file sizes in a Delta Table:
### `No. of Parquet Files, Mean File Size, Standard Deviation, Minimum File Size, Maximum File Size, 10th Percentile, 25th Percentile, Median, 75th Percentile, 90th Percentile, 95th Percentile.`

This function also works on partition condition. For example, if you have a Delta Table partitioned by `country` and you want to know the file size distribution for `country = 'Australia''`, you can run the following:
```scala
DeltaHelpers.deltaFileSizeDistribution(path, Some("country='Australia'"))
```
This will return a `DataFrame` with the following columns:
```scala
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
|partitionValues |num_of_parquet_files|mean_size_of_files|stddev |min_file_size |max_file_size |Percentile[10th, 25th, Median, 75th, 90th, 95th] |
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
|[{country, Australia}] |1429 |30.205616120778238|0.3454942220373272 |17.376179695129395 |30.377344131469727|[30.132079124450684, 30.173019409179688, 30.215540885925293, 30.25797176361084, 30.294878005981445, 30.318415641784668]|
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
```
Generally, if no partition condition is provided, the function will return the `file size distribution` for the whole Delta Table (with or without partition wise).
```scala
DeltaHelpers.deltaFileSizeDistribution(path)

+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
|partitionValues |num_of_parquet_files|mean_size_of_files|stddev |min_file_size |max_file_size |Percentile[10th, 25th, Median, 75th, 90th, 95th] |
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
|[{country, Mauritius}] |2502 |28.14731636093103 |0.7981461034111957 |0.005436897277832031|28.37139320373535 |[28.098042488098145, 28.12824249267578, 28.167524337768555, 28.207666397094727, 28.246790885925293, 28.265881538391113]|
|[{country, Malaysia}] |3334 |34.471798611888644|0.4018671378261647 |11.515838623046875 |34.700727462768555|[34.40602779388428, 34.43935298919678, 34.47779560089111, 34.51614856719971, 34.55129528045654, 34.57488822937012] |
|[{country, GrandDuchyofLuxembourg}] |808 |2.84647535569597 |0.5369371124495063 |0.006397247314453125|3.0397253036499023|[2.8616743087768555, 2.8840208053588867, 2.9723005294799805, 2.992110252380371, 3.0045957565307617, 3.0115060806274414]|
|[{country, Argentina}] |3372 |36.82978148392511 |5.336511210904255 |0.010506629943847656|99.95287132263184 |[36.29576301574707, 36.33060932159424, 36.369083404541016, 36.406826972961426, 36.442559242248535, 36.4655065536499] |
|[{country, Australia}] |1429 |30.205616120778238|0.3454942220373272 |17.376179695129395 |30.377344131469727|[30.132079124450684, 30.173019409179688, 30.215540885925293, 30.25797176361084, 30.294878005981445, 30.318415641784668]|
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
```
A similar function `deltaFileSizeDistribution` is provided which returns the same stats in bytes.
## Delta Table Number of Records Distribution
The function `deltaNumRecordDistribution` returns a `DataFrame` that contains the following stats about number of records in parquet files in a Delta Table:
### `No. of Parquet Files, Mean Num Records, Standard Deviation, Minimum & Maximum Number of Records in a File, 10th Percentile, 25th Percentile, Median, 75th Percentile, 90th Percentile, 95th Percentile.`

This function also works on partition condition. For example, if you have a Delta Table partitioned by `country` and you want to know the numRecords distribution for `country = 'Australia''`, you can run the following:
```scala
DeltaHelpers.deltaNumRecordDistribution(path, Some("country='Australia'"))
```
This will return a `DataFrame` with the following columns:
```scala
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+
|partitionValues |num_of_parquet_files|mean_num_records_in_files|stddev |min_num_records|max_num_records|Percentile[10th, 25th, Median, 75th, 90th, 95th] |
+------------------------------------------------+--------------------+-------------------------+------------------+---------------+---------------+------------------------------------------------------------+
|[{country, Australia}] |1429 |354160.2757172848 |4075.503669047513 |201823.0 |355980.0 |[353490.0, 353907.0, 354262.0, 354661.0, 355024.0, 355246.0]|
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+
```
Generally, if no partition condition is provided, the function will return the `number of records distribution` for the whole Delta Table (with or without partition wise).
```scala
DeltaHelpers.deltaNumRecordDistribution(path)

+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+
|partitionValues |num_of_parquet_files|mean_num_records_in_files|stddev |min_num_records|max_num_records|Percentile[10th, 25th, Median, 75th, 90th, 95th] |
+------------------------------------------------+--------------------+-------------------------+------------------+---------------+---------------+------------------------------------------------------------+
|[{country, Mauritius}] |2502 |433464.051558753 |12279.532110752265|1.0 |436195.0 |[432963.0, 433373.0, 433811.0, 434265.0, 434633.0, 434853.0]|
|[{country, Malaysia}] |3334 |411151.4946010798 |4797.137407595447 |136777.0 |413581.0 |[410390.0, 410794.0, 411234.0, 411674.0, 412063.0, 412309.0]|
|[{country, GrandDuchyofLuxembourg}] |808 |26462.003712871287 |5003.8118076056935|6.0 |28256.0 |[26605.0, 26811.0, 27635.0, 27822.0, 27937.0, 28002.0] |
|[{country, Argentina}] |3372 |461765.5604982206 |79874.3727926887 |61.0 |1403964.0 |[453782.0, 454174.0, 454646.0, 455103.0, 455543.0, 455818.0]|
|[{country, Australia}] |1429 |354160.2757172848 |4075.503669047513 |201823.0 |355980.0 |[353490.0, 353907.0, 354262.0, 354661.0, 355024.0, 355246.0]|
+------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+
```

## Change Data Feed Helpers

### CASE I - When Delta aka Transaction Log gets purged
Expand Down Expand Up @@ -573,6 +635,7 @@ To generate artifacts, run

* Matthew Powers aka [MrPowers](https://github.com/MrPowers)
* Brayan Jules aka [brayanjuls](https://github.com/brayanjuls)
* Joydeep Banik Roy aka [joydeepbroy-zeotap](https://github.com/joydeepbroy-zeotap)

## More about Jodie

Expand Down
117 changes: 98 additions & 19 deletions src/main/scala/mrpowers/jodie/DeltaHelpers.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package mrpowers.jodie

import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import io.delta.tables._
import mrpowers.jodie.delta.DeltaConstants._
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.expressions.Window.partitionBy
import org.apache.spark.sql.functions.{col, concat_ws, count, md5, row_number}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

import scala.collection.mutable

Expand All @@ -12,21 +14,98 @@ object DeltaHelpers {
/**
* Gets the latest version of a Delta lake
*/
def latestVersion(path: String): Long = {
DeltaTable
.forPath(SparkSession.active, path)
.history(1)
.select("version")
.head()(0)
.asInstanceOf[Long]
def latestVersion(path: String): Long =
DeltaLog.forTable(SparkSession.active, path).snapshot.version


/**
* Gets the file size distribution in megabytes of a Delta table. Works at a partition level when partition
* information is provided. Provides a more human readable version of the file size distribution. Provided columns are:
* +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
* |partitionValues |num_of_parquet_files|mean_size_of_files|stddev |min_file_size |max_file_size |Percentile[10th, 25th, Median, 75th, 90th, 95th] |
* +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
* |[{country, Mauritius}] |2502 |28.14731636093103 |0.7981461034111957 |0.005436897277832031|28.37139320373535 |[28.098042488098145, 28.12824249267578, 28.167524337768555, 28.207666397094727, 28.246790885925293, 28.265881538391113]|
* |[{country, Malaysia}] |3334 |34.471798611888644|0.4018671378261647 |11.515838623046875 |34.700727462768555|[34.40602779388428, 34.43935298919678, 34.47779560089111, 34.51614856719971, 34.55129528045654, 34.57488822937012] |
* |[{country, GrandDuchyofLuxembourg}] |808 |2.84647535569597 |0.5369371124495063 |0.006397247314453125|3.0397253036499023|[2.8616743087768555, 2.8840208053588867, 2.9723005294799805, 2.992110252380371, 3.0045957565307617, 3.0115060806274414]|
* |[{country, Argentina}] |3372 |36.82978148392511 |5.336511210904255 |0.010506629943847656|99.95287132263184 |[36.29576301574707, 36.33060932159424, 36.369083404541016, 36.406826972961426, 36.442559242248535, 36.4655065536499] |
* |[{country, Australia}] |1429 |30.205616120778238|0.3454942220373272 |17.376179695129395 |30.377344131469727|[30.132079124450684, 30.173019409179688, 30.215540885925293, 30.25797176361084, 30.294878005981445, 30.318415641784668]|
* +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+
*
* @param path
* @param condition
* @return [[DataFrame]]
*/
def deltaFileSizeDistributionInMB(path: String, condition: Option[String] = None): DataFrame =
getAllPartitionStats(deltaFileStats(path, condition)
.withColumn("size_in_mb", col(sizeColumn).divide(1024 * 1024)), statsPartitionColumn, "size_in_mb")
.toDF(sizeDFColumns: _*)

/**
* Gets the file size distribution in bytes of a Delta table. Works at a partition level when partition
* information is provided. Provided columns are same as [[deltaFileSizeDistributionInMB]]
*
* @param path
* @param condition
* @return [[DataFrame]]
*/
def deltaFileSizeDistribution(path: String, condition: Option[String] = None): DataFrame =
getAllPartitionStats(deltaFileStats(path, condition), statsPartitionColumn, sizeColumn).toDF(sizeDFColumns: _*)

/**
* Gets the file-wise number of records distribution of a Delta table. Works at a partition level when partition
* information is provided. Provided columns are:
* +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+
* |partitionValues |num_of_parquet_files|mean_num_records_in_files|stddev |min_num_records|max_num_records|Percentile[10th, 25th, Median, 75th, 90th, 95th] |
* +------------------------------------------------+--------------------+-------------------------+------------------+---------------+---------------+------------------------------------------------------------+
* |[{country, Mauritius}] |2502 |433464.051558753 |12279.532110752265|1.0 |436195.0 |[432963.0, 433373.0, 433811.0, 434265.0, 434633.0, 434853.0]|
* |[{country, Malaysia}] |3334 |411151.4946010798 |4797.137407595447 |136777.0 |413581.0 |[410390.0, 410794.0, 411234.0, 411674.0, 412063.0, 412309.0]|
* |[{country, GrandDuchyofLuxembourg}] |808 |26462.003712871287 |5003.8118076056935|6.0 |28256.0 |[26605.0, 26811.0, 27635.0, 27822.0, 27937.0, 28002.0] |
* |[{country, Argentina}] |3372 |461765.5604982206 |79874.3727926887 |61.0 |1403964.0 |[453782.0, 454174.0, 454646.0, 455103.0, 455543.0, 455818.0]|
* |[{country, Australia}] |1429 |354160.2757172848 |4075.503669047513 |201823.0 |355980.0 |[353490.0, 353907.0, 354262.0, 354661.0, 355024.0, 355246.0]|
* +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+
*
* @param path
* @param condition
* @return [[DataFrame]]
*/
def deltaNumRecordDistribution(path: String, condition: Option[String] = None): DataFrame =
getAllPartitionStats(deltaFileStats(path, condition), statsPartitionColumn, numRecordsColumn).toDF(numRecordsDFColumns: _*)


private def getAllPartitionStats(filteredDF: DataFrame, groupByCol: String, aggCol: String) = filteredDF
.groupBy(map_entries(col(groupByCol)))
.agg(
count(aggCol),
mean(aggCol),
stddev(aggCol),
min(aggCol),
max(aggCol),
percentile_approx(
col(aggCol),
lit(Array(0.1, 0.25, 0.50, 0.75, 0.90, 0.95)),
lit(Int.MaxValue)
)
)

private def deltaFileStats(path: String, condition: Option[String] = None): DataFrame = {
val tableLog = DeltaLog.forTable(SparkSession.active, path)
val snapshot = tableLog.snapshot
condition match {
case None => snapshot.filesWithStatsForScan(Nil)
case Some(value) => snapshot.filesWithStatsForScan(Seq(expr(value).expr))
}
}

def deltaFileSizes(deltaTable: DeltaTable) = {
val details: Row = deltaTable.detail().select("numFiles", "sizeInBytes").collect()(0)
val (sizeInBytes, numberOfFiles) =
(details.getAs[Long]("sizeInBytes"), details.getAs[Long]("numFiles"))
val avgFileSizeInBytes = if (numberOfFiles == 0) 0 else Math.round(sizeInBytes / numberOfFiles)
Map("size_in_bytes" -> sizeInBytes, "number_of_files" -> numberOfFiles, "average_file_size_in_bytes" -> avgFileSizeInBytes)
Map(
"size_in_bytes" -> sizeInBytes,
"number_of_files" -> numberOfFiles,
"average_file_size_in_bytes" -> avgFileSizeInBytes
)
}

/**
Expand Down Expand Up @@ -202,7 +281,7 @@ object DeltaHelpers {
if (compositeKey.isEmpty)
throw new NoSuchElementException("The attribute compositeKey must not be empty")

val mergeCondition = compositeKey.map(c => s"old.$c = new.$c").mkString(" AND ")
val mergeCondition = compositeKey.map(c => s"old.$c = new.$c").mkString(" AND ")
val appendDataCleaned = appendData.dropDuplicates(compositeKey)
deltaTable
.alias("old")
Expand Down Expand Up @@ -237,17 +316,17 @@ object DeltaHelpers {
}

def withMD5Columns(
dataFrame: DataFrame,
cols: List[String],
newColName: String = ""
): DataFrame = {
dataFrame: DataFrame,
cols: List[String],
newColName: String = ""
): DataFrame = {
val outputCol = if (newColName.isEmpty) cols.mkString("_md5", "", "") else newColName
dataFrame.withColumn(outputCol, md5(concat_ws("||", cols.map(c => col(c)): _*)))
}

def withMD5Columns(
deltaTable: DeltaTable,
cols: List[String],
newColName: String
): DataFrame = withMD5Columns(deltaTable.toDF, cols, newColName)
deltaTable: DeltaTable,
cols: List[String],
newColName: String
): DataFrame = withMD5Columns(deltaTable.toDF, cols, newColName)
}
30 changes: 30 additions & 0 deletions src/main/scala/mrpowers/jodie/delta/DeltaConstants.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mrpowers.jodie.delta

object DeltaConstants {
val sizeColumn = "size"
val numRecordsColumn = "stats.numRecords"
val statsPartitionColumn = "partitionValues"
private val percentileCol = "Percentile[10th, 25th, Median, 75th, 90th, 95th]"
private val num_of_parquet_files = "num_of_parquet_files"
val numRecordsDFColumns =
Array(
statsPartitionColumn,
num_of_parquet_files,
"mean_num_records_in_files",
"stddev",
"min_num_records",
"max_num_records",
percentileCol
)

val sizeDFColumns =
Array(
statsPartitionColumn,
num_of_parquet_files,
"mean_size_of_files",
"stddev",
"min_file_size",
"max_file_size",
percentileCol
)
}
Loading

0 comments on commit e51fb12

Please sign in to comment.