Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added initial version of validateAppend function #81

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,84 @@ DeltaHelpers.copyTable(deltaTable = deltaTable, targetTableName = Some(tableName
Note the location where the table will be stored in this last function call
will be based on the spark conf property `spark.sql.warehouse.dir`.

### Validate append

The `validateAppend` function provides a mechanism for allowing some columns for schema evolution, but rejecting appends with columns that aren't specificly allowlisted.

Suppose you have the following Delta table:

```
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| b| B|
| 1| a| A|
+----+----+----+
```
Here's an appender function that wraps `validateAppend`:

```scala
DeltaHelpers.validateAppend(
deltaTable = deltaTable,
appendDF = appendDf,
requiredCols = List("col1", "col2"),
optionalCols = List("col4")
)
```

You can append the following DataFrame that contains the required columns and the optional columns:

```
+----+----+----+
|col1|col2|col4|
+----+----+----+
| 3| c| cat|
| 4| d| dog|
+----+----+----+
```

Here's what the Delta table will contain after that data is appended:

```
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 3| c|null| cat|
| 4| d|null| dog|
| 2| b| B|null|
| 1| a| A|null|
+----+----+----+----+
```

You cannot append the following DataFrame which contains the required columns, but also contains another column (`col5`) that's not specified as an optional column.

```
+----+----+----+
|col1|col2|col5|
+----+----+----+
| 4| b| A|
| 5| y| C|
| 6| z| D|
+----+----+----+
```

Here's the error you'll get when you attempt this write: "The column col5 is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter"

You also cannot append the following DataFrame which is missing one of the required columns.

```
+----+----+
|col1|col4|
+----+----+
| 4| A|
| 5| C|
| 6| D|
+----+----+
```

Here's the error you'll get: "The base Delta table has these columns List(col1, col4), but these columns are required List(col1, col2)"


### Latest Version of Delta Table
The function `latestVersion` return the latest version number of a table given its storage path.

Expand Down
59 changes: 59 additions & 0 deletions src/main/scala/mrpowers/jodie/DeltaHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,65 @@ object DeltaHelpers {
.save(storagePath)
}

/**
* Validates and appends data to a Delta table. This function ensures that the provided DataFrame can be appended
* to the specified Delta table by checking data type compatibility and column presence.
*
* @param deltaTable The Delta table to which data will be appended.
* @param appendDF The DataFrame containing data to be appended.
* @param requiredCols The list of required columns in the appendDF.
* @param optionalCols The list of optional columns in the appendDF.
* @throws IllegalArgumentException if input arguments have an invalid type, are missing, or are empty.
* @throws IllegalArgumentException if required columns are missing in the provided Delta table.
* @throws IllegalArgumentException if a column in the append DataFrame is not part of the original Delta table.
*/
def validateAppend(
deltaTable: DeltaTable,
appendDF: DataFrame,
requiredCols: List[String],
optionalCols: List[String]
): Unit = {
// Check if deltaTable is an instance of DeltaTable
if (!deltaTable.isInstanceOf[DeltaTable]) {
jjpulidos marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("An existing delta table must be specified.")
}

// Check if appendDF is an instance of DataFrame
if (!appendDF.isInstanceOf[DataFrame]) {
jjpulidos marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("You must provide a DataFrame that is to be appended.")
}

val appendDataColumns = appendDF.columns

// Check if all required columns are present in appendDF
for (requiredColumn <- requiredCols) {
jjpulidos marked this conversation as resolved.
Show resolved Hide resolved
if (!appendDataColumns.contains(requiredColumn)) {
throw new IllegalArgumentException(
jjpulidos marked this conversation as resolved.
Show resolved Hide resolved
s"The base Delta table has these columns ${appendDataColumns.mkString("List(", ", ", ")")}, but these columns are required $requiredCols"
)
}
}

val tableColumns = deltaTable.toDF.columns

// Check if all columns in appendDF are part of the current Delta table or optional
for (column <- appendDataColumns) {
jjpulidos marked this conversation as resolved.
Show resolved Hide resolved
if (!tableColumns.contains(column) && !optionalCols.contains(column)) {
throw new IllegalArgumentException(
s"The column $column is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter."
)
}
}

val details = deltaTable.detail().select("location").collect().head.getString(0)

// Write the appendDF to the Delta table
appendDF.write.format("delta")
.mode(SaveMode.Append)
.option("mergeSchema", "true")
.save(details)
}

def getStorageLocation(deltaTable: DeltaTable): String = {
val row = deltaTable.detail().select("location").collect().head
val locationPath = row.getString(0)
Expand Down
105 changes: 105 additions & 0 deletions src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,111 @@ class DeltaHelperSpec
}
}

describe("Validate and append data to a delta table"){
it("should append dataframes with optional columns"){
val path = (os.pwd / "tmp" / "delta-lake-validate-append-valid-dataframe").toString()
Seq(
(1, "a", "A"),
(2, "b", "B"),
)
.toDF("col1", "col2", "col3")
.write
.format("delta")
.mode("overwrite")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)

val deltaTable = DeltaTable.forPath(path)

val appendDf = Seq(
(3, "c", "cat"),
(4, "d", "dog"),
)
.toDF("col1", "col2", "col4")

DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"), List("col4"))

val expected = Seq(
(1, "a", "A", null),
(2, "b", "B", null),
(3, "c", null, "cat"),
(4, "d", null, "dog"),
)
.toDF("col1", "col2", "col3", "col4")

val result = DeltaTable.forPath(path)

assertSmallDataFrameEquality(
result.toDF,
expected,
orderedComparison = false,
ignoreNullable = true
)

}
it("should fail to append dataframes with columns that are not on the accept list"){
val path = (os.pwd / "tmp" / "delta-lake-validate-cols-in-accepted-list").toString()
Seq(
(1, "a", "A"),
(2, "b", "B"),
)
.toDF("col1", "col2", "col3")
.write
.format("delta")
.mode("overwrite")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)

val deltaTable = DeltaTable.forPath(path)

val appendDf = Seq(
(4, "b", "A"),
(5, "y", "C"),
(6, "z", "D"),
)
.toDF("col1", "col2", "col5")


val exceptionMessage = intercept[IllegalArgumentException] {
DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"),
List("col4") )
}.getMessage

assert(exceptionMessage.contains("The column col5 is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter"))
}
it("should fail to append dataframes with missing required columns"){
val path = (os.pwd / "tmp" / "delta-lake-validate-missing-required-cols").toString()
Seq(
(1, "a", "A"),
(2, "b", "B"),
)
.toDF("col1", "col2", "col3")
.write
.format("delta")
.mode("overwrite")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)

val deltaTable = DeltaTable.forPath(path)

val appendDf = Seq(
(4, "A"),
(5, "C"),
(6, "D"),
)
.toDF("col1", "col4")


val exceptionMessage = intercept[IllegalArgumentException] {
DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"),
List("col4"))
}.getMessage

assert(exceptionMessage.contains("The base Delta table has these columns List(col1, col4), but these columns are required List(col1, col2)"))

}
}

describe("Append without duplicating data") {
it(
"should insert data into an existing delta table and not duplicates in case some records already exists"
Expand Down