diff --git a/README.md b/README.md index 4b2128c..e9b4186 100644 --- a/README.md +++ b/README.md @@ -376,6 +376,68 @@ Map("size_in_bytes" -> 1320, "average_file_size_in_bytes" -> 660) ``` +## Delta Table File Size Distribution +The function `deltaFileSizeDistributionInMB` returns a `DataFrame` that contains the following stats in megabytes about file sizes in a Delta Table: +### `No. of Parquet Files, Mean File Size, Standard Deviation, Minimum File Size, Maximum File Size, 10th Percentile, 25th Percentile, Median, 75th Percentile, 90th Percentile, 95th Percentile.` + +This function also works on partition condition. For example, if you have a Delta Table partitioned by `country` and you want to know the file size distribution for `country = 'Australia''`, you can run the following: +```scala +DeltaHelpers.deltaFileSizeDistribution(path, Some("country='Australia'")) +``` +This will return a `DataFrame` with the following columns: +```scala ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ +|partitionValues |num_of_parquet_files|mean_size_of_files|stddev |min_file_size |max_file_size |Percentile[10th, 25th, Median, 75th, 90th, 95th] | ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ +|[{country, Australia}] |1429 |30.205616120778238|0.3454942220373272 |17.376179695129395 |30.377344131469727|[30.132079124450684, 30.173019409179688, 30.215540885925293, 30.25797176361084, 30.294878005981445, 30.318415641784668]| ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ +``` +Generally, if no partition condition is provided, the function will return the `file size distribution` for the whole Delta Table (with or without partition wise). +```scala +DeltaHelpers.deltaFileSizeDistribution(path) + ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ +|partitionValues |num_of_parquet_files|mean_size_of_files|stddev |min_file_size |max_file_size |Percentile[10th, 25th, Median, 75th, 90th, 95th] | ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ +|[{country, Mauritius}] |2502 |28.14731636093103 |0.7981461034111957 |0.005436897277832031|28.37139320373535 |[28.098042488098145, 28.12824249267578, 28.167524337768555, 28.207666397094727, 28.246790885925293, 28.265881538391113]| +|[{country, Malaysia}] |3334 |34.471798611888644|0.4018671378261647 |11.515838623046875 |34.700727462768555|[34.40602779388428, 34.43935298919678, 34.47779560089111, 34.51614856719971, 34.55129528045654, 34.57488822937012] | +|[{country, GrandDuchyofLuxembourg}] |808 |2.84647535569597 |0.5369371124495063 |0.006397247314453125|3.0397253036499023|[2.8616743087768555, 2.8840208053588867, 2.9723005294799805, 2.992110252380371, 3.0045957565307617, 3.0115060806274414]| +|[{country, Argentina}] |3372 |36.82978148392511 |5.336511210904255 |0.010506629943847656|99.95287132263184 |[36.29576301574707, 36.33060932159424, 36.369083404541016, 36.406826972961426, 36.442559242248535, 36.4655065536499] | +|[{country, Australia}] |1429 |30.205616120778238|0.3454942220373272 |17.376179695129395 |30.377344131469727|[30.132079124450684, 30.173019409179688, 30.215540885925293, 30.25797176361084, 30.294878005981445, 30.318415641784668]| ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ +``` +A similar function `deltaFileSizeDistribution` is provided which returns the same stats in bytes. +## Delta Table Number of Records Distribution +The function `deltaNumRecordDistribution` returns a `DataFrame` that contains the following stats about number of records in parquet files in a Delta Table: +### `No. of Parquet Files, Mean Num Records, Standard Deviation, Minimum & Maximum Number of Records in a File, 10th Percentile, 25th Percentile, Median, 75th Percentile, 90th Percentile, 95th Percentile.` + +This function also works on partition condition. For example, if you have a Delta Table partitioned by `country` and you want to know the numRecords distribution for `country = 'Australia''`, you can run the following: +```scala +DeltaHelpers.deltaNumRecordDistribution(path, Some("country='Australia'")) +``` +This will return a `DataFrame` with the following columns: +```scala ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+ +|partitionValues |num_of_parquet_files|mean_num_records_in_files|stddev |min_num_records|max_num_records|Percentile[10th, 25th, Median, 75th, 90th, 95th] | ++------------------------------------------------+--------------------+-------------------------+------------------+---------------+---------------+------------------------------------------------------------+ +|[{country, Australia}] |1429 |354160.2757172848 |4075.503669047513 |201823.0 |355980.0 |[353490.0, 353907.0, 354262.0, 354661.0, 355024.0, 355246.0]| ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+ +``` +Generally, if no partition condition is provided, the function will return the `number of records distribution` for the whole Delta Table (with or without partition wise). +```scala +DeltaHelpers.deltaNumRecordDistribution(path) + ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+ +|partitionValues |num_of_parquet_files|mean_num_records_in_files|stddev |min_num_records|max_num_records|Percentile[10th, 25th, Median, 75th, 90th, 95th] | ++------------------------------------------------+--------------------+-------------------------+------------------+---------------+---------------+------------------------------------------------------------+ +|[{country, Mauritius}] |2502 |433464.051558753 |12279.532110752265|1.0 |436195.0 |[432963.0, 433373.0, 433811.0, 434265.0, 434633.0, 434853.0]| +|[{country, Malaysia}] |3334 |411151.4946010798 |4797.137407595447 |136777.0 |413581.0 |[410390.0, 410794.0, 411234.0, 411674.0, 412063.0, 412309.0]| +|[{country, GrandDuchyofLuxembourg}] |808 |26462.003712871287 |5003.8118076056935|6.0 |28256.0 |[26605.0, 26811.0, 27635.0, 27822.0, 27937.0, 28002.0] | +|[{country, Argentina}] |3372 |461765.5604982206 |79874.3727926887 |61.0 |1403964.0 |[453782.0, 454174.0, 454646.0, 455103.0, 455543.0, 455818.0]| +|[{country, Australia}] |1429 |354160.2757172848 |4075.503669047513 |201823.0 |355980.0 |[353490.0, 353907.0, 354262.0, 354661.0, 355024.0, 355246.0]| ++------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+ +``` + ## Change Data Feed Helpers ### CASE I - When Delta aka Transaction Log gets purged @@ -573,6 +635,7 @@ To generate artifacts, run * Matthew Powers aka [MrPowers](https://github.com/MrPowers) * Brayan Jules aka [brayanjuls](https://github.com/brayanjuls) +* Joydeep Banik Roy aka [joydeepbroy-zeotap](https://github.com/joydeepbroy-zeotap) ## More about Jodie diff --git a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala index f6ad13a..2afc5a6 100644 --- a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala +++ b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala @@ -1,9 +1,11 @@ package mrpowers.jodie -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import io.delta.tables._ +import mrpowers.jodie.delta.DeltaConstants._ +import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.expressions.Window.partitionBy -import org.apache.spark.sql.functions.{col, concat_ws, count, md5, row_number} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import scala.collection.mutable @@ -12,13 +14,86 @@ object DeltaHelpers { /** * Gets the latest version of a Delta lake */ - def latestVersion(path: String): Long = { - DeltaTable - .forPath(SparkSession.active, path) - .history(1) - .select("version") - .head()(0) - .asInstanceOf[Long] + def latestVersion(path: String): Long = + DeltaLog.forTable(SparkSession.active, path).snapshot.version + + + /** + * Gets the file size distribution in megabytes of a Delta table. Works at a partition level when partition + * information is provided. Provides a more human readable version of the file size distribution. Provided columns are: + * +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ + * |partitionValues |num_of_parquet_files|mean_size_of_files|stddev |min_file_size |max_file_size |Percentile[10th, 25th, Median, 75th, 90th, 95th] | + * +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ + * |[{country, Mauritius}] |2502 |28.14731636093103 |0.7981461034111957 |0.005436897277832031|28.37139320373535 |[28.098042488098145, 28.12824249267578, 28.167524337768555, 28.207666397094727, 28.246790885925293, 28.265881538391113]| + * |[{country, Malaysia}] |3334 |34.471798611888644|0.4018671378261647 |11.515838623046875 |34.700727462768555|[34.40602779388428, 34.43935298919678, 34.47779560089111, 34.51614856719971, 34.55129528045654, 34.57488822937012] | + * |[{country, GrandDuchyofLuxembourg}] |808 |2.84647535569597 |0.5369371124495063 |0.006397247314453125|3.0397253036499023|[2.8616743087768555, 2.8840208053588867, 2.9723005294799805, 2.992110252380371, 3.0045957565307617, 3.0115060806274414]| + * |[{country, Argentina}] |3372 |36.82978148392511 |5.336511210904255 |0.010506629943847656|99.95287132263184 |[36.29576301574707, 36.33060932159424, 36.369083404541016, 36.406826972961426, 36.442559242248535, 36.4655065536499] | + * |[{country, Australia}] |1429 |30.205616120778238|0.3454942220373272 |17.376179695129395 |30.377344131469727|[30.132079124450684, 30.173019409179688, 30.215540885925293, 30.25797176361084, 30.294878005981445, 30.318415641784668]| + * +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+ + * + * @param path + * @param condition + * @return [[DataFrame]] + */ + def deltaFileSizeDistributionInMB(path: String, condition: Option[String] = None): DataFrame = + getAllPartitionStats(deltaFileStats(path, condition) + .withColumn("size_in_mb", col(sizeColumn).divide(1024 * 1024)), statsPartitionColumn, "size_in_mb") + .toDF(sizeDFColumns: _*) + + /** + * Gets the file size distribution in bytes of a Delta table. Works at a partition level when partition + * information is provided. Provided columns are same as [[deltaFileSizeDistributionInMB]] + * + * @param path + * @param condition + * @return [[DataFrame]] + */ + def deltaFileSizeDistribution(path: String, condition: Option[String] = None): DataFrame = + getAllPartitionStats(deltaFileStats(path, condition), statsPartitionColumn, sizeColumn).toDF(sizeDFColumns: _*) + + /** + * Gets the file-wise number of records distribution of a Delta table. Works at a partition level when partition + * information is provided. Provided columns are: + * +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+ + * |partitionValues |num_of_parquet_files|mean_num_records_in_files|stddev |min_num_records|max_num_records|Percentile[10th, 25th, Median, 75th, 90th, 95th] | + * +------------------------------------------------+--------------------+-------------------------+------------------+---------------+---------------+------------------------------------------------------------+ + * |[{country, Mauritius}] |2502 |433464.051558753 |12279.532110752265|1.0 |436195.0 |[432963.0, 433373.0, 433811.0, 434265.0, 434633.0, 434853.0]| + * |[{country, Malaysia}] |3334 |411151.4946010798 |4797.137407595447 |136777.0 |413581.0 |[410390.0, 410794.0, 411234.0, 411674.0, 412063.0, 412309.0]| + * |[{country, GrandDuchyofLuxembourg}] |808 |26462.003712871287 |5003.8118076056935|6.0 |28256.0 |[26605.0, 26811.0, 27635.0, 27822.0, 27937.0, 28002.0] | + * |[{country, Argentina}] |3372 |461765.5604982206 |79874.3727926887 |61.0 |1403964.0 |[453782.0, 454174.0, 454646.0, 455103.0, 455543.0, 455818.0]| + * |[{country, Australia}] |1429 |354160.2757172848 |4075.503669047513 |201823.0 |355980.0 |[353490.0, 353907.0, 354262.0, 354661.0, 355024.0, 355246.0]| + * +------------------------------------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------------------------------------------------+ + * + * @param path + * @param condition + * @return [[DataFrame]] + */ + def deltaNumRecordDistribution(path: String, condition: Option[String] = None): DataFrame = + getAllPartitionStats(deltaFileStats(path, condition), statsPartitionColumn, numRecordsColumn).toDF(numRecordsDFColumns: _*) + + + private def getAllPartitionStats(filteredDF: DataFrame, groupByCol: String, aggCol: String) = filteredDF + .groupBy(map_entries(col(groupByCol))) + .agg( + count(aggCol), + mean(aggCol), + stddev(aggCol), + min(aggCol), + max(aggCol), + percentile_approx( + col(aggCol), + lit(Array(0.1, 0.25, 0.50, 0.75, 0.90, 0.95)), + lit(Int.MaxValue) + ) + ) + + private def deltaFileStats(path: String, condition: Option[String] = None): DataFrame = { + val tableLog = DeltaLog.forTable(SparkSession.active, path) + val snapshot = tableLog.snapshot + condition match { + case None => snapshot.filesWithStatsForScan(Nil) + case Some(value) => snapshot.filesWithStatsForScan(Seq(expr(value).expr)) + } } def deltaFileSizes(deltaTable: DeltaTable) = { @@ -26,7 +101,11 @@ object DeltaHelpers { val (sizeInBytes, numberOfFiles) = (details.getAs[Long]("sizeInBytes"), details.getAs[Long]("numFiles")) val avgFileSizeInBytes = if (numberOfFiles == 0) 0 else Math.round(sizeInBytes / numberOfFiles) - Map("size_in_bytes" -> sizeInBytes, "number_of_files" -> numberOfFiles, "average_file_size_in_bytes" -> avgFileSizeInBytes) + Map( + "size_in_bytes" -> sizeInBytes, + "number_of_files" -> numberOfFiles, + "average_file_size_in_bytes" -> avgFileSizeInBytes + ) } /** @@ -202,7 +281,7 @@ object DeltaHelpers { if (compositeKey.isEmpty) throw new NoSuchElementException("The attribute compositeKey must not be empty") - val mergeCondition = compositeKey.map(c => s"old.$c = new.$c").mkString(" AND ") + val mergeCondition = compositeKey.map(c => s"old.$c = new.$c").mkString(" AND ") val appendDataCleaned = appendData.dropDuplicates(compositeKey) deltaTable .alias("old") @@ -237,17 +316,17 @@ object DeltaHelpers { } def withMD5Columns( - dataFrame: DataFrame, - cols: List[String], - newColName: String = "" - ): DataFrame = { + dataFrame: DataFrame, + cols: List[String], + newColName: String = "" + ): DataFrame = { val outputCol = if (newColName.isEmpty) cols.mkString("_md5", "", "") else newColName dataFrame.withColumn(outputCol, md5(concat_ws("||", cols.map(c => col(c)): _*))) } def withMD5Columns( - deltaTable: DeltaTable, - cols: List[String], - newColName: String - ): DataFrame = withMD5Columns(deltaTable.toDF, cols, newColName) + deltaTable: DeltaTable, + cols: List[String], + newColName: String + ): DataFrame = withMD5Columns(deltaTable.toDF, cols, newColName) } diff --git a/src/main/scala/mrpowers/jodie/delta/DeltaConstants.scala b/src/main/scala/mrpowers/jodie/delta/DeltaConstants.scala new file mode 100644 index 0000000..110e27d --- /dev/null +++ b/src/main/scala/mrpowers/jodie/delta/DeltaConstants.scala @@ -0,0 +1,30 @@ +package mrpowers.jodie.delta + +object DeltaConstants { + val sizeColumn = "size" + val numRecordsColumn = "stats.numRecords" + val statsPartitionColumn = "partitionValues" + private val percentileCol = "Percentile[10th, 25th, Median, 75th, 90th, 95th]" + private val num_of_parquet_files = "num_of_parquet_files" + val numRecordsDFColumns = + Array( + statsPartitionColumn, + num_of_parquet_files, + "mean_num_records_in_files", + "stddev", + "min_num_records", + "max_num_records", + percentileCol + ) + + val sizeDFColumns = + Array( + statsPartitionColumn, + num_of_parquet_files, + "mean_size_of_files", + "stddev", + "min_file_size", + "max_file_size", + percentileCol + ) +} diff --git a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala index 808dd0f..7085235 100644 --- a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala +++ b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala @@ -3,11 +3,14 @@ package mrpowers.jodie import com.github.mrpowers.spark.daria.sql.SparkSessionExt.SparkSessionMethods import com.github.mrpowers.spark.fast.tests.DataFrameComparer import io.delta.tables.DeltaTable +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.{IntegerType, StringType} import org.scalatest.BeforeAndAfterEach import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal} +import scala.collection.mutable + class DeltaHelperSpec extends AnyFunSpec with SparkSessionTestWrapper @@ -24,16 +27,7 @@ class DeltaHelperSpec describe("When Delta table is queried for file sizes") { it("should provide delta file sizes successfully") { val path = (os.pwd / "tmp" / "delta-table").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Benito", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Jose", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write.format("delta").mode("overwrite").save(path) + createBaseDeltaTable(path) val deltaTable = DeltaTable.forPath(path) val actual = DeltaHelpers.deltaFileSizes(deltaTable) @@ -59,16 +53,7 @@ class DeltaHelperSpec describe("remove duplicate records from delta table") { it("should remove duplicates successful") { val path = (os.pwd / "tmp" / "delta-duplicate").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Benito", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Jose", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write.format("delta").mode("overwrite").save(path) + createBaseDeltaTable(path) val deltaTable = DeltaTable.forPath(path) val duplicateColumns = Seq("firstname", "lastname") @@ -165,16 +150,7 @@ class DeltaHelperSpec describe("remove duplicate records from delta table using primary key") { it("should remove duplicates given a primary key and duplicate columns") { val path = (os.pwd / "tmp" / "delta-duplicate-pk").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Benito", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Jose", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write.format("delta").mode("overwrite").save(path) + createBaseDeltaTable(path) val deltaTable = DeltaTable.forPath(path) val duplicateColumns = Seq("lastname") @@ -197,16 +173,7 @@ class DeltaHelperSpec it("should fail to remove duplicates when not duplicate columns is provided") { val path = (os.pwd / "tmp" / "delta-pk-not-duplicate-columns").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Benito", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Jose", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write.format("delta").mode("overwrite").save(path) + createBaseDeltaTable(path) val deltaTable = DeltaTable.forPath(path) val primaryKey = "id" @@ -253,16 +220,7 @@ class DeltaHelperSpec it("should fail to remove duplicate when not primary key is provided") { val path = (os.pwd / "tmp" / "delta-duplicate-no-pk").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Benito", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Jose", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write.format("delta").mode("overwrite").save(path) + createBaseDeltaTable(path) val deltaTable = DeltaTable.forPath(path) val primaryKey = "" @@ -275,16 +233,7 @@ class DeltaHelperSpec it("should fail to remove duplicate when duplicateColumns does not exist in table") { val path = (os.pwd / "tmp" / "delta-duplicate-cols-no-exists").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Benito", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Jose", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write.format("delta").mode("overwrite").save(path) + createBaseDeltaTable(path) val deltaTable = DeltaTable.forPath(path) val primaryKey = "id" @@ -430,21 +379,7 @@ class DeltaHelperSpec it("should create a new delta table from an existing one using path") { val path = (os.pwd / "tmp" / "delta-copy-from-existing-path").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Patricia", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Gabriela", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write - .format("delta") - .mode("overwrite") - .partitionBy("lastname", "firstname") - .option("delta.logRetentionDuration", "interval 30 days") - .save(path) + val df = createBaseDeltaTableWithPartitions(path, Seq("lastname", "firstname")) val deltaTable = DeltaTable.forPath(path) val targetPath = (os.pwd / "tmp" / "delta-copy-from-existing-target-path").toString() DeltaHelpers.copyTable(deltaTable, targetPath = Some(targetPath)) @@ -460,21 +395,7 @@ class DeltaHelperSpec it("should copy table from existing one using table name") { val path = (os.pwd / "tmp" / "delta-copy-from-existing-tb-name").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Patricia", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Gabriela", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write - .format("delta") - .mode("overwrite") - .partitionBy("lastname") - .option("delta.logRetentionDuration", "interval 30 days") - .save(path) + val df: DataFrame = createBaseDeltaTableWithPartitions(path,Seq("lastname")) val deltaTable = DeltaTable.forPath(path) val tableName = "students" DeltaHelpers.copyTable(deltaTable, targetTableName = Some(tableName)) @@ -489,21 +410,7 @@ class DeltaHelperSpec it("should fail to copy when no table name or target path is set") { val path = (os.pwd / "tmp" / "delta-copy-non-destination").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Patricia", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Gabriela", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write - .format("delta") - .mode("overwrite") - .partitionBy("lastname") - .option("delta.logRetentionDuration", "interval 30 days") - .save(path) + val df: DataFrame = createBaseDeltaTableWithPartitions(path,Seq("lastname")) val deltaTable = DeltaTable.forPath(path) val exceptionMessage = intercept[JodieValidationError] { DeltaHelpers.copyTable(deltaTable) @@ -514,21 +421,7 @@ class DeltaHelperSpec it("should fail to copy when both table name and target path are set") { val path = (os.pwd / "tmp" / "delta-copy-two-destination").toString() - val df = Seq( - (1, "Benito", "Jackson"), - (2, "Maria", "Willis"), - (3, "Jose", "Travolta"), - (4, "Patricia", "Jackson"), - (5, "Jose", "Travolta"), - (6, "Gabriela", "Travolta"), - (7, "Maria", "Pitt") - ).toDF("id", "firstname", "lastname") - df.write - .format("delta") - .mode("overwrite") - .partitionBy("lastname") - .option("delta.logRetentionDuration", "interval 30 days") - .save(path) + val df: DataFrame = createBaseDeltaTableWithPartitions(path,Seq("lastname")) val deltaTable = DeltaTable.forPath(path) val tableName = "students" val tablePath = (os.pwd / "tmp" / "delta-copy-from-existing-target-path").toString() @@ -759,4 +652,97 @@ class DeltaHelperSpec orderedComparison = false) } } + + describe("Generate metrics for optimize functions on Delta Table") { + it("should return valid file sizes and num records for non partitioned tables") { + val path = (os.pwd / "tmp" / "delta-table-non-partitioned").toString() + createBaseDeltaTable(path) + val fileSizeDF = DeltaHelpers.deltaFileSizeDistribution(path) + val numRecordsDF = DeltaHelpers.deltaNumRecordDistribution(path) + fileSizeDF.count() should equal(1l) + assertDistributionCount(fileSizeDF, (0, 1l, 1088.0, null, 1088l, 1088l, Array(1088, 1088, 1088, 1088, 1088, 1088))) + numRecordsDF.count() should equal(1l) + assertDistributionCount(numRecordsDF, (0, 1l, 7.0, null, 7l, 7l, Array(7, 7, 7, 7, 7, 7))) + } + it("should return valid file sizes and num records for single partitioned tables") { + val path = (os.pwd / "tmp" / "delta-table-single-partition").toString() + createBaseDeltaTableWithPartitions(path, Seq("lastname")) + val fileSizeDF = DeltaHelpers.deltaFileSizeDistribution(path, Some("lastname='Travolta'")) + val numRecordsDF = DeltaHelpers.deltaNumRecordDistribution(path, Some("lastname='Travolta'")) + fileSizeDF.count() should equal(1l) + assertDistributionCount(fileSizeDF, (1, 1l, 756.0, null, 756, 756, Array(756, 756, 756, 756, 756, 756))) + numRecordsDF.count() should equal(1l) + assertDistributionCount(numRecordsDF, (1, 1l, 3.0, null, 3, 3, Array(3, 3, 3, 3, 3, 3))) + } + it("should return valid file sizes and num records for multiple partitioned tables") { + val path = (os.pwd / "tmp" / "delta-table-multi-partition").toString() + createBaseDeltaTableWithPartitions(path, Seq("lastname", "firstname")) + val fileSizeDF = DeltaHelpers.deltaFileSizeDistribution(path, Some("lastname='Travolta' and firstname='Jose'")) + val numRecordsDF = DeltaHelpers.deltaNumRecordDistribution(path, Some("lastname='Travolta' and firstname='Jose'")) + fileSizeDF.count() should equal(1l) + assertDistributionCount(fileSizeDF, (2, 1l, 456.0, null, 456, 456, Array(456, 456, 456, 456, 456, 456))) + numRecordsDF.count() should equal(1l) + assertDistributionCount(numRecordsDF, (2, 1l, 2.0, null, 2, 2, Array(2, 2, 2, 2, 2, 2))) + } + + it("should return valid file sizes in megabytes"){ + val path = (os.pwd / "tmp" / "delta-table-multi-files").toString() + def getDF(partition:String) = { + (1 to 10000).toDF("id") + .collect() + .map(_.getInt(0)) + .map(id => (id, partition, id + 10)) + .toSeq + } + (getDF("dog") ++ getDF("cat") ++ getDF("bird")) + .toDF("id", "animal", "age").write.mode("overwrite") + .format("delta").partitionBy("animal").save(path) + val fileSizeDF = DeltaHelpers.deltaFileSizeDistributionInMB(path) + val size = 0.07698249816894531 + fileSizeDF.count() should equal(3) + assertDistributionCount(fileSizeDF, (1, 1l, size, null, size, size, Array(size, size, size, size, size, size))) + } + } + + private def assertDistributionCount(df: DataFrame, expected: (Int, Long, Double, Any, Any, Any, Array[Double])) = { + val actual = df.take(1)(0) + actual.getAs[mutable.WrappedArray[(String, String)]](0).length should equal(expected._1) + actual.getAs[Long](1) should equal(expected._2) + actual.getAs[Double](2) should equal(expected._3) + actual.getAs[Double](3) should equal(expected._4) + actual.getAs[Long](4) should equal(expected._5) + actual.getAs[Long](5) should equal(expected._6) + actual.getAs[Array[Double]](6) should equal(expected._7) + } + + private def createBaseDeltaTable(path: String): Unit = { + val df = Seq( + (1, "Benito", "Jackson"), + (2, "Maria", "Willis"), + (3, "Jose", "Travolta"), + (4, "Benito", "Jackson"), + (5, "Jose", "Travolta"), + (6, "Jose", "Travolta"), + (7, "Maria", "Pitt") + ).toDF("id", "firstname", "lastname") + df.write.format("delta").mode("overwrite").save(path) + } + private def createBaseDeltaTableWithPartitions(path: String, partitionBy: Seq[String]) = { + val df = Seq( + (1, "Benito", "Jackson"), + (2, "Maria", "Willis"), + (3, "Jose", "Travolta"), + (4, "Patricia", "Jackson"), + (5, "Jose", "Travolta"), + (6, "Gabriela", "Travolta"), + (7, "Maria", "Pitt") + ).toDF("id", "firstname", "lastname") + df.write + .format("delta") + .mode("overwrite") + .partitionBy(partitionBy: _*) + .option("delta.logRetentionDuration", "interval 30 days") + .save(path) + df + } }