Skip to content

Commit

Permalink
Added initial version of validateAppend function (#81)
Browse files Browse the repository at this point in the history
* Added initial version of validateAppend function

* Fixed tests typos

* Fixed typo errors and readme exception descriptions

* Changed non-idiomatic expressions in scala

---------

Co-authored-by: Juan Pulido <[email protected]>
  • Loading branch information
jjpulidos and Juan Pulido authored Nov 17, 2023
1 parent b6bff43 commit d265948
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 0 deletions.
78 changes: 78 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,84 @@ DeltaHelpers.copyTable(deltaTable = deltaTable, targetTableName = Some(tableName
Note the location where the table will be stored in this last function call
will be based on the spark conf property `spark.sql.warehouse.dir`.

### Validate append

The `validateAppend` function provides a mechanism for allowing some columns for schema evolution, but rejecting appends with columns that aren't specificly allowlisted.

Suppose you have the following Delta table:

```
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| b| B|
| 1| a| A|
+----+----+----+
```
Here's an appender function that wraps `validateAppend`:

```scala
DeltaHelpers.validateAppend(
deltaTable = deltaTable,
appendDF = appendDf,
requiredCols = List("col1", "col2"),
optionalCols = List("col4")
)
```

You can append the following DataFrame that contains the required columns and the optional columns:

```
+----+----+----+
|col1|col2|col4|
+----+----+----+
| 3| c| cat|
| 4| d| dog|
+----+----+----+
```

Here's what the Delta table will contain after that data is appended:

```
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 3| c|null| cat|
| 4| d|null| dog|
| 2| b| B|null|
| 1| a| A|null|
+----+----+----+----+
```

You cannot append the following DataFrame which contains the required columns, but also contains another column (`col5`) that's not specified as an optional column.

```
+----+----+----+
|col1|col2|col5|
+----+----+----+
| 4| b| A|
| 5| y| C|
| 6| z| D|
+----+----+----+
```

Here's the error you'll get when you attempt this write: "The following columns are not part of the current Delta table. If you want to add these columns to the table, you must set the optionalCols parameter: List(col5)"

You also cannot append the following DataFrame which is missing one of the required columns.

```
+----+----+
|col1|col4|
+----+----+
| 4| A|
| 5| C|
| 6| D|
+----+----+
```

Here's the error you'll get: "The base Delta table has these columns List(col1, col4), but these columns are required List(col1, col2)"


### Latest Version of Delta Table
The function `latestVersion` return the latest version number of a table given its storage path.

Expand Down
39 changes: 39 additions & 0 deletions src/main/scala/mrpowers/jodie/DeltaHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,45 @@ object DeltaHelpers {
.save(storagePath)
}

/**
* Validates and appends data to a Delta table. This function ensures that the provided DataFrame can be appended
* to the specified Delta table by checking data type compatibility and column presence.
*
* @param deltaTable The Delta table to which data will be appended.
* @param appendDF The DataFrame containing data to be appended.
* @param requiredCols The list of required columns in the appendDF.
* @param optionalCols The list of optional columns in the appendDF.
* @throws IllegalArgumentException if input arguments have an invalid type, are missing, or are empty.
* @throws IllegalArgumentException if required columns are missing in the provided Delta table.
* @throws IllegalArgumentException if a column in the append DataFrame is not part of the original Delta table.
*/
def validateAppend(
deltaTable: DeltaTable,
appendDF: DataFrame,
requiredCols: List[String],
optionalCols: List[String]
): Unit = {

val appendDataColumns = appendDF.columns
val tableColumns = deltaTable.toDF.columns

// Check if all required columns are present in appendDF
val missingColumns = requiredCols.filterNot(appendDataColumns.contains)
require(missingColumns.isEmpty, s"The base Delta table has these columns ${appendDataColumns.mkString("List(", ", ", ")")}, but these columns are required $requiredCols")

// Check if all columns in appendDF are part of the current Delta table or optional
val invalidColumns = appendDataColumns.filterNot(column => tableColumns.contains(column) || optionalCols.contains(column))
require(invalidColumns.isEmpty, s"The following columns are not part of the current Delta table. If you want to add these columns to the table, you must set the optionalCols parameter: ${invalidColumns.mkString("List(", ", ", ")")}")

val details = deltaTable.detail().select("location").collect().head.getString(0)

// Write the appendDF to the Delta table
appendDF.write.format("delta")
.mode(SaveMode.Append)
.option("mergeSchema", "true")
.save(details)
}

def getStorageLocation(deltaTable: DeltaTable): String = {
val row = deltaTable.detail().select("location").collect().head
val locationPath = row.getString(0)
Expand Down
105 changes: 105 additions & 0 deletions src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,111 @@ class DeltaHelperSpec
}
}

describe("Validate and append data to a delta table"){
it("should append dataframes with optional columns"){
val path = (os.pwd / "tmp" / "delta-lake-validate-append-valid-dataframe").toString()
Seq(
(1, "a", "A"),
(2, "b", "B"),
)
.toDF("col1", "col2", "col3")
.write
.format("delta")
.mode("overwrite")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)

val deltaTable = DeltaTable.forPath(path)

val appendDf = Seq(
(3, "c", "cat"),
(4, "d", "dog"),
)
.toDF("col1", "col2", "col4")

DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"), List("col4"))

val expected = Seq(
(1, "a", "A", null),
(2, "b", "B", null),
(3, "c", null, "cat"),
(4, "d", null, "dog"),
)
.toDF("col1", "col2", "col3", "col4")

val result = DeltaTable.forPath(path)

assertSmallDataFrameEquality(
result.toDF,
expected,
orderedComparison = false,
ignoreNullable = true
)

}
it("should fail to append dataframes with columns that are not on the accept list"){
val path = (os.pwd / "tmp" / "delta-lake-validate-cols-in-accepted-list").toString()
Seq(
(1, "a", "A"),
(2, "b", "B"),
)
.toDF("col1", "col2", "col3")
.write
.format("delta")
.mode("overwrite")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)

val deltaTable = DeltaTable.forPath(path)

val appendDf = Seq(
(4, "b", "A"),
(5, "y", "C"),
(6, "z", "D"),
)
.toDF("col1", "col2", "col5")


val exceptionMessage = intercept[IllegalArgumentException] {
DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"),
List("col4") )
}.getMessage

assert(exceptionMessage.contains("The following columns are not part of the current Delta table. If you want to add these columns to the table, you must set the optionalCols parameter: List(col5)"))
}
it("should fail to append dataframes with missing required columns"){
val path = (os.pwd / "tmp" / "delta-lake-validate-missing-required-cols").toString()
Seq(
(1, "a", "A"),
(2, "b", "B"),
)
.toDF("col1", "col2", "col3")
.write
.format("delta")
.mode("overwrite")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)

val deltaTable = DeltaTable.forPath(path)

val appendDf = Seq(
(4, "A"),
(5, "C"),
(6, "D"),
)
.toDF("col1", "col4")


val exceptionMessage = intercept[IllegalArgumentException] {
DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"),
List("col4"))
}.getMessage

assert(exceptionMessage.contains("The base Delta table has these columns List(col1, col4), but these columns are required List(col1, col2)"))

}
}

describe("Append without duplicating data") {
it(
"should insert data into an existing delta table and not duplicates in case some records already exists"
Expand Down

0 comments on commit d265948

Please sign in to comment.