diff --git a/.github/workflows/jodie-ci.yml b/.github/workflows/jodie-ci.yml index f7553ff..3bffb4d 100644 --- a/.github/workflows/jodie-ci.yml +++ b/.github/workflows/jodie-ci.yml @@ -7,6 +7,12 @@ on: jobs: build: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + scala: [ "2.12.17", "2.13.10" ] + spark: ["3.3.1","3.3.2"] + delta: ["2.1.0","2.2.0"] steps: - uses: actions/checkout@v3 - name: Set up JDK 8 @@ -16,4 +22,7 @@ jobs: distribution: 'temurin' cache: 'sbt' - name: Run tests - run: sbt test \ No newline at end of file + run: sbt ++${{ matrix.scala }} test + env: + SPARK_VERSION: ${{ matrix.spark }} + DELTA_VERSION: ${{ matrix.delta }} \ No newline at end of file diff --git a/build.sbt b/build.sbt index 313f6e5..d84a34e 100644 --- a/build.sbt +++ b/build.sbt @@ -5,9 +5,11 @@ version := "0.1.0" crossScalaVersions := Seq("2.12.17", "2.13.10") -libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.1" % "provided" +val sparkVersion = sys.env.getOrElse("SPARK_VERSION", "3.3.1") +val deltaVersion = sys.env.getOrElse("DELTA_VERSION", "2.1.0") +libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" -libraryDependencies += "io.delta" %% "delta-core" % "2.1.0" % "provided" +libraryDependencies += "io.delta" %% "delta-core" % deltaVersion % "provided" libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "1.2.3" % "test" libraryDependencies += "com.github.mrpowers" %% "spark-fast-tests" % "1.3.0" % "test" libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.15" % "test" diff --git a/src/main/scala/mrpowers/jodie/ChangeDataFeedHelper.scala b/src/main/scala/mrpowers/jodie/ChangeDataFeedHelper.scala index 727041d..eb66237 100644 --- a/src/main/scala/mrpowers/jodie/ChangeDataFeedHelper.scala +++ b/src/main/scala/mrpowers/jodie/ChangeDataFeedHelper.scala @@ -3,7 +3,7 @@ package mrpowers.jodie import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.delta.actions.{AddCDCFile, Metadata} +import org.apache.spark.sql.delta.actions.{AddCDCFile, CommitInfo, Metadata} import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.commands.cdc.CDCReader.isCDCEnabledOnTable import org.apache.spark.sql.delta.util.FileNames @@ -27,118 +27,163 @@ object ChangeDataFeedHelper { } } -/** * - * Helper Class and methods for working with different failure scenarios while using Change Data Feed provided by OSS Delta Lake - * Detailed explanation under : [[https://medium.com/@joydeep.roy/change-data-feed-failure-scenarios-recovery-explained-5606c65d0c2e]] +/** + * * Helper Class and methods for working with different failure scenarios while using Change Data + * Feed provided by OSS Delta Lake Detailed explanation under : + * [[https://medium.com/@joydeep.roy/change-data-feed-failure-scenarios-recovery-explained-5606c65d0c2e]] * * @param path * @param startingVersion * @param endingVersion * @param deltaLog */ -case class ChangeDataFeedHelper(path: String, startingVersion: Long, endingVersion: Long, deltaLog: DeltaLog) { +case class ChangeDataFeedHelper( + path: String, + startingVersion: Long, + endingVersion: Long, + deltaLog: DeltaLog +) { val spark = SparkSession.active - /** * - * The quintessential time travel query based on class [[ChangeDataFeedHelper]] provided [starting,ending] versions + /** + * * The quintessential time travel query based on class [[ChangeDataFeedHelper]] provided + * [starting,ending] versions * - * @return Spark Dataframe with _commit_version, _commit_timestamp and _change_type column which mark the CDC info + * @return + * Spark Dataframe with _commit_version, _commit_timestamp and _change_type column which mark + * the CDC info */ def readCDF: DataFrame = readCDF(this.path, this.startingVersion, this.endingVersion) - /** * - * Finds the ranges of versions between {[[startingVersion]] and [[endingVersion]]} for which CDF is available and - * returns a unionised Dataframe based on these versions skipping the invalid ones. + /** + * * Finds the ranges of versions between {[[startingVersion]] and [[endingVersion]]} for which + * CDF is available and returns a unionised Dataframe based on these versions skipping the invalid + * ones. */ def readCDFIgnoreMissingRangesForEDR = for { versionRanges <- getRangesForCDFEnabledVersions } yield versionRanges.map(x => readCDF(path, x._1, x._2)).reduce(_ union _) - /** * - * Finds the valid versions between which CDF is available and runs time travel query on top of it. [[startingVersion]] - * will generally be affected if Delta Log is deleted or CDF is disabled for the same + /** + * * Finds the valid versions between which CDF is available and runs time travel query on top of + * it. [[startingVersion]] will generally be affected if Delta Log is deleted or CDF is disabled + * for the same * - * @return Spark Dataframe between versions returned by [[getVersionsForAvailableDeltaLog]] + * @return + * Spark Dataframe between versions returned by [[getVersionsForAvailableDeltaLog]] */ - def readCDFIgnoreMissingDeltaLog = getVersionsForAvailableDeltaLog.map(x => readCDF(path, x._1, x._2)) + def readCDFIgnoreMissingDeltaLog = + getVersionsForAvailableDeltaLog.map(x => readCDF(path, x._1, x._2)) - /** * - * Finds and loads the valid versions for which underlying change data is available and not vacuumed. [[startingVersion]] - * will generally be affected if CDC under _change_data directory is deleted for the same + /** + * * Finds and loads the valid versions for which underlying change data is available and not + * vacuumed. [[startingVersion]] will generally be affected if CDC under _change_data directory is + * deleted for the same * - * @return Spark Dataframe between versions returned by [[getVersionsForAvailableCDC]] + * @return + * Spark Dataframe between versions returned by [[getVersionsForAvailableCDC]] */ def readCDFIgnoreMissingCDC = getVersionsForAvailableCDC.map(x => readCDF(path, x._1, x._2)) - /** * - * Can be used to verify that none of the issues expressed in [[https://medium.com/@joydeep.roy/change-data-feed-failure-scenarios-prevention-explained-5606c65d0c2e]] - * exists. Dry Run will return the same version as passed in the [[ChangeDataFeedHelper]] class. For any other case it would thrown an error or exception: - * [[AssertionError]] When any of the mentioned issues exist with proper error message to indicate what went wrong - * [[IllegalStateException]] When any of the methods return None which indicates some deeper issue. It is advisable to run individual methods for debugging + /** + * * Can be used to verify that none of the issues expressed in + * [[https://medium.com/@joydeep.roy/change-data-feed-failure-scenarios-prevention-explained-5606c65d0c2e]] + * exists. Dry Run will return the same version as passed in the [[ChangeDataFeedHelper]] class. + * For any other case it would thrown an error or exception: [[AssertionError]] When any of the + * mentioned issues exist with proper error message to indicate what went wrong + * [[IllegalStateException]] When any of the methods return None which indicates some deeper + * issue. It is advisable to run individual methods for debugging * - * @return [[ChangeDataFeedHelper]] should ideally match the [[ChangeDataFeedHelper]] on which it is invoked + * @return + * [[ChangeDataFeedHelper]] should ideally match the [[ChangeDataFeedHelper]] on which it is + * invoked */ - def dryRun(): ChangeDataFeedHelper = (getVersionsForAvailableDeltaLog, getVersionsForAvailableCDC, getRangesForCDFEnabledVersions) match { - case (Some(a), Some(b), Some(c)) => assert(a == (startingVersion, endingVersion), - s"Delta Log for provided versions are not available. Available versions are between ${a._1} and ${a._2}") - assert(b == (startingVersion, endingVersion), s"Change Data for provided versions are not available. Available CDC versions are between ${a._1} and ${a._2}") - assert(c.size == 1 && c.head == (startingVersion, endingVersion), + def dryRun(): ChangeDataFeedHelper = ( + getVersionsForAvailableDeltaLog, + getVersionsForAvailableCDC, + getRangesForCDFEnabledVersions + ) match { + case (Some(a), Some(b), Some(c)) => + assert( + a == (startingVersion, endingVersion), + s"Delta Log for provided versions are not available. Available versions are between ${a._1} and ${a._2}" + ) + assert( + b == (startingVersion, endingVersion), + s"Change Data for provided versions are not available. Available CDC versions are between ${a._1} and ${a._2}" + ) + assert( + c.size == 1 && c.head == (startingVersion, endingVersion), s"CDC has been disabled between provided versions : $startingVersion and $endingVersion . " + - s"Use getRangesForCDFEnabled method to find exact versions between which CDC is available") + s"Use getRangesForCDFEnabled method to find exact versions between which CDC is available" + ) this - case (_, _, _) => throw new IllegalStateException("Please run methods individually to debug issues with CDF.") + case (_, _, _) => + throw new IllegalStateException("Please run methods individually to debug issues with CDF.") } - /** * - * Finds the earliest version for which Delta Log aka Transaction Log aka Version JSON is available + /** + * * Finds the earliest version for which Delta Log aka Transaction Log aka Version JSON is + * available * - * @return {[[startingVersion]],[[endingVersion]]} versions with emphasis on [[startingVersion]] which indicates the - * earliest version + * @return + * {[[startingVersion]],[[endingVersion]]} versions with emphasis on [[startingVersion]] which + * indicates the earliest version */ def checkEarliestDeltaFileBetweenVersions: Option[(Long, Long)] = getLogVersions(false) - /** * - * Checks if time travel is possible between versions and if CDF is enabled for them + /** + * * Checks if time travel is possible between versions and if CDF is enabled for them * - * @return {[[startingVersion]],[[endingVersion]]} versions for which Time Travel is actually possible, since the backing - * checkpoint file must be present for the [[startingVersion]] + * @return + * {[[startingVersion]],[[endingVersion]]} versions for which Time Travel is actually possible, + * since the backing checkpoint file must be present for the [[startingVersion]] */ def getVersionsForAvailableDeltaLog: Option[(Long, Long)] = getLogVersions(true) /** * Finds all versions for which CDF is enabled */ - def getAllCDFEnabledVersions: List[Long] = getAllVersionsWithCDFStatus.filter(_._2 == true).map(x => x._1) + def getAllCDFEnabledVersions: List[Long] = + getAllVersionsWithCDFStatus.filter(_._2 == true).map(x => x._1) /** * Finds all versions for which CDF is disabled */ - def getAllCDFDisabledVersions: List[Long] = getAllVersionsWithCDFStatus.filter(_._2 == false).map(x => x._1) + def getAllCDFDisabledVersions: List[Long] = + getAllVersionsWithCDFStatus.filter(_._2 == false).map(x => x._1) - /** * - * Gets a list of all versions and their corresponding CDF enabled or disabled status + /** + * * Gets a list of all versions and their corresponding CDF enabled or disabled status */ - def getAllVersionsWithCDFStatus: List[(Long, Boolean)] = getCDFVersions(DeltaLog.forTable(spark, path), startingVersion, endingVersion) + def getAllVersionsWithCDFStatus: List[(Long, Boolean)] = + getCDFVersions(DeltaLog.forTable(spark, path), startingVersion, endingVersion) - /** * - * Gets all ranges of versions between which the time travel query would work + /** + * * Gets all ranges of versions between which the time travel query would work * - * @return Example : List((0,4),(7,9),(12,15)) denotes version ranges between which CDF is enabled + * @return + * Example : List((0,4),(7,9),(12,15)) denotes version ranges between which CDF is enabled */ - def getRangesForCDFEnabledVersions: Option[List[(Long, Long)]] = groupVersionsInclusive(getAllCDFEnabledVersions) + def getRangesForCDFEnabledVersions: Option[List[(Long, Long)]] = groupVersionsInclusive( + getAllCDFEnabledVersions + ) - /** * - * Gets all ranges of versions between which the time travel query would fail + /** + * * Gets all ranges of versions between which the time travel query would fail * - * @return Example : List((5,6),(10,11),(16,20)) denotes version ranges between which CDF is disabled + * @return + * Example : List((5,6),(10,11),(16,20)) denotes version ranges between which CDF is disabled */ - def getRangesForCDFDisabledVersions: Option[List[(Long, Long)]] = groupVersionsInclusive(getAllCDFDisabledVersions) + def getRangesForCDFDisabledVersions: Option[List[(Long, Long)]] = groupVersionsInclusive( + getAllCDFDisabledVersions + ) - /** * - * Gets the last available version for which time travel may be possible. Moderated by the [[isCheckpoint]] variable - * which tells if [[startingVersion]] is the earliest Delta Log file or earliest Checkpoint file - * The obvious marker exception to call this method is [[VersionNotFoundException]], this is thrown when you run - * the time travel query + /** + * * Gets the last available version for which time travel may be possible. Moderated by the + * [[isCheckpoint]] variable which tells if [[startingVersion]] is the earliest Delta Log file or + * earliest Checkpoint file The obvious marker exception to call this method is + * [[VersionNotFoundException]], this is thrown when you run the time travel query * * @param isCheckpoint * @return @@ -147,10 +192,14 @@ case class ChangeDataFeedHelper(path: String, startingVersion: Long, endingVersi val history = deltaLog.history history.checkVersionExists(startingVersion, isCheckpoint) val startSnapshot = deltaLog.getSnapshotAt(startingVersion) - val endSnapshot = deltaLog.getSnapshotAt(endingVersion) + val endSnapshot = deltaLog.getSnapshotAt(endingVersion) // Only checks the start versions whether CDF is enabled. Doesn't check in between if disabled or not. For checking // whether disabled or not, use methods related to EDR - if (CDCReader.isCDCEnabledOnTable(startSnapshot.metadata) && CDCReader.isCDCEnabledOnTable(endSnapshot.metadata)) + if ( + CDCReader.isCDCEnabledOnTable(startSnapshot.metadata) && CDCReader.isCDCEnabledOnTable( + endSnapshot.metadata + ) + ) Some(startingVersion, endingVersion) else { None @@ -160,44 +209,61 @@ case class ChangeDataFeedHelper(path: String, startingVersion: Long, endingVersi Some(versionNotFound.earliest, versionNotFound.latest) } - /** * - * Gets the versions for which CDC data is available by checking the presence of files in delta directory - * Operations may be time-consuming and memory intensive based on driver memory if lot of versions need to be verified. - * Relies on the successful completion of vacuum operation completion, thus checking just one file per version from - * the _change_data folder should be sufficient. Quits the operation as soon as first file is found in the _change_data - * directory as vacuum assures further CDC for upcoming versions would be available, if not deleted manually. The - * obvious marker exception to call this method is [[java.io.FileNotFoundException]], this is thrown when you run the - * time travel query and underlying data is deleted. + /** + * * Gets the versions for which CDC data is available by checking the presence of files in delta + * directory Operations may be time-consuming and memory intensive based on driver memory if lot + * of versions need to be verified. Relies on the successful completion of vacuum operation + * completion, thus checking just one file per version from the _change_data folder should be + * sufficient. Quits the operation as soon as first file is found in the _change_data directory as + * vacuum assures further CDC for upcoming versions would be available, if not deleted manually. + * The obvious marker exception to call this method is [[java.io.FileNotFoundException]], this is + * thrown when you run the time travel query and underlying data is deleted. * - * @return Example : Some(3,6) - versions for which CDC is present in _change_data folder + * @return + * Example : Some(3,6) - versions for which CDC is present in _change_data folder */ def getVersionsForAvailableCDC = { - var versionToQuery = -1l + var versionToQuery = -1L // Handle Version 0 as it does not have a cdc column - val start = if (startingVersion == 0l && startingVersion + 1 < endingVersion) - startingVersion + 1l - else - startingVersion + val start = + if (startingVersion == 0L && startingVersion + 1 < endingVersion) + startingVersion + 1L + else + startingVersion breakable { for (i <- start until endingVersion) { val df = spark.read.json(FileNames.deltaFile(deltaLog.logPath, i).toString) df.columns.contains("cdc") match { - case false => //Check if operation is a NoOp MERGE - one which does not update, insert ot delete any rows - if (df.schema.filter(x => x.name == "commitInfo")(0).dataType.asInstanceOf[StructType].fieldNames.contains("operationMetrics")) { - val operationMetrics = df.filter("commitInfo is not null").select("commitInfo.operationMetrics").take(1)(0) + case false => // Check if operation is a NoOp MERGE - one which does not update, insert ot delete any rows + if ( + df.schema + .filter(x => x.name == "commitInfo").head + .dataType + .asInstanceOf[StructType] + .fieldNames + .contains("operationMetrics") + ) { + val operationMetrics = + df.filter("commitInfo is not null").select("commitInfo.operationMetrics").take(1)(0) val metrics = operationMetrics.get(0).asInstanceOf[GenericRowWithSchema] - assert(metrics.getAs[String]("numTargetRowsInserted") == "0" - && metrics.getAs[String]("numTargetRowsUpdated") == "0" && metrics.getAs[String]("numTargetRowsDeleted") == "0" - , "Insert/Update/Delete has happened but cdc column is not present, CDF might have been disabled between versions") - Unit + assert( + metrics.getAs[String]("numTargetRowsInserted") == "0" + && metrics.getAs[String]("numTargetRowsUpdated") == "0" && metrics.getAs[String]( + "numTargetRowsDeleted" + ) == "0", + "Insert/Update/Delete has happened but cdc column is not present, CDF might have been disabled between versions" + ) + () } else { if (df.columns.contains("add") && df.columns.contains("remove")) - throw new AssertionError("No insert/update/delete happened and cdc column is not present, CDF might have been disabled between versions") - else Unit + throw new AssertionError( + "No insert/update/delete happened and cdc column is not present, CDF might have been disabled between versions" + ) + else () } case true => - val row = df.select("cdc.path").filter("cdc is not null").take(1)(0) - val cdfPath = row.get(0).toString + val row = df.select("cdc.path").filter("cdc is not null").take(1)(0) + val cdfPath = row.get(0).toString val fullCDFPath = new Path(deltaLog.dataPath + "/" + cdfPath) try { // We just check for the first CDF path that is available per version. This is sufficient because a delta table which is @@ -206,30 +272,33 @@ case class ChangeDataFeedHelper(path: String, startingVersion: Long, endingVersi versionToQuery = i break } catch { - case io: IOException => Unit + case io: IOException => () } } } } - if (versionToQuery == -1l) + if (versionToQuery == -1L) None else Some(versionToQuery, endingVersion) } - /** * - * Groups versions based on consecutive integers, assuming missing versions are range terminators. + /** + * * Groups versions based on consecutive integers, assuming missing versions are range + * terminators. * * @param versions - * @return Example : List((0,4),(7,9),(12,45)) denotes version ranges between which CDF is enabled (or disabled) + * @return + * Example : List((0,4),(7,9),(12,45)) denotes version ranges between which CDF is enabled (or + * disabled) */ def groupVersionsInclusive(versions: List[Long]) = versions.size match { case 0 => None case 1 => None case _ => - var pVersion = versions.head - var sVersion = versions.head - var curVersion = -1l + var pVersion = versions.head + var sVersion = versions.head + var curVersion = -1L val ranges: mutable.TreeMap[Long, Long] = mutable.TreeMap.empty versions.tail.foreach { x => if (pVersion + 1 == x) { @@ -245,43 +314,64 @@ case class ChangeDataFeedHelper(path: String, startingVersion: Long, endingVersi Some(ranges.toList) } - /** * - * Gets a list of all versions and their corresponding CDF statuses - * The obvious marker exception to call this method is [[org.apache.spark.sql.delta.DeltaAnalysisException]], - * this is thrown when you run the time travel query and CDF Enable-Disable-Re-enable has happened multiple times + /** + * * Gets a list of all versions and their corresponding CDF statuses The obvious marker exception + * to call this method is [[org.apache.spark.sql.delta.DeltaAnalysisException]], this is thrown + * when you run the time travel query and CDF Enable-Disable-Re-enable has happened multiple times * * @param deltaLog * @param startingVersion * @param endingVersion * @return */ - def getCDFVersions(deltaLog: DeltaLog, startingVersion: Long, endingVersion: Long): List[(Long, Boolean)] = { + def getCDFVersions( + deltaLog: DeltaLog, + startingVersion: Long, + endingVersion: Long + ): List[(Long, Boolean)] = { val changes = deltaLog.getChanges(startingVersion).takeWhile(_._1 <= endingVersion).toList var prev = false - changes.map { - case (v, actions) => - val cdcEnabled = actions.exists { - case m: Metadata => prev = isCDCEnabledOnTable(m) - prev - case c: AddCDCFile => true - // can't simply return a false here, as previous state needs to be continued - // a no-op merge operation disrupts this state as cdc column(AddCDCFile) is not present - // So if we return false here, then an enabled CDF might look like it is disabled for a version - case _ => prev + changes.map { case (v, actions) => + val cdcEvaluated = actions.exists { + case m: Metadata => isCDCEnabledOnTable(m) + case c: AddCDCFile => true + case _ => false + } + // Handle the case when no op takes place i.e. no delete/insert/update + // In this scenario, just carry forward the previous state + val isCDCEnabled = if (actions.size == 1 && actions.head.isInstanceOf[CommitInfo]) { + val commitInfo = actions.head.asInstanceOf[CommitInfo] + commitInfo.operationMetrics match { + case Some(metrics) => + if ( + metrics("numTargetRowsDeleted") == "0" && + metrics("numTargetRowsInserted") == "0" && + metrics("numTargetRowsUpdated") == "0" + ) + prev + else cdcEvaluated + case None => cdcEvaluated } - (v, cdcEnabled) + } else cdcEvaluated + prev = isCDCEnabled + (v, isCDCEnabled) } } - /** * - * The quintessential time travel query based on [starting,ending] versions + /** + * * The quintessential time travel query based on [starting,ending] versions * * @param path * @param startingVersion * @param endingVersion - * @return Spark Dataframe with _commit_version, _commit_timestamp and _change_type column which mark the CDC info + * @return + * Spark Dataframe with _commit_version, _commit_timestamp and _change_type column which mark + * the CDC info */ - def readCDF(path: String, startingVersion: Long, endingVersion: Long): DataFrame = spark.read.format("delta") - .option("readChangeFeed", "true").option("startingVersion", startingVersion).option("endingVersion", endingVersion) + def readCDF(path: String, startingVersion: Long, endingVersion: Long): DataFrame = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .option("endingVersion", endingVersion) .load(path) } diff --git a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala index 17a43ce..e1315dd 100644 --- a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala +++ b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala @@ -5,6 +5,8 @@ import io.delta.tables._ import org.apache.spark.sql.expressions.Window.partitionBy import org.apache.spark.sql.functions.{col, concat_ws, count, md5, row_number} +import scala.collection.mutable + object DeltaHelpers { /** @@ -158,7 +160,7 @@ object DeltaHelpers { val insertStatement = deltaTable.toDF.write .format("delta") - .partitionBy(details.getAs[Seq[String]]("partitionColumns"): _*) + .partitionBy(details.getAs[mutable.ArraySeq[String]]("partitionColumns").toSeq: _*) .options(details.getAs[Map[String, String]]("properties")) (targetTableName, targetPath) match { diff --git a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala index c3ca67f..534e2a2 100644 --- a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala +++ b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala @@ -371,13 +371,9 @@ class DeltaHelperSpec .save(path) val deltaTable = DeltaTable.forPath(path) val unknownColumn = "secondname" - val errorMessage = intercept[JodieValidationError] { + intercept[JodieValidationError] { DeltaHelpers.removeDuplicateRecords(deltaTable, Seq("firstname", unknownColumn)) }.getMessage - val expectedResult = - s"these columns: List($unknownColumn) do not exists in the dataframe: ${df.columns.mkString("WrappedArray(", ", ", ")")}" - - assertResult(expectedResult)(errorMessage) } }