Skip to content

Commit

Permalink
Added skipChangeCommits support for Delta Sharing (#285)
Browse files Browse the repository at this point in the history
  • Loading branch information
bogao007 authored Mar 29, 2023
1 parent 44f5d8d commit c5585be
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Once the provider shares a table with history, the recipient can perform a strea
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
val df = spark.readStream.format("deltaSharing")
.option("startingVersion", "1")
.option("ignoreChanges", "true")
.option("skipChangeCommits", "true")
.load(tablePath)
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object DeltaSharingErrors {
def deltaSharingSourceIgnoreChangesError(version: Long): Throwable = {
new UnsupportedOperationException("Detected a data update in the source table at version " +
s"$version. This is currently not supported. If you'd like to ignore updates, set the " +
s"option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, " +
s"option 'skipChangeCommits' to 'true'. If you would like the data update to be reflected, " +
s"please restart the query from latest snapshot with a fresh checkpoint directory.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {

val ignoreDeletes = options.get(IGNORE_DELETES_OPTION).exists(toBoolean(_, IGNORE_DELETES_OPTION))

val skipChangeCommits = options.get(SKIP_CHANGE_COMMITS_OPTION)
.exists(toBoolean(_, SKIP_CHANGE_COMMITS_OPTION))

val readChangeFeed = options.get(CDF_READ_OPTION).exists(toBoolean(_, CDF_READ_OPTION)) ||
options.get(CDF_READ_OPTION_LEGACY).exists(toBoolean(_, CDF_READ_OPTION_LEGACY))

Expand Down Expand Up @@ -154,6 +157,7 @@ object DeltaSharingOptions extends Logging {
val MAX_BYTES_PER_TRIGGER_OPTION = "maxBytesPerTrigger"
val IGNORE_CHANGES_OPTION = "ignoreChanges"
val IGNORE_DELETES_OPTION = "ignoreDeletes"
val SKIP_CHANGE_COMMITS_OPTION = "skipChangeCommits"

val STARTING_VERSION_OPTION = "startingVersion"
val STARTING_TIMESTAMP_OPTION = "startingTimestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,8 @@ case class DeltaSharingSource(
// This is checked before creating DeltaSharingSource
assert(schema.nonEmpty, "schema cannot be empty in DeltaSharingSource.")

/** A check on the source table that disallows deletes on the source data. */
private val ignoreChanges = options.ignoreChanges

/** A check on the source table that disallows commits that only include deletes to the data. */
private val ignoreDeletes = options.ignoreDeletes || ignoreChanges
/** A check on the source table that skips commits that contain removes from the set of files. */
private val skipChangeCommits = options.skipChangeCommits

private val tableId = initSnapshot.metadata.id

Expand Down Expand Up @@ -277,7 +274,7 @@ case class DeltaSharingSource(
a.id -> a.url
}.toMap
}
val allAddFiles = verifyStreamHygieneAndFilterAddFiles(tableFiles).groupBy(a => a.version)
val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version)
for (v <- fromVersion to currentLatestVersion) {

val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
Expand Down Expand Up @@ -631,7 +628,7 @@ case class DeltaSharingSource(
}
}

private def verifyStreamHygieneAndFilterAddFiles(
private def validateCommitAndFilterAddFiles(
tableFiles: DeltaTableFiles): Seq[AddFileForCDF] = {
(Seq(tableFiles.metadata) ++ tableFiles.additionalMetadatas).foreach { m =>
val schemaToCheck = DeltaTableUtils.toSchema(m.schemaString)
Expand All @@ -640,20 +637,31 @@ case class DeltaSharingSource(
}
}

if (!tableFiles.removeFiles.isEmpty) {
val addFiles = tableFiles.addFiles
if (tableFiles.removeFiles.nonEmpty) {
/** A check on the source table that disallows changes on the source data. */
val shouldAllowChanges = options.ignoreChanges || skipChangeCommits
/** A check on the source table that disallows commits that only include deletes or
* contain changes on the source data. */
val shouldAllowDeletes = shouldAllowChanges || options.ignoreDeletes
val versionsWithRemoveFiles = tableFiles.removeFiles.map(r => r.version).toSet
val versionsWithAddFiles = tableFiles.addFiles.map(a => a.version).toSet
versionsWithRemoveFiles.foreach{
case version =>
if (versionsWithAddFiles.contains(version) && !ignoreChanges) {
if (skipChangeCommits) {
// Filter out addFiles that have the same version in versionsWithRemoveFiles and directly
// return the result. Below verification is not needed if skipChangeCommits is true.
return addFiles.filter(addFile => !versionsWithRemoveFiles.contains(addFile.version))
}
versionsWithRemoveFiles.foreach {
version =>
if (versionsWithAddFiles.contains(version) && !shouldAllowChanges) {
throw DeltaSharingErrors.deltaSharingSourceIgnoreChangesError(version)
} else if (!versionsWithAddFiles.contains(version) && !ignoreDeletes) {
} else if (!versionsWithAddFiles.contains(version) && !shouldAllowDeletes) {
throw DeltaSharingErrors.deltaSharingSourceIgnoreDeleteError(version)
}
}
}

tableFiles.addFiles
addFiles
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ class DeltaSharingSourceSuite extends QueryTest
.option("ignoreChanges", "true")
}

def withSkipChangeCommitsStreamReaderAtVersion(
path: String = tablePath,
startingVersion: String = "0"): DataStreamReader = {
spark.readStream.format("deltaSharing").option("path", path)
.option("startingVersion", startingVersion)
.option("skipChangeCommits", "true")
}

/**
* Test defaultReadLimit
*/
Expand Down Expand Up @@ -487,6 +495,8 @@ class DeltaSharingSourceSuite extends QueryTest
query.processAllAvailable()
}.getMessage
assert(message.contains("Detected a data update in the source table at version 3"))
assert(message.contains("If you'd like to ignore updates, set the option 'skipChangeCommits'" +
" to 'true'."))
}

/**
Expand Down Expand Up @@ -618,4 +628,92 @@ class DeltaSharingSourceSuite extends QueryTest
query.stop()
}
}

/**
* Test basic streaming functionality with 'skipChangeCommits' as true.
*/
integrationTest("skipChangeCommits - basic - success") {
val query = withSkipChangeCommitsStreamReaderAtVersion()
.load().writeStream.format("console").start()

try {
query.processAllAvailable()
val progress = query.recentProgress.filter(_.numInputRows != 0)
assert(progress.length === 1)
progress.foreach { p =>
assert(p.numInputRows === 3)
}
} finally {
query.stop()
}
}

integrationTest("skipChangeCommits - basic memory - success") {
val query = withSkipChangeCommitsStreamReaderAtVersion()
.load().writeStream.format("memory").queryName("streamMemoryOutput").start()

try {
query.processAllAvailable()
val progress = query.recentProgress.filter(_.numInputRows != 0)
assert(progress.length === 1)
progress.foreach { p =>
assert(p.numInputRows === 3)
}

val expected = Seq(
Row("3", 3, sqlDate("2020-01-01")),
Row("2", 2, sqlDate("2020-01-01")),
Row("1", 1, sqlDate("2020-01-01"))
)
checkAnswer(sql("SELECT * FROM streamMemoryOutput"), expected)
} finally {
query.stop()
}
}

integrationTest("skipChangeCommits - outputDataframe - success") {
withTempDirs { (checkpointDir, outputDir) =>
val query = withSkipChangeCommitsStreamReaderAtVersion()
.load().writeStream.format("parquet")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)

try {
query.processAllAvailable()
val progress = query.recentProgress.filter(_.numInputRows != 0)
assert(progress.length === 1)
progress.foreach { p =>
assert(p.numInputRows === 3)
}
} finally {
query.stop()
}

val expected = Seq(
Row("3", 3, sqlDate("2020-01-01")),
Row("2", 2, sqlDate("2020-01-01")),
Row("1", 1, sqlDate("2020-01-01"))
)
checkAnswer(spark.read.format("parquet").load(outputDir.getCanonicalPath), expected)
}
}

integrationTest("skipChangeCommits - no startingVersion - success") {
// cdf_table_cdf_enabled snapshot at version 5 is queried, with 2 files and 2 rows of data
val query = spark.readStream.format("deltaSharing")
.option("skipChangeCommits", "true")
.load(tablePath)
.select("birthday", "name", "age").writeStream.format("console").start()

try {
query.processAllAvailable()
val progress = query.recentProgress.filter(_.numInputRows != 0)
assert(progress.length === 1)
progress.foreach { p =>
assert(p.numInputRows === 2)
}
} finally {
query.stop()
}
}
}

0 comments on commit c5585be

Please sign in to comment.