From d3db52e3ccba70f83e80b48dea90f30234f536b2 Mon Sep 17 00:00:00 2001 From: Juan Pulido Date: Thu, 9 Nov 2023 11:03:18 -0500 Subject: [PATCH 1/4] Added initial version of validateAppend function --- README.md | 78 ++++++++++++++ .../scala/mrpowers/jodie/DeltaHelpers.scala | 59 +++++++++++ .../mrpowers/jodie/DeltaHelperSpec.scala | 100 ++++++++++++++++++ 3 files changed, 237 insertions(+) diff --git a/README.md b/README.md index 9f1de88..fa367ac 100644 --- a/README.md +++ b/README.md @@ -224,6 +224,84 @@ DeltaHelpers.copyTable(deltaTable = deltaTable, targetTableName = Some(tableName Note the location where the table will be stored in this last function call will be based on the spark conf property `spark.sql.warehouse.dir`. +### Validate append + +The `validateAppend` function provides a mechanism for allowing some columns for schema evolution, but rejecting appends with columns that aren't specificly allowlisted. + +Suppose you have the following Delta table: + +``` ++----+----+----+ +|col1|col2|col3| ++----+----+----+ +| 2| b| B| +| 1| a| A| ++----+----+----+ +``` +Here's an appender function that wraps `validateAppend`: + +```scala +DeltaHelpers.validateAppend( + deltaTable = deltaTable, + appendDF = appendDf, + requiredCols = List("col1", "col2"), + optionalCols = List("col4") +) +``` + +You can append the following DataFrame that contains the required columns and the optional columns: + +``` ++----+----+----+ +|col1|col2|col4| ++----+----+----+ +| 3| c| cat| +| 4| d| dog| ++----+----+----+ +``` + +Here's what the Delta table will contain after that data is appended: + +``` ++----+----+----+----+ +|col1|col2|col3|col4| ++----+----+----+----+ +| 3| c|null| cat| +| 4| d|null| dog| +| 2| b| B|null| +| 1| a| A|null| ++----+----+----+----+ +``` + +You cannot append the following DataFrame which contains the required columns, but also contains another column (`col5`) that's not specified as an optional column. + +``` ++----+----+----+ +|col1|col2|col5| ++----+----+----+ +| 4| b| A| +| 5| y| C| +| 6| z| D| ++----+----+----+ +``` + +Here's the error you'll get when you attempt this write: "" + +You also cannot append the following DataFrame which is missing one of the required columns. + +``` ++----+----+ +|col1|col4| ++----+----+ +| 4| A| +| 5| C| +| 6| D| ++----+----+ +``` + +Here's the error you'll get: "" + + ### Latest Version of Delta Table The function `latestVersion` return the latest version number of a table given its storage path. diff --git a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala index a26d362..f35776a 100644 --- a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala +++ b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala @@ -339,6 +339,65 @@ object DeltaHelpers { .save(storagePath) } + /** + * Validates and appends data to a Delta table. This function ensures that the provided DataFrame can be appended + * to the specified Delta table by checking data type compatibility and column presence. + * + * @param deltaTable The Delta table to which data will be appended. + * @param appendDF The DataFrame containing data to be appended. + * @param requiredCols The list of required columns in the appendDF. + * @param optionalCols The list of optional columns in the appendDF. + * @throws IllegalArgumentException if input arguments have an invalid type, are missing, or are empty. + * @throws IllegalArgumentException if required columns are missing in the provided Delta table. + * @throws IllegalArgumentException if a column in the append DataFrame is not part of the original Delta table. + */ + def validateAppend( + deltaTable: DeltaTable, + appendDF: DataFrame, + requiredCols: List[String], + optionalCols: List[String] + ): Unit = { + // Check if deltaTable is an instance of DeltaTable + if (!deltaTable.isInstanceOf[DeltaTable]) { + throw new IllegalArgumentException("An existing delta table must be specified.") + } + + // Check if appendDF is an instance of DataFrame + if (!appendDF.isInstanceOf[DataFrame]) { + throw new IllegalArgumentException("You must provide a DataFrame that is to be appended.") + } + + val appendDataColumns = appendDF.columns + + // Check if all required columns are present in appendDF + for (requiredColumn <- requiredCols) { + if (!appendDataColumns.contains(requiredColumn)) { + throw new IllegalArgumentException( + s"The base Delta table has these columns $appendDataColumns, but these columns are required $requiredCols" + ) + } + } + + val tableColumns = deltaTable.toDF.columns + + // Check if all columns in appendDF are part of the current Delta table or optional + for (column <- appendDataColumns) { + if (!tableColumns.contains(column) && !optionalCols.contains(column)) { + throw new IllegalArgumentException( + s"The column $column is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter." + ) + } + } + + val details = deltaTable.toDF.select("location").collect()(0).getString(0) + + // Write the appendDF to the Delta table + appendDF.write.format("delta") + .mode(SaveMode.Append) + .option("mergeSchema", "true") + .save(details) + } + def getStorageLocation(deltaTable: DeltaTable): String = { val row = deltaTable.detail().select("location").collect().head val locationPath = row.getString(0) diff --git a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala index 3b95674..4dfae1a 100644 --- a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala +++ b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala @@ -439,6 +439,106 @@ class DeltaHelperSpec } } + describe("Validate and append data to a delta table"){ + it("should append dataframes with optional columns"){ + val path = (os.pwd / "tmp" / "delta-lake-validate-append-valid-dataframe").toString() + Seq( + (1, "a", "A"), + (2, "b", "B"), + ) + .toDF("col1", "col2", "col3") + .write + .format("delta") + .mode("overwrite") + .option("delta.logRetentionDuration", "interval 30 days") + .save(path) + + val deltaTable = DeltaTable.forPath(path) + + val appendDf = Seq( + (3, "c", "cat"), + (4, "d", "dog"), + ) + .toDF("col1", "col2", "col4") + + DeltaHelpers.validateAppend(deltaTable, appendDf, ["col1", "col2"], ["col4"] ) + + val expected = Seq( + (1, "a", "A", None), + (2, "b", "B", None), + (3, "c", None, "cat"), + (4, "d", None, "dog"), + ).toDF("col1", "col2", "col3", "col4") + val result = DeltaTable.forPath(path) + assertSmallDataFrameEquality( + result.toDF, + expected, + orderedComparison = false, + ignoreNullable = true + ) + + } + it("should fail to append dataframes with columns that are not on the accept list"){ + val path = (os.pwd / "tmp" / "delta-lake-validate-cols-in-accepted-list").toString() + Seq( + (1, "a", "A"), + (2, "b", "B"), + ) + .toDF("col1", "col2", "col3") + .write + .format("delta") + .mode("overwrite") + .option("delta.logRetentionDuration", "interval 30 days") + .save(path) + + val deltaTable = DeltaTable.forPath(path) + + val appendDf = Seq( + (4, "b", "A"), + (5, "y", "C"), + (6, "z", "D"), + ) + .toDF("col1", "col2", "col5") + + + val exceptionMessage = intercept[IllegalArgumentException] { + DeltaHelpers.validateAppend(deltaTable, appendDf, ["col1", "col2"], ["col4"] ) + }.getMessage + + assert(exceptionMessage.contains("but these columns are required")) + } + it("should fail to append dataframes with missing required columns"){ + val path = (os.pwd / "tmp" / "delta-lake-validate-missing-required-cols").toString() + Seq( + (1, "a", "A"), + (2, "b", "B"), + ) + .toDF("col1", "col2", "col3") + .write + .format("delta") + .mode("overwrite") + .option("delta.logRetentionDuration", "interval 30 days") + .save(path) + + val deltaTable = DeltaTable.forPath(path) + + val appendDf = Seq( + (4, "A"), + (5, "C"), + (6, "D"), + ) + .toDF("col1", "col4") + + + val exceptionMessage = intercept[IllegalArgumentException] { + DeltaHelpers.validateAppend(deltaTable, appendDf, ["col1", "col2"], ["col4"]) + }.getMessage + + assert(exceptionMessage.contains("If you want to add the column to the table, you must set the optionalCols parameter")) + + } + } + describe("Append without duplicating data") { it( "should insert data into an existing delta table and not duplicates in case some records already exists" From e02db75ef89649edb1ede3fa4d7556026c238966 Mon Sep 17 00:00:00 2001 From: Juan Pulido Date: Fri, 10 Nov 2023 16:52:30 -0500 Subject: [PATCH 2/4] Fixed tests typos --- src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala index 4dfae1a..08770af 100644 --- a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala +++ b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala @@ -461,7 +461,8 @@ class DeltaHelperSpec ) .toDF("col1", "col2", "col4") - DeltaHelpers.validateAppend(deltaTable, appendDf, ["col1", "col2"], ["col4"] ) + DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"), + List("col4" )) val expected = Seq( (1, "a", "A", None), @@ -502,7 +503,8 @@ class DeltaHelperSpec val exceptionMessage = intercept[IllegalArgumentException] { - DeltaHelpers.validateAppend(deltaTable, appendDf, ["col1", "col2"], ["col4"] ) + DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"), + List("col4") ) }.getMessage assert(exceptionMessage.contains("but these columns are required")) @@ -531,7 +533,8 @@ class DeltaHelperSpec val exceptionMessage = intercept[IllegalArgumentException] { - DeltaHelpers.validateAppend(deltaTable, appendDf, ["col1", "col2"], ["col4"]) + DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"), + List("col4")) }.getMessage assert(exceptionMessage.contains("If you want to add the column to the table, you must set the optionalCols parameter")) From 39c12c674a085f163c9bb48dfd8bf6c01336f6b7 Mon Sep 17 00:00:00 2001 From: Juan Pulido Date: Tue, 14 Nov 2023 13:57:33 -0500 Subject: [PATCH 3/4] Fixed typo errors and readme exception descriptions --- README.md | 4 ++-- .../scala/mrpowers/jodie/DeltaHelpers.scala | 4 ++-- .../mrpowers/jodie/DeltaHelperSpec.scala | 20 ++++++++++--------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index fa367ac..5740f45 100644 --- a/README.md +++ b/README.md @@ -285,7 +285,7 @@ You cannot append the following DataFrame which contains the required columns, b +----+----+----+ ``` -Here's the error you'll get when you attempt this write: "" +Here's the error you'll get when you attempt this write: "The column col5 is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter" You also cannot append the following DataFrame which is missing one of the required columns. @@ -299,7 +299,7 @@ You also cannot append the following DataFrame which is missing one of the requi +----+----+ ``` -Here's the error you'll get: "" +Here's the error you'll get: "The base Delta table has these columns List(col1, col4), but these columns are required List(col1, col2)" ### Latest Version of Delta Table diff --git a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala index f35776a..cf8d1fa 100644 --- a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala +++ b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala @@ -373,7 +373,7 @@ object DeltaHelpers { for (requiredColumn <- requiredCols) { if (!appendDataColumns.contains(requiredColumn)) { throw new IllegalArgumentException( - s"The base Delta table has these columns $appendDataColumns, but these columns are required $requiredCols" + s"The base Delta table has these columns ${appendDataColumns.mkString("List(", ", ", ")")}, but these columns are required $requiredCols" ) } } @@ -389,7 +389,7 @@ object DeltaHelpers { } } - val details = deltaTable.toDF.select("location").collect()(0).getString(0) + val details = deltaTable.detail().select("location").collect().head.getString(0) // Write the appendDF to the Delta table appendDF.write.format("delta") diff --git a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala index 08770af..fe813cf 100644 --- a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala +++ b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala @@ -461,16 +461,18 @@ class DeltaHelperSpec ) .toDF("col1", "col2", "col4") - DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"), - List("col4" )) + DeltaHelpers.validateAppend(deltaTable, appendDf, List("col1", "col2"), List("col4")) val expected = Seq( - (1, "a", "A", None), - (2, "b", "B", None), - (3, "c", None, "cat"), - (4, "d", None, "dog"), - ).toDF("col1", "col2", "col3", "col4") + (1, "a", "A", null), + (2, "b", "B", null), + (3, "c", null, "cat"), + (4, "d", null, "dog"), + ) + .toDF("col1", "col2", "col3", "col4") + val result = DeltaTable.forPath(path) + assertSmallDataFrameEquality( result.toDF, expected, @@ -507,7 +509,7 @@ class DeltaHelperSpec List("col4") ) }.getMessage - assert(exceptionMessage.contains("but these columns are required")) + assert(exceptionMessage.contains("The column col5 is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter")) } it("should fail to append dataframes with missing required columns"){ val path = (os.pwd / "tmp" / "delta-lake-validate-missing-required-cols").toString() @@ -537,7 +539,7 @@ class DeltaHelperSpec List("col4")) }.getMessage - assert(exceptionMessage.contains("If you want to add the column to the table, you must set the optionalCols parameter")) + assert(exceptionMessage.contains("The base Delta table has these columns List(col1, col4), but these columns are required List(col1, col2)")) } } From 3674cf0800e9e96067225a21b88fbe6736559449 Mon Sep 17 00:00:00 2001 From: Juan Pulido Date: Fri, 17 Nov 2023 07:59:05 -0500 Subject: [PATCH 4/4] Changed non-idiomatic expressions in scala --- README.md | 2 +- .../scala/mrpowers/jodie/DeltaHelpers.scala | 30 ++++--------------- .../mrpowers/jodie/DeltaHelperSpec.scala | 2 +- 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 5740f45..a0089c4 100644 --- a/README.md +++ b/README.md @@ -285,7 +285,7 @@ You cannot append the following DataFrame which contains the required columns, b +----+----+----+ ``` -Here's the error you'll get when you attempt this write: "The column col5 is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter" +Here's the error you'll get when you attempt this write: "The following columns are not part of the current Delta table. If you want to add these columns to the table, you must set the optionalCols parameter: List(col5)" You also cannot append the following DataFrame which is missing one of the required columns. diff --git a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala index cf8d1fa..5987f99 100644 --- a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala +++ b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala @@ -357,37 +357,17 @@ object DeltaHelpers { requiredCols: List[String], optionalCols: List[String] ): Unit = { - // Check if deltaTable is an instance of DeltaTable - if (!deltaTable.isInstanceOf[DeltaTable]) { - throw new IllegalArgumentException("An existing delta table must be specified.") - } - - // Check if appendDF is an instance of DataFrame - if (!appendDF.isInstanceOf[DataFrame]) { - throw new IllegalArgumentException("You must provide a DataFrame that is to be appended.") - } val appendDataColumns = appendDF.columns + val tableColumns = deltaTable.toDF.columns // Check if all required columns are present in appendDF - for (requiredColumn <- requiredCols) { - if (!appendDataColumns.contains(requiredColumn)) { - throw new IllegalArgumentException( - s"The base Delta table has these columns ${appendDataColumns.mkString("List(", ", ", ")")}, but these columns are required $requiredCols" - ) - } - } - - val tableColumns = deltaTable.toDF.columns + val missingColumns = requiredCols.filterNot(appendDataColumns.contains) + require(missingColumns.isEmpty, s"The base Delta table has these columns ${appendDataColumns.mkString("List(", ", ", ")")}, but these columns are required $requiredCols") // Check if all columns in appendDF are part of the current Delta table or optional - for (column <- appendDataColumns) { - if (!tableColumns.contains(column) && !optionalCols.contains(column)) { - throw new IllegalArgumentException( - s"The column $column is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter." - ) - } - } + val invalidColumns = appendDataColumns.filterNot(column => tableColumns.contains(column) || optionalCols.contains(column)) + require(invalidColumns.isEmpty, s"The following columns are not part of the current Delta table. If you want to add these columns to the table, you must set the optionalCols parameter: ${invalidColumns.mkString("List(", ", ", ")")}") val details = deltaTable.detail().select("location").collect().head.getString(0) diff --git a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala index fe813cf..351d25d 100644 --- a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala +++ b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala @@ -509,7 +509,7 @@ class DeltaHelperSpec List("col4") ) }.getMessage - assert(exceptionMessage.contains("The column col5 is not part of the current Delta table. If you want to add the column to the table, you must set the optionalCols parameter")) + assert(exceptionMessage.contains("The following columns are not part of the current Delta table. If you want to add these columns to the table, you must set the optionalCols parameter: List(col5)")) } it("should fail to append dataframes with missing required columns"){ val path = (os.pwd / "tmp" / "delta-lake-validate-missing-required-cols").toString()