Skip to content

Commit

Permalink
Delta file sizes (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
joydeepbroy-zeotap authored Mar 28, 2023
1 parent 54b9f40 commit 1f8f119
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 11 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,30 @@ The result will be the following:
Seq("firstname","lastname")
```

## Delta File Sizes

The `deltaFileSizes` function returns a `Map[String,Long]` that contains the total size in bytes, the amount of files and the
average file size for a given Delta Table.

Suppose you have the following Delta Table, partitioned by `col1`:

```
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| A|
| 2| A| B|
+----+----+----+
```

Running `DeltaHelpers.deltaFileSizes(deltaTable)` on that table will return:

```scala
Map("size_in_bytes" -> 1320,
"number_of_files" -> 2,
"average_file_size_in_bytes" -> 660)
```

## Change Data Feed Helpers

### CASE I - When Delta aka Transaction Log gets purged
Expand Down
10 changes: 9 additions & 1 deletion src/main/scala/mrpowers/jodie/DeltaHelpers.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mrpowers.jodie

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import io.delta.tables._
import org.apache.spark.sql.expressions.Window.partitionBy
import org.apache.spark.sql.functions.{col, concat_ws, count, md5, row_number}
Expand All @@ -21,6 +21,14 @@ object DeltaHelpers {
.asInstanceOf[Long]
}

def deltaFileSizes(deltaTable: DeltaTable) = {
val details: Row = deltaTable.detail().select("numFiles", "sizeInBytes").collect()(0)
val (sizeInBytes, numberOfFiles) =
(details.getAs[Long]("sizeInBytes"), details.getAs[Long]("numFiles"))
val avgFileSizeInBytes = if (numberOfFiles == 0) 0 else Math.round(sizeInBytes / numberOfFiles)
Map("size_in_bytes" -> sizeInBytes, "number_of_files" -> numberOfFiles, "average_file_size_in_bytes" -> avgFileSizeInBytes)
}

/**
* This function remove all duplicate records from a delta table. Duplicate records means all rows
* that have more than one occurrence of the value of the columns provided in the input parameter
Expand Down
54 changes: 44 additions & 10 deletions src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import io.delta.tables.DeltaTable
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funspec.AnyFunSpec

import scala.util.{Failure, Success}
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal}

class DeltaHelperSpec
extends AnyFunSpec
Expand All @@ -22,6 +21,41 @@ class DeltaHelperSpec

import spark.implicits._

describe("When Delta table is queried for file sizes") {
it("should provide delta file sizes successfully") {
val path = (os.pwd / "tmp" / "delta-table").toString()
val df = Seq(
(1, "Benito", "Jackson"),
(2, "Maria", "Willis"),
(3, "Jose", "Travolta"),
(4, "Benito", "Jackson"),
(5, "Jose", "Travolta"),
(6, "Jose", "Travolta"),
(7, "Maria", "Pitt")
).toDF("id", "firstname", "lastname")
df.write.format("delta").mode("overwrite").save(path)

val deltaTable = DeltaTable.forPath(path)
val actual = DeltaHelpers.deltaFileSizes(deltaTable)

actual("size_in_bytes") should equal(1088L)
actual("number_of_files") should equal(1L)
actual("average_file_size_in_bytes") should equal(1088L)
}

it("should not fail if the table is empty") {
val emptyDeltaTable = DeltaTable.create(spark)
.tableName("delta_empty_table")
.addColumn("id", dataType = "INT")
.addColumn("firstname", dataType = "STRING")
.addColumn("lastname", dataType = "STRING")
.execute()
val actual = DeltaHelpers.deltaFileSizes(emptyDeltaTable)
actual("size_in_bytes") should equal(0)
actual("number_of_files") should equal(0)
actual("average_file_size_in_bytes") should equal(0)
}
}
describe("remove duplicate records from delta table") {
it("should remove duplicates successful") {
val path = (os.pwd / "tmp" / "delta-duplicate").toString()
Expand Down Expand Up @@ -387,7 +421,7 @@ class DeltaHelperSpec
).toDF("id", "firstname", "lastname")
df.write.format("delta").mode("overwrite").save(path)
val deltaTable = DeltaTable.forPath(path)
val result = DeltaHelpers.getStorageLocation(deltaTable)
val result = DeltaHelpers.getStorageLocation(deltaTable)
assertResult(s"file:$path")(result)
}
}
Expand Down Expand Up @@ -609,8 +643,8 @@ class DeltaHelperSpec

}

describe("find composite key in a table"){
it("should not find the composite key in the table"){
describe("find composite key in a table") {
it("should not find the composite key in the table") {
val path = (os.pwd / "tmp" / "delta-tbl").toString()
Seq(
(1, "Benito", "Jackson"),
Expand All @@ -625,12 +659,12 @@ class DeltaHelperSpec
.save(path)

val deltaTable = DeltaTable.forPath(path)
val result = DeltaHelpers.findCompositeKeyCandidate(deltaTable,Seq("id"))
val result = DeltaHelpers.findCompositeKeyCandidate(deltaTable, Seq("id"))

assertResult(Nil)(result)
}

it("should find the composite key in the table"){
it("should find the composite key in the table") {
val path = (os.pwd / "tmp" / "delta-tbl").toString()
Seq(
(1, "Benito", "Jackson"),
Expand All @@ -644,12 +678,12 @@ class DeltaHelperSpec
.mode("overwrite")
.save(path)
val deltaTable = DeltaTable.forPath(path)
val result = DeltaHelpers.findCompositeKeyCandidate(deltaTable,Seq("id"))
val expected = Seq("firstname","lastname")
val result = DeltaHelpers.findCompositeKeyCandidate(deltaTable, Seq("id"))
val expected = Seq("firstname", "lastname")
assertResult(expected)(result)
}

it("should find the composite key in the table when cols are excluded"){
it("should find the composite key in the table when cols are excluded") {
val path = (os.pwd / "tmp" / "delta-tbl").toString()
Seq(
(1, "Benito", "Jackson"),
Expand Down

0 comments on commit 1f8f119

Please sign in to comment.